From ee040b32b91a884844f9759999ec27815b1dfa56 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 21 Apr 2024 18:01:06 -0400 Subject: [PATCH] don't fan out for watch value changes or cancels fix fanout pop ordering --- veilid-core/src/rpc_processor/fanout_queue.rs | 2 +- veilid-core/src/storage_manager/mod.rs | 8 +- .../src/storage_manager/watch_value.rs | 215 +++++++++++++++--- 3 files changed, 192 insertions(+), 33 deletions(-) diff --git a/veilid-core/src/rpc_processor/fanout_queue.rs b/veilid-core/src/rpc_processor/fanout_queue.rs index b3d8469d..013f1004 100644 --- a/veilid-core/src/rpc_processor/fanout_queue.rs +++ b/veilid-core/src/rpc_processor/fanout_queue.rs @@ -72,7 +72,7 @@ impl FanoutQueue { // Return next fanout candidate pub fn next(&mut self) -> Option { - let cn = self.current_nodes.pop_front()?; + let cn = self.current_nodes.pop_back()?; self.current_nodes.make_contiguous(); let key = cn.node_ids().get(self.crypto_kind).unwrap(); diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index d09d771e..e701d4bd 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -304,16 +304,14 @@ impl StorageManager { // Use the safety selection we opened the record with // Use the writer we opened with as the 'watcher' as well let opt_owvresult = self - .outbound_watch_value( + .outbound_watch_value_cancel( rpc_processor, key, ValueSubkeyRangeSet::full(), - Timestamp::new(0), - 0, opened_record.safety_selection(), opened_record.writer().cloned(), - Some(active_watch.id), - Some(active_watch.watch_node), + active_watch.id, + active_watch.watch_node, ) .await?; if let Some(owvresult) = opt_owvresult { diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 35874fe0..abbd1a50 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -20,6 +20,113 @@ pub(super) struct OutboundWatchValueResult { } impl StorageManager { + /// Perform a 'watch value cancel' on the network without fanout + #[allow(clippy::too_many_arguments)] + pub(super) async fn outbound_watch_value_cancel( + &self, + rpc_processor: RPCProcessor, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + safety_selection: SafetySelection, + opt_watcher: Option, + watch_id: u64, + watch_node: NodeRef, + ) -> VeilidAPIResult> { + // Get the appropriate watcher key, if anonymous use a static anonymous watch key + // which lives for the duration of the app's runtime + let watcher = opt_watcher.unwrap_or_else(|| { + self.unlocked_inner + .anonymous_watch_keys + .get(key.kind) + .unwrap() + .value + }); + + let wva = VeilidAPIError::from_network_result( + rpc_processor + .clone() + .rpc_call_watch_value( + Destination::direct(watch_node.clone()).with_safety(safety_selection), + key, + subkeys, + Timestamp::default(), + 0, + watcher, + Some(watch_id), + ) + .await?, + )?; + + if wva.answer.accepted { + Ok(Some(OutboundWatchValueResult { + expiration_ts: wva.answer.expiration_ts, + watch_id: wva.answer.watch_id, + watch_node, + opt_value_changed_route: wva.reply_private_route, + })) + } else { + Ok(None) + } + } + + /// Perform a 'watch value change' on the network without fanout + #[allow(clippy::too_many_arguments)] + pub(super) async fn outbound_watch_value_change( + &self, + rpc_processor: RPCProcessor, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + expiration: Timestamp, + count: u32, + safety_selection: SafetySelection, + opt_watcher: Option, + watch_id: u64, + watch_node: NodeRef, + ) -> VeilidAPIResult> { + if count == 0 { + apibail_internal!("cancel should be done with outbound_watch_value_cancel"); + } + if watch_id == 0 { + apibail_internal!("watch id should not be zero when changing watch"); + } + + // Get the appropriate watcher key, if anonymous use a static anonymous watch key + // which lives for the duration of the app's runtime + let watcher = opt_watcher.unwrap_or_else(|| { + self.unlocked_inner + .anonymous_watch_keys + .get(key.kind) + .unwrap() + .value + }); + + let wva = VeilidAPIError::from_network_result( + rpc_processor + .clone() + .rpc_call_watch_value( + Destination::direct(watch_node.clone()).with_safety(safety_selection), + key, + subkeys, + expiration, + count, + watcher, + Some(watch_id), + ) + .await?, + )?; + + if wva.answer.accepted { + Ok(Some(OutboundWatchValueResult { + expiration_ts: wva.answer.expiration_ts, + watch_id: wva.answer.watch_id, + watch_node, + opt_value_changed_route: wva.reply_private_route, + })) + } else { + Ok(None) + } + } + /// Perform a 'watch value' query on the network using fanout #[allow(clippy::too_many_arguments)] pub(super) async fn outbound_watch_value( @@ -34,6 +141,56 @@ impl StorageManager { opt_watch_id: Option, opt_watch_node: Option, ) -> VeilidAPIResult> { + // if the count is zero, we are cancelling + if count == 0 { + // Ensure watch id is specified + let Some(watch_id) = opt_watch_id else { + apibail_internal!("Must specify a watch id in order to cancel it"); + }; + // Ensure watch node is specified + let Some(watch_node) = opt_watch_node else { + apibail_internal!("Must specify a watch node in order to cancel it"); + }; + return self + .outbound_watch_value_cancel( + rpc_processor, + key, + subkeys, + safety_selection, + opt_watcher, + watch_id, + watch_node, + ) + .await; + } + + // if the watch id and watch node are specified, then we're trying to change an existing watch + // first try to do that, then fall back to fanout for a new watch id + if let Some(watch_id) = opt_watch_id { + let Some(watch_node) = opt_watch_node else { + apibail_internal!("Must specify a watch node in order to change it"); + }; + if let Some(res) = self + .outbound_watch_value_change( + rpc_processor.clone(), + key, + subkeys.clone(), + expiration, + count, + safety_selection, + opt_watcher, + watch_id, + watch_node, + ) + .await? + { + // If a change was successful then return immediately + return Ok(Some(res)); + } + + // Otherwise, treat this like a new watch + } + let routing_table = rpc_processor.routing_table(); // Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations @@ -45,23 +202,6 @@ impl StorageManager { ) }; - // Get the nodes we know are caching this value to seed the fanout - let init_fanout_queue = if let Some(watch_node) = opt_watch_node { - vec![watch_node] - } else { - let inner = self.inner.lock().await; - inner - .get_value_nodes(key)? - .unwrap_or_default() - .into_iter() - .filter(|x| { - x.node_info(RoutingDomain::PublicInternet) - .map(|ni| ni.has_capability(CAP_DHT_WATCH)) - .unwrap_or_default() - }) - .collect() - }; - // Get the appropriate watcher key, if anonymous use a static anonymous watch key // which lives for the duration of the app's runtime let watcher = opt_watcher.unwrap_or_else(|| { @@ -72,6 +212,21 @@ impl StorageManager { .value }); + // Get the nodes we know are caching this value to seed the fanout + let init_fanout_queue = { + let inner = self.inner.lock().await; + inner + .get_value_nodes(key)? + .unwrap_or_default() + .into_iter() + .filter(|x| { + x.node_info(RoutingDomain::PublicInternet) + .map(|ni| ni.has_capabilities(&[CAP_DHT, CAP_DHT_WATCH])) + .unwrap_or_default() + }) + .collect() + }; + // Make do-watch-value answer context let context = Arc::new(Mutex::new(OutboundWatchValueContext { opt_watch_value_result: None, @@ -82,6 +237,7 @@ impl StorageManager { let rpc_processor = rpc_processor.clone(); let context = context.clone(); let subkeys = subkeys.clone(); + async move { let wva = network_result_try!( rpc_processor @@ -93,27 +249,32 @@ impl StorageManager { expiration, count, watcher, - opt_watch_id + None ) .await? ); // Keep answer if we got one + // (accepted means the node could provide an answer, not that the watch is active) if wva.answer.accepted { + let mut done = false; if wva.answer.expiration_ts.as_u64() > 0 { // If the expiration time is greater than zero this watch is active log_dht!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64())); + done = true; } else { - // If the returned expiration time is zero, this watch was cancelled, or inactive - log_dht!(debug "Watch inactive: id={}", wva.answer.watch_id); + // If the returned expiration time is zero, this watch was cancelled or rejected + // If we are asking to cancel then check_done will stop after the first node + } + if done { + let mut ctx = context.lock(); + ctx.opt_watch_value_result = Some(OutboundWatchValueResult { + expiration_ts: wva.answer.expiration_ts, + watch_id: wva.answer.watch_id, + watch_node: next_node.clone(), + opt_value_changed_route: wva.reply_private_route, + }); } - let mut ctx = context.lock(); - ctx.opt_watch_value_result = Some(OutboundWatchValueResult { - expiration_ts: wva.answer.expiration_ts, - watch_id: wva.answer.watch_id, - watch_node: next_node.clone(), - opt_value_changed_route: wva.reply_private_route, - }); } // Return peers if we have some