fixes for init nodes

This commit is contained in:
John Smith 2023-12-09 21:41:49 -05:00 committed by Christien Rioux
parent a076082763
commit 0411055aed
5 changed files with 13 additions and 11 deletions

View File

@ -221,7 +221,7 @@ where
pub async fn run( pub async fn run(
self: Arc<Self>, self: Arc<Self>,
opt_init_fanout_queue: Option<Vec<NodeRef>>, init_fanout_queue: Vec<NodeRef>,
) -> TimeoutOr<Result<Option<R>, RPCError>> { ) -> TimeoutOr<Result<Option<R>, RPCError>> {
// Get timeout in milliseconds // Get timeout in milliseconds
let timeout_ms = match us_to_ms(self.timeout_us.as_u64()).map_err(RPCError::internal) { let timeout_ms = match us_to_ms(self.timeout_us.as_u64()).map_err(RPCError::internal) {
@ -232,10 +232,12 @@ where
}; };
// Initialize closest nodes list // Initialize closest nodes list
if let Some(init_fanout_queue) = opt_init_fanout_queue { if init_fanout_queue.is_empty() {
if let Err(e) = self.clone().init_closest_nodes() {
return TimeoutOr::value(Err(e));
}
} else {
self.clone().add_to_fanout_queue(&init_fanout_queue); self.clone().add_to_fanout_queue(&init_fanout_queue);
} else if let Err(e) = self.clone().init_closest_nodes() {
return TimeoutOr::value(Err(e));
} }
// Do a quick check to see if we're already done // Do a quick check to see if we're already done

View File

@ -516,7 +516,7 @@ impl RPCProcessor {
check_done, check_done,
); );
fanout_call.run(None).await fanout_call.run(vec![]).await
} }
/// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference

View File

@ -174,7 +174,7 @@ impl StorageManager {
check_done, check_done,
); );
match fanout_call.run(None).await { match fanout_call.run(vec![]).await {
// If we don't finish in the timeout (too much time passed checking for consensus) // If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => { TimeoutOr::Timeout => {
// Return the best answer we've got // Return the best answer we've got

View File

@ -165,7 +165,7 @@ impl StorageManager {
check_done, check_done,
); );
match fanout_call.run(None).await { match fanout_call.run(vec![]).await {
// If we don't finish in the timeout (too much time passed checking for consensus) // If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => { TimeoutOr::Timeout => {
// Return the best answer we've got // Return the best answer we've got

View File

@ -43,11 +43,11 @@ impl StorageManager {
}; };
// Get the nodes we know are caching this value to seed the fanout // Get the nodes we know are caching this value to seed the fanout
let opt_init_fanout_queue = if let Some(watch_node) = opt_watch_node { let init_fanout_queue = if let Some(watch_node) = opt_watch_node {
Some(vec![watch_node]) vec![watch_node]
} else { } else {
let inner = self.inner.lock().await; let inner = self.inner.lock().await;
inner.get_value_nodes(key)? inner.get_value_nodes(key)?.unwrap_or_default()
}; };
// Get the appropriate watcher key // Get the appropriate watcher key
@ -132,7 +132,7 @@ impl StorageManager {
check_done, check_done,
); );
match fanout_call.run(opt_init_fanout_queue).await { match fanout_call.run(init_fanout_queue).await {
// If we don't finish in the timeout (too much time passed without a successful watch) // If we don't finish in the timeout (too much time passed without a successful watch)
TimeoutOr::Timeout => { TimeoutOr::Timeout => {
// Return the best answer we've got // Return the best answer we've got