From 82d107f4467698a885b33924b651cccc595aafc1 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Wed, 24 Apr 2024 22:43:48 -0400 Subject: [PATCH] watchvalue debugging and improved timeout --- .../src/storage_manager/watch_value.rs | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index abbd1a50..c8b3a77e 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -22,6 +22,7 @@ pub(super) struct OutboundWatchValueResult { impl StorageManager { /// Perform a 'watch value cancel' on the network without fanout #[allow(clippy::too_many_arguments)] + #[instrument(target = "dht", level = "debug", skip_all, err)] pub(super) async fn outbound_watch_value_cancel( &self, rpc_processor: RPCProcessor, @@ -58,6 +59,7 @@ impl StorageManager { )?; if wva.answer.accepted { + log_dht!(debug "WatchValue canceled: id={} expiration_ts={} ({})", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()), watch_node); Ok(Some(OutboundWatchValueResult { expiration_ts: wva.answer.expiration_ts, watch_id: wva.answer.watch_id, @@ -65,12 +67,14 @@ impl StorageManager { opt_value_changed_route: wva.reply_private_route, })) } else { + log_dht!(debug "WatchValue not canceled: id={} ({})", watch_id, watch_node); Ok(None) } } /// Perform a 'watch value change' on the network without fanout #[allow(clippy::too_many_arguments)] + #[instrument(target = "dht", level = "debug", skip_all, err)] pub(super) async fn outbound_watch_value_change( &self, rpc_processor: RPCProcessor, @@ -116,6 +120,12 @@ impl StorageManager { )?; if wva.answer.accepted { + if watch_id != wva.answer.watch_id { + log_dht!(debug "WatchValue changed: id={}->{} expiration_ts={} ({})", watch_id, wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()), watch_node); + } else { + log_dht!(debug "WatchValue renewed: id={} expiration_ts={} ({})", watch_id, debug_ts(wva.answer.expiration_ts.as_u64()), watch_node); + } + Ok(Some(OutboundWatchValueResult { expiration_ts: wva.answer.expiration_ts, watch_id: wva.answer.watch_id, @@ -123,12 +133,14 @@ impl StorageManager { opt_value_changed_route: wva.reply_private_route, })) } else { + log_dht!(debug "WatchValue change failed: id={} ({})", wva.answer.watch_id, watch_node); Ok(None) } } /// Perform a 'watch value' query on the network using fanout #[allow(clippy::too_many_arguments)] + #[instrument(target = "dht", level = "debug", skip_all, err)] pub(super) async fn outbound_watch_value( &self, rpc_processor: RPCProcessor, @@ -194,11 +206,12 @@ impl StorageManager { let routing_table = rpc_processor.routing_table(); // Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations - let (key_count, timeout_us) = { + let (key_count, timeout_us, set_value_count) = { let c = self.unlocked_inner.config.get(); ( c.network.dht.max_find_node_count as usize, TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)), + c.network.dht.set_value_count as usize, ) }; @@ -260,7 +273,7 @@ impl StorageManager { let mut done = false; if wva.answer.expiration_ts.as_u64() > 0 { // If the expiration time is greater than zero this watch is active - log_dht!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64())); + log_dht!(debug "Watch created: id={} expiration_ts={} ({})", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()), next_node); done = true; } else { // If the returned expiration time is zero, this watch was cancelled or rejected @@ -278,7 +291,7 @@ impl StorageManager { } // Return peers if we have some - log_network_result!(debug "WatchValue fanout call returned peers {}", wva.answer.peers.len()); + log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node); Ok(NetworkResult::value(wva.answer.peers)) } @@ -296,12 +309,14 @@ impl StorageManager { // Call the fanout // Use a fixed fanout concurrency of 1 because we only want one watch + // Use a longer timeout (timeout_us * set_value_count) because we may need to try multiple nodes + // and each one might take timeout_us time. let fanout_call = FanoutCall::new( routing_table.clone(), key, key_count, 1, - timeout_us, + TimestampDuration::new(timeout_us.as_u64() * (set_value_count as u64)), capability_fanout_node_info_filter(vec![CAP_DHT, CAP_DHT_WATCH]), call_routine, check_done,