From 79cda4a71260c6feeb95590b9af95e15dea5782a Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 4 Sep 2022 14:17:28 -0400 Subject: [PATCH] refactor checkpoint --- veilid-core/proto/veilid.capnp | 2 +- veilid-core/src/network_manager/mod.rs | 67 ++++---- veilid-core/src/network_manager/native/mod.rs | 2 +- .../native/network_class_discovery.rs | 14 +- .../network_manager/native/start_protocols.rs | 13 +- veilid-core/src/network_manager/tasks.rs | 54 +++--- veilid-core/src/routing_table/bucket_entry.rs | 72 ++++---- veilid-core/src/routing_table/find_nodes.rs | 82 ++++----- veilid-core/src/routing_table/mod.rs | 93 ++++++++-- veilid-core/src/routing_table/node_ref.rs | 53 +++--- .../routing_table/routing_domain_editor.rs | 21 +++ .../src/routing_table/routing_domains.rs | 12 +- .../src/rpc_processor/coders/node_status.rs | 24 ++- .../coders/operations/operation.rs | 35 ++-- veilid-core/src/rpc_processor/destination.rs | 52 +++++- veilid-core/src/rpc_processor/mod.rs | 162 ++++++++++-------- .../src/rpc_processor/private_route.rs | 2 +- .../src/rpc_processor/rpc_find_node.rs | 30 ++-- .../src/rpc_processor/rpc_node_info_update.rs | 12 +- .../src/rpc_processor/rpc_return_receipt.rs | 3 +- veilid-core/src/rpc_processor/rpc_signal.rs | 4 +- veilid-core/src/rpc_processor/rpc_status.rs | 14 +- .../rpc_processor/rpc_validate_dial_info.rs | 16 +- veilid-core/src/veilid_api/debug.rs | 59 +++++-- veilid-core/src/veilid_api/mod.rs | 31 +--- 25 files changed, 531 insertions(+), 398 deletions(-) create mode 100644 veilid-core/src/routing_table/routing_domain_editor.rs diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 86bdcd95..a0f791a4 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -491,7 +491,7 @@ struct Answer { struct Operation { opId @0 :UInt64; # Random RPC ID. Must be random to foil reply forgery attacks. - senderInfo @1 :SignedNodeInfo; # (optional) SignedNodeInfo for the sender to be cached by the receiver. + senderNodeInfo @1 :SignedNodeInfo; # (optional) SignedNodeInfo for the sender to be cached by the receiver. kind :union { question @2 :Question; statement @3 :Statement; diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 44e5075d..3747c55e 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -23,7 +23,7 @@ use connection_handle::*; use connection_limits::*; use connection_manager::*; use dht::*; -use futures_util::stream::{FuturesUnordered, StreamExt}; +use futures_util::stream::{FuturesOrdered, FuturesUnordered, StreamExt}; use hashlink::LruCache; use intf::*; #[cfg(not(target_arch = "wasm32"))] @@ -388,22 +388,22 @@ impl NetworkManager { .unwrap(); inner.public_inbound_dial_info_filter = Some( - DialInfoFilter::global() + DialInfoFilter::all() .with_protocol_type_set(pc.inbound) .with_address_type_set(pc.family_global), ); inner.local_inbound_dial_info_filter = Some( - DialInfoFilter::local() + DialInfoFilter::all() .with_protocol_type_set(pc.inbound) .with_address_type_set(pc.family_local), ); inner.public_outbound_dial_info_filter = Some( - DialInfoFilter::global() + DialInfoFilter::all() .with_protocol_type_set(pc.outbound) .with_address_type_set(pc.family_global), ); inner.local_outbound_dial_info_filter = Some( - DialInfoFilter::local() + DialInfoFilter::all() .with_protocol_type_set(pc.outbound) .with_address_type_set(pc.family_local), ); @@ -557,7 +557,7 @@ impl NetworkManager { // See how many live PublicInternet entries we have let live_public_internet_entry_count = routing_table.get_entry_count( - RoutingTableSet::only(RoutingTable::PublicInternet), + RoutingDomain::PublicInternet.into(), BucketEntryState::Unreliable, ); let min_peer_count = { @@ -816,7 +816,7 @@ impl NetworkManager { }; // Make a reverse connection to the peer and send the receipt to it - rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt) + rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt) .await .wrap_err("rpc failure") } @@ -841,12 +841,12 @@ impl NetworkManager { }; // Get the udp direct dialinfo for the hole punch - let outbound_dif = self - .get_outbound_dial_info_filter(RoutingDomain::PublicInternet) + let outbound_nrf = self + .get_outbound_node_ref_filter(RoutingDomain::PublicInternet) .with_protocol_type(ProtocolType::UDP); - peer_nr.set_filter(Some(outbound_dif)); + peer_nr.set_filter(Some(outbound_nrf)); let hole_punch_dial_info_detail = peer_nr - .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) + .first_filtered_dial_info_detail() .ok_or_else(|| eyre!("No hole punch capable dialinfo found for node"))?; // Now that we picked a specific dialinfo, further restrict the noderef to the specific address type @@ -872,7 +872,7 @@ impl NetworkManager { peer_nr.set_last_connection(connection_descriptor, intf::get_timestamp()); // Return the receipt using the same dial info send the receipt to it - rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt) + rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt) .await .wrap_err("rpc failure") } @@ -1014,7 +1014,7 @@ impl NetworkManager { .node_info_outbound_filter(RoutingDomain::PublicInternet), ); if let Some(reverse_did) = routing_table.first_filtered_dial_info_detail( - RoutingDomainSet::only(RoutingDomain::PublicInternet), + RoutingDomain::PublicInternet.into(), &reverse_dif, ) { // Ensure we aren't on the same public IP address (no hairpin nat) @@ -1048,11 +1048,11 @@ impl NetworkManager { .node_info_outbound_filter(RoutingDomain::PublicInternet), ) .filtered( - DialInfoFilter::global().with_protocol_type(ProtocolType::UDP), + &DialInfoFilter::all().with_protocol_type(ProtocolType::UDP), ); if let Some(self_udp_dialinfo_detail) = routing_table .first_filtered_dial_info_detail( - RoutingDomainSet::only(RoutingDomain::PublicInternet), + RoutingDomain::PublicInternet.into(), &inbound_udp_dif, ) { @@ -1102,8 +1102,8 @@ impl NetworkManager { #[instrument(level = "trace", skip(self), ret)] fn get_contact_method_local(&self, target_node_ref: NodeRef) -> ContactMethod { // Scope noderef down to protocols we can do outbound - let local_outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::LocalNetwork); - let target_node_ref = target_node_ref.filtered_clone(NodeRefFilter::local_outbound_dif); + let local_outbound_nrf = self.get_outbound_node_ref_filter(RoutingDomain::LocalNetwork); + let target_node_ref = target_node_ref.filtered_clone(local_outbound_nrf); // Get the best matching local direct dial info if we have it if target_node_ref.is_filter_dead() { @@ -1157,8 +1157,7 @@ impl NetworkManager { let rpc = self.rpc_processor(); network_result_try!(rpc .rpc_call_signal( - Destination::Relay(relay_nr.clone(), target_nr.node_id()), - None, + Destination::relay(relay_nr, target_nr.node_id()), SignalInfo::ReverseConnect { receipt, peer_info }, ) .await @@ -1214,9 +1213,16 @@ impl NetworkManager { data: Vec, ) -> EyreResult> { // Ensure we are filtered down to UDP (the only hole punch protocol supported today) + // and only in the PublicInternet routing domain assert!(target_nr .filter_ref() - .map(|dif| dif.protocol_type_set == ProtocolTypeSet::only(ProtocolType::UDP)) + .map(|nrf| nrf.dial_info_filter.protocol_type_set + == ProtocolTypeSet::only(ProtocolType::UDP)) + .unwrap_or_default()); + assert!(target_nr + .filter_ref() + .map(|nrf| nrf.routing_domain_set + == RoutingDomainSet::only(RoutingDomain::PublicInternet)) .unwrap_or_default()); // Build a return receipt for the signal @@ -1229,7 +1235,7 @@ impl NetworkManager { // Get the udp direct dialinfo for the hole punch let hole_punch_did = target_nr - .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) + .first_filtered_dial_info_detail() .ok_or_else(|| eyre!("No hole punch capable dialinfo found for node"))?; // Do our half of the hole punch by sending an empty packet @@ -1246,8 +1252,7 @@ impl NetworkManager { let rpc = self.rpc_processor(); network_result_try!(rpc .rpc_call_signal( - Destination::Relay(relay_nr.clone(), target_nr.node_id()), - None, + Destination::relay(relay_nr, target_nr.node_id()), SignalInfo::HolePunch { receipt, peer_info }, ) .await @@ -1394,7 +1399,7 @@ impl NetworkManager { // Serialize out peer info let bootstrap_peerinfo: Vec = bootstrap_nodes .iter() - .filter_map(|b| b.peer_info(RoutingDomain::PublicInternet)) + .filter_map(|nr| nr.make_peer_info(RoutingDomain::PublicInternet)) .collect(); let json_bytes = serialize_json(bootstrap_peerinfo).as_bytes().to_vec(); @@ -1484,7 +1489,7 @@ impl NetworkManager { // Get the routing domain for this data let routing_domain = match self .routing_table() - .routing_domain_for_address(connection_descriptor.remote().address()) + .routing_domain_for_address(connection_descriptor.remote_address().address()) { Some(rd) => rd, None => { @@ -1778,7 +1783,7 @@ impl NetworkManager { // Get current external ip/port from registered global dialinfo let current_addresses: BTreeSet = routing_table .all_filtered_dial_info_details( - Some(RoutingDomain::PublicInternet), + RoutingDomain::PublicInternet.into(), &dial_info_filter, ) .iter() @@ -1896,12 +1901,8 @@ impl NetworkManager { .unlocked_inner .node_info_update_single_future .single_spawn(async move { - // Only update if we actually have a valid network class - if matches!( - this.get_network_class(routing_domain) - .unwrap_or(NetworkClass::Invalid), - NetworkClass::Invalid - ) { + // Only update if we actually have valid signed node info for this routing domain + if !this.routing_table().has_valid_own_node_info(routing_domain) { trace!( "not sending node info update because our network class is not yet valid" ); @@ -1922,7 +1923,7 @@ impl NetworkManager { unord.push(async move { // Update the node if let Err(e) = rpc - .rpc_call_node_info_update(nr.clone, routing_domain) + .rpc_call_node_info_update(nr.clone(), routing_domain) .await { // Not fatal, but we should be able to see if this is happening diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index e66244ed..b221d259 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -614,7 +614,7 @@ impl Network { return; } // Add network to local networks table - for addr in intf.addrs { + for addr in &intf.addrs { let netmask = addr.if_addr().netmask(); let network_ip = ipaddr_apply_netmask(addr.if_addr().ip(), netmask); local_networks.insert((network_ip, netmask)); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index b1ff1d6e..699c168a 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -132,8 +132,10 @@ impl DiscoveryContext { false } }; - let filter = - RoutingTable::combine_filters(inbound_dial_info_entry_filter, disallow_relays_filter); + let filter = RoutingTable::combine_entry_filters( + inbound_dial_info_entry_filter, + disallow_relays_filter, + ); // Find public nodes matching this filter let peers = self @@ -155,7 +157,11 @@ impl DiscoveryContext { continue; } } - peer.set_filter(Some(dial_info_filter.clone())); + peer.set_filter(Some( + NodeRefFilter::new() + .with_routing_domain(RoutingDomain::PublicInternet) + .with_dial_info_filter(dial_info_filter.clone()), + )); if let Some(sa) = self.request_public_address(peer.clone()).await { return Some((sa, peer)); } @@ -764,7 +770,7 @@ impl Network { // Get existing public dial info let existing_public_dial_info: HashSet = routing_table .all_filtered_dial_info_details( - Some(RoutingDomain::PublicInternet), + RoutingDomain::PublicInternet.into(), &DialInfoFilter::all(), ) .into_iter() diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index dc1e814a..e14d5ea6 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -287,14 +287,11 @@ impl Network { // Register local dial info for di in &local_dial_info_list { - -xxx write routing table sieve for routing domain from dialinfo and local network detection and registration - - // If the local interface address is global, or we are enabling local peer scope - // register global dial info if no public address is specified + // If the local interface address is global, then register global dial info + // if no other public address is specified if !detect_address_changes && public_address.is_none() - && (di.is_global() || enable_local_peer_scope) + && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di) { routing_table.register_dial_info( RoutingDomain::PublicInternet, @@ -455,7 +452,7 @@ xxx write routing table sieve for routing domain from dialinfo and local network if !detect_address_changes && url.is_none() - && (socket_address.address().is_global() || enable_local_peer_scope) + && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &local_di) { // Register public dial info routing_table.register_dial_info( @@ -625,7 +622,7 @@ xxx write routing table sieve for routing domain from dialinfo and local network // Register global dial info if no public address is specified if !detect_address_changes && public_address.is_none() - && (di.is_global() || enable_local_peer_scope) + && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di) { routing_table.register_dial_info( RoutingDomain::PublicInternet, diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index 3d6af1b6..16511a17 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -197,11 +197,7 @@ impl NetworkManager { let routing_table = routing_table.clone(); unord.push( // lets ask bootstrap to find ourselves now - async move { - routing_table - .reverse_find_node(RoutingDomain::PublicInternet, nr, true) - .await - }, + async move { routing_table.reverse_find_node(nr, true).await }, ); } } @@ -303,12 +299,10 @@ impl NetworkManager { unord.push(async move { // Need VALID signed peer info, so ask bootstrap to find_node of itself // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = routing_table - .find_target(RoutingDomain::PublicInternet, nr.clone()) - .await; + let _ = routing_table.find_target(nr.clone()).await; // Ensure we got the signed peer info - if !nr.has_valid_signed_node_info(Some(RoutingDomain::PublicInternet)) { + if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { log_net!(warn "bootstrap at {:?} did not return valid signed node info", nr @@ -316,9 +310,7 @@ impl NetworkManager { // If this node info is invalid, it will time out after being unpingable } else { // otherwise this bootstrap is valid, lets ask it to find ourselves now - routing_table - .reverse_find_node(RoutingDomain::PublicInternet, nr, true) - .await + routing_table.reverse_find_node(nr, true).await } }); } @@ -335,7 +327,9 @@ impl NetworkManager { fn ping_validator_public_internet( &self, cur_ts: u64, - unord: &mut FuturesUnordered, + unord: &mut FuturesUnordered< + SendPinBoxFuture>, RPCError>>, + >, ) -> EyreResult<()> { let rpc = self.rpc_processor(); let routing_table = self.routing_table(); @@ -352,7 +346,7 @@ impl NetworkManager { // Get our publicinternet dial info let dids = routing_table.all_filtered_dial_info_details( - RoutingDomainSet::only(RoutingDomain::PublicInternet), + RoutingDomain::PublicInternet.into(), &DialInfoFilter::all(), ); @@ -381,14 +375,10 @@ impl NetworkManager { if needs_ping { let rpc = rpc.clone(); let dif = did.dial_info.make_filter(); - let nr_filtered = nr.filtered_clone(dif); + let nr_filtered = + nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif)); log_net!("--> Keepalive ping to {:?}", nr_filtered); - unord.push( - async move { - rpc.rpc_call_status(Some(routing_domain), nr_filtered).await - } - .boxed(), - ); + unord.push(async move { rpc.rpc_call_status(nr_filtered).await }.boxed()); did_pings = true; } } @@ -398,9 +388,7 @@ impl NetworkManager { // any mapped ports to preserve if !did_pings { let rpc = rpc.clone(); - unord.push( - async move { rpc.rpc_call_status(Some(routing_domain), nr).await }.boxed(), - ); + unord.push(async move { rpc.rpc_call_status(nr).await }.boxed()); } } @@ -413,7 +401,9 @@ impl NetworkManager { fn ping_validator_local_network( &self, cur_ts: u64, - unord: &mut FuturesUnordered, + unord: &mut FuturesUnordered< + SendPinBoxFuture>, RPCError>>, + >, ) -> EyreResult<()> { let rpc = self.rpc_processor(); let routing_table = self.routing_table(); @@ -423,7 +413,7 @@ impl NetworkManager { // Get our LocalNetwork dial info let dids = routing_table.all_filtered_dial_info_details( - RoutingDomainSet::only(RoutingDomain::LocalNetwork), + RoutingDomain::LocalNetwork.into(), &DialInfoFilter::all(), ); @@ -432,7 +422,7 @@ impl NetworkManager { let rpc = rpc.clone(); // Just do a single ping with the best protocol for all the nodes - unord.push(async move { rpc.rpc_call_status(Some(routing_domain), nr).await }.boxed()); + unord.push(async move { rpc.rpc_call_status(nr).await }.boxed()); } Ok(()) @@ -476,7 +466,7 @@ impl NetworkManager { stop_token: StopToken, ) -> EyreResult<()> { let routing_table = self.routing_table(); - let mut unord = FuturesOrdered::new(); + let mut ord = FuturesOrdered::new(); let min_peer_count = { let c = self.config.get(); c.network.dht.min_peer_count as usize @@ -486,18 +476,18 @@ impl NetworkManager { // even the unreliable ones, and ask them to find nodes close to our node too let noderefs = routing_table.find_fastest_nodes( min_peer_count, - None, + |_k, _v| true, |k: DHTKey, v: Option>| { - NodeRef::new(self.clone(), k, v.unwrap().clone(), None) + NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None) }, ); for nr in noderefs { let routing_table = routing_table.clone(); - unord.push(async move { routing_table.reverse_find_node(nr, false).await }); + ord.push_back(async move { routing_table.reverse_find_node(nr, false).await }); } // do peer minimum search in order from fastest to slowest - while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + while let Ok(Some(_)) = ord.next().timeout_at(stop_token.clone()).await {} Ok(()) } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 71d3a6cd..0c6aa75a 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -39,7 +39,7 @@ pub enum BucketEntryState { } #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] -struct LastConnectionKey(RoutingDomain, ProtocolType, AddressType); +struct LastConnectionKey(ProtocolType, AddressType); /// Bucket entry information specific to the LocalNetwork RoutingDomain #[derive(Debug)] @@ -148,17 +148,11 @@ impl BucketEntryInner { // Retuns true if the node info changed pub fn update_signed_node_info( &mut self, + routing_domain: RoutingDomain, signed_node_info: SignedNodeInfo, - allow_invalid_signature: bool, ) { - // Don't allow invalid signatures unless we are explicitly allowing it - if !allow_invalid_signature && !signed_node_info.signature.valid { - log_rtab!(debug "Invalid signature on signed node info: {:?}", signed_node_info); - return; - } - // Get the correct signed_node_info for the chosen routing domain - let opt_current_sni = match signed_node_info.routing_domain { + let opt_current_sni = match routing_domain { RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, }; @@ -208,31 +202,23 @@ impl BucketEntryInner { false } - pub fn has_valid_signed_node_info(&self, routing_domain_set: RoutingDomainSet) -> bool { - for routing_domain in routing_domain_set { - // Get the correct signed_node_info for the chosen routing domain - let opt_current_sni = match routing_domain { - RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, - RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, - }; - if let Some(sni) = opt_current_sni { - if sni.is_valid() { - return true; - } - } - } - false - } - - pub fn node_info(&self, routing_domain: RoutingDomain) -> Option { + pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> { let opt_current_sni = match routing_domain { RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, }; - opt_current_sni.as_ref().map(|s| s.node_info.clone()) + opt_current_sni.as_ref().map(|s| &s.node_info) } - pub fn peer_info(&self, key: DHTKey, routing_domain: RoutingDomain) -> Option { + pub fn signed_node_info(&self, routing_domain: RoutingDomain) -> Option<&SignedNodeInfo> { + let opt_current_sni = match routing_domain { + RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + }; + opt_current_sni.as_ref().map(|s| s.as_ref()) + } + + pub fn make_peer_info(&self, key: DHTKey, routing_domain: RoutingDomain) -> Option { let opt_current_sni = match routing_domain { RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, @@ -253,18 +239,14 @@ impl BucketEntryInner { RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, }; if let Some(sni) = opt_current_sni { - if sni.is_valid() { - return Some(routing_domain); - } + return Some(routing_domain); } } None } fn descriptor_to_key(&self, last_connection: ConnectionDescriptor) -> LastConnectionKey { - let routing_domain = self.routing_domain_for_address(last_connection.remote().address()); LastConnectionKey( - routing_domain, last_connection.protocol_type(), last_connection.address_type(), ) @@ -285,17 +267,23 @@ impl BucketEntryInner { // Gets the best 'last connection' that matches a set of routing domain, protocol types and address types pub fn last_connection( &self, - routing_domain_set: RoutingDomainSet, - dial_info_filter: Option, + routing_table_inner: &RoutingTableInner, + node_ref_filter: &Option, ) -> Option<(ConnectionDescriptor, u64)> { // Iterate peer scopes and protocol types and address type in order to ensure we pick the preferred protocols if all else is the same - let dif = dial_info_filter.unwrap_or_default(); - for rd in routing_domain_set { - for pt in dif.protocol_type_set { - for at in dif.address_type_set { - let key = LastConnectionKey(rd, pt, at); - if let Some(v) = self.last_connections.get(&key) { - return Some(*v); + let nrf = node_ref_filter.unwrap_or_default(); + for pt in nrf.dial_info_filter.protocol_type_set { + for at in nrf.dial_info_filter.address_type_set { + let key = LastConnectionKey(pt, at); + if let Some(v) = self.last_connections.get(&key) { + // Verify this connection could be in the filtered routing domain + let address = v.0.remote_address().address(); + if let Some(rd) = + RoutingTable::routing_domain_for_address_inner(routing_table_inner, address) + { + if nrf.routing_domain_set.contains(rd) { + return Some(*v); + } } } } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 0c8b52a1..49929d98 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -52,7 +52,10 @@ impl RoutingTable { } // Make a filter that wraps another filter - pub fn combine_filters(mut f1: F, mut f2: G) -> impl FnMut(&BucketEntryInner) -> bool + pub fn combine_entry_filters( + mut f1: F, + mut f2: G, + ) -> impl FnMut(&BucketEntryInner) -> bool where F: FnMut(&BucketEntryInner) -> bool, G: FnMut(&BucketEntryInner) -> bool, @@ -81,7 +84,7 @@ impl RoutingTable { // count node_count, // filter - Some(|_k: DHTKey, v: Option>| { + |_k: DHTKey, v: Option>| { let entry = v.unwrap(); entry.with(|e| { // skip nodes on local network @@ -95,7 +98,7 @@ impl RoutingTable { // skip nodes that dont match entry filter entry_filter(e) }) - }), + }, // transform |k: DHTKey, v: Option>| { NodeRef::new(self.clone(), k, v.unwrap().clone(), None) @@ -118,16 +121,16 @@ impl RoutingTable { // count protocol_types.len() * 2 * max_per_type, // filter - Some(move |_k: DHTKey, v: Option>| { + move |_k: DHTKey, v: Option>| { let entry = v.unwrap(); entry.with(|e| { // skip nodes on our local network here - if e.has_node_info(RoutingDomainSet::only(RoutingDomain::LocalNetwork)) { + if e.has_node_info(RoutingDomain::LocalNetwork.into()) { return false; } // does it have some dial info we need? - let filter = |n: NodeInfo| { + let filter = |n: &NodeInfo| { let mut keep = false; for did in n.dial_info_detail_list { if matches!(did.dial_info.address_type(), AddressType::IPV4) { @@ -157,7 +160,7 @@ impl RoutingTable { .map(filter) .unwrap_or(false) }) - }), + }, // transform |k: DHTKey, v: Option>| { NodeRef::new(self.clone(), k, v.unwrap().clone(), None) @@ -167,14 +170,16 @@ impl RoutingTable { pub fn filter_has_valid_signed_node_info( &self, + routing_domain: RoutingDomain, v: Option>, - own_peer_info_is_valid: bool, - routing_domain_set: RoutingDomainSet, ) -> bool { - let routing_table = self.clone(); match v { - None => own_peer_info_is_valid, - Some(entry) => entry.with(|e| e.has_valid_signed_node_info(routing_domain_set)), + None => self.has_valid_own_node_info(routing_domain), + Some(entry) => entry.with(|e| { + e.signed_node_info(routing_domain.into()) + .map(|sni| sni.has_valid_signature()) + .unwrap_or(false) + }), } } @@ -183,12 +188,10 @@ impl RoutingTable { routing_domain: RoutingDomain, k: DHTKey, v: Option>, - own_peer_info: &PeerInfo, ) -> PeerInfo { - let routing_table = self.clone(); match v { - None => own_peer_info.clone(), - Some(entry) => entry.with(|e| e.peer_info(k, routing_domain).unwrap()), + None => self.get_own_peer_info(routing_domain), + Some(entry) => entry.with(|e| e.make_peer_info(k, routing_domain).unwrap()), } } @@ -246,8 +249,8 @@ impl RoutingTable { pub fn find_fastest_nodes( &self, node_count: usize, - mut filter: Option, - transform: T, + mut filter: F, + mut transform: T, ) -> Vec where F: FnMut(DHTKey, Option>) -> bool, @@ -264,7 +267,7 @@ impl RoutingTable { if entry.with(|e| e.state(cur_ts) == BucketEntryState::Dead) { false } else { - filter.as_mut().map(|f| f(k, v)).unwrap_or(true) + filter(k, v) } } else { // always filter out self peer, as it is irrelevant to the 'fastest nodes' search @@ -328,12 +331,12 @@ impl RoutingTable { pub fn find_closest_nodes( &self, node_id: DHTKey, - mut filter: Option, + mut filter: F, mut transform: T, ) -> Vec where - T: FnMut(DHTKey, Option>) -> O, F: FnMut(DHTKey, Option>) -> bool, + T: FnMut(DHTKey, Option>) -> O, { let cur_ts = intf::get_timestamp(); let node_count = { @@ -344,7 +347,7 @@ impl RoutingTable { node_count, cur_ts, // filter - |k, v| filter.as_mut().map(|f| f(k, v)).unwrap_or(true), + filter, // sort |(a_key, a_entry), (b_key, b_entry)| { // same nodes are always the same @@ -390,7 +393,7 @@ impl RoutingTable { let mut protocol_to_port = BTreeMap::<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>::new(); let our_dids = self.all_filtered_dial_info_details( - RoutingDomainSet::only(RoutingDomain::PublicInternet), + RoutingDomain::PublicInternet.into(), &DialInfoFilter::all(), ); for did in our_dids { @@ -417,12 +420,12 @@ impl RoutingTable { // Get all our outbound protocol/address types let outbound_dif = self .network_manager() - .get_outbound_node_ref_filter(RoutingDomain::PublicInternet); + .get_outbound_dial_info_filter(RoutingDomain::PublicInternet); let mapped_port_info = self.get_mapped_port_info(); move |e: &BucketEntryInner| { // Ensure this node is not on the local network - if e.has_node_info(RoutingDomainSet::only(RoutingDomain::LocalNetwork)) { + if e.has_node_info(RoutingDomain::LocalNetwork.into()) { return false; } @@ -549,7 +552,7 @@ impl RoutingTable { let res = network_result_try!( rpc_processor .clone() - .rpc_call_find_node(Destination::direct(node_ref), node_id,) + .rpc_call_find_node(Destination::direct(node_ref), node_id) .await? ); @@ -560,37 +563,24 @@ impl RoutingTable { } #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_self( - &self, - routing_domain: RoutingDomain, - node_ref: NodeRef, - ) -> EyreResult>> { + pub async fn find_self(&self, node_ref: NodeRef) -> EyreResult>> { let node_id = self.node_id(); - self.find_node(routing_domain, node_ref, node_id).await + self.find_node(node_ref, node_id).await } #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_target( - &self, - routing_domain: RoutingDomain, - node_ref: NodeRef, - ) -> EyreResult>> { + pub async fn find_target(&self, node_ref: NodeRef) -> EyreResult>> { let node_id = node_ref.node_id(); - self.find_node(routing_domain, node_ref, node_id).await + self.find_node(node_ref, node_id).await } #[instrument(level = "trace", skip(self))] - pub async fn reverse_find_node( - &self, - routing_domain: RoutingDomain, - node_ref: NodeRef, - wide: bool, - ) { + pub async fn reverse_find_node(&self, node_ref: NodeRef, wide: bool) { // Ask bootstrap node to 'find' our own node so we can get some more nodes near ourselves // and then contact those nodes to inform -them- that we exist // Ask bootstrap server for nodes closest to our own node - let closest_nodes = network_result_value_or_log!(debug match self.find_self(routing_domain, node_ref.clone()).await { + let closest_nodes = network_result_value_or_log!(debug match self.find_self(node_ref.clone()).await { Err(e) => { log_rtab!(error "find_self failed for {:?}: {:?}", @@ -606,7 +596,7 @@ impl RoutingTable { // Ask each node near us to find us as well if wide { for closest_nr in closest_nodes { - network_result_value_or_log!(debug match self.find_self(routing_domain, closest_nr.clone()).await { + network_result_value_or_log!(debug match self.find_self(closest_nr.clone()).await { Err(e) => { log_rtab!(error "find_self failed for {:?}: {:?}", diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 4c062c73..7d92a890 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -3,6 +3,7 @@ mod bucket_entry; mod debug; mod find_nodes; mod node_ref; +mod routing_domain_editor; mod routing_domains; mod stats_accounting; mod tasks; @@ -18,6 +19,7 @@ pub use debug::*; pub use find_nodes::*; use hashlink::LruCache; pub use node_ref::*; +pub use routing_domain_editor::*; pub use routing_domains::*; pub use stats_accounting::*; @@ -138,11 +140,13 @@ impl RoutingTable { self.inner.read().node_id_secret } - pub fn routing_domain_for_address(&self, address: Address) -> Option { - let inner = self.inner.read(); + pub fn routing_domain_for_address_inner( + inner: &RoutingTableInner, + address: Address, + ) -> Option { for rd in RoutingDomain::all() { let can_contain = - Self::with_routing_domain(&*inner, rd, |rdd| rdd.can_contain_address(address)); + Self::with_routing_domain(inner, rd, |rdd| rdd.can_contain_address(address)); if can_contain { return Some(rd); } @@ -150,6 +154,11 @@ impl RoutingTable { None } + pub fn routing_domain_for_address(&self, address: Address) -> Option { + let inner = self.inner.read(); + Self::routing_domain_for_address_inner(&*inner, address) + } + fn with_routing_domain(inner: &RoutingTableInner, domain: RoutingDomain, f: F) -> R where F: FnOnce(&dyn RoutingDomainDetail) -> R, @@ -256,6 +265,36 @@ impl RoutingTable { true } + pub fn node_info_is_valid_in_routing_domain( + &self, + routing_domain: RoutingDomain, + node_info: &NodeInfo, + ) -> bool { + // Should not be passing around nodeinfo with an invalid network class + if matches!(node_info.network_class, NetworkClass::Invalid) { + return false; + } + // Ensure all of the dial info works in this routing domain + for did in node_info.dial_info_detail_list { + if !self.ensure_dial_info_is_valid(routing_domain, &did.dial_info) { + return false; + } + } + // Ensure the relay is also valid in this routing domain if it is provided + if let Some(relay_peer_info) = node_info.relay_peer_info { + let relay_ni = &relay_peer_info.signed_node_info.node_info; + if !self.node_info_is_valid_in_routing_domain(routing_domain, relay_ni) { + return false; + } + } + true + } + + #[instrument(level = "debug", skip(self))] + pub fn edit_routing_domain(&self, domain: RoutingDomain) -> RoutingDomainEditor { + RoutingDomainEditor::new(self.clone(), domain) + } + #[instrument(level = "debug", skip(self), err)] pub fn register_dial_info( &self, @@ -349,7 +388,8 @@ impl RoutingTable { min_version: MIN_VERSION, max_version: MAX_VERSION, dial_info_detail_list: self.dial_info_details(routing_domain), - relay_peer_info: relay_node.and_then(|rn| rn.peer_info(routing_domain).map(Box::new)), + relay_peer_info: relay_node + .and_then(|rn| rn.make_peer_info(routing_domain).map(Box::new)), } } @@ -424,6 +464,8 @@ impl RoutingTable { Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_rti, e| { e.with_mut(|e| { e.clear_signed_node_info(RoutingDomain::LocalNetwork); + e.set_seen_our_node_info(RoutingDomain::LocalNetwork, false); + e.set_updated_since_last_network_change(false); }); Option::<()>::None }); @@ -549,7 +591,12 @@ impl RoutingTable { Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { // Only update nodes that haven't seen our node info yet if all || !v.with(|e| e.has_seen_our_node_info(routing_domain)) { - node_refs.push(NodeRef::new(self.clone(), k, v, None)); + node_refs.push(NodeRef::new( + self.clone(), + k, + v, + Some(NodeRefFilter::new().with_routing_domain(routing_domain)), + )); } Option::<()>::None }); @@ -572,10 +619,15 @@ impl RoutingTable { let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { if v.with(|e| { - e.has_node_info(RoutingDomainSet::only(routing_domain)) + e.has_node_info(routing_domain.into()) && e.needs_ping(&k, cur_ts, opt_relay_id == Some(k)) }) { - node_refs.push(NodeRef::new(self.clone(), k, v, None)); + node_refs.push(NodeRef::new( + self.clone(), + k, + v, + Some(NodeRefFilter::new().with_routing_domain(routing_domain)), + )); } Option::<()>::None }); @@ -670,12 +722,14 @@ impl RoutingTable { } // Shortcut function to add a node to our routing table if it doesn't exist - // and add the dial info we have for it, since that's pretty common + // and add the dial info we have for it. Returns a noderef filtered to + // the routing domain in which this node was registered for convenience. pub fn register_node_with_signed_node_info( &self, + routing_domain: RoutingDomain, node_id: DHTKey, signed_node_info: SignedNodeInfo, - allow_invalid_signature: bool, + allow_invalid: bool, ) -> Option { // validate signed node info is not something malicious if node_id == self.node_id() { @@ -688,11 +742,28 @@ impl RoutingTable { return None; } } + if !allow_invalid { + // verify signature + if !signed_node_info.has_valid_signature() { + log_rtab!(debug "signed node info for {} has invalid signature", node_id); + return None; + } + // verify signed node info is valid in this routing domain + if !self + .node_info_is_valid_in_routing_domain(routing_domain, &signed_node_info.node_info) + { + log_rtab!(debug "signed node info for {} not valid in the {:?} routing domain", node_id, routing_domain); + return None; + } + } + self.create_node_ref(node_id, |e| { - e.update_signed_node_info(signed_node_info, allow_invalid_signature); + e.update_signed_node_info(routing_domain, signed_node_info); }) .map(|mut nr| { - nr.set_filter(Some(NodeRefFilter::new().with_routing_domain(signed_node_info.routing_domain))) + nr.set_filter(Some( + NodeRefFilter::new().with_routing_domain(routing_domain), + )); nr }) } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index f02c9260..326b351d 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -14,7 +14,7 @@ pub struct NodeRefFilter { impl Default for NodeRefFilter { fn default() -> Self { - self.new() + Self::new() } } @@ -27,7 +27,7 @@ impl NodeRefFilter { } pub fn with_routing_domain(mut self, routing_domain: RoutingDomain) -> Self { - self.routing_domain_set = RoutingDomainSet::only(routing_domain); + self.routing_domain_set = routing_domain.into(); self } pub fn with_routing_domain_set(mut self, routing_domain_set: RoutingDomainSet) -> Self { @@ -62,7 +62,7 @@ impl NodeRefFilter { self } pub fn is_dead(&self) -> bool { - self.dial_info_filter.is_empty() || self.routing_domain_set.is_empty() + self.dial_info_filter.is_dead() || self.routing_domain_set.is_empty() } } @@ -154,6 +154,12 @@ impl NodeRef { .unwrap_or(RoutingDomainSet::all()) } + pub fn dial_info_filter(&self) -> DialInfoFilter { + self.filter + .map(|f| f.dial_info_filter) + .unwrap_or(DialInfoFilter::all()) + } + pub fn best_routing_domain(&self) -> Option { self.operate(|_rti, e| { e.best_routing_domain( @@ -171,9 +177,6 @@ impl NodeRef { pub fn node_id(&self) -> DHTKey { self.node_id } - pub fn has_valid_signed_node_info(&self) -> bool { - self.operate(|_rti, e| e.has_valid_signed_node_info(self.routing_domain_set())) - } pub fn has_updated_since_last_network_change(&self) -> bool { self.operate(|_rti, e| e.has_updated_since_last_network_change()) } @@ -196,8 +199,15 @@ impl NodeRef { } // Per-RoutingDomain accessors - pub fn peer_info(&self, routing_domain: RoutingDomain) -> Option { - self.operate(|_rti, e| e.peer_info(self.node_id(), routing_domain)) + pub fn make_peer_info(&self, routing_domain: RoutingDomain) -> Option { + self.operate(|_rti, e| e.make_peer_info(self.node_id(), routing_domain)) + } + pub fn signed_node_info_has_valid_signature(&self, routing_domain: RoutingDomain) -> bool { + self.operate(|_rti, e| { + e.signed_node_info(routing_domain) + .map(|sni| sni.has_valid_signature()) + .unwrap_or(false) + }) } pub fn has_seen_our_node_info(&self, routing_domain: RoutingDomain) -> bool { self.operate(|_rti, e| e.has_seen_our_node_info(routing_domain)) @@ -236,6 +246,7 @@ impl NodeRef { // Register relay node and return noderef self.routing_table.register_node_with_signed_node_info( + routing_domain, t.node_id.key, t.signed_node_info, false, @@ -246,15 +257,12 @@ impl NodeRef { // Filtered accessors pub fn first_filtered_dial_info_detail(&self) -> Option { let routing_domain_set = self.routing_domain_set(); + let dial_info_filter = self.dial_info_filter(); + self.operate(|_rt, e| { for routing_domain in routing_domain_set { if let Some(ni) = e.node_info(routing_domain) { - let filter = |did: &DialInfoDetail| { - self.filter - .as_ref() - .map(|f| did.matches_filter(f)) - .unwrap_or(true) - }; + let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter); if let Some(did) = ni.first_filtered_dial_info_detail(filter) { return Some(did); } @@ -266,16 +274,13 @@ impl NodeRef { pub fn all_filtered_dial_info_details(&self) -> Vec { let routing_domain_set = self.routing_domain_set(); + let dial_info_filter = self.dial_info_filter(); + let mut out = Vec::new(); self.operate(|_rt, e| { for routing_domain in routing_domain_set { if let Some(ni) = e.node_info(routing_domain) { - let filter = |did: &DialInfoDetail| { - self.filter - .as_ref() - .map(|f| did.matches_filter(f)) - .unwrap_or(true) - }; + let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter); if let Some(did) = ni.first_filtered_dial_info_detail(filter) { out.push(did); } @@ -288,12 +293,8 @@ impl NodeRef { pub async fn last_connection(&self) -> Option { // Get the last connection and the last time we saw anything with this connection - let (last_connection, last_seen) = self.operate(|_rti, e| { - e.last_connection( - self.filter.routing_domain_set, - self.filter.dial_info_filter.clone(), - ) - })?; + let (last_connection, last_seen) = + self.operate(|rti, e| e.last_connection(rti, &self.filter))?; // Should we check the connection table? if last_connection.protocol_type().is_connection_oriented() { diff --git a/veilid-core/src/routing_table/routing_domain_editor.rs b/veilid-core/src/routing_table/routing_domain_editor.rs new file mode 100644 index 00000000..53cbb395 --- /dev/null +++ b/veilid-core/src/routing_table/routing_domain_editor.rs @@ -0,0 +1,21 @@ +use super::*; + +enum RoutingDomainChange {} + +pub struct RoutingDomainEditor { + routing_table: RoutingTable, + routing_domain: RoutingDomain, + changes: Vec, +} + +impl RoutingDomainEditor { + pub(super) fn new(routing_table: RoutingTable, routing_domain: RoutingDomain) -> Self { + Self { + routing_table, + routing_domain, + changes: Vec::new(), + } + } + + pub fn commit(self) {} +} diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs index e23966aa..ea2ea1a7 100644 --- a/veilid-core/src/routing_table/routing_domains.rs +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -27,8 +27,11 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { self.relay_node.clone() } fn set_relay_node(&mut self, opt_relay_node: Option) { - self.relay_node = opt_relay_node - .map(|nr| nr.filtered_clone(NodeRefFilter::new().with_routing_domain(PublicInternet))) + self.relay_node = opt_relay_node.map(|nr| { + nr.filtered_clone( + NodeRefFilter::new().with_routing_domain(RoutingDomain::PublicInternet), + ) + }) } fn dial_info_details(&self) -> &Vec { &self.dial_info_details @@ -78,8 +81,9 @@ impl RoutingDomainDetail for LocalInternetRoutingDomainDetail { self.relay_node.clone() } fn set_relay_node(&mut self, opt_relay_node: Option) { - self.relay_node = opt_relay_node - .map(|nr| nr.filtered_clone(NodeRefFilter::new().with_routing_domain(LocalNetwork))); + self.relay_node = opt_relay_node.map(|nr| { + nr.filtered_clone(NodeRefFilter::new().with_routing_domain(RoutingDomain::LocalNetwork)) + }); } fn dial_info_details(&self) -> &Vec { &self.dial_info_details diff --git a/veilid-core/src/rpc_processor/coders/node_status.rs b/veilid-core/src/rpc_processor/coders/node_status.rs index 3948c7c6..a07e977f 100644 --- a/veilid-core/src/rpc_processor/coders/node_status.rs +++ b/veilid-core/src/rpc_processor/coders/node_status.rs @@ -5,11 +5,11 @@ pub fn encode_public_internet_node_status( public_internet_node_status: &PublicInternetNodeStatus, builder: &mut veilid_capnp::public_internet_node_status::Builder, ) -> Result<(), RPCError> { - builder.set_will_route(node_status.will_route); - builder.set_will_tunnel(node_status.will_tunnel); - builder.set_will_signal(node_status.will_signal); - builder.set_will_relay(node_status.will_relay); - builder.set_will_validate_dial_info(node_status.will_validate_dial_info); + builder.set_will_route(public_internet_node_status.will_route); + builder.set_will_tunnel(public_internet_node_status.will_tunnel); + builder.set_will_signal(public_internet_node_status.will_signal); + builder.set_will_relay(public_internet_node_status.will_relay); + builder.set_will_validate_dial_info(public_internet_node_status.will_validate_dial_info); Ok(()) } @@ -30,8 +30,8 @@ pub fn encode_local_network_node_status( local_network_node_status: &LocalNetworkNodeStatus, builder: &mut veilid_capnp::local_network_node_status::Builder, ) -> Result<(), RPCError> { - builder.set_will_relay(node_status.will_relay); - builder.set_will_validate_dial_info(node_status.will_validate_dial_info); + builder.set_will_relay(local_network_node_status.will_relay); + builder.set_will_validate_dial_info(local_network_node_status.will_validate_dial_info); Ok(()) } @@ -39,7 +39,7 @@ pub fn encode_local_network_node_status( pub fn decode_local_network_node_status( reader: &veilid_capnp::local_network_node_status::Reader, ) -> Result { - Ok(NodeStatus { + Ok(LocalNetworkNodeStatus { will_relay: reader.reborrow().get_will_relay(), will_validate_dial_info: reader.reborrow().get_will_validate_dial_info(), }) @@ -50,17 +50,15 @@ pub fn encode_node_status( builder: &mut veilid_capnp::node_status::Builder, ) -> Result<(), RPCError> { match node_status { - NodeStatus::PublicInternetNodeStatus(ns) => { + NodeStatus::PublicInternet(ns) => { let mut pi_builder = builder.reborrow().init_public_internet(); encode_public_internet_node_status(&ns, &mut pi_builder) } - NodeStatus::LocalNetworkNodeStatus(ns) => { + NodeStatus::LocalNetwork(ns) => { let mut ln_builder = builder.reborrow().init_local_network(); encode_local_network_node_status(&ns, &mut ln_builder) } } - - Ok(()) } pub fn decode_node_status( @@ -72,7 +70,7 @@ pub fn decode_node_status( .map_err(RPCError::map_internal("invalid node status"))? { veilid_capnp::node_status::PublicInternet(pi) => { - let r = r.map_err(RPCError::protocol)?; + let r = pi.map_err(RPCError::protocol)?; let pins = decode_public_internet_node_status(&r)?; NodeStatus::PublicInternet(pins) } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs index 11491693..aa7185a4 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -58,22 +58,25 @@ impl RPCOperationKind { #[derive(Debug, Clone)] pub struct RPCOperation { op_id: u64, - sender_info: Option, + sender_node_info: Option, kind: RPCOperationKind, } impl RPCOperation { - pub fn new_question(question: RPCQuestion, sender_info: Option) -> Self { + pub fn new_question(question: RPCQuestion, sender_node_info: Option) -> Self { Self { op_id: intf::get_random_u64(), - sender_info, + sender_node_info, kind: RPCOperationKind::Question(question), } } - pub fn new_statement(statement: RPCStatement, sender_info: Option) -> Self { + pub fn new_statement( + statement: RPCStatement, + sender_node_info: Option, + ) -> Self { Self { op_id: intf::get_random_u64(), - sender_info, + sender_node_info, kind: RPCOperationKind::Statement(statement), } } @@ -81,11 +84,11 @@ impl RPCOperation { pub fn new_answer( request: &RPCOperation, answer: RPCAnswer, - sender_info: Option, + sender_node_info: Option, ) -> Self { Self { op_id: request.op_id, - sender_info, + sender_node_info, kind: RPCOperationKind::Answer(answer), } } @@ -94,8 +97,8 @@ impl RPCOperation { self.op_id } - pub fn sender_info(&self) -> Option<&SignedNodeInfo> { - self.sender_info.as_ref() + pub fn sender_node_info(&self) -> Option<&SignedNodeInfo> { + self.sender_node_info.as_ref() } pub fn kind(&self) -> &RPCOperationKind { @@ -112,8 +115,10 @@ impl RPCOperation { ) -> Result { let op_id = operation_reader.get_op_id(); - let sender_info = if operation_reader.has_sender_info() { - let sni_reader = operation_reader.get_sender_info(); + let sender_node_info = if operation_reader.has_sender_node_info() { + let sni_reader = operation_reader + .get_sender_node_info() + .map_err(RPCError::protocol)?; let sni = decode_signed_node_info(&sni_reader, sender_node_id, true)?; Some(sni) } else { @@ -125,7 +130,7 @@ impl RPCOperation { Ok(RPCOperation { op_id, - sender_info, + sender_node_info, kind, }) } @@ -134,9 +139,9 @@ impl RPCOperation { builder.set_op_id(self.op_id); let mut k_builder = builder.reborrow().init_kind(); self.kind.encode(&mut k_builder)?; - if let Some(sender_info) = self.sender_info { - let si_builder = builder.reborrow().init_sender_info(); - encode_signed_node_info(&self.sender_info, &mut si_builder)?; + if let Some(sender_info) = self.sender_node_info { + let si_builder = builder.reborrow().init_sender_node_info(); + encode_signed_node_info(&sender_info, &mut si_builder)?; } Ok(()) } diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index 995f17c0..bbbd84c5 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -48,6 +48,57 @@ impl Destination { safety_route_spec: None, } } + // pub fn target_id(&self) -> DHTKey { + // match self { + // Destination::Direct { + // target, + // safety_route_spec, + // } => target.node_id(), + // Destination::Relay { + // relay, + // target, + // safety_route_spec, + // } => *target, + // Destination::PrivateRoute { + // private_route, + // safety_route_spec, + // } => {} + // } + // } + + // pub fn best_routing_domain(&self) -> RoutingDomain { + // match self { + // Destination::Direct { + // target, + // safety_route_spec, + // } => { + // if safety_route_spec.is_some() { + // RoutingDomain::PublicInternet + // } else { + // target + // .best_routing_domain() + // .unwrap_or(RoutingDomain::PublicInternet) + // } + // } + // Destination::Relay { + // relay, + // target, + // safety_route_spec, + // } => { + // if safety_route_spec.is_some() { + // RoutingDomain::PublicInternet + // } else { + // relay + // .best_routing_domain() + // .unwrap_or(RoutingDomain::PublicInternet) + // } + // } + // Destination::PrivateRoute { + // private_route: _, + // safety_route_spec: _, + // } => RoutingDomain::PublicInternet, + // } + // } pub fn safety_route_spec(&self) -> Option> { match self { @@ -100,7 +151,6 @@ impl fmt::Display for Destination { match self { Destination::Direct { target, - routing_domain, safety_route_spec, } => { let sr = safety_route_spec diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 3366d7a1..cb38b124 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -200,22 +200,8 @@ impl RPCProcessor { /// Determine if a NodeInfo can be placed into the specified routing domain fn filter_node_info(&self, routing_domain: RoutingDomain, node_info: &NodeInfo) -> bool { - // reject attempts to include non-public addresses in results - for did in &node_info.dial_info_detail_list { - if !did.dial_info.is_global() { - // non-public address causes rejection - return false; - } - } - if let Some(rpi) = &node_info.relay_peer_info { - for did in &rpi.signed_node_info.node_info.dial_info_detail_list { - if !did.dial_info.is_global() { - // non-public address causes rejection - return false; - } - } - } - true + let routing_table = self.routing_table(); + routing_table.node_info_is_valid_in_routing_domain(routing_domain, &node_info) } ////////////////////////////////////////////////////////////////////// @@ -360,9 +346,6 @@ impl RPCProcessor { waitable_reply.node_ref.stats_question_lost(); } Ok(TimeoutOr::Value((rpcreader, _))) => { - // Note that the remote node definitely received this node info since we got a reply - waitable_reply.node_ref.set_seen_our_node_info(); - // Reply received let recv_ts = intf::get_timestamp(); waitable_reply.node_ref.stats_answer_rcvd( @@ -403,13 +386,11 @@ impl RPCProcessor { match dest { Destination::Direct { target: node_ref, - routing_domain, safety_route_spec, } | Destination::Relay { relay: node_ref, target: _, - routing_domain, safety_route_spec, } => { // Send to a node without a private route @@ -419,7 +400,6 @@ impl RPCProcessor { let (node_ref, node_id) = if let Destination::Relay { relay: _, target: dht_key, - routing_domain: _, safety_route_spec: _, } = dest { @@ -508,6 +488,53 @@ impl RPCProcessor { }) } + // Get signed node info to package with RPC messages to improve + // routing table caching when it is okay to do so + // This is only done in the PublicInternet routing domain because + // as far as we can tell this is the only domain that will really benefit + fn get_sender_signed_node_info(&self, dest: &Destination) -> Option { + // Don't do this if the sender is to remain private + if dest.safety_route_spec().is_some() { + return None; + } + // Don't do this if our own signed node info isn't valid yet + let routing_table = self.routing_table(); + if !routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet) { + return None; + } + + match dest { + Destination::Direct { + target, + safety_route_spec: _, + } => { + // If the target has seen our node info already don't do this + if target.has_seen_our_node_info(RoutingDomain::PublicInternet) { + return None; + } + Some(routing_table.get_own_signed_node_info(RoutingDomain::PublicInternet)) + } + Destination::Relay { + relay: _, + target, + safety_route_spec: _, + } => { + if let Some(target) = routing_table.lookup_node_ref(*target) { + if target.has_seen_our_node_info(RoutingDomain::PublicInternet) { + return None; + } + Some(routing_table.get_own_signed_node_info(RoutingDomain::PublicInternet)) + } else { + None + } + } + Destination::PrivateRoute { + private_route: _, + safety_route_spec: _, + } => None, + } + } + // Issue a question over the network, possibly using an anonymized route #[instrument(level = "debug", skip(self, question), err)] async fn question( @@ -515,18 +542,11 @@ impl RPCProcessor { dest: Destination, question: RPCQuestion, ) -> Result, RPCError> { - // Get sender info if we should send that - let opt_sender_info = if dest.safety_route_spec().is_none() && matches!(question.respond_to(), RespondTo::Sender) { - // Sender is not private, send sender info if needed - // Get the noderef of the eventual destination or first route hop - if let Some(target_nr) = self.routing_table().lookup_node_ref(dest.get_target_id()) { - if target_nr.has_seen_our_node_info(R) - } - } + let opt_sender_info = self.get_sender_signed_node_info(&dest); // Wrap question in operation - let operation = RPCOperation::new_question(question); + let operation = RPCOperation::new_question(question, opt_sender_info); let op_id = operation.op_id(); // Log rpc send @@ -538,7 +558,7 @@ impl RPCProcessor { node_id, node_ref, hop_count, - } = self.render_operation(dest, &operation, safety_route_spec)?; + } = self.render_operation(dest, &operation)?; // If we need to resolve the first hop, do it let node_ref = match node_ref { @@ -594,14 +614,17 @@ impl RPCProcessor { } // Issue a statement over the network, possibly using an anonymized route - #[instrument(level = "debug", skip(self, statement, safety_route_spec), err)] + #[instrument(level = "debug", skip(self, statement), err)] async fn statement( &self, dest: Destination, statement: RPCStatement, ) -> Result, RPCError> { + // Get sender info if we should send that + let opt_sender_info = self.get_sender_signed_node_info(&dest); + // Wrap statement in operation - let operation = RPCOperation::new_statement(statement); + let operation = RPCOperation::new_statement(statement, opt_sender_info); // Log rpc send debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); @@ -612,7 +635,7 @@ impl RPCProcessor { node_id, node_ref, hop_count: _, - } = self.render_operation(dest, &operation, safety_route_spec)?; + } = self.render_operation(dest, &operation)?; // If we need to resolve the first hop, do it let node_ref = match node_ref { @@ -662,7 +685,7 @@ impl RPCProcessor { // To where should we respond? match respond_to { - RespondTo::Sender(_) => { + RespondTo::Sender => { // Reply directly to the request's source let sender_id = request.header.envelope.get_sender_id(); @@ -672,29 +695,32 @@ impl RPCProcessor { // If the sender_id is that of the peer, then this is a direct reply // else it is a relayed reply through the peer if peer_noderef.node_id() == sender_id { - Destination::Direct(peer_noderef) + Destination::direct(peer_noderef) } else { - Destination::Relay(peer_noderef, sender_id) + Destination::relay(peer_noderef, sender_id) } } - RespondTo::PrivateRoute(pr) => Destination::PrivateRoute(pr.clone()), + RespondTo::PrivateRoute(pr) => Destination::private_route(pr.clone()), } } // Issue a reply over the network, possibly using an anonymized route // The request must want a response, or this routine fails - #[instrument(level = "debug", skip(self, request, answer, safety_route_spec), err)] + #[instrument(level = "debug", skip(self, request, answer), err)] async fn answer( &self, request: RPCMessage, answer: RPCAnswer, ) -> Result, RPCError> { - // Wrap answer in operation - let operation = RPCOperation::new_answer(&request.operation, answer); - // Extract destination from respond_to let dest = self.get_respond_to_destination(&request); + // Get sender info if we should send that + let opt_sender_info = self.get_sender_signed_node_info(&dest); + + // Wrap answer in operation + let operation = RPCOperation::new_answer(&request.operation, answer, opt_sender_info); + // Log rpc send debug!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); @@ -704,7 +730,7 @@ impl RPCProcessor { node_id, node_ref, hop_count: _, - } = self.render_operation(dest, &operation, safety_route_spec)?; + } = self.render_operation(dest, &operation)?; // If we need to resolve the first hop, do it let node_ref = match node_ref { @@ -747,8 +773,7 @@ impl RPCProcessor { &self, encoded_msg: RPCMessageEncoded, ) -> Result<(), RPCError> { - - // Get the routing domain + // Get the routing domain this message came over let routing_domain = encoded_msg.header.routing_domain; // Decode the operation @@ -764,35 +789,34 @@ impl RPCProcessor { RPCOperation::decode(&op_reader, &sender_node_id)? }; - // Get the sender noderef, incorporating and 'sender node info' we have from a question + // Get the sender noderef, incorporating and 'sender node info' let mut opt_sender_nr: Option = None; - match operation.kind() { - RPCOperationKind::Question(q) => { - match q.respond_to() { - RespondTo::Sender(Some(sender_ni)) => { - // Sender NodeInfo was specified, update our routing table with it - if !self.filter_node_info(&sender_ni.node_info) { - return Err(RPCError::invalid_format( - "respond_to_sender_signed_node_info has invalid peer scope", - )); - } - opt_sender_nr = self.routing_table().register_node_with_signed_node_info( - routing_domain, - sender_node_id, - sender_ni.clone(), - false, - ); - } - _ => {} - } + if let Some(sender_node_info) = operation.sender_node_info() { + // Sender NodeInfo was specified, update our routing table with it + if !self.filter_node_info(RoutingDomain::PublicInternet, &sender_node_info.node_info) { + return Err(RPCError::invalid_format( + "sender signednodeinfo has invalid peer scope", + )); } - _ => {} - }; + opt_sender_nr = self.routing_table().register_node_with_signed_node_info( + routing_domain, + sender_node_id, + sender_node_info.clone(), + false, + ); + } + + // look up sender node, in case it's different than our peer due to relaying if opt_sender_nr.is_none() { - // look up sender node, in case it's different than our peer due to relaying opt_sender_nr = self.routing_table().lookup_node_ref(sender_node_id) } + // Mark this sender as having seen our node info over this routing domain + // because it managed to reach us over that routing domain + if let Some(sender_nr) = &opt_sender_nr { + sender_nr.set_seen_our_node_info(routing_domain); + } + // Make the RPC message let msg = RPCMessage { header: encoded_msg.header, diff --git a/veilid-core/src/rpc_processor/private_route.rs b/veilid-core/src/rpc_processor/private_route.rs index 179b4052..dba50d60 100644 --- a/veilid-core/src/rpc_processor/private_route.rs +++ b/veilid-core/src/rpc_processor/private_route.rs @@ -139,7 +139,7 @@ impl RPCProcessor { operation, }; let operation = - RPCOperation::new_statement(RPCStatement::new(RPCStatementDetail::Route(route))); + RPCOperation::new_statement(RPCStatement::new(RPCStatementDetail::Route(route)), None); // Convert message to bytes and return it let mut route_msg = ::capnp::message::Builder::new_default(); diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index e1d633e9..60eea998 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -9,7 +9,9 @@ impl RPCProcessor { dest: Destination, key: DHTKey, ) -> Result>>, RPCError> { - let find_node_q = RPCQuestionDetail::FindNodeQ(RPCOperationFindNodeQ { node_id: key }); + let find_node_q_detail = + RPCQuestionDetail::FindNodeQ(RPCOperationFindNodeQ { node_id: key }); + let find_node_q = RPCQuestion::new(RespondTo::Sender, find_node_q_detail); // Send the find_node request let waitable_reply = network_result_try!(self.question(dest, find_node_q).await?); @@ -31,7 +33,10 @@ impl RPCProcessor { // Verify peers are in the correct peer scope for peer_info in &find_node_a.peers { - if !self.filter_node_info(&peer_info.signed_node_info.node_info) { + if !self.filter_node_info( + RoutingDomain::PublicInternet, + &peer_info.signed_node_info.node_info, + ) { return Err(RPCError::invalid_format( "find_node response has invalid peer scope", )); @@ -61,23 +66,12 @@ impl RPCProcessor { let rt3 = routing_table.clone(); // find N nodes closest to the target node in our routing table - let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet); - let own_peer_info_is_valid = own_peer_info.signed_node_info.is_valid(); - let closest_nodes = routing_table.find_closest_nodes( find_node_q.node_id, // filter - Some(move |_k, v| { - rt2.filter_has_valid_signed_node_info( - v, - own_peer_info_is_valid, - Some(RoutingDomain::PublicInternet), - ) - }), + move |_k, v| rt2.filter_has_valid_signed_node_info(RoutingDomain::PublicInternet, v), // transform - move |k, v| { - rt3.transform_to_peer_info(RoutingDomain::PublicInternet, k, v, &own_peer_info) - }, + move |k, v| rt3.transform_to_peer_info(RoutingDomain::PublicInternet, k, v), ); // Make status answer @@ -87,11 +81,7 @@ impl RPCProcessor { // Send status answer let res = self - .answer( - msg, - RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)), - None, - ) + .answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a))) .await?; tracing::Span::current().record("res", &tracing::field::display(res)); Ok(()) diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index aa721d3d..6e61c754 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -18,7 +18,9 @@ impl RPCProcessor { // Send the node_info_update request to the specific routing domain requested network_result_try!( self.statement( - Destination::direct(target).with_routing_domain(routing_domain), + Destination::direct( + target.filtered_clone(NodeRefFilter::new().with_routing_domain(routing_domain)) + ), statement, ) .await? @@ -30,6 +32,7 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_node_info_update(&self, msg: RPCMessage) -> Result<(), RPCError> { let sender_node_id = msg.header.envelope.get_sender_id(); + let routing_domain = msg.header.routing_domain; // Get the statement let node_info_update = match msg.operation.into_kind() { @@ -41,14 +44,13 @@ impl RPCProcessor { }; // Update our routing table with signed node info - if !self.filter_node_info(&node_info_update.signed_node_info.node_info) { - log_rpc!(debug - "node_info_update has invalid peer scope from {}", sender_node_id - ); + if !self.filter_node_info(routing_domain, &node_info_update.signed_node_info.node_info) { + log_rpc!(debug "node info doesn't belong in {:?} routing domain: {}", routing_domain, sender_node_id); return Ok(()); } self.routing_table().register_node_with_signed_node_info( + routing_domain, sender_node_id, node_info_update.signed_node_info, false, diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index 838020ae..31c19023 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -7,7 +7,6 @@ impl RPCProcessor { pub async fn rpc_call_return_receipt>( self, dest: Destination, - safety_route: Option<&SafetyRouteSpec>, receipt: D, ) -> Result, RPCError> { let receipt = receipt.as_ref().to_vec(); @@ -16,7 +15,7 @@ impl RPCProcessor { let statement = RPCStatement::new(RPCStatementDetail::ReturnReceipt(return_receipt)); // Send the return_receipt request - network_result_try!(self.statement(dest, statement, safety_route).await?); + network_result_try!(self.statement(dest, statement).await?); Ok(NetworkResult::value(())) } diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 51d045cb..3773e1bc 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -7,15 +7,13 @@ impl RPCProcessor { pub async fn rpc_call_signal( self, dest: Destination, - safety_route: Option<&SafetyRouteSpec>, signal_info: SignalInfo, ) -> Result, RPCError> { - //let signed_node_info = self.routing_table().get_own_signed_node_info(); let signal = RPCOperationSignal { signal_info }; let statement = RPCStatement::new(RPCStatementDetail::Signal(signal)); // Send the signal request - network_result_try!(self.statement(dest, statement, safety_route).await?); + network_result_try!(self.statement(dest, statement).await?); Ok(NetworkResult::value(())) } diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index b7285701..079665d4 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -118,17 +118,15 @@ impl RPCProcessor { // Ensure the node status from the question is the kind for the routing domain we received the request in match routing_domain { RoutingDomain::PublicInternet => { - if !matches!(status_a.node_status, NodeStatus::PublicInternet(_)) { - return Ok(NetworkResult::invalid_message( - "node status doesn't match PublicInternet routing domain", - )); + if !matches!(status_q.node_status, NodeStatus::PublicInternet(_)) { + log_rpc!(debug "node status doesn't match PublicInternet routing domain"); + return Ok(()); } } RoutingDomain::LocalNetwork => { - if !matches!(status_a.node_status, NodeStatus::LocalNetwork(_)) { - return Ok(NetworkResult::invalid_message( - "node status doesn't match LocalNetwork routing domain", - )); + if !matches!(status_q.node_status, NodeStatus::LocalNetwork(_)) { + log_rpc!(debug "node status doesn't match LocalNetwork routing domain"); + return Ok(()); } } } diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 85e74703..3ff0f693 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -32,7 +32,7 @@ impl RPCProcessor { // Send the validate_dial_info request // This can only be sent directly, as relays can not validate dial info - network_result_value_or_log!(debug self.statement(Destination::Direct(peer), statement, None) + network_result_value_or_log!(debug self.statement(Destination::direct(peer), statement) .await? => { return Ok(false); } @@ -81,6 +81,7 @@ impl RPCProcessor { // an ipv6 address let routing_table = self.routing_table(); let sender_id = msg.header.envelope.get_sender_id(); + let routing_domain = msg.header.routing_domain; let node_count = { let c = self.config.get(); c.network.dht.max_find_node_count as usize @@ -88,15 +89,18 @@ impl RPCProcessor { // Filter on nodes that can validate dial info, and can reach a specific dial info let outbound_dial_info_entry_filter = - RoutingTable::make_outbound_dial_info_entry_filter(dial_info.clone()); + RoutingTable::make_outbound_dial_info_entry_filter( + routing_domain, + dial_info.clone(), + ); let will_validate_dial_info_filter = |e: &BucketEntryInner| { - if let Some(status) = &e.peer_stats().status { - status.will_validate_dial_info + if let Some(status) = &e.node_status(routing_domain) { + status.will_validate_dial_info() } else { true } }; - let filter = RoutingTable::combine_filters( + let filter = RoutingTable::combine_entry_filters( outbound_dial_info_entry_filter, will_validate_dial_info_filter, ); @@ -126,7 +130,7 @@ impl RPCProcessor { // Send the validate_dial_info request // This can only be sent directly, as relays can not validate dial info - network_result_value_or_log!(debug self.statement(Destination::Direct(peer), statement, None) + network_result_value_or_log!(debug self.statement(Destination::direct(peer), statement) .await? => { return Ok(()); } diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index b4fb3423..39b128ab 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -304,25 +304,41 @@ impl VeilidAPI { None => return Ok("Node id not found in routing table".to_owned()), }; - if args.len() >= 2 { - let pt = get_debug_argument_at( + let mut ai = 1; + let mut routing_domain = None; + while ai < args.len() { + if let Ok(pt) = get_debug_argument_at( &args, - 1, + ai, "debug_contact", "protocol_type", get_protocol_type, - )?; - nr.merge_filter(DialInfoFilter::all().with_protocol_type(pt)); - if args.len() >= 3 { - let at = get_debug_argument_at( - &args, - 2, - "debug_contact", - "address_type", - get_address_type, - )?; - nr.merge_filter(DialInfoFilter::all().with_address_type(at)); + ) { + nr.merge_filter(NodeRefFilter::new().with_protocol_type(pt)); + } else if let Ok(at) = + get_debug_argument_at(&args, ai, "debug_contact", "address_type", get_address_type) + { + nr.merge_filter(NodeRefFilter::new().with_address_type(at)); + } else if let Ok(rd) = get_debug_argument_at( + &args, + ai, + "debug_contact", + "routing_domain", + get_routing_domain, + ) { + if routing_domain.is_none() { + routing_domain = Some(rd); + } else { + return Ok("Multiple routing domains specified".to_owned()); + } + } else { + return Ok(format!("Invalid argument specified: {}", args[ai])); } + ai += 1; + } + + if let Some(routing_domain) = routing_domain { + nr.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain)) } let cm = network_manager.get_contact_method(nr); @@ -331,11 +347,14 @@ impl VeilidAPI { } async fn debug_ping(&self, args: String) -> Result { + let netman = self.network_manager()?; + let routing_table = netman.routing_table(); + let rpc = netman.rpc_processor(); + let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); let node_id = get_debug_argument_at(&args, 0, "debug_ping", "node_id", get_dht_key)?; - let routing_table = self.network_manager()?.routing_table(); let mut nr = match routing_table.lookup_node_ref(node_id) { Some(nr) => nr, None => return Ok("Node id not found in routing table".to_owned()), @@ -347,11 +366,11 @@ impl VeilidAPI { if let Ok(pt) = get_debug_argument_at(&args, ai, "debug_ping", "protocol_type", get_protocol_type) { - nr.merge_filter(DialInfoFilter::all().with_protocol_type(pt)); + nr.merge_filter(NodeRefFilter::new().with_protocol_type(pt)); } else if let Ok(at) = get_debug_argument_at(&args, ai, "debug_ping", "address_type", get_address_type) { - nr.merge_filter(DialInfoFilter::all().with_address_type(at)); + nr.merge_filter(NodeRefFilter::new().with_address_type(at)); } else if let Ok(rd) = get_debug_argument_at( &args, ai, @@ -370,11 +389,13 @@ impl VeilidAPI { ai += 1; } - let rpc = self.network_manager()?.rpc_processor(); + if let Some(routing_domain) = routing_domain { + nr.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain)) + } // Dump routing table entry let out = match rpc - .rpc_call_status(routing_domain, nr) + .rpc_call_status(nr) .await .map_err(VeilidAPIError::internal)? { diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 7f90eda9..dc7f9078 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -460,31 +460,6 @@ pub struct NodeInfo { } impl NodeInfo { - pub fn is_valid_in_routing_domain( - &self, - routing_table: RoutingTable, - routing_domain: RoutingDomain, - ) -> bool { - // Should not be passing around nodeinfo with an invalid network class - if matches!(self.network_class, NetworkClass::Invalid) { - return false; - } - // Ensure all of the dial info works in this routing domain - for did in self.dial_info_detail_list { - if !routing_table.ensure_dial_info_is_valid(routing_domain, &did.dial_info) { - return false; - } - } - // Ensure the relay is also valid in this routing domain if it is provided - if let Some(relay_peer_info) = self.relay_peer_info { - let relay_ni = &relay_peer_info.signed_node_info.node_info; - if !relay_ni.is_valid_in_routing_domain(routing_table, routing_domain) { - return false; - } - } - true - } - pub fn first_filtered_dial_info_detail(&self, filter: F) -> Option where F: Fn(&DialInfoDetail) -> bool, @@ -805,7 +780,7 @@ impl FromStr for SocketAddress { ////////////////////////////////////////////////////////////////// -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct DialInfoFilter { pub protocol_type_set: ProtocolTypeSet, pub address_type_set: AddressTypeSet, @@ -1449,8 +1424,8 @@ impl SignedNodeInfo { } } - pub fn is_valid(&self) -> bool { - self.signature.valid && self.node_info.is_valid() + pub fn has_valid_signature(&self) -> bool { + self.signature.valid } }