diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 1966bd4e..05f8bdb8 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -6,7 +6,7 @@ impl RoutingTable { let inner = self.inner.read(); out += "Routing Table Info:\n"; - out += &format!(" Node Id: {}\n", inner.node_id.encode()); + out += &format!(" Node Id: {}\n", self.unlocked_inner.node_id.encode()); out += &format!( " Self Latency Stats Accounting: {:#?}\n\n", inner.self_latency_stats_accounting diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 3519f733..581c98a4 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -212,7 +212,7 @@ impl RoutingTable { T: FnMut(DHTKey, Option>) -> O, { let inner = self.inner.read(); - let self_node_id = inner.node_id; + let self_node_id = self.unlocked_inner.node_id; // collect all the nodes for sorting let mut nodes = @@ -340,7 +340,7 @@ impl RoutingTable { { let cur_ts = intf::get_timestamp(); let node_count = { - let c = self.config.get(); + let c = self.unlocked_inner.config.get(); c.network.dht.max_find_node_count as usize }; let out = self.find_peers_with_sort_and_filter( diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 1b17d374..6a9cb6a8 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -34,41 +34,53 @@ pub struct RecentPeersEntry { /// RoutingTable rwlock-internal data struct RoutingTableInner { - // The current node's public DHT key - node_id: DHTKey, - node_id_secret: DHTKeySecret, // The current node's DHT key secret - - buckets: Vec, // Routing table buckets that hold entries - kick_queue: BTreeSet, // Buckets to kick on our next kick task - bucket_entry_count: usize, // A fast counter for the number of entries in the table, total - - public_internet_routing_domain: PublicInternetRoutingDomainDetail, // The public internet - local_network_routing_domain: LocalInternetRoutingDomainDetail, // The dial info we use on the local network - - self_latency_stats_accounting: LatencyStatsAccounting, // Interim accounting mechanism for this node's RPC latency to any other node - self_transfer_stats_accounting: TransferStatsAccounting, // Interim accounting mechanism for the total bandwidth to/from this node - self_transfer_stats: TransferStatsDownUp, // Statistics about the total bandwidth to/from this node - recent_peers: LruCache, // Peers we have recently communicated with + /// Routing table buckets that hold entries + buckets: Vec, + /// A fast counter for the number of entries in the table, total + bucket_entry_count: usize, + /// The public internet routing domain + public_internet_routing_domain: PublicInternetRoutingDomainDetail, + /// The dial info we use on the local network + local_network_routing_domain: LocalInternetRoutingDomainDetail, + /// Interim accounting mechanism for this node's RPC latency to any other node + self_latency_stats_accounting: LatencyStatsAccounting, + /// Interim accounting mechanism for the total bandwidth to/from this node + self_transfer_stats_accounting: TransferStatsAccounting, + /// Statistics about the total bandwidth to/from this node + self_transfer_stats: TransferStatsDownUp, + /// Peers we have recently communicated with + recent_peers: LruCache, } #[derive(Clone, Debug, Default)] pub struct RoutingTableHealth { + /// Number of reliable (responsive) entries in the routing table pub reliable_entry_count: usize, + /// Number of unreliable (occasionally unresponsive) entries in the routing table pub unreliable_entry_count: usize, + /// Number of dead (always unresponsive) entries in the routing table pub dead_entry_count: usize, } struct RoutingTableUnlockedInner { + // Accessors + config: VeilidConfig, network_manager: NetworkManager, - // Background processes + /// The current node's public DHT key + node_id: DHTKey, + /// The current node's DHT key secret + node_id_secret: DHTKeySecret, + /// Buckets to kick on our next kick task + kick_queue: Mutex>, + /// Background process for computing statistics rolling_transfers_task: TickTask, + /// Backgroup process to purge dead routing table entries when necessary kick_buckets_task: TickTask, } #[derive(Clone)] pub struct RoutingTable { - config: VeilidConfig, inner: Arc>, unlocked_inner: Arc, } @@ -76,10 +88,7 @@ pub struct RoutingTable { impl RoutingTable { fn new_inner() -> RoutingTableInner { RoutingTableInner { - node_id: DHTKey::default(), - node_id_secret: DHTKeySecret::default(), buckets: Vec::new(), - kick_queue: BTreeSet::default(), public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), local_network_routing_domain: LocalInternetRoutingDomainDetail::default(), bucket_entry_count: 0, @@ -90,12 +99,16 @@ impl RoutingTable { } } fn new_unlocked_inner( - _config: VeilidConfig, + config: VeilidConfig, network_manager: NetworkManager, ) -> RoutingTableUnlockedInner { - //let c = config.get(); + let c = config.get(); RoutingTableUnlockedInner { + config: config.clone(), network_manager, + node_id: c.network.node_id, + node_id_secret: c.network.node_id_secret, + kick_queue: Mutex::new(BTreeSet::default()), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), kick_buckets_task: TickTask::new(1), } @@ -103,7 +116,6 @@ impl RoutingTable { pub fn new(network_manager: NetworkManager) -> Self { let config = network_manager.config(); let this = Self { - config: config.clone(), inner: Arc::new(RwLock::new(Self::new_inner())), unlocked_inner: Arc::new(Self::new_unlocked_inner(config, network_manager)), }; @@ -137,11 +149,11 @@ impl RoutingTable { } pub fn node_id(&self) -> DHTKey { - self.inner.read().node_id + self.unlocked_inner.node_id } pub fn node_id_secret(&self) -> DHTKeySecret { - self.inner.read().node_id_secret + self.unlocked_inner.node_id_secret } fn routing_domain_for_address_inner( @@ -377,11 +389,6 @@ impl RoutingTable { inner.buckets.push(bucket); } - // make local copy of node id for easy access - let c = self.config.get(); - inner.node_id = c.network.node_id; - inner.node_id_secret = c.network.node_id_secret; - Ok(()) } @@ -482,8 +489,8 @@ impl RoutingTable { } } - fn find_bucket_index(inner: &RoutingTableInner, node_id: DHTKey) -> usize { - distance(&node_id, &inner.node_id) + fn find_bucket_index(&self, node_id: DHTKey) -> usize { + distance(&node_id, &self.unlocked_inner.node_id) .first_nonzero_bit() .unwrap() } @@ -598,9 +605,8 @@ impl RoutingTable { } fn queue_bucket_kick(&self, node_id: DHTKey) { - let mut inner = self.inner.write(); - let idx = Self::find_bucket_index(&*inner, node_id); - inner.kick_queue.insert(idx); + let idx = self.find_bucket_index(node_id); + self.unlocked_inner.kick_queue.lock().insert(idx); } // Create a node reference, possibly creating a bucket entry @@ -620,7 +626,7 @@ impl RoutingTable { let mut inner = self.inner.write(); // Look up existing entry - let idx = Self::find_bucket_index(&*inner, node_id); + let idx = self.find_bucket_index(node_id); let noderef = { let bucket = &inner.buckets[idx]; let entry = bucket.entry(&node_id); @@ -641,7 +647,7 @@ impl RoutingTable { entry.with_mut(update_func); // Kick the bucket - inner.kick_queue.insert(idx); + self.unlocked_inner.kick_queue.lock().insert(idx); log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, Self::get_entry_count_inner(&mut *inner, RoutingDomainSet::all(), BucketEntryState::Unreliable)); nr @@ -662,12 +668,12 @@ impl RoutingTable { } pub fn lookup_node_ref(&self, node_id: DHTKey) -> Option { - let inner = self.inner.read(); - if node_id == inner.node_id { + if node_id == self.unlocked_inner.node_id { log_rtab!(debug "can't look up own node id in routing table"); return None; } - let idx = Self::find_bucket_index(&*inner, node_id); + let idx = self.find_bucket_index(node_id); + let inner = self.inner.read(); let bucket = &inner.buckets[idx]; bucket .entry(&node_id) @@ -749,7 +755,7 @@ impl RoutingTable { self.unlocked_inner.rolling_transfers_task.tick().await?; // Kick buckets task - let kick_bucket_queue_count = { self.inner.read().kick_queue.len() }; + let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len(); if kick_bucket_queue_count > 0 { self.unlocked_inner.kick_buckets_task.tick().await?; } diff --git a/veilid-core/src/routing_table/routing_domain_editor.rs b/veilid-core/src/routing_table/routing_domain_editor.rs index c0bc8ccd..fac64af7 100644 --- a/veilid-core/src/routing_table/routing_domain_editor.rs +++ b/veilid-core/src/routing_table/routing_domain_editor.rs @@ -72,10 +72,10 @@ impl RoutingDomainEditor { pub async fn commit(self) { let mut changed = false; { + let node_id = self.routing_table.node_id(); + let mut inner = self.routing_table.inner.write(); let inner = &mut *inner; - let node_id = inner.node_id; - RoutingTable::with_routing_domain_mut(inner, self.routing_domain, |detail| { for change in self.changes { match change { diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs index 8284aa05..943f650b 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks.rs @@ -37,9 +37,10 @@ impl RoutingTable { _last_ts: u64, cur_ts: u64, ) -> EyreResult<()> { + let kick_queue: Vec = core::mem::take(&mut *self.unlocked_inner.kick_queue.lock()) + .into_iter() + .collect(); let mut inner = self.inner.write(); - let kick_queue: Vec = inner.kick_queue.iter().map(|v| *v).collect(); - inner.kick_queue.clear(); for idx in kick_queue { Self::kick_bucket(&mut *inner, idx) }