From ad4f1a9051725e5e02b3e5b7c66b2826d49de5ef Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 16 Sep 2023 16:37:40 -0400 Subject: [PATCH] new resolve node work --- veilid-core/src/network_manager/mod.rs | 5 +- veilid-core/src/rpc_processor/mod.rs | 87 +------------------ veilid-core/src/veilid_api/debug.rs | 4 +- veilid-core/src/veilid_api/routing_context.rs | 2 +- 4 files changed, 11 insertions(+), 87 deletions(-) diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index d1b7bc74..b48e4519 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1008,7 +1008,10 @@ impl NetworkManager { let some_relay_nr = if self.check_client_whitelist(sender_id) { // Full relay allowed, do a full resolve_node match rpc - .resolve_node(recipient_id, SafetySelection::Unsafe(Sequencing::default())) + .lookup_or_resolve_node( + recipient_id, + SafetySelection::Unsafe(Sequencing::default()), + ) .await { Ok(v) => v, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index fbcf2bf2..447045ba 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -16,6 +16,7 @@ mod rpc_status; mod rpc_validate_dial_info; mod rpc_value_changed; mod rpc_watch_value; +mod resolve_node; #[cfg(feature = "unstable-blockstore")] mod rpc_find_block; @@ -46,6 +47,7 @@ use receipt_manager::*; use routing_table::*; use stop_token::future::FutureExt; use storage_manager::*; +use resolve_node::*; ///////////////////////////////////////////////////////////////////// @@ -453,79 +455,9 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// - /// Search the network for a single node and add it to the routing table and return the node reference - /// If no node was found in the timeout, this returns None - async fn search_for_node_id( - &self, - node_id: TypedKey, - count: usize, - fanout: usize, - timeout_us: TimestampDuration, - safety_selection: SafetySelection, - ) -> TimeoutOr, RPCError>> { - let routing_table = self.routing_table(); - - // Routine to call to generate fanout - let call_routine = |next_node: NodeRef| { - let this = self.clone(); - async move { - match this - .clone() - .rpc_call_find_node( - Destination::direct(next_node).with_safety(safety_selection), - node_id, - vec![], - ) - .await - { - Ok(v) => { - let v = network_result_value_or_log!(v => [ format!(": node_id={} count={} fanout={} fanout={} safety_selection={:?}", node_id, count, fanout, timeout_us, safety_selection) ] { - // Any other failures, just try the next node - return Ok(None); - }); - Ok(Some(v.answer)) - } - Err(e) => Err(e), - } - } - }; - - // Routine to call to check if we're done at each step - let check_done = |_:&[NodeRef]| { - let Ok(Some(nr)) = routing_table - .lookup_node_ref(node_id) else { - return None; - }; - - // ensure we have some dial info for the entry already, - // and that the node is still alive - // if not, we should keep looking for better info - if !matches!(nr.state(get_aligned_timestamp()),BucketEntryState::Dead) && - nr.has_any_dial_info() { - return Some(nr); - } - - None - }; - - // Call the fanout - let fanout_call = FanoutCall::new( - routing_table.clone(), - node_id, - count, - fanout, - timeout_us, - empty_fanout_node_info_filter(), - call_routine, - check_done, - ); - - fanout_call.run().await - } - /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference /// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form - pub fn resolve_node( + pub fn lookup_or_resolve_node( &self, node_id: TypedKey, safety_selection: SafetySelection, @@ -548,20 +480,9 @@ impl RPCProcessor { } } - // If nobody knows where this node is, ask the DHT for it - let (node_count, _consensus_count, fanout, timeout) = { - let c = this.config.get(); - ( - c.network.dht.max_find_node_count as usize, - c.network.dht.resolve_node_count as usize, - c.network.dht.resolve_node_fanout as usize, - TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)), - ) - }; - // Search in preferred cryptosystem order let nr = match this - .search_for_node_id(node_id, node_count, fanout, timeout, safety_selection) + .resolve_node(node_id, safety_selection) .await { TimeoutOr::Timeout => None, diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 5d633868..c9c98633 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -344,14 +344,14 @@ fn resolve_node_ref( let node_id = TypedKey::new(best_crypto_kind(), key); routing_table .rpc_processor() - .resolve_node(node_id, safety_selection) + .lookup_or_resolve_node(node_id, safety_selection) .await .ok() .flatten()? } else if let Some(node_id) = get_typed_key(text) { routing_table .rpc_processor() - .resolve_node(node_id, safety_selection) + .lookup_or_resolve_node(node_id, safety_selection) .await .ok() .flatten()? diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 6e7a9a49..fb5ddede 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -122,7 +122,7 @@ impl RoutingContext { Target::NodeId(node_id) => { // Resolve node let mut nr = match rpc_processor - .resolve_node(node_id, self.unlocked_inner.safety_selection) + .lookup_or_resolve_node(node_id, self.unlocked_inner.safety_selection) .await { Ok(Some(nr)) => nr,