From 0411055aed8aac5d49c7ba0e076e312b3736c288 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 9 Dec 2023 21:41:49 -0500 Subject: [PATCH] fixes for init nodes --- veilid-core/src/rpc_processor/fanout_call.rs | 10 ++++++---- veilid-core/src/rpc_processor/mod.rs | 2 +- veilid-core/src/storage_manager/get_value.rs | 2 +- veilid-core/src/storage_manager/set_value.rs | 2 +- veilid-core/src/storage_manager/watch_value.rs | 8 ++++---- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 4cc2c64f..70813534 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -221,7 +221,7 @@ where pub async fn run( self: Arc, - opt_init_fanout_queue: Option>, + init_fanout_queue: Vec, ) -> TimeoutOr, RPCError>> { // Get timeout in milliseconds let timeout_ms = match us_to_ms(self.timeout_us.as_u64()).map_err(RPCError::internal) { @@ -232,10 +232,12 @@ where }; // Initialize closest nodes list - if let Some(init_fanout_queue) = opt_init_fanout_queue { + if init_fanout_queue.is_empty() { + if let Err(e) = self.clone().init_closest_nodes() { + return TimeoutOr::value(Err(e)); + } + } else { self.clone().add_to_fanout_queue(&init_fanout_queue); - } else if let Err(e) = self.clone().init_closest_nodes() { - return TimeoutOr::value(Err(e)); } // Do a quick check to see if we're already done diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 3448f3ad..6994f2d1 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -516,7 +516,7 @@ impl RPCProcessor { check_done, ); - fanout_call.run(None).await + fanout_call.run(vec![]).await } /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index e9bdad5a..239fe20f 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -174,7 +174,7 @@ impl StorageManager { check_done, ); - match fanout_call.run(None).await { + match fanout_call.run(vec![]).await { // If we don't finish in the timeout (too much time passed checking for consensus) TimeoutOr::Timeout => { // Return the best answer we've got diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 43970b9f..50aa8dc7 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -165,7 +165,7 @@ impl StorageManager { check_done, ); - match fanout_call.run(None).await { + match fanout_call.run(vec![]).await { // If we don't finish in the timeout (too much time passed checking for consensus) TimeoutOr::Timeout => { // Return the best answer we've got diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index b49aef25..c448ebc2 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -43,11 +43,11 @@ impl StorageManager { }; // Get the nodes we know are caching this value to seed the fanout - let opt_init_fanout_queue = if let Some(watch_node) = opt_watch_node { - Some(vec![watch_node]) + let init_fanout_queue = if let Some(watch_node) = opt_watch_node { + vec![watch_node] } else { let inner = self.inner.lock().await; - inner.get_value_nodes(key)? + inner.get_value_nodes(key)?.unwrap_or_default() }; // Get the appropriate watcher key @@ -132,7 +132,7 @@ impl StorageManager { check_done, ); - match fanout_call.run(opt_init_fanout_queue).await { + match fanout_call.run(init_fanout_queue).await { // If we don't finish in the timeout (too much time passed without a successful watch) TimeoutOr::Timeout => { // Return the best answer we've got