watchvalue debugging and improved timeout

This commit is contained in:
Christien Rioux 2024-04-24 22:43:48 -04:00
parent d3b596a70e
commit 82d107f446

View File

@ -22,6 +22,7 @@ pub(super) struct OutboundWatchValueResult {
impl StorageManager {
/// Perform a 'watch value cancel' on the network without fanout
#[allow(clippy::too_many_arguments)]
#[instrument(target = "dht", level = "debug", skip_all, err)]
pub(super) async fn outbound_watch_value_cancel(
&self,
rpc_processor: RPCProcessor,
@ -58,6 +59,7 @@ impl StorageManager {
)?;
if wva.answer.accepted {
log_dht!(debug "WatchValue canceled: id={} expiration_ts={} ({})", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()), watch_node);
Ok(Some(OutboundWatchValueResult {
expiration_ts: wva.answer.expiration_ts,
watch_id: wva.answer.watch_id,
@ -65,12 +67,14 @@ impl StorageManager {
opt_value_changed_route: wva.reply_private_route,
}))
} else {
log_dht!(debug "WatchValue not canceled: id={} ({})", watch_id, watch_node);
Ok(None)
}
}
/// Perform a 'watch value change' on the network without fanout
#[allow(clippy::too_many_arguments)]
#[instrument(target = "dht", level = "debug", skip_all, err)]
pub(super) async fn outbound_watch_value_change(
&self,
rpc_processor: RPCProcessor,
@ -116,6 +120,12 @@ impl StorageManager {
)?;
if wva.answer.accepted {
if watch_id != wva.answer.watch_id {
log_dht!(debug "WatchValue changed: id={}->{} expiration_ts={} ({})", watch_id, wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()), watch_node);
} else {
log_dht!(debug "WatchValue renewed: id={} expiration_ts={} ({})", watch_id, debug_ts(wva.answer.expiration_ts.as_u64()), watch_node);
}
Ok(Some(OutboundWatchValueResult {
expiration_ts: wva.answer.expiration_ts,
watch_id: wva.answer.watch_id,
@ -123,12 +133,14 @@ impl StorageManager {
opt_value_changed_route: wva.reply_private_route,
}))
} else {
log_dht!(debug "WatchValue change failed: id={} ({})", wva.answer.watch_id, watch_node);
Ok(None)
}
}
/// Perform a 'watch value' query on the network using fanout
#[allow(clippy::too_many_arguments)]
#[instrument(target = "dht", level = "debug", skip_all, err)]
pub(super) async fn outbound_watch_value(
&self,
rpc_processor: RPCProcessor,
@ -194,11 +206,12 @@ impl StorageManager {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations
let (key_count, timeout_us) = {
let (key_count, timeout_us, set_value_count) = {
let c = self.unlocked_inner.config.get();
(
c.network.dht.max_find_node_count as usize,
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
c.network.dht.set_value_count as usize,
)
};
@ -260,7 +273,7 @@ impl StorageManager {
let mut done = false;
if wva.answer.expiration_ts.as_u64() > 0 {
// If the expiration time is greater than zero this watch is active
log_dht!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
log_dht!(debug "Watch created: id={} expiration_ts={} ({})", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()), next_node);
done = true;
} else {
// If the returned expiration time is zero, this watch was cancelled or rejected
@ -278,7 +291,7 @@ impl StorageManager {
}
// Return peers if we have some
log_network_result!(debug "WatchValue fanout call returned peers {}", wva.answer.peers.len());
log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node);
Ok(NetworkResult::value(wva.answer.peers))
}
@ -296,12 +309,14 @@ impl StorageManager {
// Call the fanout
// Use a fixed fanout concurrency of 1 because we only want one watch
// Use a longer timeout (timeout_us * set_value_count) because we may need to try multiple nodes
// and each one might take timeout_us time.
let fanout_call = FanoutCall::new(
routing_table.clone(),
key,
key_count,
1,
timeout_us,
TimestampDuration::new(timeout_us.as_u64() * (set_value_count as u64)),
capability_fanout_node_info_filter(vec![CAP_DHT, CAP_DHT_WATCH]),
call_routine,
check_done,