From abc67f9606cbeb1af3d46ac9c8931aef8c291990 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Wed, 24 Jul 2024 13:52:47 -0400 Subject: [PATCH] checkpoint --- veilid-core/src/network_manager/mod.rs | 12 +- veilid-core/src/network_manager/send_data.rs | 45 +++- veilid-core/src/routing_table/bucket_entry.rs | 13 + veilid-core/src/routing_table/find_peers.rs | 4 +- veilid-core/src/routing_table/mod.rs | 58 +++-- veilid-core/src/routing_table/node_ref.rs | 18 +- veilid-core/src/routing_table/privacy.rs | 10 +- .../src/routing_table/route_spec_store/mod.rs | 25 +- .../src/routing_table/routing_table_inner.rs | 233 ++++++++++-------- .../src/routing_table/tasks/bootstrap.rs | 2 +- .../tasks/closest_peers_refresh.rs | 2 +- .../tasks/peer_minimum_refresh.rs | 2 +- .../routing_table/tasks/relay_management.rs | 55 +++-- veilid-core/src/routing_table/types/mod.rs | 2 + .../src/routing_table/types/peer_info.rs | 6 + .../src/routing_table/types/safety_domain.rs | 40 +++ .../coders/private_safety_route.rs | 5 + veilid-core/src/rpc_processor/destination.rs | 157 +++++++++++- veilid-core/src/rpc_processor/fanout_call.rs | 22 +- veilid-core/src/rpc_processor/mod.rs | 56 ++++- .../src/rpc_processor/rpc_find_node.rs | 27 +- .../src/rpc_processor/rpc_get_value.rs | 20 +- .../src/rpc_processor/rpc_inspect_value.rs | 18 +- .../src/rpc_processor/rpc_set_value.rs | 18 +- veilid-core/src/rpc_processor/rpc_signal.rs | 15 +- veilid-core/src/rpc_processor/rpc_status.rs | 4 + .../src/rpc_processor/rpc_value_changed.rs | 2 +- .../src/rpc_processor/rpc_watch_value.rs | 28 ++- veilid-core/src/storage_manager/get_value.rs | 4 +- .../src/storage_manager/inspect_value.rs | 2 +- veilid-core/src/storage_manager/set_value.rs | 4 +- .../src/storage_manager/watch_value.rs | 2 +- veilid-core/src/veilid_api/debug.rs | 39 ++- 33 files changed, 689 insertions(+), 261 deletions(-) create mode 100644 veilid-core/src/routing_table/types/safety_domain.rs diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index ba956405..e09b5569 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -781,6 +781,7 @@ impl NetworkManager { // Add the peer info to our routing table let mut peer_nr = match routing_table.register_node_with_peer_info( RoutingDomain::PublicInternet, + SafetyDomainSet::all(), peer_info, false, ) { @@ -810,6 +811,7 @@ impl NetworkManager { // Add the peer info to our routing table let mut peer_nr = match routing_table.register_node_with_peer_info( RoutingDomain::PublicInternet, + SafetyDomainSet::all(), peer_info, false, ) { @@ -906,12 +908,15 @@ impl NetworkManager { } /// Called by the RPC handler when we want to issue an RPC request or response + /// safety_domain is used to determine if this is being sent in an unsafe context + /// and should reject attempts to send to safety-only nodes /// node_ref is the direct destination to which the envelope will be sent /// If 'destination_node_ref' is specified, it can be different than the node_ref being sent to /// which will cause the envelope to be relayed #[instrument(level = "trace", target = "net", skip_all)] pub async fn send_envelope>( &self, + safety_domain: SafetyDomain, node_ref: NodeRef, destination_node_ref: Option, body: B, @@ -947,7 +952,7 @@ impl NetworkManager { } // Send the envelope via whatever means necessary - self.send_data(node_ref, out).await + self.send_data(safety_domain, node_ref, out).await } /// Called by the RPC handler when we want to issue an direct receipt @@ -1141,9 +1146,10 @@ impl NetworkManager { }; // Relay the packet to the desired destination + // Relayed packets are never received over a safety route so they are implicitly + // in the SafetyDomain::Unsafe log_net!("relaying {} bytes to {}", data.len(), relay_nr); - - network_result_value_or_log!(match self.send_data(relay_nr, data.to_vec()) + network_result_value_or_log!(match self.send_data(SafetyDomain::Unsafe, relay_nr, data.to_vec()) .await { Ok(v) => v, Err(e) => { diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 722610a4..e503bc1d 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -15,10 +15,14 @@ impl NetworkManager { #[instrument(level="trace", target="net", skip_all, err)] pub(crate) async fn send_data( &self, + safety_domain: SafetyDomain, destination_node_ref: NodeRef, data: Vec, ) -> EyreResult> { + // First try to send data to the last flow we've seen this peer on + // If we have an existing flow, then we do not need to check the safety domain + // as a connection has already been established let data = if let Some(flow) = destination_node_ref.last_flow() { match self .net() @@ -52,11 +56,12 @@ impl NetworkManager { // Get the best way to contact this node let possibly_relayed_contact_method = self.get_node_contact_method(destination_node_ref.clone())?; - self.try_possibly_relayed_contact_method(possibly_relayed_contact_method, destination_node_ref, data).await + self.try_possibly_relayed_contact_method(safety_domain, possibly_relayed_contact_method, destination_node_ref, data).await } #[instrument(level="trace", target="net", skip_all)] pub(crate) fn try_possibly_relayed_contact_method(&self, + safety_domain: SafetyDomain, possibly_relayed_contact_method: NodeContactMethod, destination_node_ref: NodeRef, data: Vec, @@ -70,6 +75,7 @@ impl NetworkManager { NodeContactMethod::OutboundRelay(relay_nr) | NodeContactMethod::InboundRelay(relay_nr) => { let cm = this.get_node_contact_method(relay_nr.clone())?; + (cm, relay_nr, Some(possibly_relayed_contact_method)) } cm => (cm, destination_node_ref.clone(), None), @@ -103,40 +109,61 @@ impl NetworkManager { ); } NodeContactMethod::Direct(dial_info) => { + // Ensure we're not sending in an unsafe context to a safety-only node + if !target_node_ref.safety_domains().contains(safety_domain) { + bail!("should not be sending direct to invalid safety domain: target={}", target_node_ref); + } + network_result_try!( this.send_data_ncm_direct(target_node_ref, dial_info, data).await? ) } - NodeContactMethod::SignalReverse(relay_nr, target_node_ref) => { + NodeContactMethod::SignalReverse(relay_node_ref, target_node_ref) => { + + // Ensure we're not sending in an unsafe context to a safety-only node + if !target_node_ref.safety_domains().contains(safety_domain) || !relay_node_ref.safety_domains().contains(safety_domain) { + bail!("should not be sending signal reverse to invalid safety domain: target={}, relay={}", target_node_ref, relay_node_ref); + } + let nres = - this.send_data_ncm_signal_reverse(relay_nr.clone(), target_node_ref.clone(), data.clone()) + this.send_data_ncm_signal_reverse(relay_node_ref.clone(), target_node_ref.clone(), data.clone()) .await?; if matches!(nres, NetworkResult::Timeout) { // Failed to holepunch, fallback to inbound relay - log_network_result!(debug "Reverse connection failed to {}, falling back to inbound relay via {}", target_node_ref, relay_nr); - network_result_try!(this.try_possibly_relayed_contact_method(NodeContactMethod::InboundRelay(relay_nr), destination_node_ref, data).await?) + log_network_result!(debug "Reverse connection failed to {}, falling back to inbound relay via {}", target_node_ref, relay_node_ref); + network_result_try!(this.try_possibly_relayed_contact_method(safety_domain, NodeContactMethod::InboundRelay(relay_node_ref), destination_node_ref, data).await?) } else { network_result_try!(nres) } } - NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { + NodeContactMethod::SignalHolePunch(relay_node_ref, target_node_ref) => { + + // Ensure we're not sending in an unsafe context to a safety-only node + if !target_node_ref.safety_domains().contains(safety_domain) || !relay_node_ref.safety_domains().contains(safety_domain) { + bail!("should not be sending signal hole punch to invalid safety domain: target={}, relay={}", target_node_ref, relay_node_ref); + } + let nres = - this.send_data_ncm_signal_hole_punch(relay_nr.clone(), target_node_ref.clone(), data.clone()) + this.send_data_ncm_signal_hole_punch(relay_node_ref.clone(), target_node_ref.clone(), data.clone()) .await?; if matches!(nres, NetworkResult::Timeout) { // Failed to holepunch, fallback to inbound relay - log_network_result!(debug "Hole punch failed to {}, falling back to inbound relay via {}", target_node_ref, relay_nr); - network_result_try!(this.try_possibly_relayed_contact_method(NodeContactMethod::InboundRelay(relay_nr), destination_node_ref, data).await?) + log_network_result!(debug "Hole punch failed to {}, falling back to inbound relay via {}", target_node_ref, relay_node_ref); + network_result_try!(this.try_possibly_relayed_contact_method(safety_domain, NodeContactMethod::InboundRelay(relay_node_ref), destination_node_ref, data).await?) } else { network_result_try!(nres) } } NodeContactMethod::Existing => { + // If we have an existing flow, then we do not need to check the safety domain + // as a connection has already been established network_result_try!( this.send_data_ncm_existing(target_node_ref, data).await? ) } NodeContactMethod::Unreachable => { + // If we have no way of reaching this node, try a last ditch effort to reach if over an existing + // incoming connection network_result_try!( this.send_data_ncm_unreachable(target_node_ref, data) .await? diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 7dbc6464..b69195db 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -149,6 +149,10 @@ pub(crate) struct BucketEntryInner { /// If the entry is being punished and should be considered dead #[serde(skip)] punishment: Option, + /// If this node is seen in an unsafe context it is safe to contact + /// unsafely and use in the construction of safety routes + /// This tracks what domains this peer is safe to be contacted in + safety_domains: SafetyDomainSet, /// Tracking identifier for NodeRef debugging #[cfg(feature = "tracking")] #[serde(skip)] @@ -406,6 +410,14 @@ impl BucketEntryInner { !last_flows.is_empty() } + pub fn safety_domains(&self) -> SafetyDomainSet { + self.safety_domains + } + + pub fn set_safety_domains>(&mut self, s: S) { + self.safety_domains = s.into(); + } + pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> { let opt_current_sni = match routing_domain { RoutingDomain::LocalNetwork => &self.local_network.signed_node_info, @@ -972,6 +984,7 @@ impl BucketEntry { latency_stats_accounting: LatencyStatsAccounting::new(), transfer_stats_accounting: TransferStatsAccounting::new(), punishment: None, + safety_domains: SafetyDomainSet::empty(), #[cfg(feature = "tracking")] next_track_id: 0, #[cfg(feature = "tracking")] diff --git a/veilid-core/src/routing_table/find_peers.rs b/veilid-core/src/routing_table/find_peers.rs index 6a7f6377..fbc451a3 100644 --- a/veilid-core/src/routing_table/find_peers.rs +++ b/veilid-core/src/routing_table/find_peers.rs @@ -51,7 +51,7 @@ impl RoutingTable { c.network.dht.max_find_node_count as usize }; - let closest_nodes = match self.find_preferred_closest_nodes( + let closest_nodes = match self.find_preferred_closest_unsafe_nodes( node_count, key, filters, @@ -132,7 +132,7 @@ impl RoutingTable { }; // - let closest_nodes = match self.find_preferred_closest_nodes( + let closest_nodes = match self.find_preferred_closest_unsafe_nodes( node_count, key, filters, diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index fc8acc10..8e92493c 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -653,27 +653,39 @@ impl RoutingTable { } /// Resolve an existing routing table entry using any crypto kind and return a reference to it - pub fn lookup_any_node_ref(&self, node_id_key: PublicKey) -> EyreResult> { + pub fn lookup_any_node_ref( + &self, + safety_domain: SafetyDomain, + node_id_key: PublicKey, + ) -> EyreResult> { self.inner .read() - .lookup_any_node_ref(self.clone(), node_id_key) + .lookup_any_node_ref(self.clone(), safety_domain, node_id_key) } /// Resolve an existing routing table entry and return a reference to it - pub fn lookup_node_ref(&self, node_id: TypedKey) -> EyreResult> { - self.inner.read().lookup_node_ref(self.clone(), node_id) + pub fn lookup_node_ref( + &self, + safety_domain: SafetyDomain, + node_id: TypedKey, + ) -> EyreResult> { + self.inner + .read() + .lookup_node_ref(self.clone(), safety_domain, node_id) } /// Resolve an existing routing table entry and return a filtered reference to it #[instrument(level = "trace", skip_all)] pub fn lookup_and_filter_noderef( &self, + safety_domain: SafetyDomain, node_id: TypedKey, routing_domain_set: RoutingDomainSet, dial_info_filter: DialInfoFilter, ) -> EyreResult> { self.inner.read().lookup_and_filter_noderef( self.clone(), + safety_domain, node_id, routing_domain_set, dial_info_filter, @@ -687,12 +699,14 @@ impl RoutingTable { pub fn register_node_with_peer_info( &self, routing_domain: RoutingDomain, + safety_domain: SafetyDomain, peer_info: PeerInfo, allow_invalid: bool, ) -> EyreResult { self.inner.write().register_node_with_peer_info( self.clone(), routing_domain, + safety_domain, peer_info, allow_invalid, ) @@ -700,6 +714,8 @@ impl RoutingTable { /// Shortcut function to add a node to our routing table if it doesn't exist /// and add the last peer address we have for it, since that's pretty common + /// This always gets added to the SafetyDomain::Unsafe because direct connections + /// are inherently Unsafe. #[instrument(level = "trace", skip_all, err)] pub fn register_node_with_existing_connection( &self, @@ -764,12 +780,15 @@ impl RoutingTable { pub fn clear_punishments(&self) { let cur_ts = get_aligned_timestamp(); - self.inner - .write() - .with_entries_mut(cur_ts, BucketEntryState::Punished, |rti, e| { + self.inner.write().with_entries_mut( + cur_ts, + SafetyDomainSet::all(), + BucketEntryState::Punished, + |rti, e| { e.with_mut(rti, |_rti, ei| ei.set_punished(None)); Option::<()>::None - }); + }, + ); } ////////////////////////////////////////////////////////////////////// @@ -949,7 +968,7 @@ impl RoutingTable { let filters = VecDeque::from([filter]); - self.find_preferred_fastest_nodes( + self.find_preferred_fastest_unsafe_nodes( protocol_types_len * 2 * max_per_type, filters, |_rti, entry: Option>| { @@ -979,7 +998,7 @@ impl RoutingTable { out } - pub fn find_preferred_fastest_nodes<'a, T, O>( + pub fn find_preferred_fastest_unsafe_nodes<'a, T, O>( &self, node_count: usize, filters: VecDeque, @@ -990,10 +1009,10 @@ impl RoutingTable { { self.inner .read() - .find_preferred_fastest_nodes(node_count, filters, transform) + .find_preferred_fastest_unsafe_nodes(node_count, filters, transform) } - pub fn find_preferred_closest_nodes<'a, T, O>( + pub fn find_preferred_closest_unsafe_nodes<'a, T, O>( &self, node_count: usize, node_id: TypedKey, @@ -1005,7 +1024,7 @@ impl RoutingTable { { self.inner .read() - .find_preferred_closest_nodes(node_count, node_id, filters, transform) + .find_preferred_closest_unsafe_nodes(node_count, node_id, filters, transform) } pub fn sort_and_clean_closest_noderefs( @@ -1022,11 +1041,11 @@ impl RoutingTable { pub fn register_find_node_answer( &self, crypto_kind: CryptoKind, - peers: Vec, + peers: PeerInfoResponse, ) -> Vec { // Register nodes we'd found - let mut out = Vec::::with_capacity(peers.len()); - for p in peers { + let mut out = Vec::::with_capacity(peers.peer_info_list.len()); + for p in peers.peer_info_list { // Ensure we're getting back nodes we asked for if !p.node_ids().kinds().contains(&crypto_kind) { continue; @@ -1038,7 +1057,12 @@ impl RoutingTable { } // Register the node if it's new - match self.register_node_with_peer_info(RoutingDomain::PublicInternet, p, false) { + match self.register_node_with_peer_info( + RoutingDomain::PublicInternet, + peers.safety_domain_set, + p, + false, + ) { Ok(nr) => out.push(nr), Err(e) => { log_rtab!(debug "failed to register node with peer info from find node answer: {}", e); diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 3fb31010..7bcd1115 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -170,6 +170,15 @@ pub(crate) trait NodeRefBase: Sized { fn set_seen_our_node_info_ts(&self, routing_domain: RoutingDomain, seen_ts: Timestamp) { self.operate_mut(|_rti, e| e.set_seen_our_node_info_ts(routing_domain, seen_ts)); } + + fn safety_domains(&self) -> SafetyDomainSet { + self.operate(|_rti, e| e.safety_domains()) + } + + fn set_safety_domains>(&mut self, s: S) { + self.operate_mut(|_rti, e| e.set_safety_domains(s)) + } + // fn network_class(&self, routing_domain: RoutingDomain) -> Option { // self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.network_class())) // } @@ -204,8 +213,13 @@ pub(crate) trait NodeRefBase: Sized { } // Register relay node and return noderef - let nr = - rti.register_node_with_peer_info(self.routing_table(), routing_domain, rpi, false)?; + let nr = rti.register_node_with_peer_info( + self.routing_table(), + routing_domain, + e.safety_domains(), + rpi, + false, + )?; Ok(Some(nr)) }) } diff --git a/veilid-core/src/routing_table/privacy.rs b/veilid-core/src/routing_table/privacy.rs index e2926823..b66439ab 100644 --- a/veilid-core/src/routing_table/privacy.rs +++ b/veilid-core/src/routing_table/privacy.rs @@ -33,6 +33,7 @@ impl RouteNode { &self, routing_table: RoutingTable, crypto_kind: CryptoKind, + safety_domain_set: SafetyDomainSet, ) -> Option { match self { RouteNode::NodeId(id) => { @@ -49,6 +50,7 @@ impl RouteNode { // match routing_table.register_node_with_peer_info( RoutingDomain::PublicInternet, + safety_domain_set, *pi.clone(), false, ) { @@ -116,7 +118,9 @@ impl PrivateRouteHops { pub(crate) struct PrivateRoute { /// The public key used for the entire route pub public_key: TypedKey, + /// The number of hops in the 'hops' structure pub hop_count: u8, + /// The encoded hops structure pub hops: PrivateRouteHops, } @@ -130,6 +134,7 @@ impl PrivateRoute { node, next_hop: None, })), + safety_domain_set: SafetyDomainSet::all(), } } @@ -193,7 +198,7 @@ impl fmt::Display for PrivateRoute { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, - "PR({:?}+{}{})", + "PR({:?}+{}{}{})", self.public_key, self.hop_count, match &self.hops { @@ -211,7 +216,8 @@ impl fmt::Display for PrivateRoute { PrivateRouteHops::Empty => { "".to_owned() } - } + }, + SafetyDomain::print_set(self.safety_domain_set), ) } } diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs index 2ab4ea52..7fcc8376 100644 --- a/veilid-core/src/routing_table/route_spec_store/mod.rs +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -421,9 +421,15 @@ impl RouteSpecStore { NodeRef::new(routing_table.clone(), entry.unwrap(), None) }; - // Pull the whole routing table in sorted order - let nodes: Vec = - rti.find_peers_with_sort_and_filter(usize::MAX, cur_ts, filters, compare, transform); + // Pull the unsafe domain peers in sorted order + let nodes: Vec = rti.find_peers_with_sort_and_filter( + usize::MAX, + cur_ts, + SafetyDomain::Unsafe.into(), + filters, + compare, + transform, + ); // If we couldn't find enough nodes, wait until we have more nodes in the routing table if nodes.len() < hop_count { @@ -749,10 +755,7 @@ impl RouteSpecStore { let safety_selection = SafetySelection::Safe(safety_spec); ( - Destination::PrivateRoute { - private_route, - safety_selection, - }, + Destination::private_route(private_route, safety_selection), hops, ) }; @@ -799,10 +802,7 @@ impl RouteSpecStore { let safety_selection = SafetySelection::Safe(safety_spec); - Destination::PrivateRoute { - private_route, - safety_selection, - } + Destination::private_route(private_route, safety_selection) }; // Test with double-round trip ping to self @@ -1042,6 +1042,7 @@ impl RouteSpecStore { rti.register_node_with_peer_info( routing_table.clone(), RoutingDomain::PublicInternet, + private_route.safety_domain_set, *pi, false, ) @@ -1459,6 +1460,8 @@ impl RouteSpecStore { // add hop for 'FirstHop' hop_count: (hop_count + 1).try_into().unwrap(), hops: PrivateRouteHops::FirstHop(Box::new(route_hop)), + // routes we allocate ourselves are safe in all domains + safety_domain_set: SafetyDomainSet::all(), }; Ok(private_route) } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 90a068ec..c54c4dd1 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -236,12 +236,17 @@ impl RoutingTableInner { pub fn reset_all_updated_since_last_network_change(&mut self) { let cur_ts = get_aligned_timestamp(); - self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, v| { - v.with_mut(rti, |_rti, e| { - e.reset_updated_since_last_network_change(); - }); - Option::<()>::None - }); + self.with_entries_mut( + cur_ts, + SafetyDomainSet::all(), + BucketEntryState::Dead, + |rti, v| { + v.with_mut(rti, |_rti, e| { + e.reset_updated_since_last_network_change(); + }); + Option::<()>::None + }, + ); } /// Return if this routing domain has a valid network class @@ -341,13 +346,18 @@ impl RoutingTableInner { // If the local network topology has changed, nuke the existing local node info and let new local discovery happen if changed { let cur_ts = get_aligned_timestamp(); - self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| { - e.with_mut(rti, |_rti, e| { - e.clear_signed_node_info(RoutingDomain::LocalNetwork); - e.reset_updated_since_last_network_change(); - }); - Option::<()>::None - }); + self.with_entries_mut( + cur_ts, + SafetyDomainSet::all(), + BucketEntryState::Dead, + |rti, e| { + e.with_mut(rti, |_rti, e| { + e.clear_signed_node_info(RoutingDomain::LocalNetwork); + e.reset_updated_since_last_network_change(); + }); + Option::<()>::None + }, + ); } } @@ -421,26 +431,31 @@ impl RoutingTableInner { pub fn refresh_cached_entry_counts(&mut self) -> EntryCounts { self.live_entry_count.clear(); let cur_ts = get_aligned_timestamp(); - self.with_entries_mut(cur_ts, BucketEntryState::Unreliable, |rti, entry| { - entry.with_inner(|e| { - // Tally per routing domain and crypto kind - for rd in RoutingDomain::all() { - if let Some(sni) = e.signed_node_info(rd) { - // Only consider entries that have valid signed node info in this domain - if sni.has_any_signature() { - // Tally - for crypto_kind in e.crypto_kinds() { - rti.live_entry_count - .entry((rd, crypto_kind)) - .and_modify(|x| *x += 1) - .or_insert(1); + self.with_entries_mut( + cur_ts, + SafetyDomainSet::all(), + BucketEntryState::Unreliable, + |rti, entry| { + entry.with_inner(|e| { + // Tally per routing domain and crypto kind + for rd in RoutingDomain::all() { + if let Some(sni) = e.signed_node_info(rd) { + // Only consider entries that have valid signed node info in this domain + if sni.has_any_signature() { + // Tally + for crypto_kind in e.crypto_kinds() { + rti.live_entry_count + .entry((rd, crypto_kind)) + .and_modify(|x| *x += 1) + .or_insert(1); + } } } } - } - }); - Option::<()>::None - }); + }); + Option::<()>::None + }, + ); self.live_entry_count.clone() } @@ -454,12 +469,13 @@ impl RoutingTableInner { pub fn get_entry_count( &self, routing_domain_set: RoutingDomainSet, + safety_domain_set: SafetyDomainSet, min_state: BucketEntryState, crypto_kinds: &[CryptoKind], ) -> usize { let mut count = 0usize; let cur_ts = get_aligned_timestamp(); - self.with_entries(cur_ts, min_state, |rti, e| { + self.with_entries(cur_ts, safety_domain_set, min_state, |rti, e| { if e.with_inner(|e| { e.best_routing_domain(rti, routing_domain_set).is_some() && !common_crypto_kinds(&e.crypto_kinds(), crypto_kinds).is_empty() @@ -475,11 +491,14 @@ impl RoutingTableInner { pub fn with_entries) -> Option>( &self, cur_ts: Timestamp, + safety_domain_set: SafetyDomainSet, min_state: BucketEntryState, mut f: F, ) -> Option { for entry in &self.all_entries { - if entry.with_inner(|e| e.state(cur_ts) >= min_state) { + if entry.with_inner(|e| { + e.state(cur_ts) >= min_state && !e.safety_domains().is_disjoint(safety_domain_set) + }) { if let Some(out) = f(self, entry) { return Some(out); } @@ -493,12 +512,15 @@ impl RoutingTableInner { pub fn with_entries_mut) -> Option>( &mut self, cur_ts: Timestamp, + safety_domain_set: SafetyDomainSet, min_state: BucketEntryState, mut f: F, ) -> Option { let mut entries = Vec::with_capacity(self.all_entries.len()); for entry in self.all_entries.iter() { - if entry.with_inner(|e| e.state(cur_ts) >= min_state) { + if entry.with_inner(|e| { + e.state(cur_ts) >= min_state && !e.safety_domains().is_disjoint(safety_domain_set) + }) { entries.push(entry); } } @@ -520,51 +542,47 @@ impl RoutingTableInner { // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow let mut node_refs = Vec::::with_capacity(self.bucket_entry_count()); - self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { - let entry_needs_ping = |e: &BucketEntryInner| { - // If this entry isn't in the routing domain we are checking, don't include it - if !e.exists_in_routing_domain(rti, routing_domain) { - return false; + self.with_entries( + cur_ts, + SafetyDomain::Unsafe.into(), + BucketEntryState::Unreliable, + |rti, entry| { + let entry_needs_ping = |e: &BucketEntryInner| { + // If this entry isn't in the routing domain we are checking, don't include it + if !e.exists_in_routing_domain(rti, routing_domain) { + return false; + } + + // If we don't have node status for this node, then we should ping it to get some node status + if e.has_node_info(routing_domain.into()) + && e.node_status(routing_domain).is_none() + { + return true; + } + + // If this entry needs a ping because this node hasn't seen our latest node info, then do it + if !e.has_seen_our_node_info_ts(routing_domain, own_node_info_ts) { + return true; + } + + // If this entry needs need a ping by non-routing-domain-specific metrics then do it + if e.needs_ping(cur_ts) { + return true; + } + + false + }; + + if entry.with_inner(entry_needs_ping) { + node_refs.push(NodeRef::new( + outer_self.clone(), + entry, + Some(NodeRefFilter::new().with_routing_domain(routing_domain)), + )); } - - // If we don't have node status for this node, then we should ping it to get some node status - if e.has_node_info(routing_domain.into()) && e.node_status(routing_domain).is_none() - { - return true; - } - - // If this entry needs a ping because this node hasn't seen our latest node info, then do it - if !e.has_seen_our_node_info_ts(routing_domain, own_node_info_ts) { - return true; - } - - // If this entry needs need a ping by non-routing-domain-specific metrics then do it - if e.needs_ping(cur_ts) { - return true; - } - - false - }; - - if entry.with_inner(entry_needs_ping) { - node_refs.push(NodeRef::new( - outer_self.clone(), - entry, - Some(NodeRefFilter::new().with_routing_domain(routing_domain)), - )); - } - Option::<()>::None - }); - node_refs - } - - #[allow(dead_code)] - pub fn get_all_alive_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec { - let mut node_refs = Vec::::with_capacity(self.bucket_entry_count()); - self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, entry| { - node_refs.push(NodeRef::new(outer_self.clone(), entry, None)); - Option::<()>::None - }); + Option::<()>::None + }, + ); node_refs } @@ -702,7 +720,7 @@ impl RoutingTableInner { new_entry.with_mut_inner(|e| update_func(self, e)); // Kick the bucket - log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable, &VALID_CRYPTO_KINDS)); + log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), SafetyDomainSet::all(), BucketEntryState::Unreliable, &VALID_CRYPTO_KINDS)); Ok(nr) } @@ -729,6 +747,7 @@ impl RoutingTableInner { pub fn lookup_node_ref( &self, outer_self: RoutingTable, + safety_domain: SafetyDomain, node_id: TypedKey, ) -> EyreResult> { if self.unlocked_inner.matches_own_node_id(&[node_id]) { @@ -750,11 +769,12 @@ impl RoutingTableInner { pub fn lookup_and_filter_noderef( &self, outer_self: RoutingTable, + safety_domain: SafetyDomain, node_id: TypedKey, routing_domain_set: RoutingDomainSet, dial_info_filter: DialInfoFilter, ) -> EyreResult> { - let nr = self.lookup_node_ref(outer_self, node_id)?; + let nr = self.lookup_node_ref(outer_self, safety_domain, node_id)?; Ok(nr.map(|nr| { nr.filtered_clone( NodeRefFilter::new() @@ -790,6 +810,7 @@ impl RoutingTableInner { &mut self, outer_self: RoutingTable, routing_domain: RoutingDomain, + safety_domain: SafetyDomain, peer_info: PeerInfo, allow_invalid: bool, ) -> EyreResult { @@ -838,6 +859,7 @@ impl RoutingTableInner { self.register_node_with_peer_info( outer_self.clone(), routing_domain, + safety_domain, relay_peer_info, false, )?; @@ -846,7 +868,7 @@ impl RoutingTableInner { let (node_ids, signed_node_info) = peer_info.destructure(); let mut nr = self.create_node_ref(outer_self, &node_ids, |_rti, e| { - e.update_signed_node_info(routing_domain, signed_node_info); + e.update_signed_node_info(routing_domain, safety_domain_set, signed_node_info); })?; nr.set_filter(Some( @@ -866,10 +888,15 @@ impl RoutingTableInner { flow: Flow, timestamp: Timestamp, ) -> EyreResult { - let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| { - //e.make_not_dead(timestamp); - e.touch_last_seen(timestamp); - })?; + let nr = self.create_node_ref( + outer_self, + SafetyDomain::Unsafe, + &TypedKeyGroup::from(node_id), + |_rti, e| { + //e.make_not_dead(timestamp); + e.touch_last_seen(timestamp); + }, + )?; // set the most recent node address for connection finding and udp replies nr.locked_mut(self).set_last_flow(flow, timestamp); Ok(nr) @@ -963,7 +990,7 @@ impl RoutingTableInner { }) as RoutingTableEntryFilter; filters.push_front(public_node_filter); - self.find_preferred_fastest_nodes( + self.find_preferred_fastest_unsafe_nodes( node_count, filters, |_rti: &RoutingTableInner, v: Option>| { @@ -1010,6 +1037,7 @@ impl RoutingTableInner { &self, node_count: usize, cur_ts: Timestamp, + safety_domain_set: SafetyDomainSet, mut filters: VecDeque, mut compare: C, mut transform: T, @@ -1039,20 +1067,25 @@ impl RoutingTableInner { } // add all nodes that match filter - self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, v| { - // Apply filter - let mut filtered = false; - for filter in &mut filters { - if !filter(rti, Some(v.clone())) { - filtered = true; - break; + self.with_entries( + cur_ts, + safety_domain_set, + BucketEntryState::Unreliable, + |rti, v| { + // Apply filter + let mut filtered = false; + for filter in &mut filters { + if !filter(rti, Some(v.clone())) { + filtered = true; + break; + } } - } - if !filtered { - nodes.push(Some(v.clone())); - } - Option::<()>::None - }); + if !filtered { + nodes.push(Some(v.clone())); + } + Option::<()>::None + }, + ); // sort by preference for returning nodes nodes.sort_by(|a, b| compare(self, a, b)); @@ -1069,7 +1102,7 @@ impl RoutingTableInner { } #[instrument(level = "trace", skip_all)] - pub fn find_preferred_fastest_nodes( + pub fn find_preferred_fastest_unsafe_nodes( &self, node_count: usize, mut filters: VecDeque, @@ -1148,7 +1181,7 @@ impl RoutingTableInner { } #[instrument(level = "trace", skip_all)] - pub fn find_preferred_closest_nodes( + pub fn find_preferred_closest_unsafe_nodes( &self, node_count: usize, node_id: TypedKey, diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index a195a890..914f649e 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -263,7 +263,7 @@ impl RoutingTable { ); let nr = - match self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, true) { + match self.register_node_with_peer_info(RoutingDomain::PublicInternet, SafetyDomainSet::all(), pi, true) { Ok(nr) => nr, Err(e) => { log_rtab!(error "failed to register bootstrap peer info: {}", e); diff --git a/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs b/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs index 22f4e92b..70c4f0cf 100644 --- a/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs +++ b/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs @@ -51,7 +51,7 @@ impl RoutingTable { filters.push_front(filter); let noderefs = routing_table - .find_preferred_closest_nodes( + .find_preferred_closest_unsafe_nodes( CLOSEST_PEERS_REQUEST_COUNT, self_node_id, filters, diff --git a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs index 34ebfc25..d199ba8a 100644 --- a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs +++ b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs @@ -67,7 +67,7 @@ impl RoutingTable { ) as RoutingTableEntryFilter; filters.push_front(filter); - let noderefs = routing_table.find_preferred_fastest_nodes( + let noderefs = routing_table.find_preferred_fastest_unsafe_nodes( min_peer_count, filters, |_rti, entry: Option>| { diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs index 1577d8bd..81984ff8 100644 --- a/veilid-core/src/routing_table/tasks/relay_management.rs +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -111,6 +111,7 @@ impl RoutingTable { // Register new outbound relay match self.register_node_with_peer_info( RoutingDomain::PublicInternet, + SafetyDomainSet::all(), outbound_relay_peerinfo, false, ) { @@ -247,32 +248,38 @@ impl RoutingTable { let mut best_inbound_relay: Option> = None; // Iterate all known nodes for candidates - inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { - let entry2 = entry.clone(); - entry.with(rti, |rti, e| { - // Filter this node - if relay_node_filter(e) { - // Compare against previous candidate - if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { - // Less is faster - let better = best_inbound_relay.with(rti, |_rti, best| { - // choose low latency stability for relays - BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) - == std::cmp::Ordering::Less - }); - // Now apply filter function and see if this node should be included - if better { - *best_inbound_relay = entry2; + // Only consider nodes that are in the SafetyDomain::Unsafe + inner.with_entries( + cur_ts, + SafetyDomain::Unsafe.into(), + BucketEntryState::Unreliable, + |rti, entry| { + let entry2 = entry.clone(); + entry.with(rti, |rti, e| { + // Filter this node + if relay_node_filter(e) { + // Compare against previous candidate + if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { + // Less is faster + let better = best_inbound_relay.with(rti, |_rti, best| { + // choose low latency stability for relays + BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) + == std::cmp::Ordering::Less + }); + // Now apply filter function and see if this node should be included + if better { + *best_inbound_relay = entry2; + } + } else { + // Always store the first candidate + best_inbound_relay = Some(entry2); } - } else { - // Always store the first candidate - best_inbound_relay = Some(entry2); } - } - }); - // Don't end early, iterate through all entries - Option::<()>::None - }); + }); + // Don't end early, iterate through all entries + Option::<()>::None + }, + ); // Return the best inbound relay noderef best_inbound_relay.map(|e| NodeRef::new(self.clone(), e, None)) } diff --git a/veilid-core/src/routing_table/types/mod.rs b/veilid-core/src/routing_table/types/mod.rs index 217c5d48..ade778b3 100644 --- a/veilid-core/src/routing_table/types/mod.rs +++ b/veilid-core/src/routing_table/types/mod.rs @@ -4,6 +4,7 @@ mod node_info; mod node_status; mod peer_info; mod routing_domain; +mod safety_domain; mod signed_direct_node_info; mod signed_node_info; mod signed_relayed_node_info; @@ -16,6 +17,7 @@ pub use node_info::*; pub use node_status::*; pub use peer_info::*; pub use routing_domain::*; +pub use safety_domain::*; pub use signed_direct_node_info::*; pub use signed_node_info::*; pub use signed_relayed_node_info::*; diff --git a/veilid-core/src/routing_table/types/peer_info.rs b/veilid-core/src/routing_table/types/peer_info.rs index 7a143790..0f0ed99c 100644 --- a/veilid-core/src/routing_table/types/peer_info.rs +++ b/veilid-core/src/routing_table/types/peer_info.rs @@ -46,3 +46,9 @@ impl PeerInfo { } } } + +#[derive(Clone, Debug)] +pub struct PeerInfoResponse { + pub safety_domain_set: SafetyDomainSet, + pub peer_info_list: Vec, +} diff --git a/veilid-core/src/routing_table/types/safety_domain.rs b/veilid-core/src/routing_table/types/safety_domain.rs new file mode 100644 index 00000000..8bca6e04 --- /dev/null +++ b/veilid-core/src/routing_table/types/safety_domain.rs @@ -0,0 +1,40 @@ +use super::*; + +#[allow(clippy::derived_hash_with_manual_eq)] +#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] +#[enumset(repr = "u8")] +pub enum SafetyDomain { + Unsafe = 0, + Safe = 1, +} +//pub type SafetyDomainSet = EnumSet; + +impl From for SafetyDomain { + fn from(value: SafetySelection) -> Self { + match value { + SafetySelection::Unsafe(_) => SafetyDomain::Unsafe, + SafetySelection::Safe(_) => SafetyDomain::Safe, + } + } +} + +impl SafetyDomain { + pub fn print(&self) -> String { + if *self == SafetyDomain::Unsafe.into() { + "*UNSAFE".to_string() + } else { + "*SAFE".to_string() + } + } + // pub fn print_set(set: SafetyDomainSet) -> String { + // if *set == SafetyDomainSet::all() { + // "*ALL".to_string() + // } else if *set == SafetyDomain::Unsafe.into() { + // "*UNSAFE".to_string() + // } else if *set == SafetyDomain::Safe.into() { + // "*SAFE".to_string() + // } else { + // "*NONE".to_string() + // } + // } +} diff --git a/veilid-core/src/rpc_processor/coders/private_safety_route.rs b/veilid-core/src/rpc_processor/coders/private_safety_route.rs index 8340f132..6b06afda 100644 --- a/veilid-core/src/rpc_processor/coders/private_safety_route.rs +++ b/veilid-core/src/rpc_processor/coders/private_safety_route.rs @@ -122,11 +122,15 @@ pub(crate) fn encode_private_route( h_builder.set_empty(()); } }; + + // We don't encode safety domain set, it will be set by the decoder based on how it was received + Ok(()) } pub(crate) fn decode_private_route( reader: &veilid_capnp::private_route::Reader, + safety_domain_set: SafetyDomainSet, ) -> Result { let public_key = decode_typed_key(&reader.get_public_key().map_err( RPCError::map_protocol("invalid public key in private route"), @@ -149,6 +153,7 @@ pub(crate) fn decode_private_route( public_key, hop_count, hops, + safety_domain_set, }) } diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index c6c18baa..8eb32db3 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -9,6 +9,8 @@ pub(crate) enum Destination { node: NodeRef, /// Require safety route or not safety_selection: SafetySelection, + /// Override safety domain + opt_override_safety_domain: Option, }, /// Send to node for relay purposes Relay { @@ -18,6 +20,8 @@ pub(crate) enum Destination { node: NodeRef, /// Require safety route or not safety_selection: SafetySelection, + /// Override safety domain + opt_override_safety_domain: Option, }, /// Send to private route PrivateRoute { @@ -25,6 +29,8 @@ pub(crate) enum Destination { private_route: PrivateRoute, /// Require safety route or not safety_selection: SafetySelection, + /// Override safety domain + opt_override_safety_domain: Option, }, } @@ -34,6 +40,7 @@ pub struct UnsafeRoutingInfo { pub opt_node: Option, pub opt_relay: Option, pub opt_routing_domain: Option, + pub opt_override_safety_domain: Option, } impl Destination { @@ -42,15 +49,18 @@ impl Destination { Destination::Direct { node: target, safety_selection: _, + opt_override_safety_domain: _, } => Some(target.clone()), Destination::Relay { relay: _, node: target, safety_selection: _, + opt_override_safety_domain: _, } => Some(target.clone()), Destination::PrivateRoute { private_route: _, safety_selection: _, + opt_override_safety_domain: _, } => None, } } @@ -59,6 +69,7 @@ impl Destination { Self::Direct { node, safety_selection: SafetySelection::Unsafe(sequencing), + opt_override_safety_domain: None, } } pub fn relay(relay: NodeRef, node: NodeRef) -> Self { @@ -67,12 +78,14 @@ impl Destination { relay, node, safety_selection: SafetySelection::Unsafe(sequencing), + opt_override_safety_domain: None, } } pub fn private_route(private_route: PrivateRoute, safety_selection: SafetySelection) -> Self { Self::PrivateRoute { private_route, safety_selection, + opt_override_safety_domain: None, } } @@ -81,43 +94,90 @@ impl Destination { Destination::Direct { node, safety_selection: _, + opt_override_safety_domain, } => Self::Direct { node, safety_selection, + opt_override_safety_domain, }, Destination::Relay { relay, node, safety_selection: _, + opt_override_safety_domain, } => Self::Relay { relay, node, safety_selection, + opt_override_safety_domain, }, Destination::PrivateRoute { private_route, safety_selection: _, + opt_override_safety_domain, } => Self::PrivateRoute { private_route, safety_selection, + opt_override_safety_domain, }, } } + pub fn is_direct(&self) -> bool { + matches!( + self, + Destination::Direct { + node: _, + safety_selection: _, + opt_override_safety_domain: _ + } + ) + } + + pub fn is_relay(&self) -> bool { + matches!( + self, + Destination::Relay { + relay: _, + node: _, + safety_selection: _, + opt_override_safety_domain: _ + } + ) + } + + pub fn is_private_route(&self) -> bool { + matches!( + self, + Destination::PrivateRoute { + private_route: _, + safety_selection: _, + opt_override_safety_domain: _, + } + ) + } + + pub fn has_safety_route(&self) -> bool { + matches!(self.get_safety_selection(), SafetySelection::Safe(_)) + } + pub fn get_safety_selection(&self) -> &SafetySelection { match self { Destination::Direct { node: _, safety_selection, + opt_override_safety_domain: _, } => safety_selection, Destination::Relay { relay: _, node: _, safety_selection, + opt_override_safety_domain: _, } => safety_selection, Destination::PrivateRoute { private_route: _, safety_selection, + opt_override_safety_domain: _, } => safety_selection, } } @@ -127,15 +187,18 @@ impl Destination { Destination::Direct { node, safety_selection: _, + opt_override_safety_domain: _, } | Destination::Relay { relay: _, node, safety_selection: _, + opt_override_safety_domain: _, } => Ok(Target::NodeId(node.best_node_id())), Destination::PrivateRoute { private_route, safety_selection: _, + opt_override_safety_domain: _, } => { // Add the remote private route if we're going to keep the id let route_id = rss @@ -147,6 +210,40 @@ impl Destination { } } + pub fn with_override_safety_domain(self, safety_domain: SafetyDomain) -> Self { + match self { + Destination::Direct { + node, + safety_selection, + opt_override_safety_domain: _, + } => Self::Direct { + node, + safety_selection, + opt_override_safety_domain: Some(safety_domain), + }, + Destination::Relay { + relay, + node, + safety_selection, + opt_override_safety_domain: _, + } => Self::Relay { + relay, + node, + safety_selection, + opt_override_safety_domain: Some(safety_domain), + }, + Destination::PrivateRoute { + private_route, + safety_selection, + opt_override_safety_domain: _, + } => Self::PrivateRoute { + private_route, + safety_selection, + opt_override_safety_domain: Some(safety_domain), + }, + } + } + pub fn get_unsafe_routing_info( &self, routing_table: RoutingTable, @@ -162,10 +259,12 @@ impl Destination { // Get: // * The target node (possibly relayed) // * The routing domain we are sending to if we can determine it - let (opt_node, opt_relay, opt_routing_domain) = match self { + // * The safety domain override if one was specified + let (opt_node, opt_relay, opt_routing_domain, opt_override_safety_domain) = match self { Destination::Direct { node, safety_selection: _, + opt_override_safety_domain: override_safety_domain, } => { let opt_routing_domain = node.best_routing_domain(); if opt_routing_domain.is_none() { @@ -173,12 +272,18 @@ impl Destination { // Only a stale connection or no connection exists log_rpc!(debug "No routing domain for node: node={}", node); }; - (Some(node.clone()), None, opt_routing_domain) + ( + Some(node.clone()), + None, + opt_routing_domain, + *override_safety_domain, + ) } Destination::Relay { relay, node, safety_selection: _, + opt_override_safety_domain: override_safety_domain, } => { // Outbound relays are defined as routing to and from PublicInternet only right now @@ -207,18 +312,30 @@ impl Destination { log_rpc!(debug "Unexpected relay used for node: relay={}, node={}", relay, node); }; - (Some(node.clone()), Some(relay.clone()), opt_routing_domain) + ( + Some(node.clone()), + Some(relay.clone()), + opt_routing_domain, + *override_safety_domain, + ) } Destination::PrivateRoute { private_route: _, safety_selection: _, - } => (None, None, Some(RoutingDomain::PublicInternet)), + opt_override_safety_domain: override_safety_domain, + } => ( + None, + None, + Some(RoutingDomain::PublicInternet), + *override_safety_domain, + ), }; Some(UnsafeRoutingInfo { opt_node, opt_relay, opt_routing_domain, + opt_override_safety_domain, }) } } @@ -229,6 +346,7 @@ impl fmt::Display for Destination { Destination::Direct { node, safety_selection, + opt_override_safety_domain, } => { let sr = if matches!(safety_selection, SafetySelection::Safe(_)) { "+SR" @@ -236,12 +354,19 @@ impl fmt::Display for Destination { "" }; - write!(f, "{}{}", node, sr) + let osd = if let Some(override_safety_domain) = opt_override_safety_domain { + format!("*{:?}", override_safety_domain) + } else { + "".to_string() + }; + + write!(f, "{}{}{}", node, sr, osd) } Destination::Relay { relay, node, safety_selection, + opt_override_safety_domain, } => { let sr = if matches!(safety_selection, SafetySelection::Safe(_)) { "+SR" @@ -249,11 +374,18 @@ impl fmt::Display for Destination { "" }; - write!(f, "{}@{}{}", node, relay, sr) + let osd = if let Some(override_safety_domain) = opt_override_safety_domain { + format!("*{:?}", override_safety_domain) + } else { + "".to_string() + }; + + write!(f, "{}@{}{}{}", node, relay, sr, osd) } Destination::PrivateRoute { private_route, safety_selection, + opt_override_safety_domain, } => { let sr = if matches!(safety_selection, SafetySelection::Safe(_)) { "+SR" @@ -261,7 +393,13 @@ impl fmt::Display for Destination { "" }; - write!(f, "{}{}", private_route.public_key, sr) + let osd = if let Some(override_safety_domain) = opt_override_safety_domain { + format!("*{:?}", override_safety_domain) + } else { + "".to_string() + }; + + write!(f, "{}{}{}", private_route.public_key, sr, osd) } } } @@ -289,6 +427,7 @@ impl RPCProcessor { Ok(rpc_processor::Destination::Direct { node: nr, safety_selection, + opt_override_safety_domain: None, }) } Target::PrivateRoute(rsid) => { @@ -302,6 +441,7 @@ impl RPCProcessor { Ok(rpc_processor::Destination::PrivateRoute { private_route, safety_selection, + opt_override_safety_domain: None, }) } } @@ -319,6 +459,7 @@ impl RPCProcessor { Destination::Direct { node: target, safety_selection, + opt_override_safety_domain: _, } => match safety_selection { SafetySelection::Unsafe(_) => { // Sent directly with no safety route, can respond directly @@ -347,6 +488,7 @@ impl RPCProcessor { relay, node: target, safety_selection, + opt_override_safety_domain: _, } => match safety_selection { SafetySelection::Unsafe(_) => { // Sent via a relay with no safety route, can respond directly @@ -373,6 +515,7 @@ impl RPCProcessor { Destination::PrivateRoute { private_route, safety_selection, + opt_override_safety_domain: _, } => { let Some(avoid_node_id) = private_route.first_hop_node_id() else { return Err(RPCError::internal( diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 0b9bc509..a6cbbb59 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -58,7 +58,7 @@ pub(crate) fn debug_fanout_results(results: &[FanoutResult]) -> String { out } -pub(crate) type FanoutCallReturnType = RPCNetworkResult>; +pub(crate) type FanoutCallReturnType = RPCNetworkResult; pub(crate) type FanoutNodeInfoFilter = Arc bool + Send + Sync>; pub(crate) fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter { @@ -196,7 +196,8 @@ where match (self.call_routine)(next_node.clone()).await { Ok(NetworkResult::Value(v)) => { // Filter returned nodes - let filtered_v: Vec = v + let filtered_peer_info_list: Vec = v + .peer_info_list .into_iter() .filter(|pi| { let node_ids = pi.node_ids().to_vec(); @@ -212,9 +213,13 @@ where // Call succeeded // Register the returned nodes and add them to the fanout queue in sorted order - let new_nodes = self - .routing_table - .register_find_node_answer(self.crypto_kind, filtered_v); + let new_nodes = self.routing_table.register_find_node_answer( + self.crypto_kind, + PeerInfoResponse { + safety_domain_set: v.safety_domain_set, + peer_info_list: filtered_peer_info_list, + }, + ); self.clone().add_to_fanout_queue(&new_nodes); } #[allow(unused_variables)] @@ -275,7 +280,12 @@ where }; routing_table - .find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform) + .find_preferred_closest_unsafe_nodes( + self.node_count, + self.node_id, + filters, + transform, + ) .map_err(RPCError::invalid_format)? }; self.clone().add_to_fanout_queue(&closest_nodes); diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 88e6677a..67ea0fe3 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -228,6 +228,8 @@ struct RenderedOperation { remote_private_route: Option, /// The private route requested to receive the reply reply_private_route: Option, + /// The safety domain we are sending in + safety_domain: SafetyDomain, } impl fmt::Debug for RenderedOperation { @@ -240,6 +242,7 @@ impl fmt::Debug for RenderedOperation { .field("safety_route", &self.safety_route) .field("remote_private_route", &self.remote_private_route) .field("reply_private_route", &self.reply_private_route) + .field("safety_domain", &self.safety_domain) .finish() } } @@ -732,6 +735,10 @@ impl RPCProcessor { safety_route: if sr_is_stub { None } else { Some(sr_pubkey) }, remote_private_route: if pr_is_stub { None } else { Some(pr_pubkey) }, reply_private_route, + // If we are choosing to send without a safety route, then we are in the unsafe domain + // If we are sending with a safety route, then our first hop should always be + // to a node in the unsafe domain since we allocated the safety route ourselves + safety_domain: SafetyDomain::Unsafe, }; Ok(NetworkResult::value(out)) @@ -745,8 +752,7 @@ impl RPCProcessor { &self, dest: Destination, operation: &RPCOperation, - ) ->RPCNetworkResult { - let out: NetworkResult; + ) -> RPCNetworkResult { // Encode message to a builder and make a message reader for it // Then produce the message as an unencrypted byte buffer @@ -771,11 +777,13 @@ impl RPCProcessor { Destination::Direct { node: ref node_ref, safety_selection, + opt_override_safety_domain, } | Destination::Relay { relay: ref node_ref, node: _, safety_selection, + opt_override_safety_domain, } => { // Send to a node without a private route // -------------------------------------- @@ -785,6 +793,7 @@ impl RPCProcessor { relay: _, node: ref target, safety_selection: _, + opt_override_safety_domain: _, } = dest { (node_ref.clone(), target.clone()) @@ -810,7 +819,7 @@ impl RPCProcessor { // If no safety route is being used, and we're not sending to a private // route, we can use a direct envelope instead of routing - out = NetworkResult::value(RenderedOperation { + Ok(NetworkResult::value(RenderedOperation { message, destination_node_ref, node_ref, @@ -818,7 +827,8 @@ impl RPCProcessor { safety_route: None, remote_private_route: None, reply_private_route: None, - }); + safety_domain: opt_override_safety_domain.unwrap_or(SafetyDomain::Unsafe), + })) } SafetySelection::Safe(_) => { // No private route was specified for the request @@ -840,32 +850,45 @@ impl RPCProcessor { ); // Wrap with safety route - out = self.wrap_with_route( + let mut rendered_operation = network_result_try!(self.wrap_with_route( safety_selection, private_route, reply_private_route, message, - )?; + )?); + + // Override safety domain if we requested it + if let Some(override_safety_domain) = opt_override_safety_domain { + rendered_operation.safety_domain = override_safety_domain; + } + + Ok(NetworkResult::value(rendered_operation)) } - }; + } } Destination::PrivateRoute { private_route, safety_selection, + opt_override_safety_domain, } => { // Send to private route // --------------------- // Reply with 'route' operation - out = self.wrap_with_route( + let mut rendered_operation = network_result_try!(self.wrap_with_route( safety_selection, private_route, reply_private_route, message, - )?; + )?); + + // Override safety domain if we requested it + if let Some(override_safety_domain) = opt_override_safety_domain { + rendered_operation.safety_domain = override_safety_domain; + } + + Ok(NetworkResult::value(rendered_operation)) } } - - Ok(out) } /// Get signed node info to package with RPC messages to improve @@ -878,7 +901,7 @@ impl RPCProcessor { // Otherwise we would be attaching the original sender's identity to the final destination, // thus defeating the purpose of the safety route entirely :P let Some(UnsafeRoutingInfo { - opt_node, opt_relay: _, opt_routing_domain + opt_node, opt_relay: _, opt_routing_domain, opt_override_safety_domain:_ }) = dest.get_unsafe_routing_info(self.routing_table.clone()) else { return SenderPeerInfo::default(); }; @@ -1194,6 +1217,7 @@ impl RPCProcessor { safety_route, remote_private_route, reply_private_route, + safety_domain, } = network_result_try!(self.render_operation(dest.clone(), &operation)?); // Calculate answer timeout @@ -1214,6 +1238,7 @@ impl RPCProcessor { let res = self .network_manager() .send_envelope( + safety_domain, node_ref.clone(), Some(destination_node_ref.clone()), message, @@ -1283,7 +1308,7 @@ impl RPCProcessor { // Log rpc send #[cfg(feature = "verbose-tracing")] - debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest); + debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest, override_safety_domain = override_safety_domain); // Produce rendered operation let RenderedOperation { @@ -1294,6 +1319,7 @@ impl RPCProcessor { safety_route, remote_private_route, reply_private_route: _, + safety_domain, } = network_result_try!(self.render_operation(dest, &operation)?); // Send statement @@ -1304,6 +1330,7 @@ impl RPCProcessor { let res = self .network_manager() .send_envelope( + safety_domain, node_ref.clone(), Some(destination_node_ref.clone()), message, @@ -1370,6 +1397,7 @@ impl RPCProcessor { safety_route, remote_private_route, reply_private_route: _, + safety_domain, } = network_result_try!(self.render_operation(dest, &operation)?); // Send the reply @@ -1380,6 +1408,7 @@ impl RPCProcessor { let res = self .network_manager() .send_envelope( + safety_domain, node_ref.clone(), Some(destination_node_ref.clone()), message, @@ -1539,6 +1568,7 @@ impl RPCProcessor { } opt_sender_nr = match self.routing_table().register_node_with_peer_info( routing_domain, + SafetyDomainSet::all(), sender_peer_info.clone(), false, ) { diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index e6f8c376..4d5d48d8 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -13,7 +13,7 @@ impl RPCProcessor { dest: Destination, node_id: TypedKey, capabilities: Vec, - ) -> RPCNetworkResult>> { + ) -> RPCNetworkResult> { let _guard = self .unlocked_inner .startup_lock @@ -21,13 +21,7 @@ impl RPCProcessor { .map_err(RPCError::map_try_again("not started up"))?; // Ensure destination never has a private route - if matches!( - dest, - Destination::PrivateRoute { - private_route: _, - safety_selection: _ - } - ) { + if dest.is_private_route() { return Err(RPCError::internal( "Never send find node requests over private routes", )); @@ -43,6 +37,12 @@ impl RPCProcessor { let debug_string = format!("FindNode(node_id={}) => {}", node_id, dest); + let safety_domain_set = if dest.has_safety_route() { + SafetyDomain::Safe.into() + } else { + SafetyDomainSet::all() + }; + // Send the find_node request let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?); @@ -66,9 +66,9 @@ impl RPCProcessor { }; // Verify peers are in the correct peer scope - let peers = find_node_a.destructure(); + let peer_info_list = find_node_a.destructure(); - for peer_info in &peers { + for peer_info in &peer_info_list { if !self.verify_node_info( RoutingDomain::PublicInternet, peer_info.signed_node_info(), @@ -80,10 +80,15 @@ impl RPCProcessor { } } + let peer_info_response = PeerInfoResponse { + safety_domain_set, + peer_info_list, + }; + Ok(NetworkResult::value(Answer::new( latency, reply_private_route, - peers, + peer_info_response, ))) } diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 36aad09c..575a9b74 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -4,7 +4,7 @@ use crate::storage_manager::{SignedValueData, SignedValueDescriptor}; #[derive(Clone, Debug)] pub struct GetValueAnswer { pub value: Option, - pub peers: Vec, + pub peers: PeerInfoResponse, pub descriptor: Option, } @@ -77,6 +77,12 @@ impl RPCProcessor { vcrypto: vcrypto.clone(), }); + let safety_domain_set = if dest.has_safety_route() { + SafetyDomain::Safe.into() + } else { + SafetyDomainSet::all() + }; + log_dht!(debug "{}", debug_string); let waitable_reply = network_result_try!( @@ -103,7 +109,7 @@ impl RPCProcessor { _ => return Ok(NetworkResult::invalid_message("not an answer")), }; - let (value, peers, descriptor) = get_value_a.destructure(); + let (value, peer_info_list, descriptor) = get_value_a.destructure(); if debug_target_enabled!("dht") { let debug_string_value = value.as_ref().map(|v| { format!(" len={} seq={} writer={}", @@ -123,18 +129,18 @@ impl RPCProcessor { } else { "" }, - peers.len(), + peer_info_list.len(), dest ); log_dht!(debug "{}", debug_string_answer); - let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + let peer_ids:Vec = peer_info_list.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); log_dht!(debug "Peers: {:#?}", peer_ids); } // Validate peers returned are, in fact, closer to the key than the node we sent this to - let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { + let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peer_info_list) { Ok(v) => v, Err(e) => { return Ok(NetworkResult::invalid_message(format!( @@ -156,14 +162,14 @@ impl RPCProcessor { tracing::Span::current().record("ret.value.data.writer", value.value_data().writer().to_string()); } #[cfg(feature = "verbose-tracing")] - tracing::Span::current().record("ret.peers.len", peers.len()); + tracing::Span::current().record("ret.peers.len", peer_info_list.len()); Ok(NetworkResult::value(Answer::new( latency, reply_private_route, GetValueAnswer { value, - peers, + peers: PeerInfoResponse{ safety_domain_set, peer_info_list }, descriptor, }, ))) diff --git a/veilid-core/src/rpc_processor/rpc_inspect_value.rs b/veilid-core/src/rpc_processor/rpc_inspect_value.rs index bebb5a11..e0f6b388 100644 --- a/veilid-core/src/rpc_processor/rpc_inspect_value.rs +++ b/veilid-core/src/rpc_processor/rpc_inspect_value.rs @@ -4,7 +4,7 @@ use crate::storage_manager::SignedValueDescriptor; #[derive(Clone, Debug)] pub struct InspectValueAnswer { pub seqs: Vec, - pub peers: Vec, + pub peers: PeerInfoResponse, pub descriptor: Option, } @@ -80,6 +80,12 @@ impl RPCProcessor { vcrypto: vcrypto.clone(), }); + let safety_domain_set = if dest.has_safety_route() { + SafetyDomain::Safe.into() + } else { + SafetyDomainSet::all() + }; + log_dht!(debug "{}", debug_string); let waitable_reply = network_result_try!( @@ -106,20 +112,20 @@ impl RPCProcessor { _ => return Ok(NetworkResult::invalid_message("not an answer")), }; - let (seqs, peers, descriptor) = inspect_value_a.destructure(); + let (seqs, peer_info_list, descriptor) = inspect_value_a.destructure(); if debug_target_enabled!("dht") { let debug_string_answer = format!( "OUT <== InspectValueA({} {} peers={}) <= {} seqs:\n{}", key, if descriptor.is_some() { " +desc" } else { "" }, - peers.len(), + peer_info_list.len(), dest, debug_seqs(&seqs) ); log_dht!(debug "{}", debug_string_answer); - let peer_ids: Vec = peers + let peer_ids: Vec = peer_info_list .iter() .filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())) .collect(); @@ -127,7 +133,7 @@ impl RPCProcessor { } // Validate peers returned are, in fact, closer to the key than the node we sent this to - let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { + let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peer_info_list) { Ok(v) => v, Err(e) => { return Ok(NetworkResult::invalid_message(format!( @@ -150,7 +156,7 @@ impl RPCProcessor { reply_private_route, InspectValueAnswer { seqs, - peers, + peers: PeerInfoResponse{ safety_domain_set, peer_info_list }, descriptor, }, ))) diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 7ba3cb1f..f7d17538 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -4,7 +4,7 @@ use super::*; pub struct SetValueAnswer { pub set: bool, pub value: Option, - pub peers: Vec, + pub peers: PeerInfoResponse, } impl RPCProcessor { @@ -95,6 +95,12 @@ impl RPCProcessor { log_dht!(debug "{}", debug_string); } + let safety_domain_set = if dest.has_safety_route() { + SafetyDomain::Safe.into() + } else { + SafetyDomainSet::all() + }; + let waitable_reply = network_result_try!( self.question(dest.clone(), question, Some(question_context)) .await? @@ -119,7 +125,7 @@ impl RPCProcessor { _ => return Ok(NetworkResult::invalid_message("not an answer")), }; - let (set, value, peers) = set_value_a.destructure(); + let (set, value, peer_info_list) = set_value_a.destructure(); if debug_target_enabled!("dht") { let debug_string_value = value.as_ref().map(|v| { @@ -140,18 +146,18 @@ impl RPCProcessor { "" }, debug_string_value, - peers.len(), + peer_info_list.len(), dest, ); log_dht!(debug "{}", debug_string_answer); - let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + let peer_ids:Vec = peer_info_list.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); log_dht!(debug "Peers: {:#?}", peer_ids); } // Validate peers returned are, in fact, closer to the key than the node we sent this to - let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { + let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peer_info_list) { Ok(v) => v, Err(e) => { return Ok(NetworkResult::invalid_message(format!( @@ -180,7 +186,7 @@ impl RPCProcessor { Ok(NetworkResult::value(Answer::new( latency, reply_private_route, - SetValueAnswer { set, value, peers }, + SetValueAnswer { set, value, peers: PeerInfoResponse { safety_domain_set, peer_info_list } } ))) } diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index b5c95c35..7c9e8f21 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -15,14 +15,13 @@ impl RPCProcessor { .enter() .map_err(RPCError::map_try_again("not started up"))?; - // Ensure destination never has a private route - if matches!( - dest, - Destination::PrivateRoute { - private_route: _, - safety_selection: _ - } - ) { + // Ensure destination is direct + if dest.has_safety_route() { + return Err(RPCError::internal( + "Never send signal requests over safety routes", + )); + } + if dest.is_private_route() { return Err(RPCError::internal( "Never send signal requests over private routes", )); diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 66125305..2b235d9a 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -31,6 +31,7 @@ impl RPCProcessor { opt_node, opt_relay, opt_routing_domain, + opt_override_safety_domain: _, }) = dest.get_unsafe_routing_info(self.routing_table.clone()) { @@ -109,6 +110,7 @@ impl RPCProcessor { Destination::Direct { node: target, safety_selection, + opt_override_safety_domain: _, } => { if matches!(safety_selection, SafetySelection::Unsafe(_)) { if let Some(sender_info) = sender_info { @@ -145,10 +147,12 @@ impl RPCProcessor { relay: _, node: _, safety_selection: _, + opt_override_safety_domain: _, } | Destination::PrivateRoute { private_route: _, safety_selection: _, + opt_override_safety_domain: _, } => { // sender info is irrelevant over relays and routes } diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index e89cfeb9..2cc15f3b 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -20,7 +20,7 @@ impl RPCProcessor { .map_err(RPCError::map_try_again("not started up"))?; // Ensure destination is never using a safety route - if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) { + if dest.has_safety_route() { return Err(RPCError::internal( "Never send value changes over safety routes", )); diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index 76fcc915..ba15a9ad 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -4,7 +4,7 @@ use super::*; pub struct WatchValueAnswer { pub accepted: bool, pub expiration_ts: Timestamp, - pub peers: Vec, + pub peers: PeerInfoResponse, pub watch_id: u64, } @@ -86,6 +86,12 @@ impl RPCProcessor { log_dht!(debug "{}", debug_string); + let safety_domain_set = if dest.has_safety_route() { + SafetyDomain::Safe.into() + } else { + SafetyDomainSet::all() + }; + let waitable_reply = network_result_try!(self.question(dest.clone(), question, None).await?); @@ -108,7 +114,7 @@ impl RPCProcessor { _ => return Ok(NetworkResult::invalid_message("not an answer")), }; let question_watch_id = watch_id; - let (accepted, expiration, peers, watch_id) = watch_value_a.destructure(); + let (accepted, expiration, peer_info_list, watch_id) = watch_value_a.destructure(); if debug_target_enabled!("dht") { let debug_string_answer = format!( "OUT <== WatchValueA({}id={} {} #{:?}@{} peers={}) <= {}", @@ -117,13 +123,13 @@ impl RPCProcessor { key, subkeys, expiration, - peers.len(), + peer_info_list.len(), dest ); log_dht!(debug "{}", debug_string_answer); - let peer_ids: Vec = peers + let peer_ids: Vec = peer_info_list .iter() .filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())) .collect(); @@ -150,7 +156,12 @@ impl RPCProcessor { } // Validate peers returned are, in fact, closer to the key than the node we sent this to - let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { + let valid = match RoutingTable::verify_peers_closer( + vcrypto, + target_node_id, + key, + &peer_info_list, + ) { Ok(v) => v, Err(e) => { return Ok(NetworkResult::invalid_message(format!( @@ -168,7 +179,7 @@ impl RPCProcessor { #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("ret.expiration", latency.as_u64()); #[cfg(feature = "verbose-tracing")] - tracing::Span::current().record("ret.peers.len", peers.len()); + tracing::Span::current().record("ret.peers.len", peer_info_list.len()); Ok(NetworkResult::value(Answer::new( latency, @@ -176,7 +187,10 @@ impl RPCProcessor { WatchValueAnswer { accepted, expiration_ts: Timestamp::new(expiration), - peers, + peers: PeerInfoResponse { + safety_domain_set, + peer_info_list, + }, watch_id, }, ))) diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 675aef6f..8a0a2916 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -113,7 +113,7 @@ impl StorageManager { // Keep the value if we got one and it is newer and it passes schema validation let Some(value) = gva.answer.value else { // Return peers if we have some - log_network_result!(debug "GetValue returned no value, fanout call returned peers {}", gva.answer.peers.len()); + log_network_result!(debug "GetValue returned no value, fanout call returned peers {}", gva.answer.peers.peer_info_list.len()); return Ok(NetworkResult::value(gva.answer.peers)) }; @@ -179,7 +179,7 @@ impl StorageManager { } // Return peers if we have some - log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); + log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.peer_info_list.len()); Ok(NetworkResult::value(gva.answer.peers)) }.instrument(tracing::trace_span!("outbound_get_value fanout routine")) diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 7483caaf..9c02b9db 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -225,7 +225,7 @@ impl StorageManager { } // Return peers if we have some - log_network_result!(debug "InspectValue fanout call returned peers {}", answer.peers.len()); + log_network_result!(debug "InspectValue fanout call returned peers {}", answer.peers.peer_info_list.len()); Ok(NetworkResult::value(answer.peers)) }.instrument(tracing::trace_span!("outbound_inspect_value fanout call")) diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index d673c3c5..5e182100 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -107,7 +107,7 @@ impl StorageManager { ctx.missed_since_last_set += 1; // Return peers if we have some - log_network_result!(debug "SetValue missed: {}, fanout call returned peers {}", ctx.missed_since_last_set, sva.answer.peers.len()); + log_network_result!(debug "SetValue missed: {}, fanout call returned peers {}", ctx.missed_since_last_set, sva.answer.peers.peer_info_list.len()); return Ok(NetworkResult::value(sva.answer.peers)); } @@ -122,7 +122,7 @@ impl StorageManager { } // Return peers if we have some - log_network_result!(debug "SetValue returned no value, fanout call returned peers {}", sva.answer.peers.len()); + log_network_result!(debug "SetValue returned no value, fanout call returned peers {}", sva.answer.peers.peer_info_list.len()); return Ok(NetworkResult::value(sva.answer.peers)); }; diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index e7a7f5aa..8587f365 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -291,7 +291,7 @@ impl StorageManager { } // Return peers if we have some - log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node); + log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.peer_info_list.len(), next_node); Ok(NetworkResult::value(wva.answer.peers)) }.instrument(tracing::trace_span!("outbound_watch_value call routine")) diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index d862a594..cc3c8439 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -224,7 +224,7 @@ fn get_destination( let text = text.to_owned(); Box::pin(async move { // Safety selection - let (text, ss) = if let Some((first, second)) = text.split_once('+') { + let (text, oss) = if let Some((first, second)) = text.split_once('+') { let ss = get_safety_selection(routing_table.clone())(second)?; (first, Some(ss)) } else { @@ -233,6 +233,15 @@ fn get_destination( if text.is_empty() { return None; } + // Override safety domain + let (text, osd) = if let Some(first) = text.strip_suffix("*SAFE") { + (first, Some(SafetyDomain::Safe)) + } else { + (text, None) + }; + if text.is_empty() { + return None; + } if &text[0..1] == "#" { let rss = routing_table.route_spec_store(); @@ -254,11 +263,14 @@ fn get_destination( }; private_route }; - - Some(Destination::private_route( + let mut d = Destination::private_route( private_route, - ss.unwrap_or(SafetySelection::Unsafe(Sequencing::default())), - )) + oss.unwrap_or(SafetySelection::Unsafe(Sequencing::default())), + ); + if let Some(sd) = osd { + d = d.with_override_safety_domain(sd); + } + Some(d) } else { let (text, mods) = text .split_once('/') @@ -274,25 +286,29 @@ fn get_destination( } let mut d = Destination::relay(relay_nr, target_nr); - if let Some(ss) = ss { + if let Some(ss) = oss { d = d.with_safety(ss) } - + if let Some(sd) = osd { + d = d.with_override_safety_domain(sd); + } Some(d) } else { // Direct let mut target_nr = - resolve_node_ref(routing_table, ss.unwrap_or_default())(text).await?; + resolve_node_ref(routing_table, oss.unwrap_or_default())(text).await?; if let Some(mods) = mods { target_nr = get_node_ref_modifiers(target_nr)(mods)?; } let mut d = Destination::direct(target_nr); - if let Some(ss) = ss { + if let Some(ss) = oss { d = d.with_safety(ss) } - + if let Some(sd) = osd { + d = d.with_override_safety_domain(sd); + } Some(d) } } @@ -968,6 +984,7 @@ impl VeilidAPI { Destination::Direct { node: target, safety_selection: _, + opt_override_safety_domain: _, } => Ok(format!( "Destination: {:#?}\nTarget Entry:\n{}\n", &dest, @@ -977,6 +994,7 @@ impl VeilidAPI { relay, node: target, safety_selection: _, + opt_override_safety_domain: _, } => Ok(format!( "Destination: {:#?}\nTarget Entry:\n{}\nRelay Entry:\n{}\n", &dest, @@ -986,6 +1004,7 @@ impl VeilidAPI { Destination::PrivateRoute { private_route: _, safety_selection: _, + opt_override_safety_domain: _, } => Ok(format!("Destination: {:#?}", &dest)), } }