diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index b2e8f223..3ed4d486 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -184,7 +184,7 @@ impl ConnectionTable { Some(out.get_handle()) } - pub fn get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec { + pub fn _get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec { let inner = self.inner.lock(); inner .ids_by_remote diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index a95a5648..868561fa 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -34,32 +34,56 @@ pub const PEEK_DETECT_LEN: usize = 64; ///////////////////////////////////////////////////////////////// struct NetworkInner { + /// true if the low-level network is running network_started: bool, + /// set if the network needs to be restarted due to a low level configuration change + /// such as dhcp release or change of address or interfaces being added or removed network_needs_restart: bool, + /// the calculated protocol configuration for inbound/outbound protocols protocol_config: Option, + /// set of statically configured protocols with public dialinfo static_public_dialinfo: ProtocolTypeSet, + /// network class per routing domain network_class: [Option; RoutingDomain::count()], + /// join handles for all the low level network background tasks join_handles: Vec>, + /// stop source for shutting down the low level network background tasks stop_source: Option, + /// port we are binding raw udp listen to udp_port: u16, + /// port we are binding raw tcp listen to tcp_port: u16, + /// port we are binding websocket listen to ws_port: u16, + /// port we are binding secure websocket listen to wss_port: u16, + /// does our network have ipv4 on any network? enable_ipv4: bool, + /// does our network have ipv6 on the global internet? enable_ipv6_global: bool, + /// does our network have ipv6 on the local network? enable_ipv6_local: bool, - // public dial info check + /// set if we need to calculate our public dial info again needs_public_dial_info_check: bool, + /// set during the actual execution of the public dial info check to ensure we don't do it more than once doing_public_dial_info_check: bool, + /// the punishment closure to enax public_dial_info_check_punishment: Option>, - // udp + /// udp socket record for bound-first sockets, which are used to guarantee a port is available before + /// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using bound_first_udp: BTreeMap>, + /// mapping of protocol handlers to accept messages from a set of bound socket addresses inbound_udp_protocol_handlers: BTreeMap, + /// outbound udp protocol handler for udpv4 outbound_udpv4_protocol_handler: Option, + /// outbound udp protocol handler for udpv6 outbound_udpv6_protocol_handler: Option, - //tcp + /// tcp socket record for bound-first sockets, which are used to guarantee a port is available before + /// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using bound_first_tcp: BTreeMap>, + /// TLS handling socket controller tls_acceptor: Option, + /// Multiplexer record for protocols on low level TCP sockets listener_states: BTreeMap>>, } @@ -712,18 +736,32 @@ impl Network { protocol_config }; + // Start editing routing table + let mut editor_public_internet = self + .unlocked_inner + .routing_table + .edit_routing_domain(RoutingDomain::PublicInternet); + let mut editor_local_network = self + .unlocked_inner + .routing_table + .edit_routing_domain(RoutingDomain::LocalNetwork); + // start listeners if protocol_config.inbound.contains(ProtocolType::UDP) { - self.start_udp_listeners().await?; + self.start_udp_listeners(&mut editor_public_internet, &mut editor_local_network) + .await?; } if protocol_config.inbound.contains(ProtocolType::WS) { - self.start_ws_listeners().await?; + self.start_ws_listeners(&mut editor_public_internet, &mut editor_local_network) + .await?; } if protocol_config.inbound.contains(ProtocolType::WSS) { - self.start_wss_listeners().await?; + self.start_wss_listeners(&mut editor_public_internet, &mut editor_local_network) + .await?; } if protocol_config.inbound.contains(ProtocolType::TCP) { - self.start_tcp_listeners().await?; + self.start_tcp_listeners(&mut editor_public_internet, &mut editor_local_network) + .await?; } // release caches of available listener ports @@ -748,6 +786,10 @@ impl Network { info!("network started"); self.inner.lock().network_started = true; + // commit routing table edits + editor_public_internet.commit().await; + editor_local_network.commit().await; + Ok(()) } @@ -792,9 +834,16 @@ impl Network { while unord.next().await.is_some() {} debug!("clearing dial info"); - // Drop all dial info - routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); - routing_table.clear_dial_info_details(RoutingDomain::LocalNetwork); + + let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); + editor.disable_node_info_updates(); + editor.clear_dial_info_details(); + editor.commit().await; + + let mut editor = routing_table.edit_routing_domain(RoutingDomain::LocalNetwork); + editor.disable_node_info_updates(); + editor.clear_dial_info_details(); + editor.commit().await; // Reset state including network class *self.inner.lock() = Self::new_inner(); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 798bbfa1..b4c1568d 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -608,7 +608,6 @@ impl Network { (protocol_config, existing_network_class, tcp_same_port) }; let routing_table = self.routing_table(); - let network_manager = self.network_manager(); // Process all protocol and address combinations let mut futures = FuturesUnordered::new(); @@ -771,6 +770,7 @@ impl Network { // If a network class could be determined // see about updating our public dial info let mut changed = false; + let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); if new_network_class.is_some() { // Get existing public dial info let existing_public_dial_info: HashSet = routing_table @@ -814,13 +814,9 @@ impl Network { // Is the public dial info different? if existing_public_dial_info != new_public_dial_info { // If so, clear existing public dial info and re-register the new public dial info - routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); + editor.clear_dial_info_details(); for did in new_public_dial_info { - if let Err(e) = routing_table.register_dial_info( - RoutingDomain::PublicInternet, - did.dial_info, - did.class, - ) { + if let Err(e) = editor.register_dial_info(did.dial_info, did.class) { log_net!(error "Failed to register detected public dial info: {}", e); } } @@ -836,7 +832,7 @@ impl Network { } } else if existing_network_class.is_some() { // Network class could not be determined - routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); + editor.clear_dial_info_details(); self.inner.lock().network_class[RoutingDomain::PublicInternet as usize] = None; changed = true; log_net!(debug "network class cleared"); @@ -849,9 +845,7 @@ impl Network { } } else { // Send updates to everyone - network_manager - .send_node_info_updates(RoutingDomain::PublicInternet, true) - .await; + editor.commit().await; } Ok(()) diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index e14d5ea6..bf862fa4 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -250,7 +250,11 @@ impl Network { ///////////////////////////////////////////////////// - pub(super) async fn start_udp_listeners(&self) -> EyreResult<()> { + pub(super) async fn start_udp_listeners( + &self, + editor_public_internet: &mut RoutingDomainEditor, + editor_local_network: &mut RoutingDomainEditor, + ) -> EyreResult<()> { trace!("starting udp listeners"); let routing_table = self.routing_table(); let (listen_address, public_address, detect_address_changes) = { @@ -293,20 +297,12 @@ impl Network { && public_address.is_none() && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di) { - routing_table.register_dial_info( - RoutingDomain::PublicInternet, - di.clone(), - DialInfoClass::Direct, - )?; + editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?; static_public = true; } // Register interface dial info as well since the address is on the local interface - routing_table.register_dial_info( - RoutingDomain::LocalNetwork, - di.clone(), - DialInfoClass::Direct, - )?; + editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?; } // Add static public dialinfo if it's configured @@ -322,11 +318,8 @@ impl Network { // Register the public address if !detect_address_changes { - routing_table.register_dial_info( - RoutingDomain::PublicInternet, - pdi.clone(), - DialInfoClass::Direct, - )?; + editor_public_internet + .register_dial_info(pdi.clone(), DialInfoClass::Direct)?; static_public = true; } @@ -341,8 +334,7 @@ impl Network { })(); if !local_dial_info_list.contains(&pdi) && is_interface_address { - routing_table.register_dial_info( - RoutingDomain::LocalNetwork, + editor_local_network.register_dial_info( DialInfo::udp_from_socketaddr(pdi_addr), DialInfoClass::Direct, )?; @@ -361,7 +353,11 @@ impl Network { self.create_udp_listener_tasks().await } - pub(super) async fn start_ws_listeners(&self) -> EyreResult<()> { + pub(super) async fn start_ws_listeners( + &self, + editor_public_internet: &mut RoutingDomainEditor, + editor_local_network: &mut RoutingDomainEditor, + ) -> EyreResult<()> { trace!("starting ws listeners"); let routing_table = self.routing_table(); let (listen_address, url, path, detect_address_changes) = { @@ -418,11 +414,8 @@ impl Network { .wrap_err("try_ws failed")?; if !detect_address_changes { - routing_table.register_dial_info( - RoutingDomain::PublicInternet, - pdi.clone(), - DialInfoClass::Direct, - )?; + editor_public_internet + .register_dial_info(pdi.clone(), DialInfoClass::Direct)?; static_public = true; } @@ -430,11 +423,7 @@ impl Network { if !registered_addresses.contains(&gsa.ip()) && self.is_usable_interface_address(gsa.ip()) { - routing_table.register_dial_info( - RoutingDomain::LocalNetwork, - pdi, - DialInfoClass::Direct, - )?; + editor_local_network.register_dial_info(pdi, DialInfoClass::Direct)?; } registered_addresses.insert(gsa.ip()); @@ -455,20 +444,13 @@ impl Network { && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &local_di) { // Register public dial info - routing_table.register_dial_info( - RoutingDomain::PublicInternet, - local_di.clone(), - DialInfoClass::Direct, - )?; + editor_public_internet + .register_dial_info(local_di.clone(), DialInfoClass::Direct)?; static_public = true; } // Register local dial info - routing_table.register_dial_info( - RoutingDomain::LocalNetwork, - local_di, - DialInfoClass::Direct, - )?; + editor_local_network.register_dial_info(local_di, DialInfoClass::Direct)?; } if static_public { @@ -481,10 +463,13 @@ impl Network { Ok(()) } - pub(super) async fn start_wss_listeners(&self) -> EyreResult<()> { + pub(super) async fn start_wss_listeners( + &self, + editor_public_internet: &mut RoutingDomainEditor, + editor_local_network: &mut RoutingDomainEditor, + ) -> EyreResult<()> { trace!("starting wss listeners"); - let routing_table = self.routing_table(); let (listen_address, url, detect_address_changes) = { let c = self.config.get(); ( @@ -543,11 +528,8 @@ impl Network { .wrap_err("try_wss failed")?; if !detect_address_changes { - routing_table.register_dial_info( - RoutingDomain::PublicInternet, - pdi.clone(), - DialInfoClass::Direct, - )?; + editor_public_internet + .register_dial_info(pdi.clone(), DialInfoClass::Direct)?; static_public = true; } @@ -555,11 +537,7 @@ impl Network { if !registered_addresses.contains(&gsa.ip()) && self.is_usable_interface_address(gsa.ip()) { - routing_table.register_dial_info( - RoutingDomain::LocalNetwork, - pdi, - DialInfoClass::Direct, - )?; + editor_local_network.register_dial_info(pdi, DialInfoClass::Direct)?; } registered_addresses.insert(gsa.ip()); @@ -578,7 +556,11 @@ impl Network { Ok(()) } - pub(super) async fn start_tcp_listeners(&self) -> EyreResult<()> { + pub(super) async fn start_tcp_listeners( + &self, + editor_public_internet: &mut RoutingDomainEditor, + editor_local_network: &mut RoutingDomainEditor, + ) -> EyreResult<()> { trace!("starting tcp listeners"); let routing_table = self.routing_table(); @@ -624,19 +606,11 @@ impl Network { && public_address.is_none() && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di) { - routing_table.register_dial_info( - RoutingDomain::PublicInternet, - di.clone(), - DialInfoClass::Direct, - )?; + editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?; static_public = true; } // Register interface dial info - routing_table.register_dial_info( - RoutingDomain::LocalNetwork, - di.clone(), - DialInfoClass::Direct, - )?; + editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?; registered_addresses.insert(socket_address.to_ip_addr()); } @@ -656,21 +630,14 @@ impl Network { let pdi = DialInfo::tcp_from_socketaddr(pdi_addr); if !detect_address_changes { - routing_table.register_dial_info( - RoutingDomain::PublicInternet, - pdi.clone(), - DialInfoClass::Direct, - )?; + editor_public_internet + .register_dial_info(pdi.clone(), DialInfoClass::Direct)?; static_public = true; } // See if this public address is also a local interface address if self.is_usable_interface_address(pdi_addr.ip()) { - routing_table.register_dial_info( - RoutingDomain::LocalNetwork, - pdi, - DialInfoClass::Direct, - )?; + editor_local_network.register_dial_info(pdi, DialInfoClass::Direct)?; } } } diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index ebb9dea0..bc571210 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -500,7 +500,9 @@ impl NetworkManager { let routing_table = self.routing_table(); let node_info = routing_table.get_own_node_info(RoutingDomain::PublicInternet); let network_class = self.get_network_class(RoutingDomain::PublicInternet); - let mut node_info_changed = false; + + // Get routing domain editor + let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); // Do we know our network class yet? if let Some(network_class) = network_class { @@ -511,16 +513,14 @@ impl NetworkManager { // Relay node is dead or no longer needed if matches!(state, BucketEntryState::Dead) { info!("Relay node died, dropping relay {}", relay_node); - routing_table.set_relay_node(RoutingDomain::PublicInternet, None); - node_info_changed = true; + editor.clear_relay_node(); false } else if !node_info.requires_relay() { info!( "Relay node no longer required, dropping relay {}", relay_node ); - routing_table.set_relay_node(RoutingDomain::PublicInternet, None); - node_info_changed = true; + editor.clear_relay_node(); false } else { true @@ -544,8 +544,7 @@ impl NetworkManager { false, ) { info!("Outbound relay node selected: {}", nr); - routing_table.set_relay_node(RoutingDomain::PublicInternet, Some(nr)); - node_info_changed = true; + editor.set_relay_node(nr); } } // Otherwise we must need an inbound relay @@ -555,18 +554,14 @@ impl NetworkManager { routing_table.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) { info!("Inbound relay node selected: {}", nr); - routing_table.set_relay_node(RoutingDomain::PublicInternet, Some(nr)); - node_info_changed = true; + editor.set_relay_node(nr); } } } } - // Re-send our node info if we selected a relay - if node_info_changed { - self.send_node_info_updates(RoutingDomain::PublicInternet, true) - .await; - } + // Commit the changes + editor.commit().await; Ok(()) } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 1bffb04b..1b17d374 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -192,11 +192,6 @@ impl RoutingTable { Self::with_routing_domain(&*inner, domain, |rd| rd.relay_node()) } - pub fn set_relay_node(&self, domain: RoutingDomain, opt_relay_node: Option) { - let mut inner = self.inner.write(); - Self::with_routing_domain_mut(&mut *inner, domain, |rd| rd.set_relay_node(opt_relay_node)); - } - pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { let inner = self.inner.read(); Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details().is_empty()) @@ -299,46 +294,6 @@ impl RoutingTable { RoutingDomainEditor::new(self.clone(), domain) } - #[instrument(level = "debug", skip(self), err)] - pub fn register_dial_info( - &self, - domain: RoutingDomain, - dial_info: DialInfo, - class: DialInfoClass, - ) -> EyreResult<()> { - if !self.ensure_dial_info_is_valid(domain, &dial_info) { - return Err(eyre!( - "dial info '{}' is not valid in routing domain '{:?}'", - dial_info, - domain - )); - } - - let mut inner = self.inner.write(); - Self::with_routing_domain_mut(&mut *inner, domain, |rd| { - rd.add_dial_info_detail(DialInfoDetail { - dial_info: dial_info.clone(), - class, - }); - }); - - info!( - "{:?} Dial Info: {}", - domain, - NodeDialInfo { - node_id: NodeId::new(inner.node_id), - dial_info - } - .to_string(), - ); - debug!(" Class: {:?}", class); - - Self::reset_all_seen_our_node_info(&mut *inner, domain); - Self::reset_all_updated_since_last_network_change(&mut *inner); - - Ok(()) - } - fn reset_all_seen_our_node_info(inner: &mut RoutingTableInner, routing_domain: RoutingDomain) { let cur_ts = intf::get_timestamp(); Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { @@ -357,18 +312,6 @@ impl RoutingTable { }); } - pub fn clear_dial_info_details(&self, routing_domain: RoutingDomain) { - trace!("clearing dial info domain: {:?}", routing_domain); - - let mut inner = self.inner.write(); - Self::with_routing_domain_mut(&mut *inner, routing_domain, |rd| { - rd.clear_dial_info_details(); - }); - - // Public dial info changed, go through all nodes and reset their 'seen our node info' bit - Self::reset_all_seen_our_node_info(&mut *inner, routing_domain); - } - pub fn get_own_peer_info(&self, routing_domain: RoutingDomain) -> PeerInfo { PeerInfo::new( NodeId::new(self.node_id()), @@ -858,9 +801,6 @@ impl RoutingTable { let mut dead = true; if let Some(nr) = self.lookup_node_ref(*e) { if let Some(last_connection) = nr.last_connection() { - - - out.push((*e, RecentPeersEntry { last_connection })); dead = false; } diff --git a/veilid-core/src/routing_table/routing_domain_editor.rs b/veilid-core/src/routing_table/routing_domain_editor.rs index 53cbb395..c0bc8ccd 100644 --- a/veilid-core/src/routing_table/routing_domain_editor.rs +++ b/veilid-core/src/routing_table/routing_domain_editor.rs @@ -1,11 +1,17 @@ use super::*; -enum RoutingDomainChange {} +enum RoutingDomainChange { + ClearDialInfoDetails, + ClearRelayNode, + SetRelayNode { relay_node: NodeRef }, + AddDialInfoDetail { dial_info_detail: DialInfoDetail }, +} pub struct RoutingDomainEditor { routing_table: RoutingTable, routing_domain: RoutingDomain, changes: Vec, + send_node_info_updates: bool, } impl RoutingDomainEditor { @@ -14,8 +20,111 @@ impl RoutingDomainEditor { routing_table, routing_domain, changes: Vec::new(), + send_node_info_updates: true, } } + #[instrument(level = "debug", skip(self))] + pub fn disable_node_info_updates(&mut self) { + self.send_node_info_updates = false; + } - pub fn commit(self) {} + #[instrument(level = "debug", skip(self))] + pub fn clear_dial_info_details(&mut self) { + self.changes.push(RoutingDomainChange::ClearDialInfoDetails); + } + #[instrument(level = "debug", skip(self))] + pub fn clear_relay_node(&mut self) { + self.changes.push(RoutingDomainChange::ClearRelayNode); + } + #[instrument(level = "debug", skip(self))] + pub fn set_relay_node(&mut self, relay_node: NodeRef) { + self.changes + .push(RoutingDomainChange::SetRelayNode { relay_node }) + } + #[instrument(level = "debug", skip(self), err)] + pub fn register_dial_info( + &mut self, + dial_info: DialInfo, + class: DialInfoClass, + ) -> EyreResult<()> { + if !self + .routing_table + .ensure_dial_info_is_valid(self.routing_domain, &dial_info) + { + return Err(eyre!( + "dial info '{}' is not valid in routing domain '{:?}'", + dial_info, + self.routing_domain + )); + } + + self.changes.push(RoutingDomainChange::AddDialInfoDetail { + dial_info_detail: DialInfoDetail { + dial_info: dial_info.clone(), + class, + }, + }); + + Ok(()) + } + + #[instrument(level = "debug", skip(self))] + pub async fn commit(self) { + let mut changed = false; + { + let mut inner = self.routing_table.inner.write(); + let inner = &mut *inner; + let node_id = inner.node_id; + + RoutingTable::with_routing_domain_mut(inner, self.routing_domain, |detail| { + for change in self.changes { + match change { + RoutingDomainChange::ClearDialInfoDetails => { + debug!("[{:?}] cleared dial info details", self.routing_domain); + detail.clear_dial_info_details(); + changed = true; + } + RoutingDomainChange::ClearRelayNode => { + debug!("[{:?}] cleared relay node", self.routing_domain); + detail.set_relay_node(None); + changed = true; + } + RoutingDomainChange::SetRelayNode { relay_node } => { + debug!("[{:?}] set relay node: {}", self.routing_domain, relay_node); + detail.set_relay_node(Some(relay_node)); + changed = true; + } + RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => { + debug!( + "[{:?}] add dial info detail: {:?}", + self.routing_domain, dial_info_detail + ); + detail.add_dial_info_detail(dial_info_detail.clone()); + + info!( + "{:?} Dial Info: {}", + self.routing_domain, + NodeDialInfo { + node_id: NodeId::new(node_id), + dial_info: dial_info_detail.dial_info + } + .to_string(), + ); + changed = true; + } + } + } + }); + if changed { + RoutingTable::reset_all_seen_our_node_info(inner, self.routing_domain); + RoutingTable::reset_all_updated_since_last_network_change(inner); + } + } + if changed && self.send_node_info_updates { + let network_manager = self.routing_table.unlocked_inner.network_manager.clone(); + network_manager + .send_node_info_updates(self.routing_domain, true) + .await; + } + } }