From 3224a315c36fde433678fd430a1b0cc266206498 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 20 Jul 2023 17:52:45 -0400 Subject: [PATCH] proper node info filter for fanout --- veilid-core/src/rpc_processor/fanout_call.rs | 56 +++++++++++++++++--- veilid-core/src/rpc_processor/mod.rs | 1 + veilid-core/src/storage_manager/get_value.rs | 5 +- veilid-core/src/storage_manager/mod.rs | 3 +- veilid-core/src/storage_manager/set_value.rs | 4 ++ 5 files changed, 59 insertions(+), 10 deletions(-) diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 32eb63f8..215378aa 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -10,6 +10,15 @@ where } pub type FanoutCallReturnType = Result>, RPCError>; +pub type FanoutNodeInfoFilter = Arc bool + Send + Sync>; + +pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter { + Arc::new(|_, _| true) +} + +pub fn capability_fanout_node_info_filter(caps: Vec) -> FanoutNodeInfoFilter { + Arc::new(move |_, ni| ni.has_capabilities(&caps)) +} /// Contains the logic for generically searing the Veilid routing table for a set of nodes and applying an /// RPC operation that eventually converges on satisfactory result, or times out and returns some @@ -40,6 +49,7 @@ where node_count: usize, fanout: usize, timeout_us: TimestampDuration, + node_info_filter: FanoutNodeInfoFilter, call_routine: C, check_done: D, } @@ -57,6 +67,7 @@ where node_count: usize, fanout: usize, timeout_us: TimestampDuration, + node_info_filter: FanoutNodeInfoFilter, call_routine: C, check_done: D, ) -> Arc { @@ -74,6 +85,7 @@ where node_count, fanout, timeout_us, + node_info_filter, call_routine, check_done, }) @@ -160,11 +172,26 @@ where // Do the call for this node match (self.call_routine)(next_node.clone()).await { Ok(Some(v)) => { + // Filter returned nodes + let filtered_v: Vec = v + .into_iter() + .filter(|pi| { + let node_ids = pi.node_ids().to_vec(); + if !(self.node_info_filter)( + &node_ids, + pi.signed_node_info().node_info(), + ) { + return false; + } + true + }) + .collect(); + // Call succeeded // Register the returned nodes and add them to the closest nodes list in sorted order let new_nodes = self .routing_table - .register_find_node_answer(self.crypto_kind, v); + .register_find_node_answer(self.crypto_kind, filtered_v); self.clone().add_new_nodes(new_nodes); } Ok(None) => { @@ -185,20 +212,33 @@ where // Get the 'node_count' closest nodes to the key out of our routing table let closest_nodes = { let routing_table = self.routing_table.clone(); - + let node_info_filter = self.node_info_filter.clone(); let filter = Box::new( move |rti: &RoutingTableInner, opt_entry: Option>| { // Exclude our own node if opt_entry.is_none() { return false; } + let entry = opt_entry.unwrap(); - // Ensure only things that are valid/signed in the PublicInternet domain are returned - rti.filter_has_valid_signed_node_info( - RoutingDomain::PublicInternet, - true, - opt_entry, - ) + // Filter entries + entry.with(rti, |_rti, e| { + let Some(signed_node_info) = e.signed_node_info(RoutingDomain::PublicInternet) else { + return false; + }; + // Ensure only things that are valid/signed in the PublicInternet domain are returned + if !signed_node_info.has_any_signature() { + return false; + } + + // Check our node info ilter + let node_ids = e.node_ids().to_vec(); + if !(node_info_filter)(&node_ids, signed_node_info.node_info()) { + return false; + } + + true + }) }, ) as RoutingTableEntryFilter; let filters = VecDeque::from([filter]); diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index bcdf9aa2..44c07ba8 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -510,6 +510,7 @@ impl RPCProcessor { count, fanout, timeout_us, + empty_fanout_node_info_filter(), call_routine, check_done, ); diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 8c889595..47998eff 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -136,7 +136,9 @@ impl StorageManager { } // Return peers if we have some - log_stor!(debug "Fanout call returned peers {}", gva.answer.peers.len()); + #[cfg(feature="network-result-extra")] + log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); + Ok(Some(gva.answer.peers)) } }; @@ -158,6 +160,7 @@ impl StorageManager { key_count, fanout, timeout_us, + capability_fanout_node_info_filter(vec![CAP_DHT]), call_routine, check_done, ); diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 165e8809..cf5222ff 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -18,7 +18,8 @@ use storage_manager_inner::*; pub use types::*; use super::*; -use crate::rpc_processor::*; +use routing_table::*; +use rpc_processor::*; /// The maximum size of a single subkey const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN; diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index a9264ed3..25a08170 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -116,6 +116,9 @@ impl StorageManager { } // Return peers if we have some + #[cfg(feature="network-result-extra")] + log_stor!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len()); + Ok(Some(sva.answer.peers)) } }; @@ -137,6 +140,7 @@ impl StorageManager { key_count, fanout, timeout_us, + capability_fanout_node_info_filter(vec![CAP_DHT]), call_routine, check_done, );