Merge branch 'keep-n-closest-nodes' into 'main'

Implement closest peers refresh

See merge request veilid/veilid!286
This commit is contained in:
Christien Rioux 2024-06-27 20:15:08 +00:00
commit 7368e5d5d3
20 changed files with 333 additions and 72 deletions

View File

@ -215,13 +215,14 @@ impl Network {
did.dial_info.protocol_type().low_level_protocol_type(), did.dial_info.protocol_type().low_level_protocol_type(),
dr.local_port, dr.local_port,
); );
for additional_pt in if let Some(ipm) = inbound_protocol_map.get(&ipmkey) {
inbound_protocol_map.get(&ipmkey).unwrap().iter().skip(1) for additional_pt in ipm.iter().skip(1) {
{
// Make dialinfo for additional protocol type // Make dialinfo for additional protocol type
let additional_ddi = DetectedDialInfo::Detected(DialInfoDetail { let additional_ddi = DetectedDialInfo::Detected(DialInfoDetail {
dial_info: self dial_info: self.make_dial_info(
.make_dial_info(did.dial_info.socket_address(), *additional_pt), did.dial_info.socket_address(),
*additional_pt,
),
class: did.class, class: did.class,
}); });
// Add additional dialinfo // Add additional dialinfo
@ -229,6 +230,7 @@ impl Network {
} }
} }
} }
}
Ok(Some(None)) => { Ok(Some(None)) => {
// Found no new dial info for this protocol/address combination // Found no new dial info for this protocol/address combination
} }

View File

@ -268,6 +268,11 @@ impl NetworkManager {
if detect_address_changes { if detect_address_changes {
// Reset the address check cache now so we can start detecting fresh // Reset the address check cache now so we can start detecting fresh
info!("Public address has changed, detecting public dial info"); info!("Public address has changed, detecting public dial info");
log_net!(debug "report_global_socket_address\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer);
log_net!(debug
"public_address_check_cache: {:#?}",
inner.public_address_check_cache
);
inner.public_address_check_cache.clear(); inner.public_address_check_cache.clear();

View File

@ -115,7 +115,11 @@ impl Bucket {
self.entries.iter() self.entries.iter()
} }
pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<PublicKey>> { pub(super) fn kick(
&mut self,
bucket_depth: usize,
exempt_peers: &BTreeSet<PublicKey>,
) -> Option<BTreeSet<PublicKey>> {
// Get number of entries to attempt to purge from bucket // Get number of entries to attempt to purge from bucket
let bucket_len = self.entries.len(); let bucket_len = self.entries.len();
@ -167,6 +171,11 @@ impl Bucket {
continue; continue;
} }
// if this entry is one of our exempt entries, don't drop it
if exempt_peers.contains(&entry.0) {
continue;
}
// if no references, lets evict it // if no references, lets evict it
dead_node_ids.insert(entry.0); dead_node_ids.insert(entry.0);
} }

View File

@ -166,12 +166,20 @@ impl BucketEntryInner {
common_crypto_kinds(&self.validated_node_ids.kinds(), other) common_crypto_kinds(&self.validated_node_ids.kinds(), other)
} }
/// Capability check /// All-of capability check
pub fn has_capabilities(&self, routing_domain: RoutingDomain, capabilities: &[Capability]) -> bool { pub fn has_all_capabilities(&self, routing_domain: RoutingDomain, capabilities: &[Capability]) -> bool {
let Some(ni) = self.node_info(routing_domain) else { let Some(ni) = self.node_info(routing_domain) else {
return false; return false;
}; };
ni.has_capabilities(capabilities) ni.has_all_capabilities(capabilities)
}
/// Any-of capability check
pub fn has_any_capabilities(&self, routing_domain: RoutingDomain, capabilities: &[Capability]) -> bool {
let Some(ni) = self.node_info(routing_domain) else {
return false;
};
ni.has_any_capabilities(capabilities)
} }
// Less is faster // Less is faster

View File

@ -132,7 +132,7 @@ impl RoutingTable {
.entries() .entries()
.filter(|e| { .filter(|e| {
let cap_match = e.1.with(inner, |_rti, e| { let cap_match = e.1.with(inner, |_rti, e| {
e.has_capabilities(RoutingDomain::PublicInternet, &capabilities) e.has_all_capabilities(RoutingDomain::PublicInternet, &capabilities)
}); });
let state = e.1.with(inner, |_rti, e| e.state(cur_ts)); let state = e.1.with(inner, |_rti, e| e.state(cur_ts));
state >= min_state && cap_match state >= min_state && cap_match

View File

@ -34,12 +34,12 @@ impl RoutingTable {
// Ensure capabilities are met // Ensure capabilities are met
match opt_entry { match opt_entry {
Some(entry) => entry.with(rti, |_rti, e| { Some(entry) => entry.with(rti, |_rti, e| {
e.has_capabilities(RoutingDomain::PublicInternet, capabilities) e.has_all_capabilities(RoutingDomain::PublicInternet, capabilities)
}), }),
None => own_peer_info None => own_peer_info
.signed_node_info() .signed_node_info()
.node_info() .node_info()
.has_capabilities(capabilities), .has_all_capabilities(capabilities),
} }
}, },
) as RoutingTableEntryFilter; ) as RoutingTableEntryFilter;
@ -98,7 +98,9 @@ impl RoutingTable {
}; };
// Ensure only things that have a minimum set of capabilities are returned // Ensure only things that have a minimum set of capabilities are returned
entry.with(rti, |rti, e| { entry.with(rti, |rti, e| {
if !e.has_capabilities(RoutingDomain::PublicInternet, &required_capabilities) { if !e
.has_all_capabilities(RoutingDomain::PublicInternet, &required_capabilities)
{
return false; return false;
} }
// Ensure only things that are valid/signed in the PublicInternet domain are returned // Ensure only things that are valid/signed in the PublicInternet domain are returned

View File

@ -115,6 +115,8 @@ pub(crate) struct RoutingTableUnlockedInner {
bootstrap_task: TickTask<EyreReport>, bootstrap_task: TickTask<EyreReport>,
/// Background process to ensure we have enough nodes in our routing table /// Background process to ensure we have enough nodes in our routing table
peer_minimum_refresh_task: TickTask<EyreReport>, peer_minimum_refresh_task: TickTask<EyreReport>,
/// Background process to ensure we have enough nodes close to our own in our routing table
closest_peers_refresh_task: TickTask<EyreReport>,
/// Background process to check nodes to see if they are still alive and for reliability /// Background process to check nodes to see if they are still alive and for reliability
ping_validator_task: TickTask<EyreReport>, ping_validator_task: TickTask<EyreReport>,
/// Background process to keep relays up /// Background process to keep relays up
@ -223,6 +225,7 @@ impl RoutingTable {
kick_buckets_task: TickTask::new(1), kick_buckets_task: TickTask::new(1),
bootstrap_task: TickTask::new(1), bootstrap_task: TickTask::new(1),
peer_minimum_refresh_task: TickTask::new(1), peer_minimum_refresh_task: TickTask::new(1),
closest_peers_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms),
ping_validator_task: TickTask::new(1), ping_validator_task: TickTask::new(1),
relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS),
private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS),
@ -1029,18 +1032,21 @@ impl RoutingTable {
out out
} }
/// Finds nodes near a particular node id
/// Ensures all returned nodes have a set of capabilities enabled
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub async fn find_node( pub async fn find_node(
&self, &self,
node_ref: NodeRef, node_ref: NodeRef,
node_id: TypedKey, node_id: TypedKey,
capabilities: Vec<Capability>,
) -> EyreResult<NetworkResult<Vec<NodeRef>>> { ) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
let rpc_processor = self.rpc_processor(); let rpc_processor = self.rpc_processor();
let res = network_result_try!( let res = network_result_try!(
rpc_processor rpc_processor
.clone() .clone()
.rpc_call_find_node(Destination::direct(node_ref), node_id, vec![]) .rpc_call_find_node(Destination::direct(node_ref), node_id, capabilities)
.await? .await?
); );
@ -1051,36 +1057,45 @@ impl RoutingTable {
} }
/// Ask a remote node to list the nodes it has around the current node /// Ask a remote node to list the nodes it has around the current node
/// Ensures all returned nodes have a set of capabilities enabled
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub async fn find_self( pub async fn find_self(
&self, &self,
crypto_kind: CryptoKind, crypto_kind: CryptoKind,
node_ref: NodeRef, node_ref: NodeRef,
capabilities: Vec<Capability>,
) -> EyreResult<NetworkResult<Vec<NodeRef>>> { ) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
let self_node_id = self.node_id(crypto_kind); let self_node_id = self.node_id(crypto_kind);
self.find_node(node_ref, self_node_id).await self.find_node(node_ref, self_node_id, capabilities).await
} }
/// Ask a remote node to list the nodes it has around itself /// Ask a remote node to list the nodes it has around itself
/// Ensures all returned nodes have a set of capabilities enabled
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub async fn find_target( pub async fn find_target(
&self, &self,
crypto_kind: CryptoKind, crypto_kind: CryptoKind,
node_ref: NodeRef, node_ref: NodeRef,
capabilities: Vec<Capability>,
) -> EyreResult<NetworkResult<Vec<NodeRef>>> { ) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
let Some(target_node_id) = node_ref.node_ids().get(crypto_kind) else { let Some(target_node_id) = node_ref.node_ids().get(crypto_kind) else {
bail!("no target node ids for this crypto kind"); bail!("no target node ids for this crypto kind");
}; };
self.find_node(node_ref, target_node_id).await self.find_node(node_ref, target_node_id, capabilities).await
} }
/// Ask node to 'find node' on own node so we can get some more nodes near ourselves
/// and then contact those nodes to inform -them- that we exist
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", skip(self))]
pub async fn reverse_find_node(&self, crypto_kind: CryptoKind, node_ref: NodeRef, wide: bool) { pub async fn reverse_find_node(
// Ask node to 'find node' on own node so we can get some more nodes near ourselves &self,
// and then contact those nodes to inform -them- that we exist crypto_kind: CryptoKind,
node_ref: NodeRef,
wide: bool,
capabilities: Vec<Capability>,
) {
// Ask node for nodes closest to our own node // Ask node for nodes closest to our own node
let closest_nodes = network_result_value_or_log!(match self.find_self(crypto_kind, node_ref.clone()).await { let closest_nodes = network_result_value_or_log!(match self.find_self(crypto_kind, node_ref.clone(), capabilities.clone()).await {
Err(e) => { Err(e) => {
log_rtab!(error log_rtab!(error
"find_self failed for {:?}: {:?}", "find_self failed for {:?}: {:?}",
@ -1096,7 +1111,7 @@ impl RoutingTable {
// Ask each node near us to find us as well // Ask each node near us to find us as well
if wide { if wide {
for closest_nr in closest_nodes { for closest_nr in closest_nodes {
network_result_value_or_log!(match self.find_self(crypto_kind, closest_nr.clone()).await { network_result_value_or_log!(match self.find_self(crypto_kind, closest_nr.clone(), capabilities.clone()).await {
Err(e) => { Err(e) => {
log_rtab!(error log_rtab!(error
"find_self failed for {:?}: {:?}", "find_self failed for {:?}: {:?}",

View File

@ -358,9 +358,10 @@ impl RoutingTableInner {
"Starting routing table buckets purge. Table currently has {} nodes", "Starting routing table buckets purge. Table currently has {} nodes",
self.bucket_entry_count() self.bucket_entry_count()
); );
let closest_nodes = BTreeSet::new();
for ck in VALID_CRYPTO_KINDS { for ck in VALID_CRYPTO_KINDS {
for bucket in self.buckets.get_mut(&ck).unwrap().iter_mut() { for bucket in self.buckets.get_mut(&ck).unwrap().iter_mut() {
bucket.kick(0); bucket.kick(0, &closest_nodes);
} }
} }
self.all_entries.remove_expired(); self.all_entries.remove_expired();
@ -396,11 +397,11 @@ impl RoutingTableInner {
/// Attempt to settle buckets and remove entries down to the desired number /// Attempt to settle buckets and remove entries down to the desired number
/// which may not be possible due extant NodeRefs /// which may not be possible due extant NodeRefs
pub fn kick_bucket(&mut self, bucket_index: BucketIndex) { pub fn kick_bucket(&mut self, bucket_index: BucketIndex, exempt_peers: &BTreeSet<PublicKey>) {
let bucket = self.get_bucket_mut(bucket_index); let bucket = self.get_bucket_mut(bucket_index);
let bucket_depth = Self::bucket_depth(bucket_index); let bucket_depth = Self::bucket_depth(bucket_index);
if let Some(_dead_node_ids) = bucket.kick(bucket_depth) { if let Some(_dead_node_ids) = bucket.kick(bucket_depth, exempt_peers) {
// Remove expired entries // Remove expired entries
self.all_entries.remove_expired(); self.all_entries.remove_expired();
@ -1252,7 +1253,7 @@ impl RoutingTableInner {
} }
} }
fn make_closest_noderef_sort( pub(crate) fn make_closest_noderef_sort(
crypto: Crypto, crypto: Crypto,
node_id: TypedKey, node_id: TypedKey,
) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering { ) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering {
@ -1280,3 +1281,19 @@ fn make_closest_noderef_sort(
}) })
} }
} }
pub(crate) fn make_closest_node_id_sort(
crypto: Crypto,
node_id: TypedKey,
) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering {
let kind = node_id.kind;
// Get cryptoversion to check distance with
let vcrypto = crypto.get(kind).unwrap();
move |a: &CryptoKey, b: &CryptoKey| -> core::cmp::Ordering {
// distance is the next metric, closer nodes first
let da = vcrypto.distance(a, &node_id.value);
let db = vcrypto.distance(b, &node_id.value);
da.cmp(&db)
}
}

View File

@ -300,7 +300,7 @@ impl RoutingTable {
// Need VALID signed peer info, so ask bootstrap to find_node of itself // Need VALID signed peer info, so ask bootstrap to find_node of itself
// which will ensure it has the bootstrap's signed peer info as part of the response // which will ensure it has the bootstrap's signed peer info as part of the response
let _ = routing_table.find_target(crypto_kind, nr.clone()).await; let _ = routing_table.find_target(crypto_kind, nr.clone(), vec![]).await;
// Ensure we got the signed peer info // Ensure we got the signed peer info
if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) {
@ -311,7 +311,7 @@ impl RoutingTable {
routing_table.network_manager().address_filter().set_dial_info_failed(bsdi); routing_table.network_manager().address_filter().set_dial_info_failed(bsdi);
} else { } else {
// otherwise this bootstrap is valid, lets ask it to find ourselves now // otherwise this bootstrap is valid, lets ask it to find ourselves now
routing_table.reverse_find_node(crypto_kind, nr, true).await routing_table.reverse_find_node(crypto_kind, nr, true, vec![]).await
} }
} }
.instrument(Span::current()), .instrument(Span::current()),

View File

@ -0,0 +1,89 @@
use super::*;
/// How many nodes to consult for closest peers simultaneously
pub const CLOSEST_PEERS_REQUEST_COUNT: usize = 5;
use futures_util::stream::{FuturesUnordered, StreamExt};
use stop_token::future::FutureExt as StopFutureExt;
impl RoutingTable {
/// Ask our closest peers to give us more peers close to ourselves. This will
/// assist with the DHT and other algorithms that utilize the distance metric.
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn closest_peers_refresh_task_routine(
self,
stop_token: StopToken,
) -> EyreResult<()> {
let mut unord = FuturesUnordered::new();
for crypto_kind in VALID_CRYPTO_KINDS {
// Get our node id for this cryptokind
let self_node_id = self.node_id(crypto_kind);
let routing_table = self.clone();
let mut filters = VecDeque::new();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Exclude our own node
let Some(entry) = opt_entry else {
return false;
};
entry.with(rti, |_rti, e| {
// Keep only the entries that contain the crypto kind we're looking for
let compatible_crypto = e.crypto_kinds().contains(&crypto_kind);
if !compatible_crypto {
return false;
}
// Keep only the entries that participate in distance-metric relevant capabilities
// This would be better to be 'has_any_capabilities' but for now until out capnp gets
// this ability, it will do.
if !e.has_all_capabilities(
RoutingDomain::PublicInternet,
DISTANCE_METRIC_CAPABILITIES,
) {
return false;
}
true
})
},
) as RoutingTableEntryFilter;
filters.push_front(filter);
let noderefs = routing_table
.find_preferred_closest_nodes(
CLOSEST_PEERS_REQUEST_COUNT,
self_node_id,
filters,
|_rti, entry: Option<Arc<BucketEntry>>| {
NodeRef::new(routing_table.clone(), entry.unwrap().clone(), None)
},
)
.unwrap();
for nr in noderefs {
let routing_table = self.clone();
unord.push(
async move {
// This would be better if it were 'any' instead of 'all' capabilities
// but that requires extending the capnp to support it.
routing_table
.reverse_find_node(
crypto_kind,
nr,
false,
DISTANCE_METRIC_CAPABILITIES.to_vec(),
)
.await
}
.instrument(Span::current()),
);
}
}
// do closest peers search in parallel
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
Ok(())
}
}

View File

@ -1,5 +1,11 @@
use super::*; use super::*;
/// How many 'reliable' nodes closest to our own node id to keep
const KEEP_N_CLOSEST_RELIABLE_PEERS_COUNT: usize = 16;
/// How many 'unreliable' nodes closest to our own node id to keep
const KEEP_N_CLOSEST_UNRELIABLE_PEERS_COUNT: usize = 8;
impl RoutingTable { impl RoutingTable {
// Kick the queued buckets in the routing table to free dead nodes if necessary // Kick the queued buckets in the routing table to free dead nodes if necessary
// Attempts to keep the size of the routing table down to the bucket depth // Attempts to keep the size of the routing table down to the bucket depth
@ -15,8 +21,70 @@ impl RoutingTable {
.into_iter() .into_iter()
.collect(); .collect();
let mut inner = self.inner.write(); let mut inner = self.inner.write();
// Get our exempt nodes for each crypto kind
let mut exempt_peers_by_kind = BTreeMap::<CryptoKind, BTreeSet<PublicKey>>::new();
for kind in VALID_CRYPTO_KINDS {
let our_node_id = self.node_id(kind);
let Some(buckets) = inner.buckets.get(&kind) else {
continue;
};
let sort = make_closest_node_id_sort(self.crypto(), our_node_id);
let mut closest_peers = BTreeSet::<CryptoKey>::new();
let mut closest_unreliable_count = 0usize;
let mut closest_reliable_count = 0usize;
// Iterate buckets backward, sort entries by closest distance first
'outer: for bucket in buckets.iter().rev() {
let mut entries = bucket.entries().collect::<Vec<_>>();
entries.sort_by(|a, b| sort(a.0, b.0));
for (key, entry) in entries {
// See if this entry is a distance-metric capability node
// If not, disqualify it from this closest_nodes list
if !entry.with(&inner, |_rti, e| {
e.has_any_capabilities(
RoutingDomain::PublicInternet,
DISTANCE_METRIC_CAPABILITIES,
)
}) {
continue;
}
let state = entry.with(&inner, |_rti, e| e.state(cur_ts));
match state {
BucketEntryState::Dead => {
// Do nothing with dead entries
}
BucketEntryState::Unreliable => {
// Add to closest unreliable nodes list
if closest_unreliable_count < KEEP_N_CLOSEST_UNRELIABLE_PEERS_COUNT {
closest_peers.insert(*key);
closest_unreliable_count += 1;
}
}
BucketEntryState::Reliable => {
// Add to closest reliable nodes list
if closest_reliable_count < KEEP_N_CLOSEST_RELIABLE_PEERS_COUNT {
closest_peers.insert(*key);
closest_reliable_count += 1;
}
}
}
if closest_unreliable_count == KEEP_N_CLOSEST_UNRELIABLE_PEERS_COUNT
&& closest_reliable_count == KEEP_N_CLOSEST_RELIABLE_PEERS_COUNT
{
break 'outer;
}
}
}
exempt_peers_by_kind.insert(kind, closest_peers);
}
for bucket_index in kick_queue { for bucket_index in kick_queue {
inner.kick_bucket(bucket_index) inner.kick_bucket(bucket_index, &exempt_peers_by_kind[&bucket_index.0]);
} }
Ok(()) Ok(())
} }

View File

@ -1,4 +1,5 @@
pub mod bootstrap; pub mod bootstrap;
pub mod closest_peers_refresh;
pub mod kick_buckets; pub mod kick_buckets;
pub mod peer_minimum_refresh; pub mod peer_minimum_refresh;
pub mod ping_validator; pub mod ping_validator;
@ -72,6 +73,23 @@ impl RoutingTable {
}); });
} }
// Set closest peers refresh tick task
{
let this = self.clone();
self.unlocked_inner
.closest_peers_refresh_task
.set_routine(move |s, _l, _t| {
Box::pin(
this.clone()
.closest_peers_refresh_task_routine(s)
.instrument(trace_span!(
parent: None,
"closest peers refresh task routine"
)),
)
});
}
// Set ping validator tick task // Set ping validator tick task
{ {
let this = self.clone(); let this = self.clone();
@ -181,9 +199,23 @@ impl RoutingTable {
// Run the relay management task // Run the relay management task
self.unlocked_inner.relay_management_task.tick().await?; self.unlocked_inner.relay_management_task.tick().await?;
// Only perform these operations if we already have a valid network class
// and if we didn't need to bootstrap or perform a peer minimum refresh as these operations
// require having a suitably full routing table and guaranteed ability to contact other nodes
if !needs_bootstrap
&& !needs_peer_minimum_refresh
&& self.has_valid_network_class(RoutingDomain::PublicInternet)
{
// Run closest peers refresh task
// this will also inform other close nodes of -our- existence so we would
// much rather perform this action -after- we have a valid network class
// so our PeerInfo is valid when informing the other nodes of our existence.
self.unlocked_inner
.closest_peers_refresh_task
.tick()
.await?;
// Run the private route management task // Run the private route management task
// If we don't know our network class then don't do this yet
if self.has_valid_network_class(RoutingDomain::PublicInternet) {
self.unlocked_inner self.unlocked_inner
.private_route_management_task .private_route_management_task
.tick() .tick()

View File

@ -80,7 +80,7 @@ impl RoutingTable {
ord.push_back( ord.push_back(
async move { async move {
routing_table routing_table
.reverse_find_node(crypto_kind, nr, false) .reverse_find_node(crypto_kind, nr, false, vec![])
.await .await
} }
.instrument(Span::current()), .instrument(Span::current()),

View File

@ -13,6 +13,8 @@ pub const CAP_APPMESSAGE: Capability = FourCC(*b"APPM");
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
pub const CAP_BLOCKSTORE: Capability = FourCC(*b"BLOC"); pub const CAP_BLOCKSTORE: Capability = FourCC(*b"BLOC");
pub const DISTANCE_METRIC_CAPABILITIES: &[Capability] = &[CAP_DHT, CAP_DHT_WATCH];
#[derive(Clone, Default, PartialEq, Eq, Debug, Serialize, Deserialize)] #[derive(Clone, Default, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct NodeInfo { pub struct NodeInfo {
network_class: NetworkClass, network_class: NetworkClass,
@ -152,7 +154,7 @@ impl NodeInfo {
pub fn has_capability(&self, cap: Capability) -> bool { pub fn has_capability(&self, cap: Capability) -> bool {
self.capabilities.contains(&cap) self.capabilities.contains(&cap)
} }
pub fn has_capabilities(&self, capabilities: &[Capability]) -> bool { pub fn has_all_capabilities(&self, capabilities: &[Capability]) -> bool {
for cap in capabilities { for cap in capabilities {
if !self.has_capability(*cap) { if !self.has_capability(*cap) {
return false; return false;
@ -160,6 +162,17 @@ impl NodeInfo {
} }
true true
} }
pub fn has_any_capabilities(&self, capabilities: &[Capability]) -> bool {
if capabilities.is_empty() {
return true;
}
for cap in capabilities {
if self.has_capability(*cap) {
return true;
}
}
false
}
/// Can direct connections be made /// Can direct connections be made
pub fn is_fully_direct_inbound(&self) -> bool { pub fn is_fully_direct_inbound(&self) -> bool {

View File

@ -66,7 +66,7 @@ pub(crate) fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
} }
pub(crate) fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter { pub(crate) fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter {
Arc::new(move |_, ni| ni.has_capabilities(&caps)) Arc::new(move |_, ni| ni.has_all_capabilities(&caps))
} }
/// Contains the logic for generically searching the Veilid routing table for a set of nodes and applying an /// Contains the logic for generically searching the Veilid routing table for a set of nodes and applying an

View File

@ -456,7 +456,7 @@ impl RPCProcessor {
) -> bool { ) -> bool {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
routing_table.signed_node_info_is_valid_in_routing_domain(routing_domain, signed_node_info) routing_table.signed_node_info_is_valid_in_routing_domain(routing_domain, signed_node_info)
&& signed_node_info.node_info().has_capabilities(capabilities) && signed_node_info.node_info().has_all_capabilities(capabilities)
} }
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////

View File

@ -139,14 +139,19 @@ impl StorageManager {
pub async fn terminate(&self) { pub async fn terminate(&self) {
log_stor!(debug "starting storage manager shutdown"); log_stor!(debug "starting storage manager shutdown");
{
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
inner.terminate().await; inner.terminate().await;
}
// Cancel all tasks // Cancel all tasks
self.cancel_tasks().await; self.cancel_tasks().await;
// Release the storage manager // Release the storage manager
{
let mut inner = self.inner.lock().await;
*inner = Self::new_inner(self.unlocked_inner.clone()); *inner = Self::new_inner(self.unlocked_inner.clone());
}
log_stor!(debug "finished storage manager shutdown"); log_stor!(debug "finished storage manager shutdown");
} }

View File

@ -10,18 +10,13 @@ impl StorageManager {
_last_ts: Timestamp, _last_ts: Timestamp,
_cur_ts: Timestamp, _cur_ts: Timestamp,
) -> EyreResult<()> { ) -> EyreResult<()> {
let (mut offline_subkey_writes, opt_update_callback) = { let mut offline_subkey_writes = {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
let out = ( let out = inner.offline_subkey_writes.clone();
inner.offline_subkey_writes.clone(),
inner.update_callback.clone(),
);
inner.offline_subkey_writes.clear(); inner.offline_subkey_writes.clear();
out out
}; };
let mut fanout_results = vec![];
for (key, osw) in offline_subkey_writes.iter_mut() { for (key, osw) in offline_subkey_writes.iter_mut() {
if poll!(stop_token.clone()).is_ready() { if poll!(stop_token.clone()).is_ready() {
log_stor!(debug "Offline subkey writes cancelled."); log_stor!(debug "Offline subkey writes cancelled.");
@ -32,6 +27,8 @@ impl StorageManager {
break; break;
}; };
let mut fanout_results = vec![];
let mut written_subkeys = ValueSubkeyRangeSet::new(); let mut written_subkeys = ValueSubkeyRangeSet::new();
for subkey in osw.subkeys.iter() { for subkey in osw.subkeys.iter() {
let get_result = { let get_result = {
@ -63,7 +60,7 @@ impl StorageManager {
*key, *key,
subkey, subkey,
osw.safety_selection, osw.safety_selection,
value, value.clone(),
descriptor, descriptor,
) )
.await; .await;
@ -85,24 +82,23 @@ impl StorageManager {
&result.fanout_result, &result.fanout_result,
); );
if !was_offline { if !was_offline {
if let Some(update_callback) = opt_update_callback.clone() {
// Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(
VeilidValueChange {
key: *key,
subkeys: ValueSubkeyRangeSet::single(subkey),
count: u32::MAX,
value: Some(
result
.signed_value_data
.value_data()
.clone(),
),
},
)));
}
written_subkeys.insert(subkey); written_subkeys.insert(subkey);
}; }
// Set the new value if it differs from what was asked to set
if result.signed_value_data.value_data() != value.value_data() {
// Record the newer value and send and update since it is different than what we just set
let mut inner = self.lock().await?;
inner
.handle_set_local_value(
*key,
subkey,
result.signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
}
fanout_results.push((subkey, result.fanout_result)); fanout_results.push((subkey, result.fanout_result));
break; break;
} }

View File

@ -234,7 +234,7 @@ impl StorageManager {
.into_iter() .into_iter()
.filter(|x| { .filter(|x| {
x.node_info(RoutingDomain::PublicInternet) x.node_info(RoutingDomain::PublicInternet)
.map(|ni| ni.has_capabilities(&[CAP_DHT, CAP_DHT_WATCH])) .map(|ni| ni.has_all_capabilities(&[CAP_DHT, CAP_DHT_WATCH]))
.unwrap_or_default() .unwrap_or_default()
}) })
.collect() .collect()

View File

@ -374,7 +374,7 @@ async def test_dht_integration_writer_reader():
rc1 = await api1.new_routing_context() rc1 = await api1.new_routing_context()
async with rc0, rc1: async with rc0, rc1:
COUNT = 10 COUNT = 100
TEST_DATA = b"test data" TEST_DATA = b"test data"
# write dht records on server 0 # write dht records on server 0