From f7873aba88a892eeff6bdaebc149f977b1650b36 Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 7 Apr 2022 09:55:09 -0400 Subject: [PATCH] add keepalives for route nodes --- veilid-core/Cargo.toml | 3 +- veilid-core/proto/veilid.capnp | 8 +- veilid-core/src/dht/key.rs | 5 ++ veilid-core/src/intf/native/system.rs | 4 + veilid-core/src/intf/wasm/system.rs | 37 ++++++++ veilid-core/src/network_manager.rs | 74 +++++++++++++++- veilid-core/src/routing_table/bucket_entry.rs | 87 ++++++++++++++++--- veilid-core/src/routing_table/mod.rs | 36 +++++++- veilid-core/src/veilid_api/mod.rs | 40 ++++++++- 9 files changed, 267 insertions(+), 27 deletions(-) diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 69d00223..a8584793 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -95,7 +95,8 @@ send_wrapper = "^0" [target.'cfg(target_arch = "wasm32")'.dependencies.web-sys] version = "^0" features = [ - # 'Document', + 'Document', + 'HtmlDocument', # 'Element', # 'HtmlElement', # 'Node', diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 770595ae..e2bbcdb0 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -171,11 +171,11 @@ enum NetworkClass { server @0; # S = Device with public IP and no UDP firewall mapped @1; # M = Device with portmap behind any NAT fullConeNAT @2; # F = Device without portmap behind full-cone NAT - addressRestrictedNAT @3; # R1 = Device without portmap behind address-only restricted NAT - portRestrictedNAT @4; # R2 = Device without portmap behind address-and-port restricted NAT + addressRestrictedNAT @3; # A = Device without portmap behind address-only restricted NAT + portRestrictedNAT @4; # P = Device without portmap behind address-and-port restricted NAT outboundOnly @5; # O = Outbound only - webApp @6; # W = PWA in either normal or tor web browser - invalid @7; # X = Invalid + webApp @6; # W = PWA + invalid @7; # I = Invalid } struct NodeInfo { diff --git a/veilid-core/src/dht/key.rs b/veilid-core/src/dht/key.rs index fba0c927..3b9cd313 100644 --- a/veilid-core/src/dht/key.rs +++ b/veilid-core/src/dht/key.rs @@ -444,3 +444,8 @@ pub fn distance(key1: &DHTKey, key2: &DHTKey) -> DHTKeyDistance { DHTKeyDistance::new(bytes) } + +#[allow(dead_code)] +pub fn sort_closest_fn(key: DHTKey) -> impl FnMut(&DHTKey, &DHTKey) -> std::cmp::Ordering { + move |k1, k2| distance(k1, &key).cmp(&distance(k2, &key)) +} diff --git a/veilid-core/src/intf/native/system.rs b/veilid-core/src/intf/native/system.rs index 6ea0b3ae..507cfb3e 100644 --- a/veilid-core/src/intf/native/system.rs +++ b/veilid-core/src/intf/native/system.rs @@ -92,6 +92,10 @@ pub fn get_concurrency() -> u32 { num_cpus::get() as u32 } +pub async fn get_outbound_relay_peer() -> Option { + panic!("Native Veilid should never require an outbound relay"); +} + /* pub fn async_callback(fut: F, ok_fn: OF, err_fn: EF) where diff --git a/veilid-core/src/intf/wasm/system.rs b/veilid-core/src/intf/wasm/system.rs index f0cbda61..6b691383 100644 --- a/veilid-core/src/intf/wasm/system.rs +++ b/veilid-core/src/intf/wasm/system.rs @@ -164,3 +164,40 @@ where pub fn get_concurrency() -> u32 { 1 } + +pub async fn get_outbound_relay_peer() -> Option { + // unimplemented! + None +} + +// pub async fn get_pwa_web_server_config() -> { +// if utils::is_browser() { + +// let win = window().unwrap(); +// let doc = win.document().unwrap(); +// let html_document = document.dyn_into::().unwrap(); +// let cookie = html_document.cookie().unwrap(); + +// // let wait_millis = if millis > u32::MAX { +// // i32::MAX +// // } else { +// // millis as i32 +// // }; +// // let promise = Promise::new(&mut |yes, _| { +// // let win = window().unwrap(); +// // win.set_timeout_with_callback_and_timeout_and_arguments_0(&yes, wait_millis) +// // .unwrap(); +// // }); + +// // JsFuture::from(promise).await.unwrap(); +// } else if utils::is_nodejs() { +// // let promise = Promise::new(&mut |yes, _| { +// // nodejs_global_set_timeout_with_callback_and_timeout_and_arguments_0(&yes, millis) +// // .unwrap(); +// // }); + +// // JsFuture::from(promise).await.unwrap(); +// } else { +// panic!("WASM requires browser or nodejs environment"); +// } +// } \ No newline at end of file diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index b134b288..8d77ec0c 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -10,6 +10,7 @@ use xx::*; //////////////////////////////////////////////////////////////////////////////////////// +pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes @@ -97,11 +98,13 @@ struct NetworkManagerInner { network_class: Option, stats: NetworkManagerStats, client_whitelist: LruCache, + relay_node: Option, } struct NetworkManagerUnlockedInner { // Background processes rolling_transfers_task: TickTask, + relay_management_task: TickTask, } #[derive(Clone)] @@ -121,12 +124,14 @@ impl NetworkManager { network_class: None, stats: NetworkManagerStats::default(), client_whitelist: LruCache::new_unbounded(), + relay_node: None, } } fn new_unlocked_inner(_config: VeilidConfig) -> NetworkManagerUnlockedInner { //let c = config.get(); NetworkManagerUnlockedInner { rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), + relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), } } @@ -147,6 +152,15 @@ impl NetworkManager { Box::pin(this2.clone().rolling_transfers_task_routine(l, t)) }); } + // Set relay management tick task + { + let this2 = this.clone(); + this.unlocked_inner + .relay_management_task + .set_routine(move |l, t| { + Box::pin(this2.clone().relay_management_task_routine(l, t)) + }); + } this } pub fn config(&self) -> VeilidConfig { @@ -192,6 +206,10 @@ impl NetworkManager { .clone() } + pub fn relay_node(&self) -> Option { + self.inner.lock().relay_node.clone() + } + pub async fn init(&self) -> Result<(), String> { let routing_table = RoutingTable::new(self.clone()); routing_table.init().await?; @@ -353,10 +371,10 @@ impl NetworkManager { pub fn generate_node_info(&self) -> NodeInfo { let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); - let will_route = network_class.can_relay(); // xxx: eventually this may have more criteria added - let will_tunnel = network_class.can_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point + let will_route = network_class.can_inbound_relay(); // xxx: eventually this may have more criteria added + let will_tunnel = network_class.can_inbound_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point let will_signal = network_class.can_signal(); - let will_relay = network_class.can_relay(); + let will_relay = network_class.can_inbound_relay(); let will_validate_dial_info = network_class.can_validate_dial_info(); NodeInfo { @@ -665,6 +683,56 @@ impl NetworkManager { Ok(true) } + // Keep relays assigned and accessible + async fn relay_management_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { + log_net!("--- network manager relay_management task"); + + // Get our node's current network class and do the right thing + let network_class = self.get_network_class(); + + // Do we know our network class yet? + if let Some(network_class) = network_class { + let routing_table = self.routing_table(); + + // If we already have a relay, see if it is dead, or if we don't need it any more + { + let mut inner = self.inner.lock(); + if let Some(relay_node) = inner.relay_node.clone() { + let state = relay_node.operate(|e| e.state(cur_ts)); + if matches!(state, BucketEntryState::Dead) || !network_class.needs_relay() { + // Relay node is dead or no longer needed + inner.relay_node = None; + } + } + } + + // Do we need an outbound relay? + if network_class.outbound_wants_relay() { + // The outbound relay is the host of the PWA + if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await { + let mut inner = self.inner.lock(); + + // Register new outbound relay + let nr = routing_table.register_node_with_dial_info( + outbound_relay_peerinfo.node_id.key, + &outbound_relay_peerinfo.dial_infos, + )?; + inner.relay_node = Some(nr); + } + } else if network_class.needs_relay() { + // Find a node in our routing table that is an acceptable inbound relay + if let Some(nr) = routing_table.find_inbound_relay(cur_ts) { + let mut inner = self.inner.lock(); + inner.relay_node = Some(nr); + } + } + } else { + // If we don't know our network class, we do nothing here and wait until we do + } + + Ok(()) + } + // Compute transfer statistics for the low level network async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { log_net!("--- network manager rolling_transfers task"); diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index cc634886..f6a6638b 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -19,6 +19,11 @@ const RELIABLE_PING_INTERVAL_MULTIPLIER: f64 = 2.0; const UNRELIABLE_PING_SPAN_SECS: u32 = 60; const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; +// Keepalive pings are done occasionally to ensure holepunched public dialinfo +// remains valid, as well as to make sure we remain in any relay node's routing table +const KEEPALIVE_PING_INTERVAL_SECS: u32 = 30; + +// Do not change order here, it will mess up other sorts #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum BucketEntryState { Dead, @@ -60,6 +65,46 @@ impl BucketEntry { } } + pub fn sort_fastest(e1: &Self, e2: &Self) -> std::cmp::Ordering { + // Lower latency to the front + if let Some(e1_latency) = &e1.peer_stats.latency { + if let Some(e2_latency) = &e2.peer_stats.latency { + e1_latency.average.cmp(&e2_latency.average) + } else { + std::cmp::Ordering::Less + } + } else if e2.peer_stats.latency.is_some() { + std::cmp::Ordering::Greater + } else { + std::cmp::Ordering::Equal + } + } + + pub fn cmp_fastest_reliable(cur_ts: u64, e1: &Self, e2: &Self) -> std::cmp::Ordering { + // Reverse compare so most reliable is at front + let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts)); + if ret != std::cmp::Ordering::Equal { + return ret; + } + + // Lower latency to the front + if let Some(e1_latency) = &e1.peer_stats.latency { + if let Some(e2_latency) = &e2.peer_stats.latency { + e1_latency.average.cmp(&e2_latency.average) + } else { + std::cmp::Ordering::Less + } + } else if e2.peer_stats.latency.is_some() { + std::cmp::Ordering::Greater + } else { + std::cmp::Ordering::Equal + } + } + + pub fn sort_fastest_reliable_fn(cur_ts: u64) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering { + move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2) + } + pub fn update_dial_infos(&mut self, dial_infos: &[DialInfo]) { self.dial_infos = dial_infos.to_vec(); self.dial_infos.sort(); @@ -185,15 +230,35 @@ impl BucketEntry { } } - // Check if this node needs a ping right now to validate it is still reachable - pub(super) fn needs_ping(&self, cur_ts: u64) -> bool { - // See which ping pattern we are to use - let mut state = self.state(cur_ts); + fn needs_constant_ping(&self, cur_ts: u64, interval: u64) -> bool { + match self.peer_stats.ping_stats.last_pinged { + None => true, + Some(last_pinged) => cur_ts.saturating_sub(last_pinged) >= (interval * 1000000u64), + } + } - // If the current dial info hasn't been recognized, - // then we gotta ping regardless so treat the node as unreliable, briefly + // Check if this node needs a ping right now to validate it is still reachable + pub(super) fn needs_ping( + &self, + routing_table: RoutingTable, + node_id: &DHTKey, + cur_ts: u64, + ) -> bool { + let netman = routing_table.network_manager(); + let relay_node = netman.relay_node(); + + // See which ping pattern we are to use + let state = self.state(cur_ts); + + // If the current dial info hasn't been recognized then we gotta ping regardless if !self.seen_our_dial_info && matches!(state, BucketEntryState::Reliable) { - state = BucketEntryState::Unreliable; + return self.needs_constant_ping(cur_ts, UNRELIABLE_PING_INTERVAL_SECS as u64); + } + // If this entry is our relay node, then we should ping it regularly + else if let Some(relay_node) = relay_node { + if relay_node.node_id() == *node_id { + return self.needs_constant_ping(cur_ts, KEEPALIVE_PING_INTERVAL_SECS as u64); + } } match state { @@ -225,13 +290,7 @@ impl BucketEntry { } BucketEntryState::Unreliable => { // If we are in an unreliable state, we need a ping every UNRELIABLE_PING_INTERVAL_SECS seconds - match self.peer_stats.ping_stats.last_pinged { - None => true, - Some(last_pinged) => { - cur_ts.saturating_sub(last_pinged) - >= (UNRELIABLE_PING_INTERVAL_SECS as u64 * 1000000u64) - } - } + self.needs_constant_ping(cur_ts, UNRELIABLE_PING_INTERVAL_SECS as u64) } BucketEntryState::Dead => false, } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 117bb0f7..246ad9bd 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -477,6 +477,38 @@ impl RoutingTable { f(entry) } + pub fn find_inbound_relay(&self, cur_ts: u64) -> Option { + let mut inner = self.inner.lock(); + let mut best_inbound_relay: Option = None; + + // Iterate all known nodes for candidates + for b in &mut inner.buckets { + for (k, entry) in b.entries_mut() { + // Ensure it's not dead + if !matches!(entry.state(cur_ts), BucketEntryState::Dead) { + // Ensure we have a node info + if let Some(node_info) = &entry.peer_stats().node_info { + // Ensure network class can relay + if node_info.network_class.can_inbound_relay() { + if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { + if best_inbound_relay.operate(|best| { + BucketEntry::cmp_fastest_reliable(cur_ts, best, entry) + }) == std::cmp::Ordering::Greater + { + *best_inbound_relay = NodeRef::new(self.clone(), *k, entry); + } + } else { + best_inbound_relay = Some(NodeRef::new(self.clone(), *k, entry)); + } + } + } + } + } + } + + best_inbound_relay + } + pub async fn find_self(&self, node_ref: NodeRef) -> Result, String> { let node_id = self.node_id(); let rpc_processor = self.rpc_processor(); @@ -635,7 +667,7 @@ impl RoutingTable { let mut inner = self.inner.lock(); for b in &mut inner.buckets { for (k, entry) in b.entries_mut() { - if entry.needs_ping(cur_ts) { + if entry.needs_ping(self.clone(), k, cur_ts) { let nr = NodeRef::new(self.clone(), *k, entry); log_rtab!( " --- ping validating: {:?} ({})", @@ -690,6 +722,8 @@ impl RoutingTable { // Ping validate some nodes to groom the table self.unlocked_inner.ping_validator_task.tick().await?; + // Keepalive + Ok(()) } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index be2503a7..7f5f3857 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -236,14 +236,15 @@ pub enum NetworkClass { Server = 0, // S = Device with public IP and no UDP firewall Mapped = 1, // M = Device with portmap behind any NAT FullConeNAT = 2, // F = Device without portmap behind full-cone NAT - AddressRestrictedNAT = 3, // R1 = Device without portmap behind address-only restricted NAT - PortRestrictedNAT = 4, // R2 = Device without portmap behind address-and-port restricted NAT + AddressRestrictedNAT = 3, // A = Device without portmap behind address-only restricted NAT + PortRestrictedNAT = 4, // P = Device without portmap behind address-and-port restricted NAT OutboundOnly = 5, // O = Outbound only WebApp = 6, // W = PWA Invalid = 7, // I = Invalid network class, unreachable or can not send packets } impl NetworkClass { + // Can the node receive inbound requests without a relay? pub fn inbound_capable(&self) -> bool { matches!( self, @@ -254,21 +255,52 @@ impl NetworkClass { | Self::PortRestrictedNAT ) } + + // Should an outbound relay be kept available? + pub fn outbound_wants_relay(&self) -> bool { + matches!(self, Self::WebApp) + } + + // Is a signal required to do an inbound hole-punch? pub fn inbound_requires_signal(&self) -> bool { matches!(self, Self::AddressRestrictedNAT | Self::PortRestrictedNAT) } + + // Is some relay required either for signal or inbound relay or outbound relay? + pub fn needs_relay(&self) -> bool { + matches!( + self, + Self::AddressRestrictedNAT + | Self::PortRestrictedNAT + | Self::OutboundOnly + | Self::WebApp + ) + } + + // Must keepalive be used to preserve the public dialinfo in use? + // Keepalive can be to either a pub fn dialinfo_requires_keepalive(&self) -> bool { matches!( self, - Self::FullConeNAT | Self::AddressRestrictedNAT | Self::PortRestrictedNAT + Self::FullConeNAT + | Self::AddressRestrictedNAT + | Self::PortRestrictedNAT + | Self::OutboundOnly + | Self::WebApp ) } + + // Can this node assist with signalling? Yes but only if it doesn't require signalling, itself. pub fn can_signal(&self) -> bool { self.inbound_capable() && !self.inbound_requires_signal() } - pub fn can_relay(&self) -> bool { + + // Can this node relay be an inbound relay? + pub fn can_inbound_relay(&self) -> bool { matches!(self, Self::Server | Self::Mapped | Self::FullConeNAT) } + + // Is this node capable of validating dial info pub fn can_validate_dial_info(&self) -> bool { matches!(self, Self::Server | Self::Mapped | Self::FullConeNAT) }