new resolve node work

This commit is contained in:
Christien Rioux 2023-09-16 16:37:40 -04:00
parent 3444cb58d9
commit ad4f1a9051
4 changed files with 11 additions and 87 deletions

View File

@ -1008,7 +1008,10 @@ impl NetworkManager {
let some_relay_nr = if self.check_client_whitelist(sender_id) { let some_relay_nr = if self.check_client_whitelist(sender_id) {
// Full relay allowed, do a full resolve_node // Full relay allowed, do a full resolve_node
match rpc match rpc
.resolve_node(recipient_id, SafetySelection::Unsafe(Sequencing::default())) .lookup_or_resolve_node(
recipient_id,
SafetySelection::Unsafe(Sequencing::default()),
)
.await .await
{ {
Ok(v) => v, Ok(v) => v,

View File

@ -16,6 +16,7 @@ mod rpc_status;
mod rpc_validate_dial_info; mod rpc_validate_dial_info;
mod rpc_value_changed; mod rpc_value_changed;
mod rpc_watch_value; mod rpc_watch_value;
mod resolve_node;
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
mod rpc_find_block; mod rpc_find_block;
@ -46,6 +47,7 @@ use receipt_manager::*;
use routing_table::*; use routing_table::*;
use stop_token::future::FutureExt; use stop_token::future::FutureExt;
use storage_manager::*; use storage_manager::*;
use resolve_node::*;
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
@ -453,79 +455,9 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
/// Search the network for a single node and add it to the routing table and return the node reference
/// If no node was found in the timeout, this returns None
async fn search_for_node_id(
&self,
node_id: TypedKey,
count: usize,
fanout: usize,
timeout_us: TimestampDuration,
safety_selection: SafetySelection,
) -> TimeoutOr<Result<Option<NodeRef>, RPCError>> {
let routing_table = self.routing_table();
// Routine to call to generate fanout
let call_routine = |next_node: NodeRef| {
let this = self.clone();
async move {
match this
.clone()
.rpc_call_find_node(
Destination::direct(next_node).with_safety(safety_selection),
node_id,
vec![],
)
.await
{
Ok(v) => {
let v = network_result_value_or_log!(v => [ format!(": node_id={} count={} fanout={} fanout={} safety_selection={:?}", node_id, count, fanout, timeout_us, safety_selection) ] {
// Any other failures, just try the next node
return Ok(None);
});
Ok(Some(v.answer))
}
Err(e) => Err(e),
}
}
};
// Routine to call to check if we're done at each step
let check_done = |_:&[NodeRef]| {
let Ok(Some(nr)) = routing_table
.lookup_node_ref(node_id) else {
return None;
};
// ensure we have some dial info for the entry already,
// and that the node is still alive
// if not, we should keep looking for better info
if !matches!(nr.state(get_aligned_timestamp()),BucketEntryState::Dead) &&
nr.has_any_dial_info() {
return Some(nr);
}
None
};
// Call the fanout
let fanout_call = FanoutCall::new(
routing_table.clone(),
node_id,
count,
fanout,
timeout_us,
empty_fanout_node_info_filter(),
call_routine,
check_done,
);
fanout_call.run().await
}
/// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
/// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form /// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form
pub fn resolve_node( pub fn lookup_or_resolve_node(
&self, &self,
node_id: TypedKey, node_id: TypedKey,
safety_selection: SafetySelection, safety_selection: SafetySelection,
@ -548,20 +480,9 @@ impl RPCProcessor {
} }
} }
// If nobody knows where this node is, ask the DHT for it
let (node_count, _consensus_count, fanout, timeout) = {
let c = this.config.get();
(
c.network.dht.max_find_node_count as usize,
c.network.dht.resolve_node_count as usize,
c.network.dht.resolve_node_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)),
)
};
// Search in preferred cryptosystem order // Search in preferred cryptosystem order
let nr = match this let nr = match this
.search_for_node_id(node_id, node_count, fanout, timeout, safety_selection) .resolve_node(node_id, safety_selection)
.await .await
{ {
TimeoutOr::Timeout => None, TimeoutOr::Timeout => None,

View File

@ -344,14 +344,14 @@ fn resolve_node_ref(
let node_id = TypedKey::new(best_crypto_kind(), key); let node_id = TypedKey::new(best_crypto_kind(), key);
routing_table routing_table
.rpc_processor() .rpc_processor()
.resolve_node(node_id, safety_selection) .lookup_or_resolve_node(node_id, safety_selection)
.await .await
.ok() .ok()
.flatten()? .flatten()?
} else if let Some(node_id) = get_typed_key(text) { } else if let Some(node_id) = get_typed_key(text) {
routing_table routing_table
.rpc_processor() .rpc_processor()
.resolve_node(node_id, safety_selection) .lookup_or_resolve_node(node_id, safety_selection)
.await .await
.ok() .ok()
.flatten()? .flatten()?

View File

@ -122,7 +122,7 @@ impl RoutingContext {
Target::NodeId(node_id) => { Target::NodeId(node_id) => {
// Resolve node // Resolve node
let mut nr = match rpc_processor let mut nr = match rpc_processor
.resolve_node(node_id, self.unlocked_inner.safety_selection) .lookup_or_resolve_node(node_id, self.unlocked_inner.safety_selection)
.await .await
{ {
Ok(Some(nr)) => nr, Ok(Some(nr)) => nr,