From 338dc6b39d329bcb4addf4802b28427d981906ea Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 9 Oct 2022 14:59:01 -0400 Subject: [PATCH] refactor checkpoint --- veilid-core/src/dht/key.rs | 6 + veilid-core/src/network_manager/mod.rs | 155 ++------ veilid-core/src/network_manager/native/mod.rs | 44 +-- .../native/network_class_discovery.rs | 16 +- veilid-core/src/network_manager/tasks.rs | 34 +- veilid-core/src/network_manager/wasm/mod.rs | 10 +- veilid-core/src/routing_table/bucket.rs | 17 +- veilid-core/src/routing_table/bucket_entry.rs | 40 ++- veilid-core/src/routing_table/debug.rs | 8 +- veilid-core/src/routing_table/find_nodes.rs | 129 +++---- veilid-core/src/routing_table/mod.rs | 260 +++++++++----- veilid-core/src/routing_table/node_ref.rs | 4 +- .../src/routing_table/route_spec_store.rs | 334 ++++++++++++++++++ .../routing_table/routing_domain_editor.rs | 121 ++++++- .../src/routing_table/routing_domains.rs | 242 ++++++++++--- veilid-core/src/routing_table/tasks.rs | 9 +- veilid-core/src/rpc_processor/destination.rs | 142 ++------ veilid-core/src/rpc_processor/mod.rs | 3 +- .../src/rpc_processor/rpc_find_node.rs | 26 +- .../rpc_processor/rpc_validate_dial_info.rs | 2 +- veilid-core/src/veilid_api/mod.rs | 19 - veilid-core/src/veilid_api/privacy.rs | 24 +- veilid-core/src/veilid_api/routing_context.rs | 26 +- .../src/veilid_api/serialize_helpers.rs | 15 + 24 files changed, 1122 insertions(+), 564 deletions(-) create mode 100644 veilid-core/src/routing_table/route_spec_store.rs diff --git a/veilid-core/src/dht/key.rs b/veilid-core/src/dht/key.rs index 8a020a38..c0c1934f 100644 --- a/veilid-core/src/dht/key.rs +++ b/veilid-core/src/dht/key.rs @@ -16,15 +16,21 @@ use serde::{Deserialize, Serialize}; ////////////////////////////////////////////////////////////////////// +/// Length of a DHT key in bytes #[allow(dead_code)] pub const DHT_KEY_LENGTH: usize = 32; +/// Length of a DHT key in bytes after encoding to base64url #[allow(dead_code)] pub const DHT_KEY_LENGTH_ENCODED: usize = 43; +/// Length of a DHT secret in bytes #[allow(dead_code)] pub const DHT_KEY_SECRET_LENGTH: usize = 32; +/// Length of a DHT secret in bytes after encoding to base64url #[allow(dead_code)] pub const DHT_KEY_SECRET_LENGTH_ENCODED: usize = 43; +/// Length of a DHT signature in bytes #[allow(dead_code)] +/// Length of a DHT signature in bytes after encoding to base64url pub const DHT_SIGNATURE_LENGTH: usize = 64; #[allow(dead_code)] pub const DHT_SIGNATURE_LENGTH_ENCODED: usize = 86; diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 4e3d0593..455d35fd 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -38,6 +38,7 @@ use xx::*; //////////////////////////////////////////////////////////////////////////////////////// pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; +pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes @@ -148,11 +149,6 @@ struct NetworkManagerInner { BTreeMap>, public_address_inconsistencies_table: BTreeMap>, - protocol_config: Option, - public_inbound_dial_info_filter: Option, - local_inbound_dial_info_filter: Option, - public_outbound_dial_info_filter: Option, - local_outbound_dial_info_filter: Option, } struct NetworkManagerUnlockedInner { @@ -163,6 +159,7 @@ struct NetworkManagerUnlockedInner { // Background processes rolling_transfers_task: TickTask, relay_management_task: TickTask, + private_route_management_task: TickTask, bootstrap_task: TickTask, peer_minimum_refresh_task: TickTask, ping_validator_task: TickTask, @@ -186,11 +183,6 @@ impl NetworkManager { client_whitelist: LruCache::new_unbounded(), public_address_check_cache: BTreeMap::new(), public_address_inconsistencies_table: BTreeMap::new(), - protocol_config: None, - public_inbound_dial_info_filter: None, - local_inbound_dial_info_filter: None, - public_outbound_dial_info_filter: None, - local_outbound_dial_info_filter: None, } } fn new_unlocked_inner(config: VeilidConfig) -> NetworkManagerUnlockedInner { @@ -201,6 +193,7 @@ impl NetworkManager { update_callback: RwLock::new(None), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), + private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), bootstrap_task: TickTask::new(1), peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), ping_validator_task: TickTask::new(1), @@ -248,6 +241,23 @@ impl NetworkManager { ) }); } + // Set private route management tick task + { + let this2 = this.clone(); + this.unlocked_inner + .private_route_management_task + .set_routine(move |s, l, t| { + Box::pin( + this2 + .clone() + .private_route_management_task_routine(s, l, t) + .instrument(trace_span!( + parent: None, + "private route management task routine" + )), + ) + }); + } // Set bootstrap tick task { let this2 = this.clone(); @@ -434,41 +444,6 @@ impl NetworkManager { return Err(e); } - // Store copy of protocol config and dial info filters - { - let pc = self.net().get_protocol_config().unwrap(); - - let mut inner = self.inner.lock(); - - inner.public_inbound_dial_info_filter = Some( - DialInfoFilter::all() - .with_protocol_type_set(pc.inbound) - .with_address_type_set(pc.family_global), - ); - inner.local_inbound_dial_info_filter = Some( - DialInfoFilter::all() - .with_protocol_type_set(pc.inbound) - .with_address_type_set(pc.family_local), - ); - inner.public_outbound_dial_info_filter = Some( - DialInfoFilter::all() - .with_protocol_type_set(pc.outbound) - .with_address_type_set(pc.family_global), - ); - inner.local_outbound_dial_info_filter = Some( - DialInfoFilter::all() - .with_protocol_type_set(pc.outbound) - .with_address_type_set(pc.family_local), - ); - - inner.protocol_config = Some(pc); - } - - // Inform routing table entries that our dial info has changed - for rd in RoutingDomain::all() { - self.send_node_info_updates(rd, true).await; - } - // Inform api clients that things have changed self.send_network_update(); @@ -527,12 +502,7 @@ impl NetworkManager { // reset the state debug!("resetting network manager state"); { - let mut inner = self.inner.lock(); - inner.public_inbound_dial_info_filter = None; - inner.local_inbound_dial_info_filter = None; - inner.public_outbound_dial_info_filter = None; - inner.local_outbound_dial_info_filter = None; - inner.protocol_config = None; + *self.inner.lock() = NetworkManager::new_inner(); } // send update @@ -640,15 +610,6 @@ impl NetworkManager { Ok(()) } - // Return what network class we are in - pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option { - if let Some(components) = self.unlocked_inner.components.read().as_ref() { - components.net.get_network_class(routing_domain) - } else { - None - } - } - // Get our node's capabilities fn generate_public_internet_node_status(&self) -> PublicInternetNodeStatus { let node_info = self @@ -694,58 +655,6 @@ impl NetworkManager { } } - // Return what protocols we have enabled - pub fn get_protocol_config(&self) -> ProtocolConfig { - let inner = self.inner.lock(); - inner.protocol_config.as_ref().unwrap().clone() - } - - // Return a dial info filter for what we can receive - pub fn get_inbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { - let inner = self.inner.lock(); - match routing_domain { - RoutingDomain::PublicInternet => inner - .public_inbound_dial_info_filter - .as_ref() - .unwrap() - .clone(), - RoutingDomain::LocalNetwork => inner - .local_inbound_dial_info_filter - .as_ref() - .unwrap() - .clone(), - } - } - pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { - let dif = self.get_inbound_dial_info_filter(routing_domain); - NodeRefFilter::new() - .with_routing_domain(routing_domain) - .with_dial_info_filter(dif) - } - - // Return a dial info filter for what we can send out - pub fn get_outbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { - let inner = self.inner.lock(); - match routing_domain { - RoutingDomain::PublicInternet => inner - .public_outbound_dial_info_filter - .as_ref() - .unwrap() - .clone(), - RoutingDomain::LocalNetwork => inner - .local_outbound_dial_info_filter - .as_ref() - .unwrap() - .clone(), - } - } - pub fn get_outbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { - let dif = self.get_outbound_dial_info_filter(routing_domain); - NodeRefFilter::new() - .with_routing_domain(routing_domain) - .with_dial_info_filter(dif) - } - // Generates a multi-shot/normal receipt #[instrument(level = "trace", skip(self, extra_data, callback), err)] pub fn generate_receipt>( @@ -890,7 +799,7 @@ impl NetworkManager { }; // Get the udp direct dialinfo for the hole punch - let outbound_nrf = self + let outbound_nrf = routing_table .get_outbound_node_ref_filter(RoutingDomain::PublicInternet) .with_protocol_type(ProtocolType::UDP); peer_nr.set_filter(Some(outbound_nrf)); @@ -1027,7 +936,10 @@ impl NetworkManager { #[instrument(level = "trace", skip(self), ret)] fn get_contact_method_public(&self, target_node_ref: NodeRef) -> ContactMethod { // Scope noderef down to protocols we can do outbound - let public_outbound_nrf = self.get_outbound_node_ref_filter(RoutingDomain::PublicInternet); + let routing_table = self.routing_table(); + + let public_outbound_nrf = + routing_table.get_outbound_node_ref_filter(RoutingDomain::PublicInternet); let target_node_ref = target_node_ref.filtered_clone(public_outbound_nrf.clone()); // Get the best match internet dial info if we have it @@ -1047,16 +959,14 @@ impl NetworkManager { // Can we reach the inbound relay? if inbound_relay_nr.first_filtered_dial_info_detail().is_some() { // Can we receive anything inbound ever? - let our_network_class = self + let our_network_class = routing_table .get_network_class(RoutingDomain::PublicInternet) .unwrap_or(NetworkClass::Invalid); if matches!(our_network_class, NetworkClass::InboundCapable) { - let routing_table = self.routing_table(); - ///////// Reverse connection // Get the best match dial info for an reverse inbound connection - let reverse_dif = self + let reverse_dif = routing_table .get_inbound_dial_info_filter(RoutingDomain::PublicInternet) .filtered( &target_node_ref @@ -1090,7 +1000,7 @@ impl NetworkManager { udp_target_nr.first_filtered_dial_info_detail() { // Does the self node have a direct udp dialinfo the target can reach? - let inbound_udp_dif = self + let inbound_udp_dif = routing_table .get_inbound_dial_info_filter(RoutingDomain::PublicInternet) .filtered( &target_node_ref @@ -1151,7 +1061,10 @@ impl NetworkManager { #[instrument(level = "trace", skip(self), ret)] fn get_contact_method_local(&self, target_node_ref: NodeRef) -> ContactMethod { // Scope noderef down to protocols we can do outbound - let local_outbound_nrf = self.get_outbound_node_ref_filter(RoutingDomain::LocalNetwork); + let routing_table = self.routing_table(); + + let local_outbound_nrf = + routing_table.get_outbound_node_ref_filter(RoutingDomain::LocalNetwork); let target_node_ref = target_node_ref.filtered_clone(local_outbound_nrf); // Get the best matching local direct dial info if we have it @@ -1865,7 +1778,7 @@ impl NetworkManager { let mut bad_public_address_detection_punishment: Option< Box, > = None; - let public_internet_network_class = net + let public_internet_network_class = routing_table .get_network_class(RoutingDomain::PublicInternet) .unwrap_or(NetworkClass::Invalid); let needs_public_address_detection = diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 1a58218f..7994a752 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -40,11 +40,9 @@ struct NetworkInner { /// such as dhcp release or change of address or interfaces being added or removed network_needs_restart: bool, /// the calculated protocol configuration for inbound/outbound protocols - protocol_config: Option, + protocol_config: ProtocolConfig, /// set of statically configured protocols with public dialinfo static_public_dialinfo: ProtocolTypeSet, - /// network class per routing domain - network_class: [Option; RoutingDomain::count()], /// join handles for all the low level network background tasks join_handles: Vec>, /// stop source for shutting down the low level network background tasks @@ -120,9 +118,8 @@ impl Network { needs_public_dial_info_check: false, doing_public_dial_info_check: false, public_dial_info_check_punishment: None, - protocol_config: None, + protocol_config: Default::default(), static_public_dialinfo: ProtocolTypeSet::empty(), - network_class: [None, None], join_handles: Vec::new(), stop_source: None, udp_port: 0u16, @@ -620,7 +617,7 @@ impl Network { ///////////////////////////////////////////////////////////////// - pub fn get_protocol_config(&self) -> Option { + pub fn get_protocol_config(&self) -> ProtocolConfig { self.inner.lock().protocol_config } @@ -734,7 +731,8 @@ impl Network { family_local, } }; - inner.protocol_config = Some(protocol_config); + inner.protocol_config = protocol_config; + protocol_config }; @@ -771,27 +769,37 @@ impl Network { // that we have ports available to us self.free_bound_first_ports(); - // If we have static public dialinfo, upgrade our network class + // set up the routing table's network config + // if we have static public dialinfo, upgrade our network class + + editor_public_internet.setup_network( + protocol_config.inbound, + protocol_config.outbound, + protocol_config.family_global, + ); + editor_local_network.setup_network( + protocol_config.inbound, + protocol_config.outbound, + protocol_config.family_local, + ); let detect_address_changes = { let c = self.config.get(); c.network.detect_address_changes }; - if !detect_address_changes { - let mut inner = self.inner.lock(); + let inner = self.inner.lock(); if !inner.static_public_dialinfo.is_empty() { - inner.network_class[RoutingDomain::PublicInternet as usize] = - Some(NetworkClass::InboundCapable); + editor_public_internet.set_network_class(Some(NetworkClass::InboundCapable)); } } - info!("network started"); - self.inner.lock().network_started = true; - // commit routing table edits editor_public_internet.commit().await; editor_local_network.commit().await; + info!("network started"); + self.inner.lock().network_started = true; + Ok(()) } @@ -873,11 +881,6 @@ impl Network { inner.doing_public_dial_info_check } - pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option { - let inner = self.inner.lock(); - inner.network_class[routing_domain as usize] - } - ////////////////////////////////////////// #[instrument(level = "trace", skip(self), err)] @@ -939,6 +942,7 @@ impl Network { // If we need to figure out our network class, tick the task for it if detect_address_changes { let public_internet_network_class = self + .routing_table() .get_network_class(RoutingDomain::PublicInternet) .unwrap_or(NetworkClass::Invalid); let needs_public_dial_info_check = self.needs_public_dial_info_check(); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index b53fefad..aff9bd28 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -125,7 +125,7 @@ impl DiscoveryContext { RoutingDomain::PublicInternet, dial_info_filter.clone(), ); - let disallow_relays_filter = move |e: &BucketEntryInner| { + let disallow_relays_filter = move |_rti, e: &BucketEntryInner| { if let Some(n) = e.node_info(RoutingDomain::PublicInternet) { n.relay_peer_info.is_none() } else { @@ -610,12 +610,14 @@ impl Network { _l: u64, _t: u64, ) -> EyreResult<()> { + let routing_table = self.routing_table(); + // Figure out if we can optimize TCP/WS checking since they are often on the same port let (protocol_config, existing_network_class, tcp_same_port) = { let inner = self.inner.lock(); - let protocol_config = inner.protocol_config.unwrap_or_default(); + let protocol_config = inner.protocol_config; let existing_network_class = - inner.network_class[RoutingDomain::PublicInternet as usize]; + routing_table.get_network_class(RoutingDomain::PublicInternet); let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP) && protocol_config.inbound.contains(ProtocolType::WS) { @@ -625,7 +627,6 @@ impl Network { }; (protocol_config, existing_network_class, tcp_same_port) }; - let routing_table = self.routing_table(); // Process all protocol and address combinations let mut futures = FuturesUnordered::new(); @@ -849,17 +850,16 @@ impl Network { // Is the network class different? if existing_network_class != new_network_class { - self.inner.lock().network_class[RoutingDomain::PublicInternet as usize] = - new_network_class; + editor.set_network_class(new_network_class); changed = true; log_net!(debug "PublicInternet network class changed to {:?}", new_network_class); } } else if existing_network_class.is_some() { // Network class could not be determined editor.clear_dial_info_details(); - self.inner.lock().network_class[RoutingDomain::PublicInternet as usize] = None; + editor.set_network_class(None); changed = true; - log_net!(debug "network class cleared"); + log_net!(debug "PublicInternet network class cleared"); } // Punish nodes that told us our public address had changed when it didn't diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index 55396fe3..e41c8c06 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -495,8 +495,8 @@ impl NetworkManager { // even the unreliable ones, and ask them to find nodes close to our node too let noderefs = routing_table.find_fastest_nodes( min_peer_count, - |_k, _v| true, - |k: DHTKey, v: Option>| { + |_rti, _k, _v| true, + |_rti, k: DHTKey, v: Option>| { NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None) }, ); @@ -525,7 +525,7 @@ impl NetworkManager { // Get our node's current node info and network class and do the right thing let routing_table = self.routing_table(); let node_info = routing_table.get_own_node_info(RoutingDomain::PublicInternet); - let network_class = self.get_network_class(RoutingDomain::PublicInternet); + let network_class = routing_table.get_network_class(RoutingDomain::PublicInternet); // Get routing domain editor let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); @@ -594,6 +594,34 @@ impl NetworkManager { Ok(()) } + // Keep private routes assigned and accessible + #[instrument(level = "trace", skip(self), err)] + pub(super) async fn private_route_management_task_routine( + self, + _stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> EyreResult<()> { + // Get our node's current node info and network class and do the right thing + let routing_table = self.routing_table(); + let node_info = routing_table.get_own_node_info(RoutingDomain::PublicInternet); + let network_class = routing_table.get_network_class(RoutingDomain::PublicInternet); + + // Get routing domain editor + let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); + + // Do we know our network class yet? + if let Some(network_class) = network_class { + + // see if we have any routes that need extending + } + + // Commit the changes + editor.commit().await; + + Ok(()) + } + // Compute transfer statistics for the low level network #[instrument(level = "trace", skip(self), err)] pub(super) async fn rolling_transfers_task_routine( diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index c35167a9..d630f598 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -12,7 +12,7 @@ use std::io; struct NetworkInner { network_started: bool, network_needs_restart: bool, - protocol_config: Option, + protocol_config: ProtocolConfig, } struct NetworkUnlockedInner { @@ -34,7 +34,7 @@ impl Network { NetworkInner { network_started: false, network_needs_restart: false, - protocol_config: None, //join_handle: None, + protocol_config: Default::default(), } } @@ -247,7 +247,7 @@ impl Network { pub async fn startup(&self) -> EyreResult<()> { // get protocol config - self.inner.lock().protocol_config = Some({ + self.inner.lock().protocol_config = { let c = self.config.get(); let inbound = ProtocolTypeSet::new(); let mut outbound = ProtocolTypeSet::new(); @@ -269,7 +269,7 @@ impl Network { family_global, family_local, } - }); + }; self.inner.lock().network_started = true; Ok(()) @@ -337,7 +337,7 @@ impl Network { }; } - pub fn get_protocol_config(&self) -> Option { + pub fn get_protocol_config(&self) -> ProtocolConfig { self.inner.lock().protocol_config.clone() } diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 8312038d..88051c0c 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -48,13 +48,6 @@ impl Bucket { // newest_entry is updated by kick_bucket() } - pub(super) fn roll_transfers(&self, last_ts: u64, cur_ts: u64) { - // Called every ROLLING_TRANSFERS_INTERVAL_SECS - for (_k, v) in &self.entries { - v.with_mut(|e| e.roll_transfers(last_ts, cur_ts)); - } - } - pub(super) fn entry(&self, key: &DHTKey) -> Option> { self.entries.get(key).cloned() } @@ -63,7 +56,11 @@ impl Bucket { self.entries.iter() } - pub(super) fn kick(&mut self, bucket_depth: usize) -> Option> { + pub(super) fn kick( + &self, + inner: &mut RoutingTableInner, + bucket_depth: usize, + ) -> Option> { // Get number of entries to attempt to purge from bucket let bucket_len = self.entries.len(); @@ -87,8 +84,8 @@ impl Bucket { if a.0 == b.0 { return core::cmp::Ordering::Equal; } - a.1.with(|ea| { - b.1.with(|eb| { + a.1.with(inner, |rti, ea| { + b.1.with(rti, |_rti, eb| { let astate = state_ordering(ea.state(cur_ts)); let bstate = state_ordering(eb.state(cur_ts)); // first kick dead nodes, then unreliable nodes diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index f3f665e8..cb60d8ec 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -132,6 +132,28 @@ impl BucketEntryInner { } } + // Less is more reliable then older + pub fn cmp_oldest_reliable(cur_ts: u64, e1: &Self, e2: &Self) -> std::cmp::Ordering { + // Reverse compare so most reliable is at front + let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts)); + if ret != std::cmp::Ordering::Equal { + return ret; + } + + // Lower timestamp to the front, recent or no timestamp is at the end + if let Some(e1_ts) = &e1.peer_stats.rpc_stats.first_consecutive_seen_ts { + if let Some(e2_ts) = &e2.peer_stats.rpc_stats.first_consecutive_seen_ts { + e1_ts.cmp(&e2_ts) + } else { + std::cmp::Ordering::Less + } + } else if e2.peer_stats.rpc_stats.first_consecutive_seen_ts.is_some() { + std::cmp::Ordering::Greater + } else { + std::cmp::Ordering::Equal + } + } + pub fn sort_fastest_reliable_fn(cur_ts: u64) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering { move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2) } @@ -645,20 +667,26 @@ impl BucketEntry { } } - pub(super) fn with(&self, f: F) -> R + // Note, that this requires -also- holding the RoutingTable read lock, as an + // immutable reference to RoutingTableInner must be passed in to get this + // This ensures that an operation on the routing table can not change entries + // while it is being read from + pub(super) fn with(&self, rti: &RoutingTableInner, f: F) -> R where - F: FnOnce(&BucketEntryInner) -> R, + F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> R, { let inner = self.inner.read(); - f(&*inner) + f(rti, &*inner) } - pub(super) fn with_mut(&self, f: F) -> R + // Note, that this requires -also- holding the RoutingTable write lock, as a + // mutable reference to RoutingTableInner must be passed in to get this + pub(super) fn with_mut(&self, rti: &mut RoutingTableInner, f: F) -> R where - F: FnOnce(&mut BucketEntryInner) -> R, + F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> R, { let mut inner = self.inner.write(); - f(&mut *inner) + f(rti, &mut *inner) } } diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 05f8bdb8..3fa6fbc7 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -102,6 +102,7 @@ impl RoutingTable { pub fn debug_info_entries(&self, limit: usize, min_state: BucketEntryState) -> String { let inner = self.inner.read(); + let inner = &*inner; let cur_ts = intf::get_timestamp(); let mut out = String::new(); @@ -114,14 +115,14 @@ impl RoutingTable { let filtered_entries: Vec<(&DHTKey, &Arc)> = inner.buckets[b] .entries() .filter(|e| { - let state = e.1.with(|e| e.state(cur_ts)); + let state = e.1.with(inner, |_rti, e| e.state(cur_ts)); state >= min_state }) .collect(); if !filtered_entries.is_empty() { out += &format!(" Bucket #{}:\n", b); for e in filtered_entries { - let state = e.1.with(|e| e.state(cur_ts)); + let state = e.1.with(inner, |_rti, e| e.state(cur_ts)); out += &format!( " {} [{}]\n", e.0.encode(), @@ -161,6 +162,7 @@ impl RoutingTable { pub fn debug_info_buckets(&self, min_state: BucketEntryState) -> String { let inner = self.inner.read(); + let inner = &*inner; let cur_ts = intf::get_timestamp(); let mut out = String::new(); @@ -175,7 +177,7 @@ impl RoutingTable { while c < COLS { let mut cnt = 0; for e in inner.buckets[b].entries() { - if e.1.with(|e| e.state(cur_ts) >= min_state) { + if e.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) { cnt += 1; } } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 4485b510..3e3866d4 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -17,9 +17,9 @@ impl RoutingTable { pub fn make_inbound_dial_info_entry_filter( routing_domain: RoutingDomain, dial_info_filter: DialInfoFilter, - ) -> impl FnMut(&BucketEntryInner) -> bool { + ) -> impl FnMut(&RoutingTableInner, &BucketEntryInner) -> bool { // does it have matching public dial info? - move |e| { + move |_rti, e| { if let Some(ni) = e.node_info(routing_domain) { if ni .first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| { @@ -35,12 +35,12 @@ impl RoutingTable { } // Makes a filter that finds nodes capable of dialing a particular outbound dialinfo - pub fn make_outbound_dial_info_entry_filter( + pub fn make_outbound_dial_info_entry_filter<'s>( routing_domain: RoutingDomain, dial_info: DialInfo, - ) -> impl FnMut(&BucketEntryInner) -> bool { + ) -> impl FnMut(&RoutingTableInner, &'s BucketEntryInner) -> bool { // does the node's outbound capabilities match the dialinfo? - move |e| { + move |_rti, e| { if let Some(ni) = e.node_info(routing_domain) { let dif = DialInfoFilter::all() .with_protocol_type_set(ni.outbound_protocols) @@ -54,19 +54,19 @@ impl RoutingTable { } // Make a filter that wraps another filter - pub fn combine_entry_filters( + pub fn combine_entry_filters<'a, 'b, F, G>( mut f1: F, mut f2: G, - ) -> impl FnMut(&BucketEntryInner) -> bool + ) -> impl FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool where - F: FnMut(&BucketEntryInner) -> bool, - G: FnMut(&BucketEntryInner) -> bool, + F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, + G: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, { - move |e| { - if !f1(e) { + move |rti, e| { + if !f1(rti, e) { return false; } - if !f2(e) { + if !f2(rti, e) { return false; } true @@ -74,21 +74,21 @@ impl RoutingTable { } // Retrieve the fastest nodes in the routing table matching an entry filter - pub fn find_fast_public_nodes_filtered( + pub fn find_fast_public_nodes_filtered<'r, 'e, F>( &self, node_count: usize, mut entry_filter: F, ) -> Vec where - F: FnMut(&BucketEntryInner) -> bool, + F: FnMut(&'r RoutingTableInner, &'e BucketEntryInner) -> bool, { self.find_fastest_nodes( // count node_count, // filter - |_k: DHTKey, v: Option>| { + |rti, _k: DHTKey, v: Option>| { let entry = v.unwrap(); - entry.with(|e| { + entry.with(rti, |rti, e| { // skip nodes on local network if e.node_info(RoutingDomain::LocalNetwork).is_some() { return false; @@ -98,11 +98,11 @@ impl RoutingTable { return false; } // skip nodes that dont match entry filter - entry_filter(e) + entry_filter(rti, e) }) }, // transform - |k: DHTKey, v: Option>| { + |_rti, k: DHTKey, v: Option>| { NodeRef::new(self.clone(), k, v.unwrap().clone(), None) }, ) @@ -123,9 +123,9 @@ impl RoutingTable { // count protocol_types.len() * 2 * max_per_type, // filter - move |_k: DHTKey, v: Option>| { + move |rti, _k: DHTKey, v: Option>| { let entry = v.unwrap(); - entry.with(|e| { + entry.with(rti, |_rti, e| { // skip nodes on our local network here if e.has_node_info(RoutingDomain::LocalNetwork.into()) { return false; @@ -164,20 +164,21 @@ impl RoutingTable { }) }, // transform - |k: DHTKey, v: Option>| { + |_rti, k: DHTKey, v: Option>| { NodeRef::new(self.clone(), k, v.unwrap().clone(), None) }, ) } - pub fn filter_has_valid_signed_node_info( - &self, + pub fn filter_has_valid_signed_node_info_inner( + inner: &RoutingTableInner, routing_domain: RoutingDomain, + has_valid_own_node_info: bool, v: Option>, ) -> bool { match v { - None => self.has_valid_own_node_info(routing_domain), - Some(entry) => entry.with(|e| { + None => has_valid_own_node_info, + Some(entry) => entry.with(inner, |_rti, e| { e.signed_node_info(routing_domain.into()) .map(|sni| sni.has_valid_signature()) .unwrap_or(false) @@ -185,15 +186,18 @@ impl RoutingTable { } } - pub fn transform_to_peer_info( - &self, + pub fn transform_to_peer_info_inner( + inner: &RoutingTableInner, routing_domain: RoutingDomain, + own_peer_info: PeerInfo, k: DHTKey, v: Option>, ) -> PeerInfo { match v { - None => self.get_own_peer_info(routing_domain), - Some(entry) => entry.with(|e| e.make_peer_info(k, routing_domain).unwrap()), + None => own_peer_info, + Some(entry) => entry.with(inner, |_rti, e| { + e.make_peer_info(k, routing_domain).unwrap() + }), } } @@ -206,14 +210,16 @@ impl RoutingTable { mut transform: T, ) -> Vec where - F: FnMut(DHTKey, Option>) -> bool, + F: FnMut(&RoutingTableInner, DHTKey, Option>) -> bool, C: FnMut( + &RoutingTableInner, &(DHTKey, Option>), &(DHTKey, Option>), ) -> core::cmp::Ordering, - T: FnMut(DHTKey, Option>) -> O, + T: FnMut(&RoutingTableInner, DHTKey, Option>) -> O, { let inner = self.inner.read(); + let inner = &*inner; let self_node_id = self.unlocked_inner.node_id; // collect all the nodes for sorting @@ -221,27 +227,32 @@ impl RoutingTable { Vec::<(DHTKey, Option>)>::with_capacity(inner.bucket_entry_count + 1); // add our own node (only one of there with the None entry) - if filter(self_node_id, None) { + if filter(inner, self_node_id, None) { nodes.push((self_node_id, None)); } // add all nodes from buckets - Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { - // Apply filter - if filter(k, Some(v.clone())) { - nodes.push((k, Some(v.clone()))); - } - Option::<()>::None - }); + Self::with_entries( + &*inner, + cur_ts, + BucketEntryState::Unreliable, + |rti, k, v| { + // Apply filter + if filter(rti, k, Some(v.clone())) { + nodes.push((k, Some(v.clone()))); + } + Option::<()>::None + }, + ); // sort by preference for returning nodes - nodes.sort_by(compare); + nodes.sort_by(|a, b| compare(inner, a, b)); // return transformed vector for filtered+sorted nodes let cnt = usize::min(node_count, nodes.len()); let mut out = Vec::::with_capacity(cnt); for node in nodes { - let val = transform(node.0, node.1); + let val = transform(inner, node.0, node.1); out.push(val); } @@ -255,21 +266,21 @@ impl RoutingTable { transform: T, ) -> Vec where - F: FnMut(DHTKey, Option>) -> bool, - T: FnMut(DHTKey, Option>) -> O, + F: FnMut(&RoutingTableInner, DHTKey, Option>) -> bool, + T: FnMut(&RoutingTableInner, DHTKey, Option>) -> O, { let cur_ts = intf::get_timestamp(); let out = self.find_peers_with_sort_and_filter( node_count, cur_ts, // filter - |k, v| { + |rti, k, v| { if let Some(entry) = &v { // always filter out dead nodes - if entry.with(|e| e.state(cur_ts) == BucketEntryState::Dead) { + if entry.with(rti, |_rti, e| e.state(cur_ts) == BucketEntryState::Dead) { false } else { - filter(k, v) + filter(rti, k, v) } } else { // always filter out self peer, as it is irrelevant to the 'fastest nodes' search @@ -277,7 +288,7 @@ impl RoutingTable { } }, // sort - |(a_key, a_entry), (b_key, b_entry)| { + |rti, (a_key, a_entry), (b_key, b_entry)| { // same nodes are always the same if a_key == b_key { return core::cmp::Ordering::Equal; @@ -292,8 +303,8 @@ impl RoutingTable { // reliable nodes come first let ae = a_entry.as_ref().unwrap(); let be = b_entry.as_ref().unwrap(); - ae.with(|ae| { - be.with(|be| { + ae.with(rti, |rti, ae| { + be.with(rti, |_rti, be| { let ra = ae.check_reliable(cur_ts); let rb = be.check_reliable(cur_ts); if ra != rb { @@ -337,8 +348,8 @@ impl RoutingTable { mut transform: T, ) -> Vec where - F: FnMut(DHTKey, Option>) -> bool, - T: FnMut(DHTKey, Option>) -> O, + F: FnMut(&RoutingTableInner, DHTKey, Option>) -> bool, + T: FnMut(&RoutingTableInner, DHTKey, Option>) -> O, { let cur_ts = intf::get_timestamp(); let node_count = { @@ -351,7 +362,7 @@ impl RoutingTable { // filter filter, // sort - |(a_key, a_entry), (b_key, b_entry)| { + |rti, (a_key, a_entry), (b_key, b_entry)| { // same nodes are always the same if a_key == b_key { return core::cmp::Ordering::Equal; @@ -360,10 +371,10 @@ impl RoutingTable { // reliable nodes come first, pessimistically treating our own node as unreliable let ra = a_entry .as_ref() - .map_or(false, |x| x.with(|x| x.check_reliable(cur_ts))); + .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); let rb = b_entry .as_ref() - .map_or(false, |x| x.with(|x| x.check_reliable(cur_ts))); + .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); if ra != rb { if ra { return core::cmp::Ordering::Less; @@ -420,9 +431,7 @@ impl RoutingTable { fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { // Get all our outbound protocol/address types - let outbound_dif = self - .network_manager() - .get_outbound_dial_info_filter(RoutingDomain::PublicInternet); + let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet); let mapped_port_info = self.get_low_level_port_info(); move |e: &BucketEntryInner| { @@ -481,9 +490,9 @@ impl RoutingTable { let mut best_inbound_relay: Option<(DHTKey, Arc)> = None; // Iterate all known nodes for candidates - Self::with_entries(inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + Self::with_entries(inner, cur_ts, BucketEntryState::Unreliable, |rti, k, v| { let v2 = v.clone(); - v.with(|e| { + v.with(rti, |rti, e| { // Ensure we have the node's status if let Some(node_status) = e.node_status(routing_domain) { // Ensure the node will relay @@ -491,7 +500,7 @@ impl RoutingTable { // Compare against previous candidate if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { // Less is faster - let better = best_inbound_relay.1.with(|best| { + let better = best_inbound_relay.1.with(rti, |_rti, best| { BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) == std::cmp::Ordering::Less }); diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index cc831641..1e86c2d6 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -3,6 +3,7 @@ mod bucket_entry; mod debug; mod find_nodes; mod node_ref; +mod route_spec_store; mod routing_domain_editor; mod routing_domains; mod stats_accounting; @@ -19,6 +20,7 @@ pub use debug::*; pub use find_nodes::*; use hashlink::LruCache; pub use node_ref::*; +pub use route_spec_store::*; pub use routing_domain_editor::*; pub use routing_domains::*; pub use stats_accounting::*; @@ -41,7 +43,7 @@ struct RoutingTableInner { /// The public internet routing domain public_internet_routing_domain: PublicInternetRoutingDomainDetail, /// The dial info we use on the local network - local_network_routing_domain: LocalInternetRoutingDomainDetail, + local_network_routing_domain: LocalNetworkRoutingDomainDetail, /// Interim accounting mechanism for this node's RPC latency to any other node self_latency_stats_accounting: LatencyStatsAccounting, /// Interim accounting mechanism for the total bandwidth to/from this node @@ -50,6 +52,8 @@ struct RoutingTableInner { self_transfer_stats: TransferStatsDownUp, /// Peers we have recently communicated with recent_peers: LruCache, + /// Storage for private/safety RouteSpecs + route_spec_store: RouteSpecStore, } #[derive(Clone, Debug, Default)] @@ -90,12 +94,13 @@ impl RoutingTable { RoutingTableInner { buckets: Vec::new(), public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), - local_network_routing_domain: LocalInternetRoutingDomainDetail::default(), + local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(), bucket_entry_count: 0, self_latency_stats_accounting: LatencyStatsAccounting::new(), self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats: TransferStatsDownUp::default(), recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), + route_spec_store: RouteSpecStore::new(), } } fn new_unlocked_inner( @@ -214,17 +219,21 @@ impl RoutingTable { pub fn relay_node(&self, domain: RoutingDomain) -> Option { let inner = self.inner.read(); - Self::with_routing_domain(&*inner, domain, |rd| rd.relay_node()) + Self::with_routing_domain(&*inner, domain, |rd| rd.common().relay_node()) } pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { let inner = self.inner.read(); - Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details().is_empty()) + Self::with_routing_domain(&*inner, domain, |rd| { + !rd.common().dial_info_details().is_empty() + }) } pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec { let inner = self.inner.read(); - Self::with_routing_domain(&*inner, domain, |rd| rd.dial_info_details().clone()) + Self::with_routing_domain(&*inner, domain, |rd| { + rd.common().dial_info_details().clone() + }) } pub fn first_filtered_dial_info_detail( @@ -235,7 +244,7 @@ impl RoutingTable { let inner = self.inner.read(); for routing_domain in routing_domain_set { let did = Self::with_routing_domain(&*inner, routing_domain, |rd| { - for did in rd.dial_info_details() { + for did in rd.common().dial_info_details() { if did.matches_filter(filter) { return Some(did.clone()); } @@ -258,7 +267,7 @@ impl RoutingTable { let mut ret = Vec::new(); for routing_domain in routing_domain_set { Self::with_routing_domain(&*inner, routing_domain, |rd| { - for did in rd.dial_info_details() { + for did in rd.common().dial_info_details() { if did.matches_filter(filter) { ret.push(did.clone()); } @@ -321,8 +330,8 @@ impl RoutingTable { fn reset_all_seen_our_node_info(inner: &mut RoutingTableInner, routing_domain: RoutingDomain) { let cur_ts = intf::get_timestamp(); - Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { - v.with_mut(|e| { + Self::with_entries_mut(inner, cur_ts, BucketEntryState::Dead, |rti, _, v| { + v.with_mut(rti, |_rti, e| { e.set_seen_our_node_info(routing_domain, false); }); Option::<()>::None @@ -331,50 +340,83 @@ impl RoutingTable { fn reset_all_updated_since_last_network_change(inner: &mut RoutingTableInner) { let cur_ts = intf::get_timestamp(); - Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { - v.with_mut(|e| e.set_updated_since_last_network_change(false)); + Self::with_entries_mut(inner, cur_ts, BucketEntryState::Dead, |rti, _, v| { + v.with_mut(rti, |_rti, e| { + e.set_updated_since_last_network_change(false) + }); Option::<()>::None }); } + /// Return a copy of our node's peerinfo pub fn get_own_peer_info(&self, routing_domain: RoutingDomain) -> PeerInfo { - PeerInfo::new( - NodeId::new(self.node_id()), - self.get_own_signed_node_info(routing_domain), - ) + let inner = &*self.inner.read(); + Self::with_routing_domain(inner, routing_domain, |rdd| { + rdd.common().with_peer_info(|pi| pi.clone()) + }) } + /// Return a copy of our node's signednodeinfo pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedNodeInfo { - let node_id = NodeId::new(self.node_id()); - let secret = self.node_id_secret(); - SignedNodeInfo::with_secret(self.get_own_node_info(routing_domain), node_id, &secret) - .unwrap() + let inner = &*self.inner.read(); + Self::with_routing_domain(inner, routing_domain, |rdd| { + rdd.common() + .with_peer_info(|pi| pi.signed_node_info.clone()) + }) } + /// Return a copy of our node's nodeinfo pub fn get_own_node_info(&self, routing_domain: RoutingDomain) -> NodeInfo { - let netman = self.network_manager(); - let relay_node = self.relay_node(routing_domain); - let pc = netman.get_protocol_config(); - NodeInfo { - network_class: netman - .get_network_class(routing_domain) - .unwrap_or(NetworkClass::Invalid), - outbound_protocols: pc.outbound, - address_types: pc.family_global, - min_version: MIN_VERSION, - max_version: MAX_VERSION, - dial_info_detail_list: self.dial_info_details(routing_domain), - relay_peer_info: relay_node - .and_then(|rn| rn.make_peer_info(routing_domain).map(Box::new)), - } + let inner = &*self.inner.read(); + Self::with_routing_domain(inner, routing_domain, |rdd| { + rdd.common() + .with_peer_info(|pi| pi.signed_node_info.node_info.clone()) + }) } + /// Return our currently registered network class pub fn has_valid_own_node_info(&self, routing_domain: RoutingDomain) -> bool { - let netman = self.network_manager(); - let nc = netman - .get_network_class(routing_domain) - .unwrap_or(NetworkClass::Invalid); - !matches!(nc, NetworkClass::Invalid) + let inner = &*self.inner.read(); + Self::with_routing_domain(inner, routing_domain, |rdd| { + rdd.common().has_valid_own_node_info() + }) + } + + /// Return the domain's currently registered network class + pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option { + let inner = &*self.inner.read(); + Self::with_routing_domain(inner, routing_domain, |rdd| rdd.common().network_class()) + } + + /// Return the domain's filter for what we can receivein the form of a dial info filter + pub fn get_inbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { + let inner = &*self.inner.read(); + Self::with_routing_domain(inner, routing_domain, |rdd| { + rdd.common().inbound_dial_info_filter() + }) + } + + /// Return the domain's filter for what we can receive in the form of a node ref filter + pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { + let dif = self.get_inbound_dial_info_filter(routing_domain); + NodeRefFilter::new() + .with_routing_domain(routing_domain) + .with_dial_info_filter(dif) + } + + /// Return the domain's filter for what we can send out in the form of a dial info filter + pub fn get_outbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { + let inner = &*self.inner.read(); + Self::with_routing_domain(inner, routing_domain, |rdd| { + rdd.common().outbound_dial_info_filter() + }) + } + /// Return the domain's filter for what we can receive in the form of a node ref filter + pub fn get_outbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { + let dif = self.get_outbound_dial_info_filter(routing_domain); + NodeRefFilter::new() + .with_routing_domain(routing_domain) + .with_dial_info_filter(dif) } fn bucket_depth(index: usize) -> usize { @@ -434,8 +476,8 @@ impl RoutingTable { // If the local network topology has changed, nuke the existing local node info and let new local discovery happen if changed { let cur_ts = intf::get_timestamp(); - Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_rti, e| { - e.with_mut(|e| { + Self::with_entries_mut(&mut *inner, cur_ts, BucketEntryState::Dead, |rti, _, e| { + e.with_mut(rti, |_rti, e| { e.clear_signed_node_info(RoutingDomain::LocalNetwork); e.set_seen_our_node_info(RoutingDomain::LocalNetwork, false); e.set_updated_since_last_network_change(false); @@ -449,12 +491,13 @@ impl RoutingTable { // should only be performed when there are no node_refs (detached) pub fn purge_buckets(&self) { let mut inner = self.inner.write(); + let inner = &mut *inner; log_rtab!( "Starting routing table buckets purge. Table currently has {} nodes", inner.bucket_entry_count ); - for bucket in &mut inner.buckets { - bucket.kick(0); + for bucket in &inner.buckets { + bucket.kick(inner, 0); } log_rtab!(debug "Routing table buckets purge complete. Routing table now has {} nodes", @@ -465,13 +508,14 @@ impl RoutingTable { // Attempt to remove last_connections from entries pub fn purge_last_connections(&self) { let mut inner = self.inner.write(); + let inner = &mut *inner; log_rtab!( "Starting routing table last_connections purge. Table currently has {} nodes", inner.bucket_entry_count ); - for bucket in &mut inner.buckets { + for bucket in &inner.buckets { for entry in bucket.entries() { - entry.1.with_mut(|e| { + entry.1.with_mut(inner, |_rti, e| { e.clear_last_connections(); }); } @@ -488,7 +532,7 @@ impl RoutingTable { let bucket = &mut inner.buckets[idx]; let bucket_depth = Self::bucket_depth(idx); - if let Some(dead_node_ids) = bucket.kick(bucket_depth) { + if let Some(dead_node_ids) = bucket.kick(inner, bucket_depth) { // Remove counts inner.bucket_entry_count -= dead_node_ids.len(); log_rtab!(debug "Routing table now has {} nodes", inner.bucket_entry_count); @@ -524,8 +568,8 @@ impl RoutingTable { ) -> usize { let mut count = 0usize; let cur_ts = intf::get_timestamp(); - Self::with_entries(inner, cur_ts, min_state, |_, e| { - if e.with(|e| e.best_routing_domain(routing_domain_set)) + Self::with_entries(inner, cur_ts, min_state, |rti, _, e| { + if e.with(rti, |_rti, e| e.best_routing_domain(routing_domain_set)) .is_some() { count += 1; @@ -535,7 +579,7 @@ impl RoutingTable { count } - fn with_entries) -> Option>( + fn with_entries) -> Option>( inner: &RoutingTableInner, cur_ts: u64, min_state: BucketEntryState, @@ -543,8 +587,29 @@ impl RoutingTable { ) -> Option { for bucket in &inner.buckets { for entry in bucket.entries() { - if entry.1.with(|e| e.state(cur_ts) >= min_state) { - if let Some(out) = f(*entry.0, entry.1.clone()) { + if entry.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) { + if let Some(out) = f(inner, *entry.0, entry.1.clone()) { + return Some(out); + } + } + } + } + None + } + + fn with_entries_mut< + T, + F: FnMut(&mut RoutingTableInner, DHTKey, Arc) -> Option, + >( + inner: &mut RoutingTableInner, + cur_ts: u64, + min_state: BucketEntryState, + mut f: F, + ) -> Option { + for bucket in &inner.buckets { + for entry in bucket.entries() { + if entry.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) { + if let Some(out) = f(inner, *entry.0, entry.1.clone()) { return Some(out); } } @@ -561,18 +626,23 @@ impl RoutingTable { ) -> Vec { let inner = self.inner.read(); let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); - Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { - // Only update nodes that haven't seen our node info yet - if all || !v.with(|e| e.has_seen_our_node_info(routing_domain)) { - node_refs.push(NodeRef::new( - self.clone(), - k, - v, - Some(NodeRefFilter::new().with_routing_domain(routing_domain)), - )); - } - Option::<()>::None - }); + Self::with_entries( + &*inner, + cur_ts, + BucketEntryState::Unreliable, + |rti, k, v| { + // Only update nodes that haven't seen our node info yet + if all || !v.with(rti, |_rti, e| e.has_seen_our_node_info(routing_domain)) { + node_refs.push(NodeRef::new( + self.clone(), + k, + v, + Some(NodeRefFilter::new().with_routing_domain(routing_domain)), + )); + } + Option::<()>::None + }, + ); node_refs } @@ -585,35 +655,45 @@ impl RoutingTable { // Collect relay nodes let opt_relay_id = Self::with_routing_domain(&*inner, routing_domain, |rd| { - rd.relay_node().map(|rn| rn.node_id()) + rd.common().relay_node().map(|rn| rn.node_id()) }); // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); - Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { - if v.with(|e| { - e.has_node_info(routing_domain.into()) - && e.needs_ping(cur_ts, opt_relay_id == Some(k)) - }) { - node_refs.push(NodeRef::new( - self.clone(), - k, - v, - Some(NodeRefFilter::new().with_routing_domain(routing_domain)), - )); - } - Option::<()>::None - }); + Self::with_entries( + &*inner, + cur_ts, + BucketEntryState::Unreliable, + |rti, k, v| { + if v.with(rti, |_rti, e| { + e.has_node_info(routing_domain.into()) + && e.needs_ping(cur_ts, opt_relay_id == Some(k)) + }) { + node_refs.push(NodeRef::new( + self.clone(), + k, + v, + Some(NodeRefFilter::new().with_routing_domain(routing_domain)), + )); + } + Option::<()>::None + }, + ); node_refs } pub fn get_all_nodes(&self, cur_ts: u64) -> Vec { let inner = self.inner.read(); let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); - Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { - node_refs.push(NodeRef::new(self.clone(), k, v, None)); - Option::<()>::None - }); + Self::with_entries( + &*inner, + cur_ts, + BucketEntryState::Unreliable, + |_rti, k, v| { + node_refs.push(NodeRef::new(self.clone(), k, v, None)); + Option::<()>::None + }, + ); node_refs } @@ -627,7 +707,7 @@ impl RoutingTable { // in a locked fashion as to ensure the bucket entry state is always valid pub fn create_node_ref(&self, node_id: DHTKey, update_func: F) -> Option where - F: FnOnce(&mut BucketEntryInner), + F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner), { // Ensure someone isn't trying register this node itself if node_id == self.node_id() { @@ -637,6 +717,7 @@ impl RoutingTable { // Lock this entire operation let mut inner = self.inner.write(); + let inner = &mut *inner; // Look up existing entry let idx = self.find_bucket_index(node_id); @@ -657,7 +738,7 @@ impl RoutingTable { // Update the entry let entry = bucket.entry(&node_id).unwrap(); - entry.with_mut(update_func); + entry.with_mut(inner, update_func); // Kick the bucket self.unlocked_inner.kick_queue.lock().insert(idx); @@ -669,9 +750,7 @@ impl RoutingTable { // Update the entry let bucket = &mut inner.buckets[idx]; let entry = bucket.entry(&node_id).unwrap(); - entry.with_mut(|e| { - update_func(e); - }); + entry.with_mut(inner, update_func); nr } @@ -731,7 +810,7 @@ impl RoutingTable { } } - self.create_node_ref(node_id, |e| { + self.create_node_ref(node_id, |_rti, e| { e.update_signed_node_info(routing_domain, signed_node_info); }) .map(|mut nr| { @@ -750,7 +829,7 @@ impl RoutingTable { descriptor: ConnectionDescriptor, timestamp: u64, ) -> Option { - let out = self.create_node_ref(node_id, |e| { + let out = self.create_node_ref(node_id, |_rti, e| { // this node is live because it literally just connected to us e.touch_last_seen(timestamp); }); @@ -783,9 +862,10 @@ impl RoutingTable { let mut health = RoutingTableHealth::default(); let cur_ts = intf::get_timestamp(); let inner = self.inner.read(); + let inner = &*inner; for bucket in &inner.buckets { for (_, v) in bucket.entries() { - match v.with(|e| e.state(cur_ts)) { + match v.with(inner, |_rti, e| e.state(cur_ts)) { BucketEntryState::Reliable => { health.reliable_entry_count += 1; } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index d686e84d..4586189c 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -103,7 +103,7 @@ impl NodeRef { F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T, { let inner = &*self.routing_table.inner.read(); - self.entry.with(|e| f(inner, e)) + self.entry.with(inner, f) } pub(super) fn operate_mut(&self, f: F) -> T @@ -111,7 +111,7 @@ impl NodeRef { F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T, { let inner = &mut *self.routing_table.inner.write(); - self.entry.with_mut(|e| f(inner, e)) + self.entry.with_mut(inner, f) } // Filtering diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs new file mode 100644 index 00000000..480f6a7f --- /dev/null +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -0,0 +1,334 @@ +use super::*; +use crate::veilid_api::*; +use serde::*; + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct RouteSpecDetail { + /// The actual route spec + #[serde(with = "arc_serialize")] + route_spec: Arc, + /// Transfers up and down + transfer_stats_down_up: TransferStatsDownUp, + /// Latency stats + latency_stats: LatencyStats, + /// Accounting mechanism for this route's RPC latency + #[serde(skip)] + latency_stats_accounting: LatencyStatsAccounting, + /// Accounting mechanism for the bandwidth across this route + #[serde(skip)] + transfer_stats_accounting: TransferStatsAccounting, + /// Published private route, do not reuse for ephemeral routes + #[serde(skip)] + published: bool, + /// Timestamp of when the route was created + timestamp: u64, +} + +/// The core representation of the RouteSpecStore that can be serialized +#[derive(Debug, Serialize, Deserialize)] +pub struct RouteSpecStoreContent { + /// All of the routes we have allocated so far + details: HashMap, +} + +/// Ephemeral data used to help the RouteSpecStore operate efficiently +#[derive(Debug, Default)] +pub struct RouteSpecStoreCache { + /// The fastest routes by latency + fastest_routes: Vec, + /// The most reliable routes by node lifetime longevity + reliable_routes: Vec, + /// How many times nodes have been used + used_nodes: HashMap, + /// How many times nodes have been used at the terminal point of a route + used_end_nodes: HashMap, + /// Route spec hop cache, used to quickly disqualify routes + hop_cache: HashSet>, +} + +#[derive(Debug)] +pub struct RouteSpecStore { + /// Serialize RouteSpecStore content + content: RouteSpecStoreContent, + /// RouteSpecStore cache + cache: RouteSpecStoreCache, +} + +fn route_spec_to_hop_cache(spec: Arc) -> Vec { + let mut cache: Vec = Vec::with_capacity(spec.hops.len() * DHT_KEY_LENGTH); + for hop in spec.hops { + cache.extend_from_slice(&hop.dial_info.node_id.key.bytes); + } + cache +} + +fn node_sublist_to_hop_cache( + nodes: &[(DHTKey, Arc)], + start: usize, + len: usize, +) -> Vec { + let mut cache: Vec = Vec::with_capacity(len * DHT_KEY_LENGTH); + for node in &nodes[start..start + len] { + cache.extend_from_slice(&node.0.bytes) + } + cache +} + +impl RouteSpecStore { + pub fn new() -> Self { + Self { + content: RouteSpecStoreContent { + details: HashMap::new(), + }, + cache: Default::default(), + } + } + + pub fn from_cbor( + routing_table: RoutingTable, + cbor: &[u8], + ) -> Result { + let content: RouteSpecStoreContent = serde_cbor::from_slice(cbor) + .map_err(|e| VeilidAPIError::parse_error("invalid route spec store content", e))?; + let rss = RouteSpecStore { + content, + cache: Default::default(), + }; + rss.rebuild_cache(); + Ok(rss) + } + + pub fn to_cbor(&self) -> Vec { + serde_cbor::to_vec(&self.content).unwrap() + } + + fn rebuild_cache(&mut self) { + // + } + + fn detail_mut(&mut self, spec: Arc) -> &mut RouteSpecDetail { + self.content.details.get_mut(&spec.public_key).unwrap() + } + + /// Create a new route + /// Prefers nodes that are not currently in use by another route + /// The route is not yet tested for its reachability + /// Returns None if no route could be allocated at this time + pub fn allocate_route( + &mut self, + routing_table: RoutingTable, + reliable: bool, + hop_count: usize, + ) -> Option> { + use core::cmp::Ordering; + + let max_route_hop_count = { + let config = routing_table.network_manager().config(); + let c = config.get(); + let max_route_hop_count = c.network.rpc.max_route_hop_count; + max_route_hop_count.into() + }; + + if hop_count < 2 { + log_rtab!(error "Not allocating route less than two hops in length"); + return None; + } + + if hop_count > max_route_hop_count { + log_rtab!(error "Not allocating route longer than max route hop count"); + return None; + } + + // Get list of all nodes, and sort them for selection + let cur_ts = intf::get_timestamp(); + let dial_info_sort = if reliable { + Some(DialInfoDetail::reliable_sort) + } else { + None + }; + let filter = |rti, k: DHTKey, v: Option>| -> bool { + // Exclude our own node from routes + if v.is_none() { + return false; + } + let v = v.unwrap(); + + // Exclude nodes on our local network + let on_local_network = v.with(rti, |_rti, e| { + e.node_info(RoutingDomain::LocalNetwork).is_some() + }); + if on_local_network { + return false; + } + + // Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route + v.with(rti, |_rti, e| { + let node_info_ok = if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) { + ni.has_any_dial_info() + } else { + false + }; + let node_status_ok = if let Some(ns) = e.node_status(RoutingDomain::PublicInternet) + { + ns.will_route() + } else { + false + }; + + node_info_ok && node_status_ok + }) + }; + let compare = |rti, + v1: &(DHTKey, Option>), + v2: &(DHTKey, Option>)| + -> Ordering { + // deprioritize nodes that we have already used as end points + let e1_used_end = self + .cache + .used_end_nodes + .get(&v1.0) + .cloned() + .unwrap_or_default(); + let e2_used_end = self + .cache + .used_end_nodes + .get(&v2.0) + .cloned() + .unwrap_or_default(); + let cmp_used_end = e1_used_end.cmp(&e2_used_end); + if !matches!(cmp_used_end, Ordering::Equal) { + return cmp_used_end; + } + + // deprioritize nodes we have used already anywhere + let e1_used = self + .cache + .used_nodes + .get(&v1.0) + .cloned() + .unwrap_or_default(); + let e2_used = self + .cache + .used_nodes + .get(&v2.0) + .cloned() + .unwrap_or_default(); + let cmp_used = e1_used.cmp(&e2_used); + if !matches!(cmp_used, Ordering::Equal) { + return cmp_used; + } + + // always prioritize reliable nodes, but sort by oldest or fastest + let cmpout = v1.1.unwrap().with(rti, |rti, e1| { + v2.1.unwrap().with(rti, |_rti, e2| { + if reliable { + BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2) + } else { + BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2) + } + }) + }); + cmpout + }; + let transform = |rti, k: DHTKey, v: Option>| -> (DHTKey, NodeInfo) { + // Return the key and the nodeinfo for that key + ( + k, + v.unwrap().with(rti, |_rti, e| { + e.node_info(RoutingDomain::PublicInternet.into()) + .unwrap() + .clone() + }), + ) + }; + + // Pull the whole routing table in sorted order + let node_count = routing_table.get_entry_count( + RoutingDomain::PublicInternet.into(), + BucketEntryState::Unreliable, + ); + let mut nodes = routing_table + .find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform); + + // If we couldn't find enough nodes, wait until we have more nodes in the routing table + if nodes.len() < hop_count { + log_rtab!(debug "Not enough nodes to construct route at this time. Try again later."); + return None; + } + + // Now go through nodes and try to build a route we haven't seen yet + let mut route_nodes = None; + for start in 0..(nodes.len() - hop_count) { + // Get the route cache key + let key = node_sublist_to_hop_cache(&nodes, start, hop_count); + + // try each route until we find a unique one + if !self.cache.hop_cache.contains(&key) { + route_nodes = Some(&nodes[start..start + hop_count]); + break; + } + } + if route_nodes.is_none() { + return None; + } + let route_node = route_nodes.unwrap(); + + // Got a unique route, lets build the detail, register it, and return it + let hops: Vec = route_node + .into_iter() + .map(|v| RouteHopSpec { + dial_info: NodeDialInfo { + node_id: NodeId::new(v.0), + dial_info: xxx, + }, + }) + .collect(); + + let (public_key, secret_key) = generate_secret(); + let route_spec = Arc::new(RouteSpec { + public_key, + secret_key, + hops, + }); + + let rsd = RouteSpecDetail { + route_spec, + transfer_stats_down_up: Default::default(), + latency_stats: Default::default(), + latency_stats_accounting: Default::default(), + transfer_stats_accounting: Default::default(), + published: false, + timestamp: cur_ts, + }; + + None + } + + pub fn release_route(&mut self, spec: Arc) {} + + pub fn best_route(&mut self, reliable: bool) -> Arc {} + + /// Mark route as published + /// When first deserialized, routes must be re-published in order to ensure they remain + /// in the RouteSpecStore. + pub fn publish_route(&mut self, spec: Arc) { + //compile private route here? + } + + pub fn record_latency( + &mut self, + spec: Arc, + latency: u64, + ) -> veilid_api::LatencyStats { + } + + pub fn add_down(&mut self, spec: Arc, bytes: u64) { + self.current_transfer.down += bytes; + } + + pub fn add_up(&mut self, spec: Arc, bytes: u64) {} + + pub fn roll_transfers(&mut self) { + // + } +} diff --git a/veilid-core/src/routing_table/routing_domain_editor.rs b/veilid-core/src/routing_table/routing_domain_editor.rs index fac64af7..c507b3fa 100644 --- a/veilid-core/src/routing_table/routing_domain_editor.rs +++ b/veilid-core/src/routing_table/routing_domain_editor.rs @@ -3,8 +3,24 @@ use super::*; enum RoutingDomainChange { ClearDialInfoDetails, ClearRelayNode, - SetRelayNode { relay_node: NodeRef }, - AddDialInfoDetail { dial_info_detail: DialInfoDetail }, + SetRelayNode { + relay_node: NodeRef, + }, + AddDialInfoDetail { + dial_info_detail: DialInfoDetail, + }, + SetupNode { + node_id: DHTKey, + node_id_secret: DHTKeySecret, + }, + SetupNetwork { + outbound_protocols: ProtocolTypeSet, + inbound_protocols: ProtocolTypeSet, + address_types: AddressTypeSet, + }, + SetNetworkClass { + network_class: Option, + }, } pub struct RoutingDomainEditor { @@ -67,9 +83,40 @@ impl RoutingDomainEditor { Ok(()) } + #[instrument(level = "debug", skip(self))] + pub fn setup_node(&mut self, node_id: DHTKey, node_id_secret: DHTKeySecret) { + self.changes.push(RoutingDomainChange::SetupNode { + node_id, + node_id_secret, + }) + } + #[instrument(level = "debug", skip(self))] + pub fn setup_network( + &mut self, + outbound_protocols: ProtocolTypeSet, + inbound_protocols: ProtocolTypeSet, + address_types: AddressTypeSet, + ) { + self.changes.push(RoutingDomainChange::SetupNetwork { + outbound_protocols, + inbound_protocols, + address_types, + }) + } + + #[instrument(level = "debug", skip(self))] + pub fn set_network_class(&mut self, network_class: Option) { + self.changes + .push(RoutingDomainChange::SetNetworkClass { network_class }) + } #[instrument(level = "debug", skip(self))] pub async fn commit(self) { + // No locking if we have nothing to do + if self.changes.is_empty() { + return; + } + let mut changed = false; { let node_id = self.routing_table.node_id(); @@ -81,17 +128,17 @@ impl RoutingDomainEditor { match change { RoutingDomainChange::ClearDialInfoDetails => { debug!("[{:?}] cleared dial info details", self.routing_domain); - detail.clear_dial_info_details(); + detail.common_mut().clear_dial_info_details(); changed = true; } RoutingDomainChange::ClearRelayNode => { debug!("[{:?}] cleared relay node", self.routing_domain); - detail.set_relay_node(None); + detail.common_mut().set_relay_node(None); changed = true; } RoutingDomainChange::SetRelayNode { relay_node } => { debug!("[{:?}] set relay node: {}", self.routing_domain, relay_node); - detail.set_relay_node(Some(relay_node)); + detail.common_mut().set_relay_node(Some(relay_node)); changed = true; } RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => { @@ -99,7 +146,9 @@ impl RoutingDomainEditor { "[{:?}] add dial info detail: {:?}", self.routing_domain, dial_info_detail ); - detail.add_dial_info_detail(dial_info_detail.clone()); + detail + .common_mut() + .add_dial_info_detail(dial_info_detail.clone()); info!( "{:?} Dial Info: {}", @@ -112,8 +161,68 @@ impl RoutingDomainEditor { ); changed = true; } + RoutingDomainChange::SetupNode { + node_id, + node_id_secret, + } => { + debug!( + "[{:?}] setup node: {}", + self.routing_domain, + node_id.encode() + ); + detail.common_mut().setup_node(node_id, node_id_secret); + changed = true; + } + RoutingDomainChange::SetupNetwork { + outbound_protocols, + inbound_protocols, + address_types, + } => { + let old_outbound_protocols = detail.common().outbound_protocols(); + let old_inbound_protocols = detail.common().inbound_protocols(); + let old_address_types = detail.common().address_types(); + + let this_changed = old_outbound_protocols != outbound_protocols + || old_inbound_protocols != inbound_protocols + || old_address_types != address_types; + + debug!( + "[{:?}] setup network: {:?} {:?} {:?}", + self.routing_domain, + outbound_protocols, + inbound_protocols, + address_types + ); + + detail.common_mut().setup_network( + outbound_protocols, + inbound_protocols, + address_types, + ); + if this_changed { + changed = true; + } + } + RoutingDomainChange::SetNetworkClass { network_class } => { + let old_network_class = detail.common().network_class(); + + let this_changed = old_network_class != network_class; + + debug!( + "[{:?}] set network class: {:?}", + self.routing_domain, network_class, + ); + + detail.common_mut().set_network_class(network_class); + if this_changed { + changed = true; + } + } } } + if changed { + detail.common_mut().clear_cache() + } }); if changed { RoutingTable::reset_all_seen_our_node_info(inner, self.routing_domain); diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs index ee29fa73..80ed405c 100644 --- a/veilid-core/src/routing_table/routing_domains.rs +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -1,62 +1,204 @@ use super::*; +#[derive(Debug)] +pub struct RoutingDomainDetailCommon { + routing_domain: RoutingDomain, + node_id: DHTKey, + node_id_secret: DHTKeySecret, + network_class: Option, + outbound_protocols: ProtocolTypeSet, + inbound_protocols: ProtocolTypeSet, + address_types: AddressTypeSet, + relay_node: Option, + dial_info_details: Vec, + // caches + cached_peer_info: Mutex>, +} + +impl RoutingDomainDetailCommon { + pub fn new(routing_domain: RoutingDomain) -> Self { + Self { + routing_domain, + node_id: Default::default(), + node_id_secret: Default::default(), + network_class: Default::default(), + outbound_protocols: Default::default(), + inbound_protocols: Default::default(), + address_types: Default::default(), + relay_node: Default::default(), + dial_info_details: Default::default(), + cached_peer_info: Mutex::new(Default::default()), + } + } + + // Set from routing table + pub(super) fn setup_node(&mut self, node_id: DHTKey, node_id_secret: DHTKeySecret) { + self.node_id = node_id; + self.node_id_secret = node_id_secret; + self.clear_cache(); + } + // Set from network manager + pub(super) fn setup_network( + &mut self, + outbound_protocols: ProtocolTypeSet, + inbound_protocols: ProtocolTypeSet, + address_types: AddressTypeSet, + ) { + self.outbound_protocols = outbound_protocols; + self.inbound_protocols = inbound_protocols; + self.address_types = address_types; + } + + pub fn node_id(&self) -> DHTKey { + self.node_id + } + pub fn node_id_secret(&self) -> DHTKeySecret { + self.node_id_secret + } + pub(super) fn set_network_class(&mut self, network_class: Option) { + self.network_class = network_class; + } + pub fn network_class(&self) -> Option { + self.network_class + } + pub fn outbound_protocols(&self) -> ProtocolTypeSet { + self.outbound_protocols + } + pub fn inbound_protocols(&self) -> ProtocolTypeSet { + self.inbound_protocols + } + pub fn address_types(&self) -> AddressTypeSet { + self.address_types + } + pub fn relay_node(&self) -> Option { + self.relay_node.clone() + } + pub(super) fn set_relay_node(&mut self, opt_relay_node: Option) { + self.relay_node = opt_relay_node.map(|nr| { + nr.filtered_clone(NodeRefFilter::new().with_routing_domain(self.routing_domain)) + }) + } + pub fn dial_info_details(&self) -> &Vec { + &self.dial_info_details + } + pub(super) fn clear_dial_info_details(&mut self) { + self.dial_info_details.clear(); + } + pub(super) fn add_dial_info_detail(&mut self, did: DialInfoDetail) { + self.dial_info_details.push(did); + self.dial_info_details.sort(); + } + + pub fn has_valid_own_node_info(&self) -> bool { + self.network_class.unwrap_or(NetworkClass::Invalid) != NetworkClass::Invalid + } + + pub fn with_peer_info(&self, f: F) -> R + where + F: FnOnce(&PeerInfo) -> R, + { + let cpi = self.cached_peer_info.lock(); + if cpi.is_none() { + // Regenerate peer info + let pi = PeerInfo::new( + NodeId::new(self.node_id), + SignedNodeInfo::with_secret( + NodeInfo { + network_class: self.network_class.unwrap_or(NetworkClass::Invalid), + outbound_protocols: self.outbound_protocols, + address_types: self.address_types, + min_version: MIN_VERSION, + max_version: MAX_VERSION, + dial_info_detail_list: self.dial_info_details.clone(), + relay_peer_info: self + .relay_node + .and_then(|rn| rn.make_peer_info(self.routing_domain).map(Box::new)), + }, + NodeId::new(self.node_id), + &self.node_id_secret, + ) + .unwrap(), + ); + // Cache the peer info + *cpi = Some(pi); + } + f(cpi.as_ref().unwrap()) + } + + pub fn inbound_dial_info_filter(&self) -> DialInfoFilter { + DialInfoFilter::all() + .with_protocol_type_set(self.inbound_protocols) + .with_address_type_set(self.address_types) + } + pub fn outbound_dial_info_filter(&self) -> DialInfoFilter { + DialInfoFilter::all() + .with_protocol_type_set(self.outbound_protocols) + .with_address_type_set(self.address_types) + } + + pub(super) fn clear_cache(&self) { + *self.cached_peer_info.lock() = None; + } +} + /// General trait for all routing domains pub trait RoutingDomainDetail { + // Common accessors + fn common(&self) -> &RoutingDomainDetailCommon; + fn common_mut(&mut self) -> &mut RoutingDomainDetailCommon; + + // Per-domain accessors fn can_contain_address(&self, address: Address) -> bool; - fn relay_node(&self) -> Option; - fn set_relay_node(&mut self, opt_relay_node: Option); - fn dial_info_details(&self) -> &Vec; - fn clear_dial_info_details(&mut self); - fn add_dial_info_detail(&mut self, did: DialInfoDetail); } +///////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /// Public Internet routing domain internals -#[derive(Debug, Default)] +#[derive(Debug)] pub struct PublicInternetRoutingDomainDetail { - /// An optional node we relay through for this domain - relay_node: Option, - /// The dial infos on this domain we can be reached by - dial_info_details: Vec, + /// Common implementation for all routing domains + common: RoutingDomainDetailCommon, +} + +impl Default for PublicInternetRoutingDomainDetail { + fn default() -> Self { + Self { + common: RoutingDomainDetailCommon::new(RoutingDomain::PublicInternet), + } + } } impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { + fn common(&self) -> &RoutingDomainDetailCommon { + &self.common + } + fn common_mut(&mut self) -> &mut RoutingDomainDetailCommon { + &mut self.common + } fn can_contain_address(&self, address: Address) -> bool { address.is_global() } - fn relay_node(&self) -> Option { - self.relay_node.clone() - } - fn set_relay_node(&mut self, opt_relay_node: Option) { - self.relay_node = opt_relay_node.map(|nr| { - nr.filtered_clone( - NodeRefFilter::new().with_routing_domain(RoutingDomain::PublicInternet), - ) - }) - } - fn dial_info_details(&self) -> &Vec { - &self.dial_info_details - } - fn clear_dial_info_details(&mut self) { - self.dial_info_details.clear(); - } - fn add_dial_info_detail(&mut self, did: DialInfoDetail) { - self.dial_info_details.push(did); - self.dial_info_details.sort(); - } } /// Local Network routing domain internals -#[derive(Debug, Default)] -pub struct LocalInternetRoutingDomainDetail { - /// An optional node we relay through for this domain - relay_node: Option, - /// The dial infos on this domain we can be reached by - dial_info_details: Vec, +#[derive(Debug)] +pub struct LocalNetworkRoutingDomainDetail { /// The local networks this domain will communicate with local_networks: Vec<(IpAddr, IpAddr)>, + /// Common implementation for all routing domains + common: RoutingDomainDetailCommon, } -impl LocalInternetRoutingDomainDetail { +impl Default for LocalNetworkRoutingDomainDetail { + fn default() -> Self { + Self { + local_networks: Default::default(), + common: RoutingDomainDetailCommon::new(RoutingDomain::LocalNetwork), + } + } +} + +impl LocalNetworkRoutingDomainDetail { pub fn set_local_networks(&mut self, mut local_networks: Vec<(IpAddr, IpAddr)>) -> bool { local_networks.sort(); if local_networks == self.local_networks { @@ -67,7 +209,13 @@ impl LocalInternetRoutingDomainDetail { } } -impl RoutingDomainDetail for LocalInternetRoutingDomainDetail { +impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { + fn common(&self) -> &RoutingDomainDetailCommon { + &self.common + } + fn common_mut(&mut self) -> &mut RoutingDomainDetailCommon { + &mut self.common + } fn can_contain_address(&self, address: Address) -> bool { let ip = address.to_ip_addr(); for localnet in &self.local_networks { @@ -77,22 +225,4 @@ impl RoutingDomainDetail for LocalInternetRoutingDomainDetail { } false } - fn relay_node(&self) -> Option { - self.relay_node.clone() - } - fn set_relay_node(&mut self, opt_relay_node: Option) { - self.relay_node = opt_relay_node.map(|nr| { - nr.filtered_clone(NodeRefFilter::new().with_routing_domain(RoutingDomain::LocalNetwork)) - }); - } - fn dial_info_details(&self) -> &Vec { - &self.dial_info_details - } - fn clear_dial_info_details(&mut self) { - self.dial_info_details.clear(); - } - fn add_dial_info_detail(&mut self, did: DialInfoDetail) { - self.dial_info_details.push(did); - self.dial_info_details.sort(); - } } diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs index 943f650b..d43ed5a4 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks.rs @@ -22,8 +22,13 @@ impl RoutingTable { ); // Roll all bucket entry transfers - for b in &mut inner.buckets { - b.roll_transfers(last_ts, cur_ts); + let entries: Vec> = inner + .buckets + .iter() + .flat_map(|b| b.entries().map(|(_k, v)| v.clone())) + .collect(); + for v in entries { + v.with_mut(inner, |_rti, e| e.roll_transfers(last_ts, cur_ts)); } Ok(()) } diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index d98adf30..3510f826 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -7,8 +7,8 @@ pub enum Destination { Direct { /// The node to send to target: NodeRef, - /// An optional safety route specification to send from for sender privacy - safety_route_spec: Option>, + /// Require safety route or not + safety: bool, }, /// Send to node for relay purposes Relay { @@ -16,15 +16,17 @@ pub enum Destination { relay: NodeRef, /// The final destination the relay should send to target: DHTKey, - /// An optional safety route specification to send from for sender privacy - safety_route_spec: Option>, + /// Require safety route or not + safety: bool, }, /// Send to private route (privateroute) PrivateRoute { /// A private route to send to private_route: PrivateRoute, - /// An optional safety route specification to send from for sender privacy - safety_route_spec: Option>, + /// Require safety route or not + safety: bool, + /// Prefer reliability or not + reliable: bool, }, } @@ -32,115 +34,47 @@ impl Destination { pub fn direct(target: NodeRef) -> Self { Self::Direct { target, - safety_route_spec: None, + safety: false, } } pub fn relay(relay: NodeRef, target: DHTKey) -> Self { Self::Relay { relay, target, - safety_route_spec: None, + safety: false, } } - pub fn private_route(private_route: PrivateRoute) -> Self { + pub fn private_route(private_route: PrivateRoute, reliable: bool) -> Self { Self::PrivateRoute { private_route, - safety_route_spec: None, + safety: false, + reliable, } } - // pub fn target_id(&self) -> DHTKey { - // match self { - // Destination::Direct { - // target, - // safety_route_spec, - // } => target.node_id(), - // Destination::Relay { - // relay, - // target, - // safety_route_spec, - // } => *target, - // Destination::PrivateRoute { - // private_route, - // safety_route_spec, - // } => {} - // } - // } - // pub fn best_routing_domain(&self) -> RoutingDomain { - // match self { - // Destination::Direct { - // target, - // safety_route_spec, - // } => { - // if safety_route_spec.is_some() { - // RoutingDomain::PublicInternet - // } else { - // target - // .best_routing_domain() - // .unwrap_or(RoutingDomain::PublicInternet) - // } - // } - // Destination::Relay { - // relay, - // target, - // safety_route_spec, - // } => { - // if safety_route_spec.is_some() { - // RoutingDomain::PublicInternet - // } else { - // relay - // .best_routing_domain() - // .unwrap_or(RoutingDomain::PublicInternet) - // } - // } - // Destination::PrivateRoute { - // private_route: _, - // safety_route_spec: _, - // } => RoutingDomain::PublicInternet, - // } - // } - - pub fn safety_route_spec(&self) -> Option> { + pub fn with_safety(self) -> Self { match self { - Destination::Direct { - target: _, - safety_route_spec, - } => safety_route_spec.clone(), - Destination::Relay { - relay: _, - target: _, - safety_route_spec, - } => safety_route_spec.clone(), - Destination::PrivateRoute { - private_route: _, - safety_route_spec, - } => safety_route_spec.clone(), - } - } - pub fn with_safety_route_spec(self, safety_route_spec: Arc) -> Self { - match self { - Destination::Direct { + Destination::Direct { target, safety: _ } => Self::Direct { target, - safety_route_spec: _, - } => Self::Direct { - target, - safety_route_spec: Some(safety_route_spec), + safety: true, }, Destination::Relay { relay, target, - safety_route_spec: _, + safety: _, } => Self::Relay { relay, target, - safety_route_spec: Some(safety_route_spec), + safety: true, }, Destination::PrivateRoute { private_route, - safety_route_spec: _, + safety: _, + reliable, } => Self::PrivateRoute { private_route, - safety_route_spec: Some(safety_route_spec), + safety: true, + reliable, }, } } @@ -149,39 +83,29 @@ impl Destination { impl fmt::Display for Destination { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Destination::Direct { - target, - safety_route_spec, - } => { - let sr = safety_route_spec - .as_ref() - .map(|_sr| "+SR".to_owned()) - .unwrap_or_default(); + Destination::Direct { target, safety } => { + let sr = if *safety { "+SR" } else { "" }; - write!(f, "{:?}{}", target, sr) + write!(f, "{}{}", target, sr) } Destination::Relay { relay, target, - safety_route_spec, + safety, } => { - let sr = safety_route_spec - .as_ref() - .map(|_sr| "+SR".to_owned()) - .unwrap_or_default(); + let sr = if *safety { "+SR" } else { "" }; - write!(f, "{:?}@{:?}{}", target.encode(), relay, sr) + write!(f, "{}@{}{}", target.encode(), relay, sr) } Destination::PrivateRoute { private_route, - safety_route_spec, + safety, + reliable, } => { - let sr = safety_route_spec - .as_ref() - .map(|_sr| "+SR".to_owned()) - .unwrap_or_default(); + let sr = if *safety { "+SR" } else { "" }; + let rl = if *reliable { "+RL" } else { "" }; - write!(f, "{}{}", private_route, sr) + write!(f, "{}{}{}", private_route, sr, rl) } } } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 9de635db..2867d719 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -544,7 +544,8 @@ impl RPCProcessor { } // Don't do this if our own signed node info isn't valid yet let routing_table = self.routing_table(); - if !routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet) { + let network_manager = self.network_manager(); + if !RoutingTable::has_valid_own_node_info(network_manager, RoutingDomain::PublicInternet) { return None; } diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 60eea998..2448b9b8 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -62,16 +62,34 @@ impl RPCProcessor { // add node information for the requesting node to our routing table let routing_table = self.routing_table(); - let rt2 = routing_table.clone(); - let rt3 = routing_table.clone(); + let network_manager = self.network_manager(); + let has_valid_own_node_info = + routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet); + let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet); // find N nodes closest to the target node in our routing table let closest_nodes = routing_table.find_closest_nodes( find_node_q.node_id, // filter - move |_k, v| rt2.filter_has_valid_signed_node_info(RoutingDomain::PublicInternet, v), + |rti, _k, v| { + RoutingTable::filter_has_valid_signed_node_info_inner( + rti, + RoutingDomain::PublicInternet, + has_valid_own_node_info, + v, + ) + }, // transform - move |k, v| rt3.transform_to_peer_info(RoutingDomain::PublicInternet, k, v), + |rti, k, v| { + let own_peer_info = own_peer_info.clone(); + RoutingTable::transform_to_peer_info_inner( + rti, + RoutingDomain::PublicInternet, + own_peer_info, + k, + v, + ) + }, ); // Make status answer diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index e1cc7788..9c73baf2 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -87,7 +87,7 @@ impl RPCProcessor { routing_domain, dial_info.clone(), ); - let will_validate_dial_info_filter = |e: &BucketEntryInner| { + let will_validate_dial_info_filter = |_rti, e: &BucketEntryInner| { if let Some(status) = &e.node_status(routing_domain) { status.will_validate_dial_info() } else { diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index f330ef44..1fea8f86 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -2002,25 +2002,6 @@ impl VeilidAPI { .map_err(|e| VeilidAPIError::internal(e)) } - //////////////////////////////////////////////////////////////// - // Safety / Private Route Handling - - #[instrument(level = "debug", err, skip(self))] - pub async fn new_safety_route_spec( - &self, - _hops: u8, - ) -> Result { - panic!("unimplemented"); - } - - #[instrument(level = "debug", err, skip(self))] - pub async fn new_private_route_spec( - &self, - _hops: u8, - ) -> Result { - panic!("unimplemented"); - } - //////////////////////////////////////////////////////////////// // Routing Context diff --git a/veilid-core/src/veilid_api/privacy.rs b/veilid-core/src/veilid_api/privacy.rs index 046785a9..6ddc3ef0 100644 --- a/veilid-core/src/veilid_api/privacy.rs +++ b/veilid-core/src/veilid_api/privacy.rs @@ -9,35 +9,17 @@ pub struct RouteHopSpec { } #[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct PrivateRouteSpec { +pub struct RouteSpec { // pub public_key: DHTKey, pub secret_key: DHTKeySecret, pub hops: Vec, } -impl PrivateRouteSpec { +impl RouteSpec { pub fn new() -> Self { let (pk, sk) = generate_secret(); - PrivateRouteSpec { - public_key: pk, - secret_key: sk, - hops: Vec::new(), - } - } -} - -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct SafetyRouteSpec { - pub public_key: DHTKey, - pub secret_key: DHTKeySecret, - pub hops: Vec, -} - -impl SafetyRouteSpec { - pub fn new() -> Self { - let (pk, sk) = generate_secret(); - SafetyRouteSpec { + RouteSpec { public_key: pk, secret_key: sk, hops: Vec::new(), diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index b095fd3a..215a8b2c 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -10,10 +10,8 @@ pub enum Target { pub struct RoutingContextInner {} pub struct RoutingContextUnlockedInner { - /// Safety route specified here is for _this_ node's anonymity as a sender, used via the 'route' operation - safety_route_spec: Option>, - /// Private route specified here is for _this_ node's anonymity as a receiver, passed out via the 'respond_to' field for replies - private_route_spec: Option>, + /// Enforce use of private routing + privacy: bool, /// Choose reliable protocols over unreliable/faster protocols when available reliable: bool, } @@ -43,24 +41,18 @@ impl RoutingContext { api, inner: Arc::new(Mutex::new(RoutingContextInner {})), unlocked_inner: Arc::new(RoutingContextUnlockedInner { - safety_route_spec: None, - private_route_spec: None, + privacy: false, reliable: false, }), } } - pub fn with_privacy( - self, - safety_route_spec: SafetyRouteSpec, - private_route_spec: PrivateRouteSpec, - ) -> Self { + pub fn with_privacy(self) -> Self { Self { api: self.api.clone(), inner: Arc::new(Mutex::new(RoutingContextInner {})), unlocked_inner: Arc::new(RoutingContextUnlockedInner { - safety_route_spec: Some(Arc::new(safety_route_spec)), - private_route_spec: Some(Arc::new(private_route_spec)), + privacy: true, reliable: self.unlocked_inner.reliable, }), } @@ -71,8 +63,7 @@ impl RoutingContext { api: self.api.clone(), inner: Arc::new(Mutex::new(RoutingContextInner {})), unlocked_inner: Arc::new(RoutingContextUnlockedInner { - safety_route_spec: self.unlocked_inner.safety_route_spec.clone(), - private_route_spec: self.unlocked_inner.private_route_spec.clone(), + privacy: self.unlocked_inner.privacy, reliable: true, }), } @@ -102,12 +93,13 @@ impl RoutingContext { } Ok(rpc_processor::Destination::Direct { target: nr, - safety_route_spec: self.unlocked_inner.safety_route_spec.clone(), + safety: self.unlocked_inner.privacy, }) } Target::PrivateRoute(pr) => Ok(rpc_processor::Destination::PrivateRoute { private_route: pr, - safety_route_spec: self.unlocked_inner.safety_route_spec.clone(), + safety: self.unlocked_inner.privacy, + reliable: self.unlocked_inner.reliable, }), } } diff --git a/veilid-core/src/veilid_api/serialize_helpers.rs b/veilid-core/src/veilid_api/serialize_helpers.rs index 58b51909..3f843e4b 100644 --- a/veilid-core/src/veilid_api/serialize_helpers.rs +++ b/veilid-core/src/veilid_api/serialize_helpers.rs @@ -113,3 +113,18 @@ pub mod opt_json_as_string { } } } + +pub mod arc_serialize { + use alloc::sync::Arc; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize(v: &Arc, s: S) -> Result { + T::serialize(v.as_ref(), s) + } + + pub fn deserialize<'de, T: Deserialize<'de>, D: Deserializer<'de>>( + d: D, + ) -> Result, D::Error> { + Ok(Arc::new(T::deserialize(d)?)) + } +}