mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-01-23 05:01:12 -05:00
atomic routing domain editor fixes
This commit is contained in:
parent
51b509221c
commit
9dcfcd02a0
@ -6,7 +6,7 @@ impl RoutingTable {
|
||||
let inner = self.inner.read();
|
||||
out += "Routing Table Info:\n";
|
||||
|
||||
out += &format!(" Node Id: {}\n", inner.node_id.encode());
|
||||
out += &format!(" Node Id: {}\n", self.unlocked_inner.node_id.encode());
|
||||
out += &format!(
|
||||
" Self Latency Stats Accounting: {:#?}\n\n",
|
||||
inner.self_latency_stats_accounting
|
||||
|
@ -212,7 +212,7 @@ impl RoutingTable {
|
||||
T: FnMut(DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
let inner = self.inner.read();
|
||||
let self_node_id = inner.node_id;
|
||||
let self_node_id = self.unlocked_inner.node_id;
|
||||
|
||||
// collect all the nodes for sorting
|
||||
let mut nodes =
|
||||
@ -340,7 +340,7 @@ impl RoutingTable {
|
||||
{
|
||||
let cur_ts = intf::get_timestamp();
|
||||
let node_count = {
|
||||
let c = self.config.get();
|
||||
let c = self.unlocked_inner.config.get();
|
||||
c.network.dht.max_find_node_count as usize
|
||||
};
|
||||
let out = self.find_peers_with_sort_and_filter(
|
||||
|
@ -34,41 +34,53 @@ pub struct RecentPeersEntry {
|
||||
|
||||
/// RoutingTable rwlock-internal data
|
||||
struct RoutingTableInner {
|
||||
// The current node's public DHT key
|
||||
node_id: DHTKey,
|
||||
node_id_secret: DHTKeySecret, // The current node's DHT key secret
|
||||
|
||||
buckets: Vec<Bucket>, // Routing table buckets that hold entries
|
||||
kick_queue: BTreeSet<usize>, // Buckets to kick on our next kick task
|
||||
bucket_entry_count: usize, // A fast counter for the number of entries in the table, total
|
||||
|
||||
public_internet_routing_domain: PublicInternetRoutingDomainDetail, // The public internet
|
||||
local_network_routing_domain: LocalInternetRoutingDomainDetail, // The dial info we use on the local network
|
||||
|
||||
self_latency_stats_accounting: LatencyStatsAccounting, // Interim accounting mechanism for this node's RPC latency to any other node
|
||||
self_transfer_stats_accounting: TransferStatsAccounting, // Interim accounting mechanism for the total bandwidth to/from this node
|
||||
self_transfer_stats: TransferStatsDownUp, // Statistics about the total bandwidth to/from this node
|
||||
recent_peers: LruCache<DHTKey, RecentPeersEntry>, // Peers we have recently communicated with
|
||||
/// Routing table buckets that hold entries
|
||||
buckets: Vec<Bucket>,
|
||||
/// A fast counter for the number of entries in the table, total
|
||||
bucket_entry_count: usize,
|
||||
/// The public internet routing domain
|
||||
public_internet_routing_domain: PublicInternetRoutingDomainDetail,
|
||||
/// The dial info we use on the local network
|
||||
local_network_routing_domain: LocalInternetRoutingDomainDetail,
|
||||
/// Interim accounting mechanism for this node's RPC latency to any other node
|
||||
self_latency_stats_accounting: LatencyStatsAccounting,
|
||||
/// Interim accounting mechanism for the total bandwidth to/from this node
|
||||
self_transfer_stats_accounting: TransferStatsAccounting,
|
||||
/// Statistics about the total bandwidth to/from this node
|
||||
self_transfer_stats: TransferStatsDownUp,
|
||||
/// Peers we have recently communicated with
|
||||
recent_peers: LruCache<DHTKey, RecentPeersEntry>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct RoutingTableHealth {
|
||||
/// Number of reliable (responsive) entries in the routing table
|
||||
pub reliable_entry_count: usize,
|
||||
/// Number of unreliable (occasionally unresponsive) entries in the routing table
|
||||
pub unreliable_entry_count: usize,
|
||||
/// Number of dead (always unresponsive) entries in the routing table
|
||||
pub dead_entry_count: usize,
|
||||
}
|
||||
|
||||
struct RoutingTableUnlockedInner {
|
||||
// Accessors
|
||||
config: VeilidConfig,
|
||||
network_manager: NetworkManager,
|
||||
|
||||
// Background processes
|
||||
/// The current node's public DHT key
|
||||
node_id: DHTKey,
|
||||
/// The current node's DHT key secret
|
||||
node_id_secret: DHTKeySecret,
|
||||
/// Buckets to kick on our next kick task
|
||||
kick_queue: Mutex<BTreeSet<usize>>,
|
||||
/// Background process for computing statistics
|
||||
rolling_transfers_task: TickTask<EyreReport>,
|
||||
/// Backgroup process to purge dead routing table entries when necessary
|
||||
kick_buckets_task: TickTask<EyreReport>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RoutingTable {
|
||||
config: VeilidConfig,
|
||||
inner: Arc<RwLock<RoutingTableInner>>,
|
||||
unlocked_inner: Arc<RoutingTableUnlockedInner>,
|
||||
}
|
||||
@ -76,10 +88,7 @@ pub struct RoutingTable {
|
||||
impl RoutingTable {
|
||||
fn new_inner() -> RoutingTableInner {
|
||||
RoutingTableInner {
|
||||
node_id: DHTKey::default(),
|
||||
node_id_secret: DHTKeySecret::default(),
|
||||
buckets: Vec::new(),
|
||||
kick_queue: BTreeSet::default(),
|
||||
public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(),
|
||||
local_network_routing_domain: LocalInternetRoutingDomainDetail::default(),
|
||||
bucket_entry_count: 0,
|
||||
@ -90,12 +99,16 @@ impl RoutingTable {
|
||||
}
|
||||
}
|
||||
fn new_unlocked_inner(
|
||||
_config: VeilidConfig,
|
||||
config: VeilidConfig,
|
||||
network_manager: NetworkManager,
|
||||
) -> RoutingTableUnlockedInner {
|
||||
//let c = config.get();
|
||||
let c = config.get();
|
||||
RoutingTableUnlockedInner {
|
||||
config: config.clone(),
|
||||
network_manager,
|
||||
node_id: c.network.node_id,
|
||||
node_id_secret: c.network.node_id_secret,
|
||||
kick_queue: Mutex::new(BTreeSet::default()),
|
||||
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
|
||||
kick_buckets_task: TickTask::new(1),
|
||||
}
|
||||
@ -103,7 +116,6 @@ impl RoutingTable {
|
||||
pub fn new(network_manager: NetworkManager) -> Self {
|
||||
let config = network_manager.config();
|
||||
let this = Self {
|
||||
config: config.clone(),
|
||||
inner: Arc::new(RwLock::new(Self::new_inner())),
|
||||
unlocked_inner: Arc::new(Self::new_unlocked_inner(config, network_manager)),
|
||||
};
|
||||
@ -137,11 +149,11 @@ impl RoutingTable {
|
||||
}
|
||||
|
||||
pub fn node_id(&self) -> DHTKey {
|
||||
self.inner.read().node_id
|
||||
self.unlocked_inner.node_id
|
||||
}
|
||||
|
||||
pub fn node_id_secret(&self) -> DHTKeySecret {
|
||||
self.inner.read().node_id_secret
|
||||
self.unlocked_inner.node_id_secret
|
||||
}
|
||||
|
||||
fn routing_domain_for_address_inner(
|
||||
@ -377,11 +389,6 @@ impl RoutingTable {
|
||||
inner.buckets.push(bucket);
|
||||
}
|
||||
|
||||
// make local copy of node id for easy access
|
||||
let c = self.config.get();
|
||||
inner.node_id = c.network.node_id;
|
||||
inner.node_id_secret = c.network.node_id_secret;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -482,8 +489,8 @@ impl RoutingTable {
|
||||
}
|
||||
}
|
||||
|
||||
fn find_bucket_index(inner: &RoutingTableInner, node_id: DHTKey) -> usize {
|
||||
distance(&node_id, &inner.node_id)
|
||||
fn find_bucket_index(&self, node_id: DHTKey) -> usize {
|
||||
distance(&node_id, &self.unlocked_inner.node_id)
|
||||
.first_nonzero_bit()
|
||||
.unwrap()
|
||||
}
|
||||
@ -598,9 +605,8 @@ impl RoutingTable {
|
||||
}
|
||||
|
||||
fn queue_bucket_kick(&self, node_id: DHTKey) {
|
||||
let mut inner = self.inner.write();
|
||||
let idx = Self::find_bucket_index(&*inner, node_id);
|
||||
inner.kick_queue.insert(idx);
|
||||
let idx = self.find_bucket_index(node_id);
|
||||
self.unlocked_inner.kick_queue.lock().insert(idx);
|
||||
}
|
||||
|
||||
// Create a node reference, possibly creating a bucket entry
|
||||
@ -620,7 +626,7 @@ impl RoutingTable {
|
||||
let mut inner = self.inner.write();
|
||||
|
||||
// Look up existing entry
|
||||
let idx = Self::find_bucket_index(&*inner, node_id);
|
||||
let idx = self.find_bucket_index(node_id);
|
||||
let noderef = {
|
||||
let bucket = &inner.buckets[idx];
|
||||
let entry = bucket.entry(&node_id);
|
||||
@ -641,7 +647,7 @@ impl RoutingTable {
|
||||
entry.with_mut(update_func);
|
||||
|
||||
// Kick the bucket
|
||||
inner.kick_queue.insert(idx);
|
||||
self.unlocked_inner.kick_queue.lock().insert(idx);
|
||||
log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, Self::get_entry_count_inner(&mut *inner, RoutingDomainSet::all(), BucketEntryState::Unreliable));
|
||||
|
||||
nr
|
||||
@ -662,12 +668,12 @@ impl RoutingTable {
|
||||
}
|
||||
|
||||
pub fn lookup_node_ref(&self, node_id: DHTKey) -> Option<NodeRef> {
|
||||
let inner = self.inner.read();
|
||||
if node_id == inner.node_id {
|
||||
if node_id == self.unlocked_inner.node_id {
|
||||
log_rtab!(debug "can't look up own node id in routing table");
|
||||
return None;
|
||||
}
|
||||
let idx = Self::find_bucket_index(&*inner, node_id);
|
||||
let idx = self.find_bucket_index(node_id);
|
||||
let inner = self.inner.read();
|
||||
let bucket = &inner.buckets[idx];
|
||||
bucket
|
||||
.entry(&node_id)
|
||||
@ -749,7 +755,7 @@ impl RoutingTable {
|
||||
self.unlocked_inner.rolling_transfers_task.tick().await?;
|
||||
|
||||
// Kick buckets task
|
||||
let kick_bucket_queue_count = { self.inner.read().kick_queue.len() };
|
||||
let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len();
|
||||
if kick_bucket_queue_count > 0 {
|
||||
self.unlocked_inner.kick_buckets_task.tick().await?;
|
||||
}
|
||||
|
@ -72,10 +72,10 @@ impl RoutingDomainEditor {
|
||||
pub async fn commit(self) {
|
||||
let mut changed = false;
|
||||
{
|
||||
let node_id = self.routing_table.node_id();
|
||||
|
||||
let mut inner = self.routing_table.inner.write();
|
||||
let inner = &mut *inner;
|
||||
let node_id = inner.node_id;
|
||||
|
||||
RoutingTable::with_routing_domain_mut(inner, self.routing_domain, |detail| {
|
||||
for change in self.changes {
|
||||
match change {
|
||||
|
@ -37,9 +37,10 @@ impl RoutingTable {
|
||||
_last_ts: u64,
|
||||
cur_ts: u64,
|
||||
) -> EyreResult<()> {
|
||||
let kick_queue: Vec<usize> = core::mem::take(&mut *self.unlocked_inner.kick_queue.lock())
|
||||
.into_iter()
|
||||
.collect();
|
||||
let mut inner = self.inner.write();
|
||||
let kick_queue: Vec<usize> = inner.kick_queue.iter().map(|v| *v).collect();
|
||||
inner.kick_queue.clear();
|
||||
for idx in kick_queue {
|
||||
Self::kick_bucket(&mut *inner, idx)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user