diff --git a/veilid-core/src/crypto/envelope.rs b/veilid-core/src/crypto/envelope.rs index 3874397b..fe22f2fc 100644 --- a/veilid-core/src/crypto/envelope.rs +++ b/veilid-core/src/crypto/envelope.rs @@ -266,6 +266,7 @@ impl Envelope { pub fn get_sender_id(&self) -> PublicKey { self.sender_id } + pub fn get_recipient_id(&self) -> PublicKey { self.recipient_id } diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 0b0bbb96..11456ca9 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -12,8 +12,6 @@ pub struct Bucket { routing_table: RoutingTable, /// Map of keys to entries for this bucket entries: BTreeMap>, - /// The most recent entry in this bucket - newest_entry: Option, /// The crypto kind in use for the public keys in this bucket kind: CryptoKind, } @@ -31,7 +29,6 @@ struct SerializedBucketEntryData { #[archive_attr(repr(C), derive(CheckBytes))] struct SerializedBucketData { entries: Vec, - newest_entry: Option, } fn state_ordering(state: BucketEntryState) -> usize { @@ -47,7 +44,6 @@ impl Bucket { Self { routing_table, entries: BTreeMap::new(), - newest_entry: None, kind, } } @@ -64,8 +60,6 @@ impl Bucket { .insert(e.key, all_entries[e.value as usize].clone()); } - self.newest_entry = bucket_data.newest_entry; - Ok(()) } @@ -86,27 +80,21 @@ impl Bucket { value: *entry_index, }); } - let bucket_data = SerializedBucketData { - entries, - newest_entry: self.newest_entry.clone(), - }; + let bucket_data = SerializedBucketData { entries }; let out = to_rkyv(&bucket_data)?; Ok(out) } /// Create a new entry with a node_id of this crypto kind and return it - pub(super) fn add_entry(&mut self, node_id_key: PublicKey) -> NodeRef { + pub(super) fn add_new_entry(&mut self, node_id_key: PublicKey) -> Arc { log_rtab!("Node added: {}:{}", self.kind, node_id_key); // Add new entry let entry = Arc::new(BucketEntry::new(TypedKey::new(self.kind, node_id_key))); self.entries.insert(node_id_key, entry.clone()); - // This is now the newest bucket entry - self.newest_entry = Some(node_id_key); - - // Get a node ref to return since this is new - NodeRef::new(self.routing_table.clone(), entry, None) + // Return the new entry + entry } /// Add an existing entry with a new node_id for this crypto kind @@ -114,23 +102,15 @@ impl Bucket { log_rtab!("Existing node added: {}:{}", self.kind, node_id_key); // Add existing entry - entry.with_mut_inner(|e| e.add_node_id(TypedKey::new(self.kind, node_id_key))); self.entries.insert(node_id_key, entry); - - // This is now the newest bucket entry - self.newest_entry = Some(node_id_key); - - // No need to return a noderef here because the noderef will already exist in the caller } /// Remove an entry with a node_id for this crypto kind from the bucket - fn remove_entry(&mut self, node_id_key: &PublicKey) { + pub(super) fn remove_entry(&mut self, node_id_key: &PublicKey) { log_rtab!("Node removed: {}:{}", self.kind, node_id_key); // Remove the entry self.entries.remove(node_id_key); - - // newest_entry is updated by kick_bucket() } pub(super) fn entry(&self, key: &PublicKey) -> Option> { @@ -184,24 +164,15 @@ impl Bucket { }) }); - self.newest_entry = None; for entry in sorted_entries { // If we're not evicting more entries, exit, noting this may be the newest entry if extra_entries == 0 { - // The first 'live' entry we find is our newest entry - if self.newest_entry.is_none() { - self.newest_entry = Some(entry.0); - } break; } extra_entries -= 1; // if this entry has references we can't drop it yet if entry.1.ref_count.load(Ordering::Acquire) > 0 { - // The first 'live' entry we fine is our newest entry - if self.newest_entry.is_none() { - self.newest_entry = Some(entry.0); - } continue; } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 35a3c503..2058fb24 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -122,21 +122,33 @@ impl BucketEntryInner { self.node_ref_tracks.remove(&track_id); } - // Node ids + /// Get node ids pub fn node_ids(&self) -> TypedKeySet { self.node_ids.clone() } - pub fn add_node_id(&mut self, node_id: TypedKey) { + /// Add a node id for a particular crypto kind. + /// Returns any previous existing node id associated with that crypto kind + pub fn add_node_id(&mut self, node_id: TypedKey) -> Option { + if let Some(old_node_id) = self.node_ids.get(node_id.kind) { + // If this was already there we do nothing + if old_node_id == node_id { + return None; + } + self.node_ids.add(node_id); + return Some(old_node_id); + } self.node_ids.add(node_id); + None } pub fn best_node_id(&self) -> TypedKey { self.node_ids.best().unwrap() } - // Crypto kinds + /// Get crypto kinds pub fn crypto_kinds(&self) -> Vec { self.node_ids.kinds() } + /// Compare sets of crypto kinds pub fn common_crypto_kinds(&self, other: &[CryptoKind]) -> Vec { common_crypto_kinds(&self.node_ids.kinds(), other) } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 435346a0..48c17100 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -137,14 +137,17 @@ impl RoutingTableUnlockedInner { false } - pub fn find_bucket_index(&self, node_id: TypedKey) -> Option<(CryptoKind, usize)> { + pub fn calculate_bucket_index(&self, node_id: &TypedKey) -> (CryptoKind, usize) { let crypto = self.crypto(); - let self_node_id = self.node_id_keypairs.get(&node_id.kind)?.key; - let vcrypto = crypto.get(node_id.kind)?; - vcrypto - .distance(&node_id.key, &self_node_id) - .first_nonzero_bit() - .map(|x| (node_id.kind, x)) + let self_node_id = self.node_id_keypairs.get(&node_id.kind).unwrap().key; + let vcrypto = crypto.get(node_id.kind).unwrap(); + ( + node_id.kind, + vcrypto + .distance(&node_id.key, &self_node_id) + .first_nonzero_bit() + .unwrap(), + ) } } @@ -560,10 +563,7 @@ impl RoutingTable { fn queue_bucket_kicks(&self, node_ids: TypedKeySet) { for node_id in node_ids.iter() { - let Some(x) = self.unlocked_inner.find_bucket_index(*node_id) else { - log_rtab!(error "find bucket index failed for nodeid {}", node_id); - continue; - }; + let x = self.unlocked_inner.calculate_bucket_index(node_id); self.unlocked_inner.kick_queue.lock().insert(x); } } diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index ab81c0b2..ab3280ca 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -1669,7 +1669,7 @@ impl RouteSpecStore { #[instrument(level = "trace", skip(self), err)] pub fn assemble_private_route( &self, - key: &PublicKey, + key: &TypedKey, optimized: Option, ) -> EyreResult { let inner = &*self.inner.lock(); diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 377f1e67..fb895efe 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -335,7 +335,7 @@ impl RoutingTableInner { // If the local network topology has changed, nuke the existing local node info and let new local discovery happen if changed { let cur_ts = get_aligned_timestamp(); - self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, _, e| { + self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| { e.with_mut(rti, |_rti, e| { e.clear_signed_node_info(RoutingDomain::LocalNetwork); e.set_updated_since_last_network_change(false); @@ -417,7 +417,7 @@ impl RoutingTableInner { let mut count = 0usize; let cur_ts = get_aligned_timestamp(); self.with_entries(cur_ts, min_state, |rti, e| { - if e.with(rti, |rti, e| e.best_routing_domain(rti, routing_domain_set)) + if e.with_inner(|e| e.best_routing_domain(rti, routing_domain_set)) .is_some() { count += 1; @@ -434,7 +434,7 @@ impl RoutingTableInner { mut f: F, ) -> Option { for entry in self.all_entries { - if entry.with(self, |_rti, e| e.state(cur_ts) >= min_state) { + if entry.with_inner(|e| e.state(cur_ts) >= min_state) { if let Some(out) = f(self, entry) { return Some(out); } @@ -468,21 +468,23 @@ impl RoutingTableInner { cur_ts: Timestamp, ) -> Vec { // Collect relay nodes - let opt_relay_id = self.with_routing_domain(routing_domain, |rd| { - rd.common().relay_node().map(|rn| rn.node_id()) - }); + let opt_relay = self.with_routing_domain(routing_domain, |rd| rd.common().relay_node()); let own_node_info_ts = self.get_own_node_info_ts(routing_domain); // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow - let mut node_refs = Vec::::with_capacity(self.bucket_entry_count); - self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { - if v.with(rti, |rti, e| { + let mut node_refs = Vec::::with_capacity(self.bucket_entry_count()); + self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { + if entry.with(rti, |rti, e| { // If this isn't in the routing domain we are checking, don't include it if !e.exists_in_routing_domain(rti, routing_domain) { return false; } // If we need a ping via the normal timing mechanism, then do it - if e.needs_ping(cur_ts, opt_relay_id == Some(k)) { + // or if this node is our own relay, then we keep it alive + let is_our_relay = opt_relay + .map(|nr| nr.same_bucket_entry(&entry)) + .unwrap_or(false); + if e.needs_ping(cur_ts, is_our_relay) { return true; } // If we need a ping because this node hasn't seen our latest node info, then do it @@ -495,8 +497,7 @@ impl RoutingTableInner { }) { node_refs.push(NodeRef::new( outer_self.clone(), - k, - v, + entry, Some(NodeRefFilter::new().with_routing_domain(routing_domain)), )); } @@ -514,6 +515,33 @@ impl RoutingTableInner { node_refs } + // Update buckets with new node ids we may have learned belong to this entry + fn update_bucket_entries(&self, entry: Arc, node_ids: &[TypedKey]) { + entry.with_mut_inner(|e| { + let existing_node_ids = e.node_ids(); + for node_id in node_ids { + if !existing_node_ids.contains(node_id) { + // Add new node id to entry + if let Some(old_node_id) = e.add_node_id(*node_id) { + // Remove any old node id for this crypto kind + let (kind, idx) = self.unlocked_inner.calculate_bucket_index(&old_node_id); + let bucket = &mut self.buckets[&kind][idx]; + bucket.remove_entry(&old_node_id.key); + self.unlocked_inner.kick_queue.lock().insert((kind, idx)); + } + + // Bucket the entry appropriately + let (kind, idx) = self.unlocked_inner.calculate_bucket_index(node_id); + let bucket = &mut self.buckets[&kind][idx]; + bucket.add_existing_entry(node_id.key, entry.clone()); + + // Kick bucket + self.unlocked_inner.kick_queue.lock().insert((kind, idx)); + } + } + }) + } + /// Create a node reference, possibly creating a bucket entry /// the 'update_func' closure is called on the node, and, if created, /// in a locked fashion as to ensure the bucket entry state is always valid @@ -525,7 +553,7 @@ impl RoutingTableInner { ) -> Option where F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner), - {xxx continue here + { // Ensure someone isn't trying register this node itself if self.unlocked_inner.matches_own_node_id(node_ids) { log_rtab!(debug "can't register own node"); @@ -536,63 +564,59 @@ impl RoutingTableInner { // If we have more than one, pick the one with the best cryptokind to add node ids to let mut best_entry: Option> = None; for node_id in node_ids { - if let Some((kind, idx)) = self.unlocked_inner.find_bucket_index(*node_id) { - let bucket = &self.buckets[&kind][idx]; - if let Some(entry) = bucket.entry(&node_id.key) { - // Best entry is the first one in sorted order that exists from the node id list - // Everything else that matches will be overwritten in the bucket and the - // existing noderefs will eventually unref and drop the old unindexed bucketentry - // We do this instead of merging for now. We could 'kill' entries and have node_refs - // rewrite themselves to point to the merged entry upon dereference. The use case for this - // may not be worth the effort. - best_entry = Some(entry); - break; - } + if !VALID_CRYPTO_KINDS.contains(&node_id.kind) { + log_rtab!(error "can't look up node id with invalid crypto kind"); + return None; + } + let (kind, idx) = self.unlocked_inner.calculate_bucket_index(node_id); + let bucket = &self.buckets[&kind][idx]; + if let Some(entry) = bucket.entry(&node_id.key) { + // Best entry is the first one in sorted order that exists from the node id list + // Everything else that matches will be overwritten in the bucket and the + // existing noderefs will eventually unref and drop the old unindexed bucketentry + // We do this instead of merging for now. We could 'kill' entries and have node_refs + // rewrite themselves to point to the merged entry upon dereference. The use case for this + // may not be worth the effort. + best_entry = Some(entry); + break; }; } // If the entry does exist already, update it if let Some(best_entry) = best_entry { - let nr = best_entry.map(|e| NodeRef::new(outer_self.clone(), best_entry, None)); - // Update the entry with all of the node ids - nr.update_node_ids(node_ids); + self.update_bucket_entries(best_entry, node_ids); + + // Make a noderef to return + let nr = NodeRef::new(outer_self.clone(), best_entry, None); // Update the entry with the update func - nr.operate_mut(|rti, e| update_func(rti, e)); + best_entry.with_mut_inner(|e| update_func(self, e)); + // Return the noderef return Some(nr); } - // Find a bucket for the first node id crypto kind we can handle - let (node_id, kind, idx) = node_ids.iter().find_map(|x| { - self.unlocked_inner - .find_bucket_index(*x) - .map(|v| (*x, v.0, v.1)) - })?; + // If no entry exists yet, add the first entry to a bucket, possibly evicting a bucket member + let first_node_id = node_ids[0]; + let (kind, idx) = self.unlocked_inner.calculate_bucket_index(&first_node_id); + let bucket = &mut self.buckets[&kind][idx]; + let new_entry = bucket.add_new_entry(first_node_id.key); + self.unlocked_inner.kick_queue.lock().insert((kind, idx)); - // Look up existing entry - let noderef = { - let bucket = &self.buckets[&kind][idx]; - let entry = bucket.entry(&node_id.key); - entry.map(|e| NodeRef::new(outer_self.clone(), e, None)) - }; + // Update the other bucket entries with the remaining node ids + self.update_bucket_entries(new_entry, node_ids); - // If one doesn't exist, insert into bucket, possibly evicting a bucket member - self.bucket_entry_count += 1; - let cnt = self.bucket_entry_count; - let bucket = &mut self.buckets[idx]; - let nr = bucket.add_entry(node_id); + // Make node ref to return + let nr = NodeRef::new(outer_self.clone(), new_entry, None); - // Update the entry - let entry = bucket.entry(&node_id).unwrap(); - entry.with_mut(self, update_func); + // Update the entry with the update func + new_entry.with_mut_inner(|e| update_func(self, e)); // Kick the bucket - self.unlocked_inner.kick_queue.lock().insert(idx); - log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable)); + log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable)); - Some(NodeRef::new(outer_self.clone(), e, None)) + Some(nr) } /// Resolve an existing routing table entry and return a reference to it @@ -601,7 +625,12 @@ impl RoutingTableInner { log_rtab!(error "can't look up own node id in routing table"); return None; } - let (kind, idx) = self.unlocked_inner.find_bucket_index(node_id)?; + if !VALID_CRYPTO_KINDS.contains(&node_id.kind) { + log_rtab!(error "can't look up node id with invalid crypto kind"); + return None; + } + + let (kind, idx) = self.unlocked_inner.calculate_bucket_index(&node_id); let bucket = &self.buckets[&kind][idx]; bucket .entry(&node_id.key) @@ -631,13 +660,17 @@ impl RoutingTableInner { where F: FnOnce(Arc) -> R, { - if node_id == self.unlocked_inner.node_id { + if self.unlocked_inner.matches_own_node_id(&[node_id]) { log_rtab!(error "can't look up own node id in routing table"); return None; } - let idx = self.unlocked_inner.find_bucket_index(node_id); - let bucket = &self.buckets[idx]; - if let Some(e) = bucket.entry(&node_id) { + if !VALID_CRYPTO_KINDS.contains(&node_id.kind) { + log_rtab!(error "can't look up node id with invalid crypto kind"); + return None; + } + let (kind, idx) = self.unlocked_inner.calculate_bucket_index(&node_id); + let bucket = &self.buckets[&kind][idx]; + if let Some(e) = bucket.entry(&node_id.key) { return Some(f(e)); } None diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index 9a692fef..c33e3592 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -300,13 +300,14 @@ impl RPCProcessor { }; // Reply directly to the request's source - let sender_id = detail.envelope.get_sender_id(); + let sender_id = TypedKey::new(detail.envelope.get_crypto_kind(), detail.envelope.get_sender_id(); // This may be a different node's reference than the 'sender' in the case of a relay let peer_noderef = detail.peer_noderef.clone(); // If the sender_id is that of the peer, then this is a direct reply // else it is a relayed reply through the peer + xxx continue here, make sure respond to semantics are correct if peer_noderef.node_id() == sender_id { NetworkResult::value(Destination::direct(peer_noderef)) } else { diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 13b95c95..566f1e7a 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -107,8 +107,7 @@ impl RPCProcessor { let filters = VecDeque::from([filter]); let node_count = { - let config = self.config(); - let c = config.get(); + let c = self.config.get(); c.network.dht.max_find_node_count as usize }; diff --git a/veilid-core/src/veilid_api/api.rs b/veilid-core/src/veilid_api/api.rs index ef76f6e1..a6f1cd96 100644 --- a/veilid-core/src/veilid_api/api.rs +++ b/veilid-core/src/veilid_api/api.rs @@ -204,7 +204,7 @@ impl VeilidAPI { apibail_generic!("unable to allocate route"); }; if !rss - .test_route(&pr_pubkey) + .test_route(&pr_keys) .await .map_err(VeilidAPIError::no_connection)? {