Merge branch 'veilidchat-work' into 'main'

proper node info filter for fanout

See merge request veilid/veilid!92
This commit is contained in:
Christien Rioux 2023-07-20 23:14:44 +00:00
commit e27f73fa7a
5 changed files with 59 additions and 10 deletions

View File

@ -10,6 +10,15 @@ where
}
pub type FanoutCallReturnType = Result<Option<Vec<PeerInfo>>, RPCError>;
pub type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;
pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
Arc::new(|_, _| true)
}
pub fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter {
Arc::new(move |_, ni| ni.has_capabilities(&caps))
}
/// Contains the logic for generically searing the Veilid routing table for a set of nodes and applying an
/// RPC operation that eventually converges on satisfactory result, or times out and returns some
@ -40,6 +49,7 @@ where
node_count: usize,
fanout: usize,
timeout_us: TimestampDuration,
node_info_filter: FanoutNodeInfoFilter,
call_routine: C,
check_done: D,
}
@ -57,6 +67,7 @@ where
node_count: usize,
fanout: usize,
timeout_us: TimestampDuration,
node_info_filter: FanoutNodeInfoFilter,
call_routine: C,
check_done: D,
) -> Arc<Self> {
@ -74,6 +85,7 @@ where
node_count,
fanout,
timeout_us,
node_info_filter,
call_routine,
check_done,
})
@ -160,11 +172,26 @@ where
// Do the call for this node
match (self.call_routine)(next_node.clone()).await {
Ok(Some(v)) => {
// Filter returned nodes
let filtered_v: Vec<PeerInfo> = v
.into_iter()
.filter(|pi| {
let node_ids = pi.node_ids().to_vec();
if !(self.node_info_filter)(
&node_ids,
pi.signed_node_info().node_info(),
) {
return false;
}
true
})
.collect();
// Call succeeded
// Register the returned nodes and add them to the closest nodes list in sorted order
let new_nodes = self
.routing_table
.register_find_node_answer(self.crypto_kind, v);
.register_find_node_answer(self.crypto_kind, filtered_v);
self.clone().add_new_nodes(new_nodes);
}
Ok(None) => {
@ -185,20 +212,33 @@ where
// Get the 'node_count' closest nodes to the key out of our routing table
let closest_nodes = {
let routing_table = self.routing_table.clone();
let node_info_filter = self.node_info_filter.clone();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Exclude our own node
if opt_entry.is_none() {
return false;
}
let entry = opt_entry.unwrap();
// Filter entries
entry.with(rti, |_rti, e| {
let Some(signed_node_info) = e.signed_node_info(RoutingDomain::PublicInternet) else {
return false;
};
// Ensure only things that are valid/signed in the PublicInternet domain are returned
rti.filter_has_valid_signed_node_info(
RoutingDomain::PublicInternet,
true,
opt_entry,
)
if !signed_node_info.has_any_signature() {
return false;
}
// Check our node info ilter
let node_ids = e.node_ids().to_vec();
if !(node_info_filter)(&node_ids, signed_node_info.node_info()) {
return false;
}
true
})
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);

View File

@ -510,6 +510,7 @@ impl RPCProcessor {
count,
fanout,
timeout_us,
empty_fanout_node_info_filter(),
call_routine,
check_done,
);

View File

@ -136,7 +136,9 @@ impl StorageManager {
}
// Return peers if we have some
log_stor!(debug "Fanout call returned peers {}", gva.answer.peers.len());
#[cfg(feature="network-result-extra")]
log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
Ok(Some(gva.answer.peers))
}
};
@ -158,6 +160,7 @@ impl StorageManager {
key_count,
fanout,
timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT]),
call_routine,
check_done,
);

View File

@ -18,7 +18,8 @@ use storage_manager_inner::*;
pub use types::*;
use super::*;
use crate::rpc_processor::*;
use routing_table::*;
use rpc_processor::*;
/// The maximum size of a single subkey
const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN;

View File

@ -116,6 +116,9 @@ impl StorageManager {
}
// Return peers if we have some
#[cfg(feature="network-result-extra")]
log_stor!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len());
Ok(Some(sva.answer.peers))
}
};
@ -137,6 +140,7 @@ impl StorageManager {
key_count,
fanout,
timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT]),
call_routine,
check_done,
);