From e0a5b1bd691a1d41c0e6182a8cf1d7cce94c6685 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 3 Sep 2022 13:57:25 -0400 Subject: [PATCH] refactor checkpoint --- scripts/local-test.yml | 1 - veilid-core/proto/veilid.capnp | 77 +++-- veilid-core/src/network_manager/mod.rs | 186 ++++++----- veilid-core/src/network_manager/native/mod.rs | 23 ++ .../network_manager/native/start_protocols.rs | 6 +- veilid-core/src/network_manager/tasks.rs | 151 +++++++-- veilid-core/src/routing_table/bucket_entry.rs | 90 +++--- veilid-core/src/routing_table/find_nodes.rs | 95 ++---- veilid-core/src/routing_table/mod.rs | 266 +++++++++------- veilid-core/src/routing_table/node_ref.rs | 299 ++++++++++-------- .../src/routing_table/routing_domains.rs | 94 ++++++ .../src/rpc_processor/coders/node_status.rs | 73 ++++- .../coders/operations/operation.rs | 36 ++- .../coders/operations/question.rs | 8 +- .../coders/operations/respond_to.rs | 20 +- veilid-core/src/rpc_processor/destination.rs | 135 ++++++++ veilid-core/src/rpc_processor/mod.rs | 293 +++-------------- .../src/rpc_processor/private_route.rs | 8 +- .../src/rpc_processor/rpc_find_node.rs | 10 +- .../src/rpc_processor/rpc_node_info_update.rs | 6 +- veilid-core/src/rpc_processor/rpc_status.rs | 78 ++++- .../src/tests/common/test_veilid_config.rs | 2 - veilid-core/src/veilid_api/debug.rs | 51 ++- veilid-core/src/veilid_api/mod.rs | 36 ++- veilid-core/src/veilid_config.rs | 2 - veilid-core/src/xx/ip_extra.rs | 45 +++ veilid-flutter/example/lib/config.dart | 1 - veilid-flutter/lib/veilid.dart | 4 - veilid-server/src/cmdline.rs | 9 +- veilid-server/src/settings.rs | 7 - 30 files changed, 1274 insertions(+), 838 deletions(-) create mode 100644 veilid-core/src/routing_table/routing_domains.rs create mode 100644 veilid-core/src/rpc_processor/destination.rs diff --git a/scripts/local-test.yml b/scripts/local-test.yml index 1e8a7258..6e73ef0c 100644 --- a/scripts/local-test.yml +++ b/scripts/local-test.yml @@ -3,5 +3,4 @@ core: network: dht: min_peer_count: 1 - enable_local_peer_scope: true bootstrap: [] diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index dc2d67fc..86bdcd95 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -175,10 +175,6 @@ struct ValueData { # Operations ############################## -struct OperationStatusQ { - nodeStatus @0 :NodeStatus; # node status update about the statusq sender -} - enum NetworkClass { inboundCapable @0; # I = Inbound capable without relay, may require signal outboundOnly @1; # O = Outbound only, inbound relay required except with reverse connect signal @@ -199,7 +195,7 @@ struct DialInfoDetail { class @1 :DialInfoClass; } -struct NodeStatus { +struct PublicInternetNodeStatus { willRoute @0 :Bool; willTunnel @1 :Bool; willSignal @2 :Bool; @@ -207,6 +203,18 @@ struct NodeStatus { willValidateDialInfo @4 :Bool; } +struct LocalNetworkNodeStatus { + willRelay @0 :Bool; + willValidateDialInfo @1 :Bool; +} + +struct NodeStatus { + union { + publicInternet @0 :PublicInternetNodeStatus; + localNetwork @1 :LocalNetworkNodeStatus; + } +} + struct ProtocolTypeSet { udp @0 :Bool; tcp @1 :Bool; @@ -239,6 +247,21 @@ struct SenderInfo { socketAddress @0 :SocketAddress; # socket address was available for peer } +struct PeerInfo { + nodeId @0 :NodeID; # node id for 'closer peer' + signedNodeInfo @1 :SignedNodeInfo; # signed node info for 'closer peer' +} + +struct RoutedOperation { + signatures @0 :List(Signature); # signatures from nodes that have handled the private route + nonce @1 :Nonce; # nonce Xmsg + data @2 :Data; # Operation encrypted with ENC(Xmsg,DH(PKapr,SKbsr)) +} + +struct OperationStatusQ { + nodeStatus @0 :NodeStatus; # node status update about the statusq sender +} + struct OperationStatusA { nodeStatus @0 :NodeStatus; # returned node status senderInfo @1 :SenderInfo; # info about StatusQ sender from the perspective of the replier @@ -258,21 +281,10 @@ struct OperationFindNodeQ { nodeId @0 :NodeID; # node id to locate } -struct PeerInfo { - nodeId @0 :NodeID; # node id for 'closer peer' - signedNodeInfo @1 :SignedNodeInfo; # signed node info for 'closer peer' -} - struct OperationFindNodeA { peers @0 :List(PeerInfo); # returned 'closer peer' information } -struct RoutedOperation { - signatures @0 :List(Signature); # signatures from nodes that have handled the private route - nonce @1 :Nonce; # nonce Xmsg - data @2 :Data; # Operation encrypted with ENC(Xmsg,DH(PKapr,SKbsr)) -} - struct OperationRoute { safetyRoute @0 :SafetyRoute; # Where this should go operation @1 :RoutedOperation; # The operation to be routed @@ -419,26 +431,25 @@ struct OperationCancelTunnelA { # Things that want an answer struct Question { respondTo :union { - sender @0 :Void; # sender without node info - senderWithInfo @1 :SignedNodeInfo; # some envelope-sender signed node info to be used for reply - privateRoute @2 :PrivateRoute; # embedded private route to be used for reply + sender @0 :Void; # sender + privateRoute @1 :PrivateRoute; # embedded private route to be used for reply } detail :union { # Direct operations - statusQ @3 :OperationStatusQ; - findNodeQ @4 :OperationFindNodeQ; + statusQ @2 :OperationStatusQ; + findNodeQ @3 :OperationFindNodeQ; # Routable operations - getValueQ @5 :OperationGetValueQ; - setValueQ @6 :OperationSetValueQ; - watchValueQ @7 :OperationWatchValueQ; - supplyBlockQ @8 :OperationSupplyBlockQ; - findBlockQ @9 :OperationFindBlockQ; + getValueQ @4 :OperationGetValueQ; + setValueQ @5 :OperationSetValueQ; + watchValueQ @6 :OperationWatchValueQ; + supplyBlockQ @7 :OperationSupplyBlockQ; + findBlockQ @8 :OperationFindBlockQ; # Tunnel operations - startTunnelQ @10 :OperationStartTunnelQ; - completeTunnelQ @11 :OperationCompleteTunnelQ; - cancelTunnelQ @12 :OperationCancelTunnelQ; + startTunnelQ @9 :OperationStartTunnelQ; + completeTunnelQ @10 :OperationCompleteTunnelQ; + cancelTunnelQ @11 :OperationCancelTunnelQ; } } @@ -480,10 +491,10 @@ struct Answer { struct Operation { opId @0 :UInt64; # Random RPC ID. Must be random to foil reply forgery attacks. - + senderInfo @1 :SignedNodeInfo; # (optional) SignedNodeInfo for the sender to be cached by the receiver. kind :union { - question @1 :Question; - statement @2 :Statement; - answer @3 :Answer; + question @2 :Question; + statement @3 :Statement; + answer @4 :Answer; } } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index fb7986fe..44e5075d 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -552,33 +552,29 @@ impl NetworkManager { // Run the rolling transfers task self.unlocked_inner.rolling_transfers_task.tick().await?; - // Process global peer scope ticks - // These only make sense when connected to the actual internet and not just the local network - // Must have at least one outbound protocol enabled, and one global peer scope address family enabled - let global_peer_scope_enabled = - !protocol_config.outbound.is_empty() && !protocol_config.family_global.is_empty(); - if global_peer_scope_enabled { - // Run the relay management task - self.unlocked_inner.relay_management_task.tick().await?; + // Run the relay management task + self.unlocked_inner.relay_management_task.tick().await?; - // If routing table has no live entries, then add the bootstrap nodes to it - let live_entry_count = routing_table.get_entry_count(BucketEntryState::Unreliable); - if live_entry_count == 0 { - self.unlocked_inner.bootstrap_task.tick().await?; - } - - // If we still don't have enough peers, find nodes until we do - let min_peer_count = { - let c = self.config.get(); - c.network.dht.min_peer_count as usize - }; - if live_entry_count < min_peer_count { - self.unlocked_inner.peer_minimum_refresh_task.tick().await?; - } - - // Ping validate some nodes to groom the table - self.unlocked_inner.ping_validator_task.tick().await?; + // See how many live PublicInternet entries we have + let live_public_internet_entry_count = routing_table.get_entry_count( + RoutingTableSet::only(RoutingTable::PublicInternet), + BucketEntryState::Unreliable, + ); + let min_peer_count = { + let c = self.config.get(); + c.network.dht.min_peer_count as usize + }; + // If none, then add the bootstrap nodes to it + if live_public_internet_entry_count == 0 { + self.unlocked_inner.bootstrap_task.tick().await?; } + // If we still don't have enough peers, find nodes until we do + else if live_public_internet_entry_count < min_peer_count { + self.unlocked_inner.peer_minimum_refresh_task.tick().await?; + } + + // Ping validate some nodes to groom the table + self.unlocked_inner.ping_validator_task.tick().await?; // Run the routing table tick routing_table.tick().await?; @@ -605,21 +601,18 @@ impl NetworkManager { } // Get our node's capabilities - pub fn generate_node_status(&self) -> NodeStatus { - let peer_info = self + fn generate_public_internet_node_status(&self) -> PublicInternetNodeStatus { + let node_info = self .routing_table() - .get_own_peer_info(RoutingDomain::PublicInternet); + .get_own_node_info(RoutingDomain::PublicInternet); - let will_route = peer_info.signed_node_info.node_info.can_inbound_relay(); // xxx: eventually this may have more criteria added - let will_tunnel = peer_info.signed_node_info.node_info.can_inbound_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point - let will_signal = peer_info.signed_node_info.node_info.can_signal(); - let will_relay = peer_info.signed_node_info.node_info.can_inbound_relay(); - let will_validate_dial_info = peer_info - .signed_node_info - .node_info - .can_validate_dial_info(); + let will_route = node_info.can_inbound_relay(); // xxx: eventually this may have more criteria added + let will_tunnel = node_info.can_inbound_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point + let will_signal = node_info.can_signal(); + let will_relay = node_info.can_inbound_relay(); + let will_validate_dial_info = node_info.can_validate_dial_info(); - NodeStatus { + PublicInternetNodeStatus { will_route, will_tunnel, will_signal, @@ -627,6 +620,30 @@ impl NetworkManager { will_validate_dial_info, } } + fn generate_local_network_node_status(&self) -> LocalNetworkNodeStatus { + let node_info = self + .routing_table() + .get_own_node_info(RoutingDomain::LocalNetwork); + + let will_relay = node_info.can_inbound_relay(); + let will_validate_dial_info = node_info.can_validate_dial_info(); + + LocalNetworkNodeStatus { + will_relay, + will_validate_dial_info, + } + } + + pub fn generate_node_status(&self, routing_domain: RoutingDomain) -> NodeStatus { + match routing_domain { + RoutingDomain::PublicInternet => { + NodeStatus::PublicInternet(self.generate_public_internet_node_status()) + } + RoutingDomain::LocalNetwork => { + NodeStatus::LocalNetwork(self.generate_local_network_node_status()) + } + } + } // Return what protocols we have enabled pub fn get_protocol_config(&self) -> ProtocolConfig { @@ -650,6 +667,12 @@ impl NetworkManager { .clone(), } } + pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { + let dif = self.get_inbound_dial_info_filter(routing_domain); + NodeRefFilter::new() + .with_routing_domain(routing_domain) + .with_dial_info_filter(dif) + } // Return a dial info filter for what we can send out pub fn get_outbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { @@ -667,6 +690,12 @@ impl NetworkManager { .clone(), } } + pub fn get_outbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { + let dif = self.get_outbound_dial_info_filter(routing_domain); + NodeRefFilter::new() + .with_routing_domain(routing_domain) + .with_dial_info_filter(dif) + } // Generates a multi-shot/normal receipt #[instrument(level = "trace", skip(self, extra_data, callback), err)] @@ -949,12 +978,11 @@ impl NetworkManager { #[instrument(level = "trace", skip(self), ret)] fn get_contact_method_public(&self, target_node_ref: NodeRef) -> ContactMethod { // Scope noderef down to protocols we can do outbound - let public_outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet); - let target_node_ref = target_node_ref.filtered_clone(public_outbound_dif.clone()); + let public_outbound_nrf = self.get_outbound_node_ref_filter(RoutingDomain::PublicInternet); + let target_node_ref = target_node_ref.filtered_clone(public_outbound_nrf.clone()); // Get the best match internet dial info if we have it - let opt_target_public_did = - target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)); + let opt_target_public_did = target_node_ref.first_filtered_dial_info_detail(); if let Some(target_public_did) = opt_target_public_did { // Do we need to signal before going inbound? if !target_public_did.class.requires_signal() { @@ -966,12 +994,9 @@ impl NetworkManager { // Note that .relay() never returns our own node. We can't relay to ourselves. if let Some(inbound_relay_nr) = target_node_ref.relay(RoutingDomain::PublicInternet) { // Scope down to protocols we can do outbound - let inbound_relay_nr = inbound_relay_nr.filtered_clone(public_outbound_dif.clone()); + let inbound_relay_nr = inbound_relay_nr.filtered_clone(public_outbound_nrf.clone()); // Can we reach the inbound relay? - if inbound_relay_nr - .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) - .is_some() - { + if inbound_relay_nr.first_filtered_dial_info_detail().is_some() { // Can we receive anything inbound ever? let our_network_class = self .get_network_class(RoutingDomain::PublicInternet) @@ -985,11 +1010,11 @@ impl NetworkManager { let reverse_dif = self .get_inbound_dial_info_filter(RoutingDomain::PublicInternet) .filtered( - target_node_ref + &target_node_ref .node_info_outbound_filter(RoutingDomain::PublicInternet), ); if let Some(reverse_did) = routing_table.first_filtered_dial_info_detail( - Some(RoutingDomain::PublicInternet), + RoutingDomainSet::only(RoutingDomain::PublicInternet), &reverse_dif, ) { // Ensure we aren't on the same public IP address (no hairpin nat) @@ -1010,16 +1035,16 @@ impl NetworkManager { // Does the target have a direct udp dialinfo we can reach? let udp_target_nr = target_node_ref.filtered_clone( - DialInfoFilter::global().with_protocol_type(ProtocolType::UDP), + NodeRefFilter::new().with_protocol_type(ProtocolType::UDP), ); - if let Some(target_udp_dialinfo_detail) = udp_target_nr - .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) + if let Some(target_udp_dialinfo_detail) = + udp_target_nr.first_filtered_dial_info_detail() { // Does the self node have a direct udp dialinfo the target can reach? let inbound_udp_dif = self .get_inbound_dial_info_filter(RoutingDomain::PublicInternet) .filtered( - target_node_ref + &target_node_ref .node_info_outbound_filter(RoutingDomain::PublicInternet), ) .filtered( @@ -1027,7 +1052,7 @@ impl NetworkManager { ); if let Some(self_udp_dialinfo_detail) = routing_table .first_filtered_dial_info_detail( - Some(RoutingDomain::PublicInternet), + RoutingDomainSet::only(RoutingDomain::PublicInternet), &inbound_udp_dif, ) { @@ -1056,7 +1081,7 @@ impl NetworkManager { { // Can we reach the full relay? if target_inbound_relay_nr - .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) + .first_filtered_dial_info_detail() .is_some() { return ContactMethod::InboundRelay(target_inbound_relay_nr); @@ -1078,15 +1103,13 @@ impl NetworkManager { fn get_contact_method_local(&self, target_node_ref: NodeRef) -> ContactMethod { // Scope noderef down to protocols we can do outbound let local_outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::LocalNetwork); - let target_node_ref = target_node_ref.filtered_clone(local_outbound_dif); + let target_node_ref = target_node_ref.filtered_clone(NodeRefFilter::local_outbound_dif); // Get the best matching local direct dial info if we have it - // ygjghhtiygiukuymyg if target_node_ref.is_filter_dead() { return ContactMethod::Unreachable; } - let opt_target_local_did = - target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::LocalNetwork)); + let opt_target_local_did = target_node_ref.first_filtered_dial_info_detail(); if let Some(target_local_did) = opt_target_local_did { return ContactMethod::Direct(target_local_did.dial_info); } @@ -1096,21 +1119,18 @@ impl NetworkManager { // Figure out how to reach a node #[instrument(level = "trace", skip(self), ret)] pub(crate) fn get_contact_method(&self, target_node_ref: NodeRef) -> ContactMethod { - // Try local first - let out = self.get_contact_method_local(target_node_ref.clone()); - if !matches!(out, ContactMethod::Unreachable) { - return out; - } + let routing_domain = match target_node_ref.best_routing_domain() { + Some(rd) => rd, + None => { + log_net!("no routing domain for node {:?}", target_node_ref); + return ContactMethod::Unreachable; + } + }; - // Try public next - let out = self.get_contact_method_public(target_node_ref.clone()); - if !matches!(out, ContactMethod::Unreachable) { - return out; + match routing_domain { + RoutingDomain::LocalNetwork => self.get_contact_method_local(target_node_ref), + RoutingDomain::PublicInternet => self.get_contact_method_public(target_node_ref), } - - // Otherwise, we can't reach this node - log_net!("unable to reach node {:?}", target_node_ref); - ContactMethod::Unreachable } // Send a reverse connection signal and wait for the return receipt over it @@ -1461,6 +1481,18 @@ impl NetworkManager { return Ok(false); } + // Get the routing domain for this data + let routing_domain = match self + .routing_table() + .routing_domain_for_address(connection_descriptor.remote().address()) + { + Some(rd) => rd, + None => { + log_net!(debug "no routing domain for envelope received from {:?}", connection_descriptor); + return Ok(false); + } + }; + // Is this a direct bootstrap request instead of an envelope? if data[0..4] == *BOOT_MAGIC { network_result_value_or_log!(debug self.handle_boot_request(connection_descriptor).await? => {}); @@ -1588,7 +1620,13 @@ impl NetworkManager { // xxx: deal with spoofing and flooding here? // Pass message to RPC system - rpc.enqueue_message(envelope, body, source_noderef, connection_descriptor)?; + rpc.enqueue_message( + envelope, + body, + source_noderef, + connection_descriptor, + routing_domain, + )?; // Inform caller that we dealt with the envelope locally Ok(true) @@ -1669,7 +1707,7 @@ impl NetworkManager { // Determine if a local IP address has changed // this means we should restart the low level network and and recreate all of our dial info // Wait until we have received confirmation from N different peers - pub fn report_local_socket_address( + pub fn report_local_network_socket_address( &self, _socket_address: SocketAddress, _connection_descriptor: ConnectionDescriptor, @@ -1681,7 +1719,7 @@ impl NetworkManager { // Determine if a global IP address has changed // this means we should recreate our public dial info if it is not static and rediscover it // Wait until we have received confirmation from N different peers - pub fn report_global_socket_address( + pub fn report_public_internet_socket_address( &self, socket_address: SocketAddress, // the socket address as seen by the remote peer connection_descriptor: ConnectionDescriptor, // the connection descriptor used diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 95136b18..e66244ed 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -603,6 +603,29 @@ impl Network { // initialize interfaces self.unlocked_inner.interfaces.refresh().await?; + // build the set of networks we should consider for the 'LocalNetwork' routing domain + let mut local_networks: HashSet<(IpAddr, IpAddr)> = HashSet::new(); + self.unlocked_inner + .interfaces + .with_interfaces(|interfaces| { + for (name, intf) in interfaces { + // Skip networks that we should never encounter + if intf.is_loopback() || !intf.is_running() { + return; + } + // Add network to local networks table + for addr in intf.addrs { + let netmask = addr.if_addr().netmask(); + let network_ip = ipaddr_apply_netmask(addr.if_addr().ip(), netmask); + local_networks.insert((network_ip, netmask)); + } + } + }); + let local_networks: Vec<(IpAddr, IpAddr)> = local_networks.into_iter().collect(); + self.unlocked_inner + .routing_table + .configure_local_network_routing_domain(local_networks); + // determine if we have ipv4/ipv6 addresses { let mut inner = self.inner.lock(); diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index e61ca869..dc1e814a 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -367,13 +367,12 @@ xxx write routing table sieve for routing domain from dialinfo and local network pub(super) async fn start_ws_listeners(&self) -> EyreResult<()> { trace!("starting ws listeners"); let routing_table = self.routing_table(); - let (listen_address, url, path, enable_local_peer_scope, detect_address_changes) = { + let (listen_address, url, path, detect_address_changes) = { let c = self.config.get(); ( c.network.protocol.ws.listen_address.clone(), c.network.protocol.ws.url.clone(), c.network.protocol.ws.path.clone(), - c.network.enable_local_peer_scope, c.network.detect_address_changes, ) }; @@ -586,12 +585,11 @@ xxx write routing table sieve for routing domain from dialinfo and local network trace!("starting tcp listeners"); let routing_table = self.routing_table(); - let (listen_address, public_address, enable_local_peer_scope, detect_address_changes) = { + let (listen_address, public_address, detect_address_changes) = { let c = self.config.get(); ( c.network.protocol.tcp.listen_address.clone(), c.network.protocol.tcp.public_address.clone(), - c.network.enable_local_peer_scope, c.network.detect_address_changes, ) }; diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index 01c705cf..3d6af1b6 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -197,7 +197,11 @@ impl NetworkManager { let routing_table = routing_table.clone(); unord.push( // lets ask bootstrap to find ourselves now - async move { routing_table.reverse_find_node(nr, true).await }, + async move { + routing_table + .reverse_find_node(RoutingDomain::PublicInternet, nr, true) + .await + }, ); } } @@ -299,7 +303,9 @@ impl NetworkManager { unord.push(async move { // Need VALID signed peer info, so ask bootstrap to find_node of itself // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = routing_table.find_target(nr.clone()).await; + let _ = routing_table + .find_target(RoutingDomain::PublicInternet, nr.clone()) + .await; // Ensure we got the signed peer info if !nr.has_valid_signed_node_info(Some(RoutingDomain::PublicInternet)) { @@ -310,7 +316,9 @@ impl NetworkManager { // If this node info is invalid, it will time out after being unpingable } else { // otherwise this bootstrap is valid, lets ask it to find ourselves now - routing_table.reverse_find_node(nr, true).await + routing_table + .reverse_find_node(RoutingDomain::PublicInternet, nr, true) + .await } }); } @@ -324,33 +332,35 @@ impl NetworkManager { // Ping each node in the routing table if they need to be pinged // to determine their reliability #[instrument(level = "trace", skip(self), err)] - pub(super) async fn ping_validator_task_routine( - self, - stop_token: StopToken, - _last_ts: u64, + fn ping_validator_public_internet( + &self, cur_ts: u64, + unord: &mut FuturesUnordered, ) -> EyreResult<()> { let rpc = self.rpc_processor(); let routing_table = self.routing_table(); - let mut unord = FuturesUnordered::new(); + // Get all nodes needing pings in the PublicInternet routing domain + let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); - let node_refs = routing_table.get_nodes_needing_ping(cur_ts); + // Look up any NAT mappings we may need to try to preserve with keepalives let mut mapped_port_info = routing_table.get_mapped_port_info(); - let opt_public_internet_relay_nr = routing_table.relay_node(RoutingDomain::PublicInternet); - let opt_public_internet_relay_id = opt_public_internet_relay_nr.map(|nr| nr.node_id()); - // let opt_local_network_relay_nr = routing_table.relay_node(RoutingDomain::LocalNetwork); - // let opt_local_network_relay_id = opt_local_network_relay_nr.map(|nr| nr.node_id()); - // Public Internet Routing Domain + // Get the PublicInternet relay if we are using one + let opt_relay_nr = routing_table.relay_node(RoutingDomain::PublicInternet); + let opt_relay_id = opt_relay_nr.map(|nr| nr.node_id()); + + // Get our publicinternet dial info let dids = routing_table.all_filtered_dial_info_details( - Some(RoutingDomain::PublicInternet), - &DialInfoFilter::global(), + RoutingDomainSet::only(RoutingDomain::PublicInternet), + &DialInfoFilter::all(), ); + // For all nodes needing pings, figure out how many and over what protocols for nr in node_refs { - let rpc = rpc.clone(); - if Some(nr.node_id()) == opt_public_internet_relay_id { + // If this is a relay, let's check for NAT keepalives + let mut did_pings = false; + if Some(nr.node_id()) == opt_relay_id { // Relay nodes get pinged over all protocols we have inbound dialinfo for // This is so we can preserve the inbound NAT mappings at our router for did in &dids { @@ -370,19 +380,84 @@ impl NetworkManager { }; if needs_ping { let rpc = rpc.clone(); - let dif = did.dial_info.make_filter(true); + let dif = did.dial_info.make_filter(); let nr_filtered = nr.filtered_clone(dif); log_net!("--> Keepalive ping to {:?}", nr_filtered); - unord.push(async move { rpc.rpc_call_status(nr_filtered).await }.boxed()); + unord.push( + async move { + rpc.rpc_call_status(Some(routing_domain), nr_filtered).await + } + .boxed(), + ); + did_pings = true; } } - } else { - // Just do a single ping with the best protocol for all the other nodes - unord.push(async move { rpc.rpc_call_status(nr).await }.boxed()); + } + // Just do a single ping with the best protocol for all the other nodes, + // ensuring that we at least ping a relay with -something- even if we didnt have + // any mapped ports to preserve + if !did_pings { + let rpc = rpc.clone(); + unord.push( + async move { rpc.rpc_call_status(Some(routing_domain), nr).await }.boxed(), + ); } } - // Wait for futures to complete + Ok(()) + } + + // Ping each node in the LocalNetwork routing domain if they + // need to be pinged to determine their reliability + #[instrument(level = "trace", skip(self), err)] + fn ping_validator_local_network( + &self, + cur_ts: u64, + unord: &mut FuturesUnordered, + ) -> EyreResult<()> { + let rpc = self.rpc_processor(); + let routing_table = self.routing_table(); + + // Get all nodes needing pings in the LocalNetwork routing domain + let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::LocalNetwork, cur_ts); + + // Get our LocalNetwork dial info + let dids = routing_table.all_filtered_dial_info_details( + RoutingDomainSet::only(RoutingDomain::LocalNetwork), + &DialInfoFilter::all(), + ); + + // For all nodes needing pings, figure out how many and over what protocols + for nr in node_refs { + let rpc = rpc.clone(); + + // Just do a single ping with the best protocol for all the nodes + unord.push(async move { rpc.rpc_call_status(Some(routing_domain), nr).await }.boxed()); + } + + Ok(()) + } + + // Ping each node in the routing table if they need to be pinged + // to determine their reliability + #[instrument(level = "trace", skip(self), err)] + pub(super) async fn ping_validator_task_routine( + self, + stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> EyreResult<()> { + let rpc = self.rpc_processor(); + let routing_table = self.routing_table(); + let mut unord = FuturesUnordered::new(); + + // PublicInternet + self.ping_validator_public_internet(cur_ts, &mut unord)?; + + // LocalNetwork + self.ping_validator_local_network(cur_ts, &mut unord)?; + + // Wait for ping futures to complete in parallel while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} Ok(()) @@ -390,24 +465,38 @@ impl NetworkManager { // Ask our remaining peers to give us more peers before we go // back to the bootstrap servers to keep us from bothering them too much + // This only adds PublicInternet routing domain peers. The discovery + // mechanism for LocalNetwork suffices for locating all the local network + // peers that are available. This, however, may query other LocalNetwork + // nodes for their PublicInternet peers, which is a very fast way to get + // a new node online. #[instrument(level = "trace", skip(self), err)] pub(super) async fn peer_minimum_refresh_task_routine( self, stop_token: StopToken, ) -> EyreResult<()> { let routing_table = self.routing_table(); - let cur_ts = intf::get_timestamp(); + let mut unord = FuturesOrdered::new(); + let min_peer_count = { + let c = self.config.get(); + c.network.dht.min_peer_count as usize + }; - // get list of all peers we know about, even the unreliable ones, and ask them to find nodes close to our node too - let noderefs = routing_table.get_all_nodes(cur_ts); - - // do peer minimum search concurrently - let mut unord = FuturesUnordered::new(); + // For the PublicInternet routing domain, get list of all peers we know about + // even the unreliable ones, and ask them to find nodes close to our node too + let noderefs = routing_table.find_fastest_nodes( + min_peer_count, + None, + |k: DHTKey, v: Option>| { + NodeRef::new(self.clone(), k, v.unwrap().clone(), None) + }, + ); for nr in noderefs { - log_net!("--- peer minimum search with {:?}", nr); let routing_table = routing_table.clone(); unord.push(async move { routing_table.reverse_find_node(nr, false).await }); } + + // do peer minimum search in order from fastest to slowest while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} Ok(()) diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 6c63ee0a..71d3a6cd 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -39,7 +39,7 @@ pub enum BucketEntryState { } #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] -struct LastConnectionKey(ProtocolType, AddressType); +struct LastConnectionKey(RoutingDomain, ProtocolType, AddressType); /// Bucket entry information specific to the LocalNetwork RoutingDomain #[derive(Debug)] @@ -136,10 +136,18 @@ impl BucketEntryInner { move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2) } + pub fn clear_signed_node_info(&mut self, routing_domain: RoutingDomain) { + // Get the correct signed_node_info for the chosen routing domain + let opt_current_sni = match routing_domain { + RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + }; + *opt_current_sni = None; + } + // Retuns true if the node info changed pub fn update_signed_node_info( &mut self, - routing_domain: RoutingDomain, signed_node_info: SignedNodeInfo, allow_invalid_signature: bool, ) { @@ -150,7 +158,7 @@ impl BucketEntryInner { } // Get the correct signed_node_info for the chosen routing domain - let opt_current_sni = match routing_domain { + let opt_current_sni = match signed_node_info.routing_domain { RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, }; @@ -160,7 +168,8 @@ impl BucketEntryInner { // If the timestamp hasn't changed or is less, ignore this update if signed_node_info.timestamp <= current_sni.timestamp { // If we received a node update with the same timestamp - // we can make this node live again, but only if our network hasn't changed + // we can make this node live again, but only if our network has recently changed + // which may make nodes that were unreachable now reachable with the same dialinfo if !self.updated_since_last_network_change && signed_node_info.timestamp == current_sni.timestamp { @@ -185,50 +194,34 @@ impl BucketEntryInner { self.touch_last_seen(intf::get_timestamp()); } - pub fn has_node_info(&self, opt_routing_domain: Option) -> bool { - if let Some(routing_domain) = opt_routing_domain { + pub fn has_node_info(&self, routing_domain_set: RoutingDomainSet) -> bool { + for routing_domain in routing_domain_set { // Get the correct signed_node_info for the chosen routing domain let opt_current_sni = match routing_domain { RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, }; - opt_current_sni.is_some() - } else { - if self.local_network.signed_node_info.is_some() { - true - } else if self.public_internet.signed_node_info.is_some() { - true - } else { - false + if opt_current_sni.is_some() { + return true; } } + false } - pub fn has_valid_signed_node_info(&self, opt_routing_domain: Option) -> bool { - if let Some(routing_domain) = opt_routing_domain { + pub fn has_valid_signed_node_info(&self, routing_domain_set: RoutingDomainSet) -> bool { + for routing_domain in routing_domain_set { // Get the correct signed_node_info for the chosen routing domain let opt_current_sni = match routing_domain { RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, }; if let Some(sni) = opt_current_sni { - sni.is_valid() - } else { - false - } - } else { - if let Some(sni) = self.local_network.signed_node_info { if sni.is_valid() { return true; } } - if let Some(sni) = self.public_internet.signed_node_info { - if sni.is_valid() { - return true; - } - } - false } + false } pub fn node_info(&self, routing_domain: RoutingDomain) -> Option { @@ -250,8 +243,28 @@ impl BucketEntryInner { }) } - fn descriptor_to_key(last_connection: ConnectionDescriptor) -> LastConnectionKey { + pub fn best_routing_domain( + &self, + routing_domain_set: RoutingDomainSet, + ) -> Option { + for routing_domain in routing_domain_set { + let opt_current_sni = match routing_domain { + RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + }; + if let Some(sni) = opt_current_sni { + if sni.is_valid() { + return Some(routing_domain); + } + } + } + None + } + + fn descriptor_to_key(&self, last_connection: ConnectionDescriptor) -> LastConnectionKey { + let routing_domain = self.routing_domain_for_address(last_connection.remote().address()); LastConnectionKey( + routing_domain, last_connection.protocol_type(), last_connection.address_type(), ) @@ -259,7 +272,7 @@ impl BucketEntryInner { // Stores a connection descriptor in this entry's table of last connections pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: u64) { - let key = Self::descriptor_to_key(last_connection); + let key = self.descriptor_to_key(last_connection); self.last_connections .insert(key, (last_connection, timestamp)); } @@ -269,18 +282,21 @@ impl BucketEntryInner { self.last_connections.clear(); } - // Gets the best 'last connection' that matches a set of protocol types and address types + // Gets the best 'last connection' that matches a set of routing domain, protocol types and address types pub fn last_connection( &self, + routing_domain_set: RoutingDomainSet, dial_info_filter: Option, ) -> Option<(ConnectionDescriptor, u64)> { // Iterate peer scopes and protocol types and address type in order to ensure we pick the preferred protocols if all else is the same let dif = dial_info_filter.unwrap_or_default(); - for pt in dif.protocol_type_set { - for at in dif.address_type_set { - let key = LastConnectionKey(pt, at); - if let Some(v) = self.last_connections.get(&key) { - return Some(*v); + for rd in routing_domain_set { + for pt in dif.protocol_type_set { + for at in dif.address_type_set { + let key = LastConnectionKey(rd, pt, at); + if let Some(v) = self.last_connections.get(&key) { + return Some(*v); + } } } } @@ -325,7 +341,7 @@ impl BucketEntryInner { .node_status .map(|ln| NodeStatus::LocalNetwork(ln)), RoutingDomain::PublicInternet => self - .local_network + .public_internet .node_status .map(|pi| NodeStatus::PublicInternet(pi)), } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index a17a98f6..0c8b52a1 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -122,7 +122,7 @@ impl RoutingTable { let entry = v.unwrap(); entry.with(|e| { // skip nodes on our local network here - if e.has_node_info(Some(RoutingDomain::LocalNetwork)) { + if e.has_node_info(RoutingDomainSet::only(RoutingDomain::LocalNetwork)) { return false; } @@ -130,25 +130,22 @@ impl RoutingTable { let filter = |n: NodeInfo| { let mut keep = false; for did in n.dial_info_detail_list { - if did.dial_info.is_global() { - if matches!(did.dial_info.address_type(), AddressType::IPV4) { - for (n, protocol_type) in protocol_types.iter().enumerate() { - if nodes_proto_v4[n] < max_per_type - && did.dial_info.protocol_type() == *protocol_type - { - nodes_proto_v4[n] += 1; - keep = true; - } + if matches!(did.dial_info.address_type(), AddressType::IPV4) { + for (n, protocol_type) in protocol_types.iter().enumerate() { + if nodes_proto_v4[n] < max_per_type + && did.dial_info.protocol_type() == *protocol_type + { + nodes_proto_v4[n] += 1; + keep = true; } - } else if matches!(did.dial_info.address_type(), AddressType::IPV6) - { - for (n, protocol_type) in protocol_types.iter().enumerate() { - if nodes_proto_v6[n] < max_per_type - && did.dial_info.protocol_type() == *protocol_type - { - nodes_proto_v6[n] += 1; - keep = true; - } + } + } else if matches!(did.dial_info.address_type(), AddressType::IPV6) { + for (n, protocol_type) in protocol_types.iter().enumerate() { + if nodes_proto_v6[n] < max_per_type + && did.dial_info.protocol_type() == *protocol_type + { + nodes_proto_v6[n] += 1; + keep = true; } } } @@ -168,48 +165,16 @@ impl RoutingTable { ) } - // Get our own node's peer info (public node info) so we can share it with other nodes - pub fn get_own_peer_info(&self, routing_domain: RoutingDomain) -> PeerInfo { - PeerInfo::new( - NodeId::new(self.node_id()), - self.get_own_signed_node_info(routing_domain), - ) - } - - pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedNodeInfo { - let node_id = NodeId::new(self.node_id()); - let secret = self.node_id_secret(); - SignedNodeInfo::with_secret(self.get_own_node_info(routing_domain), node_id, &secret) - .unwrap() - } - - pub fn get_own_node_info(&self, routing_domain: RoutingDomain) -> NodeInfo { - let netman = self.network_manager(); - let relay_node = self.relay_node(routing_domain); - let pc = netman.get_protocol_config(); - NodeInfo { - network_class: netman - .get_network_class(routing_domain) - .unwrap_or(NetworkClass::Invalid), - outbound_protocols: pc.outbound, - address_types: pc.family_global, - min_version: MIN_VERSION, - max_version: MAX_VERSION, - dial_info_detail_list: self.dial_info_details(routing_domain), - relay_peer_info: relay_node.and_then(|rn| rn.peer_info(routing_domain).map(Box::new)), - } - } - pub fn filter_has_valid_signed_node_info( &self, v: Option>, own_peer_info_is_valid: bool, - opt_routing_domain: Option, + routing_domain_set: RoutingDomainSet, ) -> bool { let routing_table = self.clone(); match v { None => own_peer_info_is_valid, - Some(entry) => entry.with(|e| e.has_valid_signed_node_info(opt_routing_domain)), + Some(entry) => entry.with(|e| e.has_valid_signed_node_info(routing_domain_set)), } } @@ -425,7 +390,7 @@ impl RoutingTable { let mut protocol_to_port = BTreeMap::<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>::new(); let our_dids = self.all_filtered_dial_info_details( - Some(RoutingDomain::PublicInternet), + RoutingDomainSet::only(RoutingDomain::PublicInternet), &DialInfoFilter::all(), ); for did in our_dids { @@ -452,12 +417,12 @@ impl RoutingTable { // Get all our outbound protocol/address types let outbound_dif = self .network_manager() - .get_outbound_dial_info_filter(RoutingDomain::PublicInternet); + .get_outbound_node_ref_filter(RoutingDomain::PublicInternet); let mapped_port_info = self.get_mapped_port_info(); move |e: &BucketEntryInner| { // Ensure this node is not on the local network - if e.has_node_info(Some(RoutingDomain::LocalNetwork)) { + if e.has_node_info(RoutingDomainSet::only(RoutingDomain::LocalNetwork)) { return false; } @@ -542,11 +507,7 @@ impl RoutingTable { } #[instrument(level = "trace", skip(self), ret)] - pub fn register_find_node_answer( - &self, - routing_domain: RoutingDomain, - peers: Vec, - ) -> Vec { + pub fn register_find_node_answer(&self, peers: Vec) -> Vec { let node_id = self.node_id(); // register nodes we'd found @@ -566,7 +527,7 @@ impl RoutingTable { // register the node if it's new if let Some(nr) = self.register_node_with_signed_node_info( - routing_domain, + RoutingDomain::PublicInternet, p.node_id.key, p.signed_node_info.clone(), false, @@ -580,7 +541,6 @@ impl RoutingTable { #[instrument(level = "trace", skip(self), ret, err)] pub async fn find_node( &self, - routing_domain: RoutingDomain, node_ref: NodeRef, node_id: DHTKey, ) -> EyreResult>> { @@ -589,18 +549,13 @@ impl RoutingTable { let res = network_result_try!( rpc_processor .clone() - .rpc_call_find_node( - Destination::direct(node_ref.clone()).with_routing_domain(routing_domain), - node_id, - None, - rpc_processor.make_respond_to_sender(routing_domain, node_ref.clone()), - ) + .rpc_call_find_node(Destination::direct(node_ref), node_id,) .await? ); // register nodes we'd found Ok(NetworkResult::value( - self.register_find_node_answer(routing_domain, res.answer), + self.register_find_node_answer(res.answer), )) } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 6ed25b7c..4c062c73 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -3,6 +3,7 @@ mod bucket_entry; mod debug; mod find_nodes; mod node_ref; +mod routing_domains; mod stats_accounting; mod tasks; @@ -17,34 +18,31 @@ pub use debug::*; pub use find_nodes::*; use hashlink::LruCache; pub use node_ref::*; +pub use routing_domains::*; pub use stats_accounting::*; const RECENT_PEERS_TABLE_SIZE: usize = 64; ////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Default)] -pub struct RoutingDomainDetail { - relay_node: Option, - dial_info_details: Vec, -} - #[derive(Debug, Clone, Copy)] pub struct RecentPeersEntry { last_connection: ConnectionDescriptor, } +/// RoutingTable rwlock-internal data struct RoutingTableInner { network_manager: NetworkManager, - node_id: DHTKey, // The current node's public DHT key + // The current node's public DHT key + node_id: DHTKey, node_id_secret: DHTKeySecret, // The current node's DHT key secret buckets: Vec, // Routing table buckets that hold entries kick_queue: BTreeSet, // Buckets to kick on our next kick task bucket_entry_count: usize, // A fast counter for the number of entries in the table, total - public_internet_routing_domain: RoutingDomainDetail, // The dial info we use on the public internet - local_network_routing_domain: RoutingDomainDetail, // The dial info we use on the local network + public_internet_routing_domain: PublicInternetRoutingDomainDetail, // The public internet + local_network_routing_domain: LocalInternetRoutingDomainDetail, // The dial info we use on the local network self_latency_stats_accounting: LatencyStatsAccounting, // Interim accounting mechanism for this node's RPC latency to any other node self_transfer_stats_accounting: TransferStatsAccounting, // Interim accounting mechanism for the total bandwidth to/from this node @@ -80,8 +78,8 @@ impl RoutingTable { node_id_secret: DHTKeySecret::default(), buckets: Vec::new(), kick_queue: BTreeSet::default(), - public_internet_routing_domain: RoutingDomainDetail::default(), - local_network_routing_domain: RoutingDomainDetail::default(), + public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), + local_network_routing_domain: LocalInternetRoutingDomainDetail::default(), bucket_entry_count: 0, self_latency_stats_accounting: LatencyStatsAccounting::new(), self_transfer_stats_accounting: TransferStatsAccounting::new(), @@ -140,9 +138,21 @@ impl RoutingTable { self.inner.read().node_id_secret } + pub fn routing_domain_for_address(&self, address: Address) -> Option { + let inner = self.inner.read(); + for rd in RoutingDomain::all() { + let can_contain = + Self::with_routing_domain(&*inner, rd, |rdd| rdd.can_contain_address(address)); + if can_contain { + return Some(rd); + } + } + None + } + fn with_routing_domain(inner: &RoutingTableInner, domain: RoutingDomain, f: F) -> R where - F: FnOnce(&RoutingDomainDetail) -> R, + F: FnOnce(&dyn RoutingDomainDetail) -> R, { match domain { RoutingDomain::PublicInternet => f(&inner.public_internet_routing_domain), @@ -156,7 +166,7 @@ impl RoutingTable { f: F, ) -> R where - F: FnOnce(&mut RoutingDomainDetail) -> R, + F: FnOnce(&mut dyn RoutingDomainDetail) -> R, { match domain { RoutingDomain::PublicInternet => f(&mut inner.public_internet_routing_domain), @@ -166,79 +176,56 @@ impl RoutingTable { pub fn relay_node(&self, domain: RoutingDomain) -> Option { let inner = self.inner.read(); - Self::with_routing_domain(&*inner, domain, |rd| rd.relay_node.clone()) + Self::with_routing_domain(&*inner, domain, |rd| rd.relay_node()) } pub fn set_relay_node(&self, domain: RoutingDomain, opt_relay_node: Option) { let inner = self.inner.write(); - Self::with_routing_domain(&mut *inner, domain, |rd| rd.relay_node = opt_relay_node); + Self::with_routing_domain_mut(&mut *inner, domain, |rd| rd.set_relay_node(opt_relay_node)); } pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { let inner = self.inner.read(); - Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details.is_empty()) + Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details().is_empty()) } pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec { let inner = self.inner.read(); - Self::with_routing_domain(&*inner, domain, |rd| rd.dial_info_details.clone()) + Self::with_routing_domain(&*inner, domain, |rd| rd.dial_info_details().clone()) } pub fn first_filtered_dial_info_detail( &self, - domain: Option, + routing_domain_set: RoutingDomainSet, filter: &DialInfoFilter, ) -> Option { let inner = self.inner.read(); - // Prefer local network first if it isn't filtered out - if domain == None || domain == Some(RoutingDomain::LocalNetwork) { - Self::with_routing_domain(&*inner, RoutingDomain::LocalNetwork, |rd| { - for did in &rd.dial_info_details { + for routing_domain in routing_domain_set { + let did = Self::with_routing_domain(&*inner, routing_domain, |rd| { + for did in rd.dial_info_details() { if did.matches_filter(filter) { return Some(did.clone()); } } None - }) - } else { - None - } - .or_else(|| { - if domain == None || domain == Some(RoutingDomain::PublicInternet) { - Self::with_routing_domain(&*inner, RoutingDomain::PublicInternet, |rd| { - for did in &rd.dial_info_details { - if did.matches_filter(filter) { - return Some(did.clone()); - } - } - None - }) - } else { - None + }); + if did.is_some() { + return did; } - }) + } + None } pub fn all_filtered_dial_info_details( &self, - domain: Option, + routing_domain_set: RoutingDomainSet, filter: &DialInfoFilter, ) -> Vec { let inner = self.inner.read(); let mut ret = Vec::new(); - - if domain == None || domain == Some(RoutingDomain::LocalNetwork) { - Self::with_routing_domain(&*inner, RoutingDomain::LocalNetwork, |rd| { - for did in &rd.dial_info_details { - if did.matches_filter(filter) { - ret.push(did.clone()); - } - } - }); - } - if domain == None || domain == Some(RoutingDomain::PublicInternet) { - Self::with_routing_domain(&*inner, RoutingDomain::PublicInternet, |rd| { - for did in &rd.dial_info_details { + for routing_domain in routing_domain_set { + Self::with_routing_domain(&*inner, routing_domain, |rd| { + for did in rd.dial_info_details() { if did.matches_filter(filter) { ret.push(did.clone()); } @@ -250,17 +237,13 @@ impl RoutingTable { } pub fn ensure_dial_info_is_valid(&self, domain: RoutingDomain, dial_info: &DialInfo) -> bool { - let enable_local_peer_scope = { - let config = self.network_manager().config(); - let c = config.get(); - c.network.enable_local_peer_scope - }; + let address = dial_info.socket_address().address(); + let inner = self.inner.read(); + let can_contain_address = + Self::with_routing_domain(&*inner, domain, |rd| rd.can_contain_address(address)); - if !enable_local_peer_scope - && matches!(domain, RoutingDomain::PublicInternet) - && dial_info.is_local() - { - log_rtab!(debug "shouldn't be registering local addresses as public"); + if !can_contain_address { + log_rtab!(debug "can not add dial info to this routing domain"); return false; } if !dial_info.is_valid() { @@ -281,25 +264,20 @@ impl RoutingTable { class: DialInfoClass, ) -> EyreResult<()> { if !self.ensure_dial_info_is_valid(domain, &dial_info) { - return Err(eyre!("dial info is not valid")); + return Err(eyre!("dial info is not valid in this routing domain")); } let mut inner = self.inner.write(); Self::with_routing_domain_mut(&mut *inner, domain, |rd| { - rd.dial_info_details.push(DialInfoDetail { + rd.add_dial_info_detail(DialInfoDetail { dial_info: dial_info.clone(), class, }); - rd.dial_info_details.sort(); }); - let domain_str = match domain { - RoutingDomain::PublicInternet => "Public", - RoutingDomain::LocalNetwork => "Local", - }; info!( - "{} Dial Info: {}", - domain_str, + "{:?} Dial Info: {}", + domain, NodeDialInfo { node_id: NodeId::new(inner.node_id), dial_info @@ -308,19 +286,18 @@ impl RoutingTable { ); debug!(" Class: {:?}", class); - // Public dial info changed, go through all nodes and reset their 'seen our node info' bit - if matches!(domain, RoutingDomain::PublicInternet) { - Self::reset_all_seen_our_node_info(&mut *inner); - Self::reset_all_updated_since_last_network_change(&mut *inner); - } + Self::reset_all_seen_our_node_info(&mut *inner, domain); + Self::reset_all_updated_since_last_network_change(&mut *inner); Ok(()) } - fn reset_all_seen_our_node_info(inner: &mut RoutingTableInner) { + fn reset_all_seen_our_node_info(inner: &mut RoutingTableInner, routing_domain: RoutingDomain) { let cur_ts = intf::get_timestamp(); Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { - v.with_mut(|e| e.set_seen_our_node_info(false)); + v.with_mut(|e| { + e.set_seen_our_node_info(routing_domain, false); + }); Option::<()>::None }); } @@ -333,20 +310,57 @@ impl RoutingTable { }); } - pub fn clear_dial_info_details(&self, domain: RoutingDomain) { - trace!("clearing dial info domain: {:?}", domain); + pub fn clear_dial_info_details(&self, routing_domain: RoutingDomain) { + trace!("clearing dial info domain: {:?}", routing_domain); let mut inner = self.inner.write(); - Self::with_routing_domain_mut(&mut *inner, domain, |rd| { - rd.dial_info_details.clear(); + Self::with_routing_domain_mut(&mut *inner, routing_domain, |rd| { + rd.clear_dial_info_details(); }); // Public dial info changed, go through all nodes and reset their 'seen our node info' bit - if matches!(domain, RoutingDomain::PublicInternet) { - Self::reset_all_seen_our_node_info(&mut *inner); + Self::reset_all_seen_our_node_info(&mut *inner, routing_domain); + } + + pub fn get_own_peer_info(&self, routing_domain: RoutingDomain) -> PeerInfo { + PeerInfo::new( + NodeId::new(self.node_id()), + self.get_own_signed_node_info(routing_domain), + ) + } + + pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedNodeInfo { + let node_id = NodeId::new(self.node_id()); + let secret = self.node_id_secret(); + SignedNodeInfo::with_secret(self.get_own_node_info(routing_domain), node_id, &secret) + .unwrap() + } + + pub fn get_own_node_info(&self, routing_domain: RoutingDomain) -> NodeInfo { + let netman = self.network_manager(); + let relay_node = self.relay_node(routing_domain); + let pc = netman.get_protocol_config(); + NodeInfo { + network_class: netman + .get_network_class(routing_domain) + .unwrap_or(NetworkClass::Invalid), + outbound_protocols: pc.outbound, + address_types: pc.family_global, + min_version: MIN_VERSION, + max_version: MAX_VERSION, + dial_info_detail_list: self.dial_info_details(routing_domain), + relay_peer_info: relay_node.and_then(|rn| rn.peer_info(routing_domain).map(Box::new)), } } + pub fn has_valid_own_node_info(&self, routing_domain: RoutingDomain) -> bool { + let netman = self.network_manager(); + let nc = netman + .get_network_class(routing_domain) + .unwrap_or(NetworkClass::Invalid); + !matches!(nc, NetworkClass::Invalid) + } + fn bucket_depth(index: usize) -> usize { match index { 0 => 256, @@ -398,6 +412,24 @@ impl RoutingTable { debug!("finished routing table terminate"); } + pub fn configure_local_network_routing_domain(&self, local_networks: Vec<(IpAddr, IpAddr)>) { + let mut inner = self.inner.write(); + let changed = inner + .local_network_routing_domain + .set_local_networks(local_networks); + + // If the local network topology has changed, nuke the existing local node info and let new local discovery happen + if changed { + let cur_ts = intf::get_timestamp(); + Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_rti, e| { + e.with_mut(|e| { + e.clear_signed_node_info(RoutingDomain::LocalNetwork); + }); + Option::<()>::None + }); + } + } + // Attempt to empty the routing table // should only be performed when there are no node_refs (detached) pub fn purge_buckets(&self) { @@ -461,16 +493,28 @@ impl RoutingTable { .unwrap() } - pub fn get_entry_count(&self, min_state: BucketEntryState) -> usize { + pub fn get_entry_count( + &self, + routing_domain_set: RoutingDomainSet, + min_state: BucketEntryState, + ) -> usize { let inner = self.inner.read(); - Self::get_entry_count_inner(&*inner, min_state) + Self::get_entry_count_inner(&*inner, routing_domain_set, min_state) } - fn get_entry_count_inner(inner: &RoutingTableInner, min_state: BucketEntryState) -> usize { + fn get_entry_count_inner( + inner: &RoutingTableInner, + routing_domain_set: RoutingDomainSet, + min_state: BucketEntryState, + ) -> usize { let mut count = 0usize; let cur_ts = intf::get_timestamp(); - Self::with_entries(inner, cur_ts, min_state, |_, _| { - count += 1; + Self::with_entries(inner, cur_ts, min_state, |_, e| { + if e.with(|e| e.best_routing_domain(routing_domain_set)) + .is_some() + { + count += 1; + } Option::<()>::None }); count @@ -505,38 +549,32 @@ impl RoutingTable { Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { // Only update nodes that haven't seen our node info yet if all || !v.with(|e| e.has_seen_our_node_info(routing_domain)) { - node_refs.push(NodeRef::new( - self.clone(), - k, - v, - RoutingDomainSet::only(routing_domain), - None, - )); + node_refs.push(NodeRef::new(self.clone(), k, v, None)); } Option::<()>::None }); node_refs } - pub fn get_nodes_needing_ping(&self, cur_ts: u64) -> Vec { + pub fn get_nodes_needing_ping( + &self, + routing_domain: RoutingDomain, + cur_ts: u64, + ) -> Vec { let inner = self.inner.read(); // Collect relay nodes - let mut relays: HashSet = HashSet::new(); - for rd in RoutingDomain::all() { - let opt_relay_id = - Self::with_routing_domain(&*inner, RoutingDomain::PublicInternet, |rd| { - rd.relay_node.map(|rn| rn.node_id()) - }); - if let Some(relay_id) = opt_relay_id { - relays.insert(relay_id); - } - } + let opt_relay_id = Self::with_routing_domain(&*inner, routing_domain, |rd| { + rd.relay_node().map(|rn| rn.node_id()) + }); // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { - if v.with(|e| e.has_node_info(None) && e.needs_ping(&k, cur_ts, relays.contains(&k))) { + if v.with(|e| { + e.has_node_info(RoutingDomainSet::only(routing_domain)) + && e.needs_ping(&k, cur_ts, opt_relay_id == Some(k)) + }) { node_refs.push(NodeRef::new(self.clone(), k, v, None)); } Option::<()>::None @@ -599,7 +637,7 @@ impl RoutingTable { // Kick the bucket inner.kick_queue.insert(idx); - log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, Self::get_entry_count_inner(&mut *inner, BucketEntryState::Unreliable)); + log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, Self::get_entry_count_inner(&mut *inner, RoutingDomainSet::all(), BucketEntryState::Unreliable)); nr } @@ -635,7 +673,6 @@ impl RoutingTable { // and add the dial info we have for it, since that's pretty common pub fn register_node_with_signed_node_info( &self, - routing_domain: RoutingDomain, node_id: DHTKey, signed_node_info: SignedNodeInfo, allow_invalid_signature: bool, @@ -651,9 +688,12 @@ impl RoutingTable { return None; } } - self.create_node_ref(node_id, |e| { - e.update_signed_node_info(routing_domain, signed_node_info, allow_invalid_signature); + e.update_signed_node_info(signed_node_info, allow_invalid_signature); + }) + .map(|mut nr| { + nr.set_filter(Some(NodeRefFilter::new().with_routing_domain(signed_node_info.routing_domain))) + nr }) } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index f86094ca..f02c9260 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -6,11 +6,71 @@ use alloc::fmt; // We should ping them with some frequency and 30 seconds is typical timeout const CONNECTIONLESS_TIMEOUT_SECS: u32 = 29; +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct NodeRefFilter { + pub routing_domain_set: RoutingDomainSet, + pub dial_info_filter: DialInfoFilter, +} + +impl Default for NodeRefFilter { + fn default() -> Self { + self.new() + } +} + +impl NodeRefFilter { + pub fn new() -> Self { + Self { + routing_domain_set: RoutingDomainSet::all(), + dial_info_filter: DialInfoFilter::all(), + } + } + + pub fn with_routing_domain(mut self, routing_domain: RoutingDomain) -> Self { + self.routing_domain_set = RoutingDomainSet::only(routing_domain); + self + } + pub fn with_routing_domain_set(mut self, routing_domain_set: RoutingDomainSet) -> Self { + self.routing_domain_set = routing_domain_set; + self + } + pub fn with_dial_info_filter(mut self, dial_info_filter: DialInfoFilter) -> Self { + self.dial_info_filter = dial_info_filter; + self + } + pub fn with_protocol_type(mut self, protocol_type: ProtocolType) -> Self { + self.dial_info_filter = self.dial_info_filter.with_protocol_type(protocol_type); + self + } + pub fn with_protocol_type_set(mut self, protocol_set: ProtocolTypeSet) -> Self { + self.dial_info_filter = self.dial_info_filter.with_protocol_type_set(protocol_set); + self + } + pub fn with_address_type(mut self, address_type: AddressType) -> Self { + self.dial_info_filter = self.dial_info_filter.with_address_type(address_type); + self + } + pub fn with_address_type_set(mut self, address_set: AddressTypeSet) -> Self { + self.dial_info_filter = self.dial_info_filter.with_address_type_set(address_set); + self + } + pub fn filtered(mut self, other_filter: &NodeRefFilter) -> Self { + self.routing_domain_set &= other_filter.routing_domain_set; + self.dial_info_filter = self + .dial_info_filter + .filtered(&other_filter.dial_info_filter); + self + } + pub fn is_dead(&self) -> bool { + self.dial_info_filter.is_empty() || self.routing_domain_set.is_empty() + } +} + pub struct NodeRef { routing_table: RoutingTable, node_id: DHTKey, entry: Arc, - filter: Option, + filter: Option, #[cfg(feature = "tracking")] track_id: usize, } @@ -20,7 +80,7 @@ impl NodeRef { routing_table: RoutingTable, node_id: DHTKey, entry: Arc, - filter: Option, + filter: Option, ) -> Self { entry.ref_count.fetch_add(1u32, Ordering::Relaxed); @@ -34,43 +94,7 @@ impl NodeRef { } } - pub fn node_id(&self) -> DHTKey { - self.node_id - } - - pub fn filter_ref(&self) -> Option<&DialInfoFilter> { - self.filter.as_ref() - } - - pub fn take_filter(&mut self) -> Option { - self.filter.take() - } - - pub fn set_filter(&mut self, filter: Option) { - self.filter = filter - } - - pub fn merge_filter(&mut self, filter: DialInfoFilter) { - if let Some(self_filter) = self.filter.take() { - self.filter = Some(self_filter.filtered(&filter)); - } else { - self.filter = Some(filter); - } - } - - pub fn filtered_clone(&self, filter: DialInfoFilter) -> Self { - let mut out = self.clone(); - out.merge_filter(filter); - out - } - - pub fn is_filter_dead(&self) -> bool { - if let Some(filter) = &self.filter { - filter.is_dead() - } else { - false - } - } + // Operate on entry accessors pub(super) fn operate(&self, f: F) -> T where @@ -88,17 +112,67 @@ impl NodeRef { self.entry.with_mut(|e| f(inner, e)) } - pub fn peer_info(&self, routing_domain: RoutingDomain) -> Option { - self.operate(|_rti, e| e.peer_info(self.node_id(), routing_domain)) + // Filtering + + pub fn filter_ref(&self) -> Option<&NodeRefFilter> { + self.filter.as_ref() } - pub fn has_valid_signed_node_info(&self, opt_routing_domain: Option) -> bool { - self.operate(|_rti, e| e.has_valid_signed_node_info(opt_routing_domain)) + + pub fn take_filter(&mut self) -> Option { + self.filter.take() } - pub fn has_seen_our_node_info(&self, routing_domain: RoutingDomain) -> bool { - self.operate(|_rti, e| e.has_seen_our_node_info(routing_domain)) + + pub fn set_filter(&mut self, filter: Option) { + self.filter = filter } - pub fn set_seen_our_node_info(&self, routing_domain: RoutingDomain) { - self.operate_mut(|_rti, e| e.set_seen_our_node_info(routing_domain, true)); + + pub fn merge_filter(&mut self, filter: NodeRefFilter) { + if let Some(self_filter) = self.filter.take() { + self.filter = Some(self_filter.filtered(&filter)); + } else { + self.filter = Some(filter); + } + } + + pub fn filtered_clone(&self, filter: NodeRefFilter) -> Self { + let mut out = self.clone(); + out.merge_filter(filter); + out + } + + pub fn is_filter_dead(&self) -> bool { + if let Some(filter) = &self.filter { + filter.is_dead() + } else { + false + } + } + + pub fn routing_domain_set(&self) -> RoutingDomainSet { + self.filter + .map(|f| f.routing_domain_set) + .unwrap_or(RoutingDomainSet::all()) + } + + pub fn best_routing_domain(&self) -> Option { + self.operate(|_rti, e| { + e.best_routing_domain( + self.filter + .map(|f| f.routing_domain_set) + .unwrap_or(RoutingDomainSet::all()), + ) + }) + } + + // Accessors + pub fn routing_table(&self) -> RoutingTable { + self.routing_table.clone() + } + pub fn node_id(&self) -> DHTKey { + self.node_id + } + pub fn has_valid_signed_node_info(&self) -> bool { + self.operate(|_rti, e| e.has_valid_signed_node_info(self.routing_domain_set())) } pub fn has_updated_since_last_network_change(&self) -> bool { self.operate(|_rti, e| e.has_updated_since_last_network_change()) @@ -106,24 +180,31 @@ impl NodeRef { pub fn set_updated_since_last_network_change(&self) { self.operate_mut(|_rti, e| e.set_updated_since_last_network_change(true)); } - pub fn update_node_status(&self, node_status: NodeStatus) { self.operate_mut(|_rti, e| { e.update_node_status(node_status); }); } - pub fn min_max_version(&self) -> Option<(u8, u8)> { self.operate(|_rti, e| e.min_max_version()) } pub fn set_min_max_version(&self, min_max_version: (u8, u8)) { self.operate_mut(|_rti, e| e.set_min_max_version(min_max_version)) } - pub fn state(&self, cur_ts: u64) -> BucketEntryState { self.operate(|_rti, e| e.state(cur_ts)) } + // Per-RoutingDomain accessors + pub fn peer_info(&self, routing_domain: RoutingDomain) -> Option { + self.operate(|_rti, e| e.peer_info(self.node_id(), routing_domain)) + } + pub fn has_seen_our_node_info(&self, routing_domain: RoutingDomain) -> bool { + self.operate(|_rti, e| e.has_seen_our_node_info(routing_domain)) + } + pub fn set_seen_our_node_info(&self, routing_domain: RoutingDomain) { + self.operate_mut(|_rti, e| e.set_seen_our_node_info(routing_domain, true)); + } pub fn network_class(&self, routing_domain: RoutingDomain) -> Option { self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.network_class)) } @@ -143,7 +224,6 @@ impl NodeRef { } dif } - pub fn relay(&self, routing_domain: RoutingDomain) -> Option { let target_rpi = self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.relay_peer_info))?; @@ -155,83 +235,50 @@ impl NodeRef { } // Register relay node and return noderef - self.routing_table - .register_node_with_signed_node_info( - routing_domain, - t.node_id.key, - t.signed_node_info, - false, - ) - .map(|mut nr| { - nr.set_filter(self.filter_ref().cloned()); - nr - }) - }) - } - pub fn first_filtered_dial_info_detail( - &self, - routing_domain: Option, - ) -> Option { - self.operate(|_rt, e| { - // Prefer local dial info first unless it is filtered out - if routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork) { - e.node_info(RoutingDomain::LocalNetwork).and_then(|l| { - l.first_filtered_dial_info_detail(|did| { - if let Some(filter) = self.filter.as_ref() { - did.matches_filter(filter) - } else { - true - } - }) - }) - } else { - None - } - .or_else(|| { - if routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet) { - e.node_info(RoutingDomain::PublicInternet).and_then(|n| { - n.first_filtered_dial_info_detail(|did| { - if let Some(filter) = self.filter.as_ref() { - did.matches_filter(filter) - } else { - true - } - }) - }) - } else { - None - } - }) + self.routing_table.register_node_with_signed_node_info( + t.node_id.key, + t.signed_node_info, + false, + ) }) } - pub fn all_filtered_dial_info_details( - &self, - routing_domain: Option, - ) -> Vec { - let mut out = Vec::new(); + // Filtered accessors + pub fn first_filtered_dial_info_detail(&self) -> Option { + let routing_domain_set = self.routing_domain_set(); self.operate(|_rt, e| { - // Prefer local dial info first unless it is filtered out - if routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork) { - if let Some(ni) = e.node_info(RoutingDomain::LocalNetwork) { - out.append(&mut ni.all_filtered_dial_info_details(|did| { - if let Some(filter) = self.filter.as_ref() { - did.matches_filter(filter) - } else { - true - } - })) + for routing_domain in routing_domain_set { + if let Some(ni) = e.node_info(routing_domain) { + let filter = |did: &DialInfoDetail| { + self.filter + .as_ref() + .map(|f| did.matches_filter(f)) + .unwrap_or(true) + }; + if let Some(did) = ni.first_filtered_dial_info_detail(filter) { + return Some(did); + } } } - if routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet) { - if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) { - out.append(&mut ni.all_filtered_dial_info_details(|did| { - if let Some(filter) = self.filter.as_ref() { - did.matches_filter(filter) - } else { - true - } - })) + None + }) + } + + pub fn all_filtered_dial_info_details(&self) -> Vec { + let routing_domain_set = self.routing_domain_set(); + let mut out = Vec::new(); + self.operate(|_rt, e| { + for routing_domain in routing_domain_set { + if let Some(ni) = e.node_info(routing_domain) { + let filter = |did: &DialInfoDetail| { + self.filter + .as_ref() + .map(|f| did.matches_filter(f)) + .unwrap_or(true) + }; + if let Some(did) = ni.first_filtered_dial_info_detail(filter) { + out.push(did); + } } } }); @@ -241,8 +288,12 @@ impl NodeRef { pub async fn last_connection(&self) -> Option { // Get the last connection and the last time we saw anything with this connection - let (last_connection, last_seen) = - self.operate(|_rti, e| e.last_connection(self.filter.clone()))?; + let (last_connection, last_seen) = self.operate(|_rti, e| { + e.last_connection( + self.filter.routing_domain_set, + self.filter.dial_info_filter.clone(), + ) + })?; // Should we check the connection table? if last_connection.protocol_type().is_connection_oriented() { diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs new file mode 100644 index 00000000..e23966aa --- /dev/null +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -0,0 +1,94 @@ +use super::*; + +/// General trait for all routing domains +pub trait RoutingDomainDetail { + fn can_contain_address(&self, address: Address) -> bool; + fn relay_node(&self) -> Option; + fn set_relay_node(&mut self, opt_relay_node: Option); + fn dial_info_details(&self) -> &Vec; + fn clear_dial_info_details(&mut self); + fn add_dial_info_detail(&mut self, did: DialInfoDetail); +} + +/// Public Internet routing domain internals +#[derive(Debug, Default)] +pub struct PublicInternetRoutingDomainDetail { + /// An optional node we relay through for this domain + relay_node: Option, + /// The dial infos on this domain we can be reached by + dial_info_details: Vec, +} + +impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { + fn can_contain_address(&self, address: Address) -> bool { + address.is_global() + } + fn relay_node(&self) -> Option { + self.relay_node.clone() + } + fn set_relay_node(&mut self, opt_relay_node: Option) { + self.relay_node = opt_relay_node + .map(|nr| nr.filtered_clone(NodeRefFilter::new().with_routing_domain(PublicInternet))) + } + fn dial_info_details(&self) -> &Vec { + &self.dial_info_details + } + fn clear_dial_info_details(&mut self) { + self.dial_info_details.clear(); + } + fn add_dial_info_detail(&mut self, did: DialInfoDetail) { + self.dial_info_details.push(did); + self.dial_info_details.sort(); + } +} + +/// Local Network routing domain internals +#[derive(Debug, Default)] +pub struct LocalInternetRoutingDomainDetail { + /// An optional node we relay through for this domain + relay_node: Option, + /// The dial infos on this domain we can be reached by + dial_info_details: Vec, + /// The local networks this domain will communicate with + local_networks: Vec<(IpAddr, IpAddr)>, +} + +impl LocalInternetRoutingDomainDetail { + pub fn set_local_networks(&mut self, local_networks: Vec<(IpAddr, IpAddr)>) -> bool { + local_networks.sort(); + if local_networks == self.local_networks { + return false; + } + self.local_networks = local_networks; + true + } +} + +impl RoutingDomainDetail for LocalInternetRoutingDomainDetail { + fn can_contain_address(&self, address: Address) -> bool { + let ip = address.to_ip_addr(); + for localnet in self.local_networks { + if ipaddr_in_network(ip, localnet.0, localnet.1) { + return true; + } + } + false + } + fn relay_node(&self) -> Option { + self.relay_node.clone() + } + fn set_relay_node(&mut self, opt_relay_node: Option) { + self.relay_node = opt_relay_node + .map(|nr| nr.filtered_clone(NodeRefFilter::new().with_routing_domain(LocalNetwork))); + } + fn dial_info_details(&self) -> &Vec { + &self.dial_info_details + } + fn clear_dial_info_details(&mut self) { + self.dial_info_details.clear(); + } + fn add_dial_info_detail(&mut self, did: DialInfoDetail) { + self.dial_info_details.push(did); + self.dial_info_details.sort(); + } +} diff --git a/veilid-core/src/rpc_processor/coders/node_status.rs b/veilid-core/src/rpc_processor/coders/node_status.rs index 5e42a42f..3948c7c6 100644 --- a/veilid-core/src/rpc_processor/coders/node_status.rs +++ b/veilid-core/src/rpc_processor/coders/node_status.rs @@ -1,9 +1,9 @@ use crate::*; use rpc_processor::*; -pub fn encode_node_status( - node_status: &NodeStatus, - builder: &mut veilid_capnp::node_status::Builder, +pub fn encode_public_internet_node_status( + public_internet_node_status: &PublicInternetNodeStatus, + builder: &mut veilid_capnp::public_internet_node_status::Builder, ) -> Result<(), RPCError> { builder.set_will_route(node_status.will_route); builder.set_will_tunnel(node_status.will_tunnel); @@ -14,10 +14,10 @@ pub fn encode_node_status( Ok(()) } -pub fn decode_node_status( - reader: &veilid_capnp::node_status::Reader, -) -> Result { - Ok(NodeStatus { +pub fn decode_public_internet_node_status( + reader: &veilid_capnp::public_internet_node_status::Reader, +) -> Result { + Ok(PublicInternetNodeStatus { will_route: reader.reborrow().get_will_route(), will_tunnel: reader.reborrow().get_will_tunnel(), will_signal: reader.reborrow().get_will_signal(), @@ -25,3 +25,62 @@ pub fn decode_node_status( will_validate_dial_info: reader.reborrow().get_will_validate_dial_info(), }) } + +pub fn encode_local_network_node_status( + local_network_node_status: &LocalNetworkNodeStatus, + builder: &mut veilid_capnp::local_network_node_status::Builder, +) -> Result<(), RPCError> { + builder.set_will_relay(node_status.will_relay); + builder.set_will_validate_dial_info(node_status.will_validate_dial_info); + + Ok(()) +} + +pub fn decode_local_network_node_status( + reader: &veilid_capnp::local_network_node_status::Reader, +) -> Result { + Ok(NodeStatus { + will_relay: reader.reborrow().get_will_relay(), + will_validate_dial_info: reader.reborrow().get_will_validate_dial_info(), + }) +} + +pub fn encode_node_status( + node_status: &NodeStatus, + builder: &mut veilid_capnp::node_status::Builder, +) -> Result<(), RPCError> { + match node_status { + NodeStatus::PublicInternetNodeStatus(ns) => { + let mut pi_builder = builder.reborrow().init_public_internet(); + encode_public_internet_node_status(&ns, &mut pi_builder) + } + NodeStatus::LocalNetworkNodeStatus(ns) => { + let mut ln_builder = builder.reborrow().init_local_network(); + encode_local_network_node_status(&ns, &mut ln_builder) + } + } + + Ok(()) +} + +pub fn decode_node_status( + reader: &veilid_capnp::node_status::Reader, +) -> Result { + Ok( + match reader + .which() + .map_err(RPCError::map_internal("invalid node status"))? + { + veilid_capnp::node_status::PublicInternet(pi) => { + let r = r.map_err(RPCError::protocol)?; + let pins = decode_public_internet_node_status(&r)?; + NodeStatus::PublicInternet(pins) + } + veilid_capnp::node_status::LocalNetwork(ln) => { + let r = ln.map_err(RPCError::protocol)?; + let lnns = decode_local_network_node_status(&r)?; + NodeStatus::LocalNetwork(lnns) + } + }, + ) +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs index c09ce08e..11491693 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -58,26 +58,34 @@ impl RPCOperationKind { #[derive(Debug, Clone)] pub struct RPCOperation { op_id: u64, + sender_info: Option, kind: RPCOperationKind, } impl RPCOperation { - pub fn new_question(question: RPCQuestion) -> Self { + pub fn new_question(question: RPCQuestion, sender_info: Option) -> Self { Self { op_id: intf::get_random_u64(), + sender_info, kind: RPCOperationKind::Question(question), } } - pub fn new_statement(statement: RPCStatement) -> Self { + pub fn new_statement(statement: RPCStatement, sender_info: Option) -> Self { Self { op_id: intf::get_random_u64(), + sender_info, kind: RPCOperationKind::Statement(statement), } } - pub fn new_answer(request: &RPCOperation, answer: RPCAnswer) -> Self { + pub fn new_answer( + request: &RPCOperation, + answer: RPCAnswer, + sender_info: Option, + ) -> Self { Self { op_id: request.op_id, + sender_info, kind: RPCOperationKind::Answer(answer), } } @@ -86,6 +94,10 @@ impl RPCOperation { self.op_id } + pub fn sender_info(&self) -> Option<&SignedNodeInfo> { + self.sender_info.as_ref() + } + pub fn kind(&self) -> &RPCOperationKind { &self.kind } @@ -100,16 +112,32 @@ impl RPCOperation { ) -> Result { let op_id = operation_reader.get_op_id(); + let sender_info = if operation_reader.has_sender_info() { + let sni_reader = operation_reader.get_sender_info(); + let sni = decode_signed_node_info(&sni_reader, sender_node_id, true)?; + Some(sni) + } else { + None + }; + let kind_reader = operation_reader.get_kind(); let kind = RPCOperationKind::decode(&kind_reader, sender_node_id)?; - Ok(RPCOperation { op_id, kind }) + Ok(RPCOperation { + op_id, + sender_info, + kind, + }) } pub fn encode(&self, builder: &mut veilid_capnp::operation::Builder) -> Result<(), RPCError> { builder.set_op_id(self.op_id); let mut k_builder = builder.reborrow().init_kind(); self.kind.encode(&mut k_builder)?; + if let Some(sender_info) = self.sender_info { + let si_builder = builder.reborrow().init_sender_info(); + encode_signed_node_info(&self.sender_info, &mut si_builder)?; + } Ok(()) } } diff --git a/veilid-core/src/rpc_processor/coders/operations/question.rs b/veilid-core/src/rpc_processor/coders/operations/question.rs index 5287cfa1..eb0bae02 100644 --- a/veilid-core/src/rpc_processor/coders/operations/question.rs +++ b/veilid-core/src/rpc_processor/coders/operations/question.rs @@ -18,12 +18,6 @@ impl RPCQuestion { pub fn detail(&self) -> &RPCQuestionDetail { &self.detail } - // pub fn into_detail(self) -> RPCQuestionDetail { - // self.detail - // } - // pub fn into_respond_to_detail(self) -> (RespondTo, RPCQuestionDetail) { - // (self.respond_to, self.detail) - // } pub fn desc(&self) -> &'static str { self.detail.desc() } @@ -32,7 +26,7 @@ impl RPCQuestion { sender_node_id: &DHTKey, ) -> Result { let rt_reader = reader.get_respond_to(); - let respond_to = RespondTo::decode(&rt_reader, sender_node_id)?; + let respond_to = RespondTo::decode(&rt_reader)?; let d_reader = reader.get_detail(); let detail = RPCQuestionDetail::decode(&d_reader)?; Ok(RPCQuestion { respond_to, detail }) diff --git a/veilid-core/src/rpc_processor/coders/operations/respond_to.rs b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs index 40e50247..79c4e358 100644 --- a/veilid-core/src/rpc_processor/coders/operations/respond_to.rs +++ b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub enum RespondTo { - Sender(Option), + Sender, PrivateRoute(PrivateRoute), } @@ -13,11 +13,7 @@ impl RespondTo { builder: &mut veilid_capnp::question::respond_to::Builder, ) -> Result<(), RPCError> { match self { - Self::Sender(Some(sni)) => { - let mut sni_builder = builder.reborrow().init_sender_with_info(); - encode_signed_node_info(sni, &mut sni_builder)?; - } - Self::Sender(None) => { + Self::Sender => { builder.reborrow().set_sender(()); } Self::PrivateRoute(pr) => { @@ -28,17 +24,9 @@ impl RespondTo { Ok(()) } - pub fn decode( - reader: &veilid_capnp::question::respond_to::Reader, - sender_node_id: &DHTKey, - ) -> Result { + pub fn decode(reader: &veilid_capnp::question::respond_to::Reader) -> Result { let respond_to = match reader.which().map_err(RPCError::protocol)? { - veilid_capnp::question::respond_to::Sender(()) => RespondTo::Sender(None), - veilid_capnp::question::respond_to::SenderWithInfo(sender_ni_reader) => { - let sender_ni_reader = sender_ni_reader.map_err(RPCError::protocol)?; - let sni = decode_signed_node_info(&sender_ni_reader, sender_node_id, true)?; - RespondTo::Sender(Some(sni)) - } + veilid_capnp::question::respond_to::Sender(()) => RespondTo::Sender, veilid_capnp::question::respond_to::PrivateRoute(pr_reader) => { let pr_reader = pr_reader.map_err(RPCError::protocol)?; let pr = decode_private_route(&pr_reader)?; diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs new file mode 100644 index 00000000..995f17c0 --- /dev/null +++ b/veilid-core/src/rpc_processor/destination.rs @@ -0,0 +1,135 @@ +use super::*; + +/// Where to send an RPC message +#[derive(Debug, Clone)] +pub enum Destination { + /// Send to node directly + Direct { + /// The node to send to + target: NodeRef, + /// An optional safety route specification to send from for sender privacy + safety_route_spec: Option>, + }, + /// Send to node for relay purposes + Relay { + /// The relay to send to + relay: NodeRef, + /// The final destination the relay should send to + target: DHTKey, + /// An optional safety route specification to send from for sender privacy + safety_route_spec: Option>, + }, + /// Send to private route (privateroute) + PrivateRoute { + /// A private route to send to + private_route: PrivateRoute, + /// An optional safety route specification to send from for sender privacy + safety_route_spec: Option>, + }, +} + +impl Destination { + pub fn direct(target: NodeRef) -> Self { + Self::Direct { + target, + safety_route_spec: None, + } + } + pub fn relay(relay: NodeRef, target: DHTKey) -> Self { + Self::Relay { + relay, + target, + safety_route_spec: None, + } + } + pub fn private_route(private_route: PrivateRoute) -> Self { + Self::PrivateRoute { + private_route, + safety_route_spec: None, + } + } + + pub fn safety_route_spec(&self) -> Option> { + match self { + Destination::Direct { + target, + safety_route_spec, + } => safety_route_spec.clone(), + Destination::Relay { + relay, + target, + safety_route_spec, + } => safety_route_spec.clone(), + Destination::PrivateRoute { + private_route, + safety_route_spec, + } => safety_route_spec.clone(), + } + } + pub fn with_safety_route_spec(self, safety_route_spec: Arc) -> Self { + match self { + Destination::Direct { + target, + safety_route_spec: _, + } => Self::Direct { + target, + safety_route_spec: Some(safety_route_spec), + }, + Destination::Relay { + relay, + target, + safety_route_spec: _, + } => Self::Relay { + relay, + target, + safety_route_spec: Some(safety_route_spec), + }, + Destination::PrivateRoute { + private_route, + safety_route_spec: _, + } => Self::PrivateRoute { + private_route, + safety_route_spec: Some(safety_route_spec), + }, + } + } +} + +impl fmt::Display for Destination { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Destination::Direct { + target, + routing_domain, + safety_route_spec, + } => { + let sr = safety_route_spec + .map(|_sr| "+SR".to_owned()) + .unwrap_or_default(); + + write!(f, "{:?}{}", target, sr) + } + Destination::Relay { + relay, + target, + safety_route_spec, + } => { + let sr = safety_route_spec + .map(|_sr| "+SR".to_owned()) + .unwrap_or_default(); + + write!(f, "{:?}@{:?}{}", target.encode(), relay, sr) + } + Destination::PrivateRoute { + private_route, + safety_route_spec, + } => { + let sr = safety_route_spec + .map(|_sr| "+SR".to_owned()) + .unwrap_or_default(); + + write!(f, "{}{}", private_route, sr) + } + } + } +} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index d0edd24a..3366d7a1 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1,4 +1,5 @@ mod coders; +mod destination; mod private_route; mod rpc_cancel_tunnel; mod rpc_complete_tunnel; @@ -18,6 +19,7 @@ mod rpc_validate_dial_info; mod rpc_value_changed; mod rpc_watch_value; +pub use destination::*; pub use private_route::*; pub use rpc_error::*; @@ -36,208 +38,6 @@ use stop_token::future::FutureExt; type OperationId = u64; -/// Where to send an RPC message -#[derive(Debug, Clone)] -pub enum Destination { - /// Send to node directly - Direct { - /// The node to send to - target: NodeRef, - /// An optional routing domain to require - routing_domain: Option, - /// An optional safety route specification to send from for sender privacy - safety_route_spec: Option>, - }, - /// Send to node for relay purposes - Relay { - /// The relay to send to - relay: NodeRef, - /// The final destination the relay should send to - target: DHTKey, - /// An optional routing domain to require - routing_domain: Option, - /// An optional safety route specification to send from for sender privacy - safety_route_spec: Option>, - }, - /// Send to private route (privateroute) - PrivateRoute { - /// A private route to send to - private_route: PrivateRoute, - /// An optional safety route specification to send from for sender privacy - safety_route_spec: Option>, - }, -} - -impl Destination { - pub fn direct(target: NodeRef) -> Self { - Self::Direct { - target, - routing_domain: None, - safety_route_spec: None, - } - } - pub fn relay(relay: NodeRef, target: DHTKey) -> Self { - Self::Relay { - relay, - target, - routing_domain: None, - safety_route_spec: None, - } - } - pub fn private_route(private_route: PrivateRoute) -> Self { - Self::PrivateRoute { - private_route, - safety_route_spec: None, - } - } - - pub fn - - pub fn routing_domain(&self) -> Option { - match self { - Destination::Direct { - target, - routing_domain, - safety_route_spec, - } => *routing_domain, - Destination::Relay { - relay, - target, - routing_domain, - safety_route_spec, - } => *routing_domain, - Destination::PrivateRoute { - private_route, - safety_route_spec, - } => Some(RoutingDomain::PublicInternet), - } - } - pub fn safety_route_spec(&self) -> Option> { - match self { - Destination::Direct { - target, - routing_domain, - safety_route_spec, - } => safety_route_spec.clone(), - Destination::Relay { - relay, - target, - routing_domain, - safety_route_spec, - } => safety_route_spec.clone(), - Destination::PrivateRoute { - private_route, - safety_route_spec, - } => safety_route_spec.clone(), - } - } - pub fn with_routing_domain(self, routing_domain: RoutingDomain) -> Self { - match self { - Destination::Direct { - target, - routing_domain: _, - safety_route_spec, - } => Self::Direct { - target, - routing_domain: Some(routing_domain), - safety_route_spec, - }, - Destination::Relay { - relay, - target, - routing_domain: _, - safety_route_spec, - } => Self::Relay { - relay, - target, - routing_domain: Some(routing_domain), - safety_route_spec, - }, - Destination::PrivateRoute { - private_route: _, - safety_route_spec: _, - } => panic!("Private route is only valid in PublicInternet routing domain"), - } - } - pub fn with_safety_route_spec(self, safety_route_spec: Arc) -> Self { - match self { - Destination::Direct { - target, - routing_domain, - safety_route_spec: _, - } => Self::Direct { - target, - routing_domain, - safety_route_spec: Some(safety_route_spec), - }, - Destination::Relay { - relay, - target, - routing_domain, - safety_route_spec: _, - } => Self::Relay { - relay, - target, - routing_domain, - safety_route_spec: Some(safety_route_spec), - }, - Destination::PrivateRoute { - private_route, - safety_route_spec: _, - } => Self::PrivateRoute { - private_route, - safety_route_spec: Some(safety_route_spec), - }, - } - } -} - -impl fmt::Display for Destination { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Destination::Direct { - target, - routing_domain, - safety_route_spec, - } => { - let rd = routing_domain - .map(|rd| format!("#{:?}", rd)) - .unwrap_or_default(); - let sr = safety_route_spec - .map(|_sr| "+SR".to_owned()) - .unwrap_or_default(); - - write!(f, "{:?}{}{}", target, rd, sr) - } - Destination::Relay { - relay, - target, - routing_domain, - safety_route_spec, - } => { - let rd = routing_domain - .map(|rd| format!("#{:?}", rd)) - .unwrap_or_default(); - let sr = safety_route_spec - .map(|_sr| "+SR".to_owned()) - .unwrap_or_default(); - - write!(f, "{:?}@{:?}{}{}", target.encode(), relay, rd, sr) - } - Destination::PrivateRoute { - private_route, - safety_route_spec, - } => { - let sr = safety_route_spec - .map(|_sr| "+SR".to_owned()) - .unwrap_or_default(); - - write!(f, "{}{}", private_route, sr) - } - } - } -} - /// The decoded header of an RPC message #[derive(Debug, Clone)] struct RPCMessageHeader { @@ -251,6 +51,8 @@ struct RPCMessageHeader { peer_noderef: NodeRef, /// The connection from the peer sent the message (not the original sender) connection_descriptor: ConnectionDescriptor, + /// The routing domain the message was sent through + routing_domain: RoutingDomain, } #[derive(Debug)] @@ -352,7 +154,6 @@ pub struct RPCProcessorInner { pub struct RPCProcessor { crypto: Crypto, config: VeilidConfig, - enable_local_peer_scope: bool, inner: Arc>, } @@ -375,11 +176,6 @@ impl RPCProcessor { Self { crypto: network_manager.crypto(), config: network_manager.config(), - enable_local_peer_scope: network_manager - .config() - .get() - .network - .enable_local_peer_scope, inner: Arc::new(Mutex::new(Self::new_inner(network_manager))), } } @@ -402,12 +198,8 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// - fn filter_peer_scope(&self, node_info: &NodeInfo) -> bool { - // if local peer scope is enabled, then don't reject any peer info - if self.enable_local_peer_scope { - return true; - } - + /// Determine if a NodeInfo can be placed into the specified routing domain + fn filter_node_info(&self, routing_domain: RoutingDomain, node_info: &NodeInfo) -> bool { // reject attempts to include non-public addresses in results for did in &node_info.dial_info_detail_list { if !did.dial_info.is_global() { @@ -584,31 +376,6 @@ impl RPCProcessor { out } - /// Gets a 'RespondTo::Sender' that contains either our dial info, - /// or None if the peer has seen our dial info before or our node info is not yet valid - /// because of an unknown network class - pub fn make_respond_to_sender( - &self, - routing_domain: RoutingDomain, - peer: NodeRef, - ) -> RespondTo { - if peer.has_seen_our_node_info(routing_domain) - || matches!( - self.network_manager() - .get_network_class(routing_domain) - .unwrap_or(NetworkClass::Invalid), - NetworkClass::Invalid - ) - { - RespondTo::Sender(None) - } else { - let our_sni = self - .routing_table() - .get_own_signed_node_info(routing_domain); - RespondTo::Sender(Some(our_sni)) - } - } - /// Produce a byte buffer that represents the wire encoding of the entire /// unencrypted envelope body for a RPC message. This incorporates /// wrapping a private and/or safety route if they are specified. @@ -617,7 +384,7 @@ impl RPCProcessor { &self, dest: Destination, operation: &RPCOperation, - ) -> Result { xxx continue propagating safetyroutespec + ) -> Result { let out_node_id; // Envelope Node Id let mut out_node_ref: Option = None; // Node to send envelope to let out_hop_count: usize; // Total safety + private route hop count @@ -634,12 +401,28 @@ impl RPCProcessor { // To where are we sending the request match dest { - Destination::Direct(ref node_ref) | Destination::Relay(ref node_ref, _) => { + Destination::Direct { + target: node_ref, + routing_domain, + safety_route_spec, + } + | Destination::Relay { + relay: node_ref, + target: _, + routing_domain, + safety_route_spec, + } => { // Send to a node without a private route // -------------------------------------- // Get the actual destination node id accounting for relays - let (node_ref, node_id) = if let Destination::Relay(_, dht_key) = dest { + let (node_ref, node_id) = if let Destination::Relay { + relay: _, + target: dht_key, + routing_domain: _, + safety_route_spec: _, + } = dest + { (node_ref.clone(), dht_key.clone()) } else { let node_id = node_ref.node_id(); @@ -676,7 +459,10 @@ impl RPCProcessor { } }; } - Destination::PrivateRoute(private_route) => { + Destination::PrivateRoute { + private_route, + safety_route_spec, + } => { // Send to private route // --------------------- // Reply with 'route' operation @@ -723,12 +509,22 @@ impl RPCProcessor { } // Issue a question over the network, possibly using an anonymized route - #[instrument(level = "debug", skip(self, question, safety_route_spec), err)] + #[instrument(level = "debug", skip(self, question), err)] async fn question( &self, dest: Destination, question: RPCQuestion, ) -> Result, RPCError> { + + // Get sender info if we should send that + let opt_sender_info = if dest.safety_route_spec().is_none() && matches!(question.respond_to(), RespondTo::Sender) { + // Sender is not private, send sender info if needed + // Get the noderef of the eventual destination or first route hop + if let Some(target_nr) = self.routing_table().lookup_node_ref(dest.get_target_id()) { + if target_nr.has_seen_our_node_info(R) + } + } + // Wrap question in operation let operation = RPCOperation::new_question(question); let op_id = operation.op_id(); @@ -951,6 +747,10 @@ impl RPCProcessor { &self, encoded_msg: RPCMessageEncoded, ) -> Result<(), RPCError> { + + // Get the routing domain + let routing_domain = encoded_msg.header.routing_domain; + // Decode the operation let sender_node_id = encoded_msg.header.envelope.get_sender_id(); @@ -971,12 +771,13 @@ impl RPCProcessor { match q.respond_to() { RespondTo::Sender(Some(sender_ni)) => { // Sender NodeInfo was specified, update our routing table with it - if !self.filter_peer_scope(&sender_ni.node_info) { + if !self.filter_node_info(&sender_ni.node_info) { return Err(RPCError::invalid_format( "respond_to_sender_signed_node_info has invalid peer scope", )); } opt_sender_nr = self.routing_table().register_node_with_signed_node_info( + routing_domain, sender_node_id, sender_ni.clone(), false, @@ -1168,6 +969,7 @@ impl RPCProcessor { body: Vec, peer_noderef: NodeRef, connection_descriptor: ConnectionDescriptor, + routing_domain: RoutingDomain, ) -> EyreResult<()> { let msg = RPCMessageEncoded { header: RPCMessageHeader { @@ -1176,6 +978,7 @@ impl RPCProcessor { body_len: body.len() as u64, peer_noderef, connection_descriptor, + routing_domain, }, data: RPCMessageData { contents: body }, }; diff --git a/veilid-core/src/rpc_processor/private_route.rs b/veilid-core/src/rpc_processor/private_route.rs index d5a59991..179b4052 100644 --- a/veilid-core/src/rpc_processor/private_route.rs +++ b/veilid-core/src/rpc_processor/private_route.rs @@ -4,7 +4,7 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// fn compile_safety_route( &self, - safety_route_spec: &SafetyRouteSpec, + safety_route_spec: Arc, private_route: PrivateRoute, ) -> Result { // Ensure the total hop count isn't too long for our config @@ -111,15 +111,15 @@ impl RPCProcessor { // Wrap an operation inside a route pub(super) fn wrap_with_route( &self, - safety_route_spec: Option<&SafetyRouteSpec>, + safety_route_spec: Option>, private_route: PrivateRoute, message_data: Vec, ) -> Result, RPCError> { // Encrypt routed operation // Xmsg + ENC(Xmsg, DH(PKapr, SKbsr)) let nonce = Crypto::get_random_nonce(); - let stub_safety_route_spec = SafetyRouteSpec::new(); - let safety_route_spec = safety_route_spec.unwrap_or(&stub_safety_route_spec); + let safety_route_spec = + safety_route_spec.unwrap_or_else(|| Arc::new(SafetyRouteSpec::new())); let dh_secret = self .crypto .cached_dh(&private_route.public_key, &safety_route_spec.secret_key) diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 9fff8034..e1d633e9 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -8,15 +8,11 @@ impl RPCProcessor { self, dest: Destination, key: DHTKey, - safety_route: Option<&SafetyRouteSpec>, - respond_to: RespondTo, ) -> Result>>, RPCError> { - let find_node_q = RPCOperationFindNodeQ { node_id: key }; - let question = RPCQuestion::new(respond_to, RPCQuestionDetail::FindNodeQ(find_node_q)); + let find_node_q = RPCQuestionDetail::FindNodeQ(RPCOperationFindNodeQ { node_id: key }); // Send the find_node request - let waitable_reply = - network_result_try!(self.question(dest, question, safety_route).await?); + let waitable_reply = network_result_try!(self.question(dest, find_node_q).await?); // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { @@ -35,7 +31,7 @@ impl RPCProcessor { // Verify peers are in the correct peer scope for peer_info in &find_node_a.peers { - if !self.filter_peer_scope(&peer_info.signed_node_info.node_info) { + if !self.filter_node_info(&peer_info.signed_node_info.node_info) { return Err(RPCError::invalid_format( "find_node response has invalid peer scope", )); diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index 924283b8..aa721d3d 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -2,20 +2,20 @@ use super::*; impl RPCProcessor { // Sends a our node info to another node - // Can be sent via all methods including relays and routes #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_node_info_update( self, target: NodeRef, routing_domain: RoutingDomain, ) -> Result, RPCError> { + // Get the signed node info for the desired routing domain to send update with let signed_node_info = self .routing_table() .get_own_signed_node_info(routing_domain); let node_info_update = RPCOperationNodeInfoUpdate { signed_node_info }; let statement = RPCStatement::new(RPCStatementDetail::NodeInfoUpdate(node_info_update)); - // Send the node_info_update request + // Send the node_info_update request to the specific routing domain requested network_result_try!( self.statement( Destination::direct(target).with_routing_domain(routing_domain), @@ -41,7 +41,7 @@ impl RPCProcessor { }; // Update our routing table with signed node info - if !self.filter_peer_scope(&node_info_update.signed_node_info.node_info) { + if !self.filter_node_info(&node_info_update.signed_node_info.node_info) { log_rpc!(debug "node_info_update has invalid peer scope from {}", sender_node_id ); diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 25c04bcf..b7285701 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -8,14 +8,21 @@ impl RPCProcessor { self, peer: NodeRef, ) -> Result>, RPCError> { - let node_status = self.network_manager().generate_node_status(); + let routing_domain = match peer.best_routing_domain() { + Some(rd) => rd, + None => { + return Ok(NetworkResult::no_connection_other( + "no routing domain for peer", + )) + } + }; + let node_status = self.network_manager().generate_node_status(routing_domain); let status_q = RPCOperationStatusQ { node_status }; - let respond_to = self.make_respond_to_sender(peer.clone()); - let question = RPCQuestion::new(respond_to, RPCQuestionDetail::StatusQ(status_q)); + let question = RPCQuestion::new(RespondTo::Sender, RPCQuestionDetail::StatusQ(status_q)); // Send the info request let waitable_reply = network_result_try!( - self.question(Destination::Direct(peer.clone()), question, None) + self.question(Destination::direct(peer.clone()), question) .await? ); @@ -37,26 +44,48 @@ impl RPCProcessor { _ => return Err(RPCError::invalid_format("not an answer")), }; + // Ensure the returned node status is the kind for the routing domain we asked for + match routing_domain { + RoutingDomain::PublicInternet => { + if !matches!(status_a.node_status, NodeStatus::PublicInternet(_)) { + return Ok(NetworkResult::invalid_message( + "node status doesn't match PublicInternet routing domain", + )); + } + } + RoutingDomain::LocalNetwork => { + if !matches!(status_a.node_status, NodeStatus::LocalNetwork(_)) { + return Ok(NetworkResult::invalid_message( + "node status doesn't match LocalNetwork routing domain", + )); + } + } + } + // Update latest node status in routing table - peer.update_node_status(status_a.node_status.clone()); + peer.update_node_status(status_a.node_status); // Report sender_info IP addresses to network manager + // Don't need to validate these addresses for the current routing domain + // the address itself is irrelevant, and the remote node can lie anyway if let Some(socket_address) = status_a.sender_info.socket_address { match send_data_kind { - SendDataKind::Direct(connection_descriptor) => { - match connection_descriptor.peer_scope() { - PeerScope::Global => self.network_manager().report_global_socket_address( + SendDataKind::Direct(connection_descriptor) => match routing_domain { + RoutingDomain::PublicInternet => self + .network_manager() + .report_public_internet_socket_address( socket_address, connection_descriptor, peer, ), - PeerScope::Local => self.network_manager().report_local_socket_address( + RoutingDomain::LocalNetwork => { + self.network_manager().report_local_network_socket_address( socket_address, connection_descriptor, peer, - ), + ) } - } + }, SendDataKind::Indirect => { // Do nothing in this case, as the socket address returned here would be for any node other than ours } @@ -75,6 +104,7 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> { let connection_descriptor = msg.header.connection_descriptor; + let routing_domain = msg.header.routing_domain; // Get the question let status_q = match msg.operation.kind() { @@ -85,6 +115,24 @@ impl RPCProcessor { _ => panic!("not a question"), }; + // Ensure the node status from the question is the kind for the routing domain we received the request in + match routing_domain { + RoutingDomain::PublicInternet => { + if !matches!(status_a.node_status, NodeStatus::PublicInternet(_)) { + return Ok(NetworkResult::invalid_message( + "node status doesn't match PublicInternet routing domain", + )); + } + } + RoutingDomain::LocalNetwork => { + if !matches!(status_a.node_status, NodeStatus::LocalNetwork(_)) { + return Ok(NetworkResult::invalid_message( + "node status doesn't match LocalNetwork routing domain", + )); + } + } + } + // update node status for the requesting node to our routing table if let Some(sender_nr) = msg.opt_sender_nr.clone() { // Update latest node status in routing table for the statusq sender @@ -92,7 +140,7 @@ impl RPCProcessor { } // Make status answer - let node_status = self.network_manager().generate_node_status(); + let node_status = self.network_manager().generate_node_status(routing_domain); // Get the peer address in the returned sender info let sender_info = SenderInfo { @@ -106,11 +154,7 @@ impl RPCProcessor { // Send status answer let res = self - .answer( - msg, - RPCAnswer::new(RPCAnswerDetail::StatusA(status_a)), - None, - ) + .answer(msg, RPCAnswer::new(RPCAnswerDetail::StatusA(status_a))) .await?; tracing::Span::current().record("res", &tracing::field::display(res)); Ok(()) diff --git a/veilid-core/src/tests/common/test_veilid_config.rs b/veilid-core/src/tests/common/test_veilid_config.rs index 47793a48..a84d7d11 100644 --- a/veilid-core/src/tests/common/test_veilid_config.rs +++ b/veilid-core/src/tests/common/test_veilid_config.rs @@ -224,7 +224,6 @@ fn config_callback(key: String) -> ConfigCallbackReturn { "network.upnp" => Ok(Box::new(false)), "network.natpmp" => Ok(Box::new(false)), "network.detect_address_changes" => Ok(Box::new(true)), - "network.enable_local_peer_scope" => Ok(Box::new(false)), "network.restricted_nat_retries" => Ok(Box::new(3u32)), "network.tls.certificate_path" => Ok(Box::new(get_certfile_path())), "network.tls.private_key_path" => Ok(Box::new(get_keyfile_path())), @@ -354,7 +353,6 @@ pub async fn test_config() { assert_eq!(inner.network.upnp, false); assert_eq!(inner.network.natpmp, false); assert_eq!(inner.network.detect_address_changes, true); - assert_eq!(inner.network.enable_local_peer_scope, false); assert_eq!(inner.network.restricted_nat_retries, 3u32); assert_eq!(inner.network.tls.certificate_path, get_certfile_path()); assert_eq!(inner.network.tls.private_key_path, get_keyfile_path()); diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 22866268..b4fb3423 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -45,6 +45,16 @@ fn get_address_type(text: &str) -> Option { None } } +fn get_routing_domain(text: &str) -> Option { + let lctext = text.to_ascii_lowercase(); + if "publicinternet".starts_with(&lctext) { + Some(RoutingDomain::PublicInternet) + } else if "localnetwork".starts_with(&lctext) { + Some(RoutingDomain::LocalNetwork) + } else { + None + } +} fn get_debug_argument Option>( value: &str, @@ -331,27 +341,40 @@ impl VeilidAPI { None => return Ok("Node id not found in routing table".to_owned()), }; - if args.len() >= 2 { - let pt = - get_debug_argument_at(&args, 1, "debug_ping", "protocol_type", get_protocol_type)?; - nr.merge_filter(DialInfoFilter::all().with_protocol_type(pt)); - if args.len() >= 3 { - let at = get_debug_argument_at( - &args, - 2, - "debug_ping", - "address_type", - get_address_type, - )?; + let mut ai = 1; + let mut routing_domain = None; + while ai < args.len() { + if let Ok(pt) = + get_debug_argument_at(&args, ai, "debug_ping", "protocol_type", get_protocol_type) + { + nr.merge_filter(DialInfoFilter::all().with_protocol_type(pt)); + } else if let Ok(at) = + get_debug_argument_at(&args, ai, "debug_ping", "address_type", get_address_type) + { nr.merge_filter(DialInfoFilter::all().with_address_type(at)); + } else if let Ok(rd) = get_debug_argument_at( + &args, + ai, + "debug_ping", + "routing_domain", + get_routing_domain, + ) { + if routing_domain.is_none() { + routing_domain = Some(rd); + } else { + return Ok("Multiple routing domains specified".to_owned()); + } + } else { + return Ok(format!("Invalid argument specified: {}", args[ai])); } + ai += 1; } let rpc = self.network_manager()?.rpc_processor(); // Dump routing table entry let out = match rpc - .rpc_call_status(nr) + .rpc_call_status(routing_domain, nr) .await .map_err(VeilidAPIError::internal)? { @@ -383,7 +406,7 @@ impl VeilidAPI { attach detach restart network - ping [protocol_type [address_type]] + ping [protocol_type][address_type][routing_domain] contact [protocol_type [address_type]] "# .to_owned()) diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 1e0b2df6..7f90eda9 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -460,9 +460,31 @@ pub struct NodeInfo { } impl NodeInfo { - pub fn is_valid(&self) -> bool { - !matches!(self.network_class, NetworkClass::Invalid) + pub fn is_valid_in_routing_domain( + &self, + routing_table: RoutingTable, + routing_domain: RoutingDomain, + ) -> bool { + // Should not be passing around nodeinfo with an invalid network class + if matches!(self.network_class, NetworkClass::Invalid) { + return false; + } + // Ensure all of the dial info works in this routing domain + for did in self.dial_info_detail_list { + if !routing_table.ensure_dial_info_is_valid(routing_domain, &did.dial_info) { + return false; + } + } + // Ensure the relay is also valid in this routing domain if it is provided + if let Some(relay_peer_info) = self.relay_peer_info { + let relay_ni = &relay_peer_info.signed_node_info.node_info; + if !relay_ni.is_valid_in_routing_domain(routing_table, routing_domain) { + return false; + } + } + true } + pub fn first_filtered_dial_info_detail(&self, filter: F) -> Option where F: Fn(&DialInfoDetail) -> bool, @@ -783,7 +805,7 @@ impl FromStr for SocketAddress { ////////////////////////////////////////////////////////////////// -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct DialInfoFilter { pub protocol_type_set: ProtocolTypeSet, pub address_type_set: AddressTypeSet, @@ -1096,6 +1118,14 @@ impl DialInfo { pub fn address_type(&self) -> AddressType { self.socket_address().address_type() } + pub fn address(&self) -> Address { + match self { + Self::UDP(di) => di.socket_address.address, + Self::TCP(di) => di.socket_address.address, + Self::WS(di) => di.socket_address.address, + Self::WSS(di) => di.socket_address.address, + } + } pub fn socket_address(&self) -> SocketAddress { match self { Self::UDP(di) => di.socket_address, diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index db9b469c..cc09e317 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -137,7 +137,6 @@ pub struct VeilidConfigNetwork { pub upnp: bool, pub natpmp: bool, pub detect_address_changes: bool, - pub enable_local_peer_scope: bool, pub restricted_nat_retries: u32, pub tls: VeilidConfigTLS, pub application: VeilidConfigApplication, @@ -356,7 +355,6 @@ impl VeilidConfig { get_config!(inner.network.upnp); get_config!(inner.network.natpmp); get_config!(inner.network.detect_address_changes); - get_config!(inner.network.enable_local_peer_scope); get_config!(inner.network.restricted_nat_retries); get_config!(inner.network.tls.certificate_path); get_config!(inner.network.tls.private_key_path); diff --git a/veilid-core/src/xx/ip_extra.rs b/veilid-core/src/xx/ip_extra.rs index 2623a27c..140a9042 100644 --- a/veilid-core/src/xx/ip_extra.rs +++ b/veilid-core/src/xx/ip_extra.rs @@ -215,3 +215,48 @@ pub fn ip_to_ipblock(ip6_prefix_size: usize, addr: IpAddr) -> IpAddr { } } } + +pub fn ipaddr_apply_netmask(addr: IpAddr, netmask: IpAddr) -> IpAddr { + match addr { + IpAddr::V4(v4) => { + let v4mask = match netmask { + IpAddr::V4(v4mask) => v4mask, + IpAddr::V6(_) => { + panic!("netmask doesn't match ipv4 address"); + } + }; + let v4 = v4.octets(); + let v4mask = v4mask.octets(); + IpAddr::V4(Ipv4Addr::new( + v4[0] & v4mask[0], + v4[1] & v4mask[1], + v4[2] & v4mask[2], + v4[3] & v4mask[3], + )) + } + IpAddr::V6(v6) => { + let v6mask = match netmask { + IpAddr::V4(_) => { + panic!("netmask doesn't match ipv6 address"); + } + IpAddr::V6(v6mask) => v6mask, + }; + let v6 = v6.segments(); + let v6mask = v6mask.segments(); + IpAddr::V6(Ipv6Addr::new( + v6[0] & v6mask[0], + v6[1] & v6mask[1], + v6[2] & v6mask[2], + v6[3] & v6mask[3], + v6[4] & v6mask[4], + v6[5] & v6mask[5], + v6[6] & v6mask[6], + v6[7] & v6mask[7], + )) + } + } +} + +pub fn ipaddr_in_network(addr: IpAddr, netaddr: IpAddr, netmask: IpAddr) -> bool { + ipaddr_apply_netmask(netaddr, netmask) == ipaddr_apply_netmask(addr, netmask) +} diff --git a/veilid-flutter/example/lib/config.dart b/veilid-flutter/example/lib/config.dart index e3bfcd60..5f8b4e6d 100644 --- a/veilid-flutter/example/lib/config.dart +++ b/veilid-flutter/example/lib/config.dart @@ -85,7 +85,6 @@ Future getDefaultVeilidConfig() async { upnp: true, natpmp: true, detectAddressChanges: true, - enableLocalPeerScope: false, restrictedNatRetries: 0, tls: VeilidConfigTLS( certificatePath: "", diff --git a/veilid-flutter/lib/veilid.dart b/veilid-flutter/lib/veilid.dart index 18263aa4..dffbb33c 100644 --- a/veilid-flutter/lib/veilid.dart +++ b/veilid-flutter/lib/veilid.dart @@ -741,7 +741,6 @@ class VeilidConfigNetwork { bool upnp; bool natpmp; bool detectAddressChanges; - bool enableLocalPeerScope; int restrictedNatRetries; VeilidConfigTLS tls; VeilidConfigApplication application; @@ -767,7 +766,6 @@ class VeilidConfigNetwork { required this.upnp, required this.natpmp, required this.detectAddressChanges, - required this.enableLocalPeerScope, required this.restrictedNatRetries, required this.tls, required this.application, @@ -795,7 +793,6 @@ class VeilidConfigNetwork { 'upnp': upnp, 'natpmp': natpmp, 'detect_address_changes': detectAddressChanges, - 'enable_local_peer_scope': enableLocalPeerScope, 'restricted_nat_retries': restrictedNatRetries, 'tls': tls.json, 'application': application.json, @@ -826,7 +823,6 @@ class VeilidConfigNetwork { upnp = json['upnp'], natpmp = json['natpmp'], detectAddressChanges = json['detect_address_changes'], - enableLocalPeerScope = json['enable_local_peer_scope'], restrictedNatRetries = json['restricted_nat_retries'], tls = VeilidConfigTLS.fromJson(json['tls']), application = VeilidConfigApplication.fromJson(json['application']), diff --git a/veilid-server/src/cmdline.rs b/veilid-server/src/cmdline.rs index 8d317744..f1048bbb 100644 --- a/veilid-server/src/cmdline.rs +++ b/veilid-server/src/cmdline.rs @@ -130,11 +130,7 @@ fn do_clap_matches(default_config_path: &OsStr) -> Result EyreResult<(Settings, ArgMatches)> { if matches.is_present("attach") { settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true")); } - if matches.is_present("local") { - settingsrw.core.network.enable_local_peer_scope = true; - } if matches.occurrences_of("delete-protected-store") != 0 { settingsrw.core.protected_store.delete = true; } diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index d7e24c46..a8dba9f8 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -99,7 +99,6 @@ core: upnp: true natpmp: false detect_address_changes: true - enable_local_peer_scope: false restricted_nat_retries: 0 tls: certificate_path: '%CERTIFICATE_PATH%' @@ -589,7 +588,6 @@ pub struct Network { pub upnp: bool, pub natpmp: bool, pub detect_address_changes: bool, - pub enable_local_peer_scope: bool, pub restricted_nat_retries: u32, pub tls: Tls, pub application: Application, @@ -976,7 +974,6 @@ impl Settings { set_config_value!(inner.core.network.upnp, value); set_config_value!(inner.core.network.natpmp, value); set_config_value!(inner.core.network.detect_address_changes, value); - set_config_value!(inner.core.network.enable_local_peer_scope, value); set_config_value!(inner.core.network.restricted_nat_retries, value); set_config_value!(inner.core.network.tls.certificate_path, value); set_config_value!(inner.core.network.tls.private_key_path, value); @@ -1177,9 +1174,6 @@ impl Settings { "network.detect_address_changes" => { Ok(Box::new(inner.core.network.detect_address_changes)) } - "network.enable_local_peer_scope" => { - Ok(Box::new(inner.core.network.enable_local_peer_scope)) - } "network.restricted_nat_retries" => { Ok(Box::new(inner.core.network.restricted_nat_retries)) } @@ -1503,7 +1497,6 @@ mod tests { assert_eq!(s.core.network.upnp, true); assert_eq!(s.core.network.natpmp, false); assert_eq!(s.core.network.detect_address_changes, true); - assert_eq!(s.core.network.enable_local_peer_scope, false); assert_eq!(s.core.network.restricted_nat_retries, 0u32); // assert_eq!(