diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index 6c75db82..3fbaa148 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + /// Where to send an RPC message #[derive(Debug, Clone)] pub(crate) enum Destination { diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index 4a89ebbc..3e63b968 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { // Sends a high level app request and wait for response // Can be sent via all methods including relays and routes diff --git a/veilid-core/src/rpc_processor/rpc_app_message.rs b/veilid-core/src/rpc_processor/rpc_app_message.rs index 93da6adb..05985366 100644 --- a/veilid-core/src/rpc_processor/rpc_app_message.rs +++ b/veilid-core/src/rpc_processor/rpc_app_message.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { // Sends a high level app message // Can be sent via all methods including relays and routes diff --git a/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs index d3b1c026..2a9f2cd6 100644 --- a/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(super) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { diff --git a/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs index 0179dbcb..a36704e3 100644 --- a/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(super) async fn process_complete_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { diff --git a/veilid-core/src/rpc_processor/rpc_find_block.rs b/veilid-core/src/rpc_processor/rpc_find_block.rs index 8fc8c63c..f08f74f7 100644 --- a/veilid-core/src/rpc_processor/rpc_find_block.rs +++ b/veilid-core/src/rpc_processor/rpc_find_block.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(super) async fn process_find_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 88ee0aed..2d67549a 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { /// Send FindNodeQ RPC request, receive FindNodeA answer /// Can be sent via all methods including relays diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 68717c9b..204990ab 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -1,6 +1,8 @@ use super::*; use crate::storage_manager::{SignedValueData, SignedValueDescriptor}; +impl_veilid_log_facility!("rpc"); + #[derive(Clone, Debug)] pub struct GetValueAnswer { pub value: Option, @@ -78,7 +80,7 @@ impl RPCProcessor { crypto_kind: vcrypto.kind(), }); - veilid_log!(self debug "{}", debug_string); + veilid_log!(self debug target: "dht", "{}", debug_string); let waitable_reply = network_result_try!( self.question(dest.clone(), question, Some(question_context)) @@ -128,13 +130,13 @@ impl RPCProcessor { dest ); - veilid_log!(self debug "{}", debug_string_answer); + veilid_log!(self debug target: "dht", "{}", debug_string_answer); let peer_ids: Vec = peers .iter() .filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())) .collect(); - veilid_log!(self debug "Peers: {:#?}", peer_ids); + veilid_log!(self debug target: "dht", "Peers: {:#?}", peer_ids); } // Validate peers returned are, in fact, closer to the key than the node we sent this to @@ -228,7 +230,7 @@ impl RPCProcessor { msg.header.direct_sender_node_id() ); - veilid_log!(self debug "{}", debug_string); + veilid_log!(self debug target: "dht", "{}", debug_string); } // See if we would have accepted this as a set @@ -278,7 +280,7 @@ impl RPCProcessor { msg.header.direct_sender_node_id() ); - veilid_log!(self debug "{}", debug_string_answer); + veilid_log!(self debug target: "dht", "{}", debug_string_answer); } // Make GetValue answer diff --git a/veilid-core/src/rpc_processor/rpc_inspect_value.rs b/veilid-core/src/rpc_processor/rpc_inspect_value.rs index 3126ad61..2fbe0a98 100644 --- a/veilid-core/src/rpc_processor/rpc_inspect_value.rs +++ b/veilid-core/src/rpc_processor/rpc_inspect_value.rs @@ -1,6 +1,8 @@ use super::*; use crate::storage_manager::SignedValueDescriptor; +impl_veilid_log_facility!("rpc"); + #[derive(Clone, Debug)] pub struct InspectValueAnswer { pub seqs: Vec, @@ -81,7 +83,7 @@ impl RPCProcessor { crypto_kind: vcrypto.kind(), }); - veilid_log!(self debug "{}", debug_string); + veilid_log!(self debug target: "dht", "{}", debug_string); let waitable_reply = network_result_try!( self.question(dest.clone(), question, Some(question_context)) @@ -118,13 +120,13 @@ impl RPCProcessor { debug_seqs(&seqs) ); - veilid_log!(self debug "{}", debug_string_answer); + veilid_log!(self debug target: "dht", "{}", debug_string_answer); let peer_ids: Vec = peers .iter() .filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())) .collect(); - veilid_log!(self debug "Peers: {:#?}", peer_ids); + veilid_log!(self debug target: "dht", "Peers: {:#?}", peer_ids); } // Validate peers returned are, in fact, closer to the key than the node we sent this to @@ -209,7 +211,7 @@ impl RPCProcessor { msg.header.direct_sender_node_id() ); - veilid_log!(self debug "{}", debug_string); + veilid_log!(self debug target: "dht", "{}", debug_string); } // See if we would have accepted this as a set @@ -247,7 +249,7 @@ impl RPCProcessor { msg.header.direct_sender_node_id() ); - veilid_log!(self debug "{}", debug_string_answer); + veilid_log!(self debug target: "dht", "{}", debug_string_answer); } // Make InspectValue answer diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index 3f7ac96e..5bf4ffe8 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { // Sends a unidirectional in-band return receipt // Can be sent via all methods including relays and routes diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index b5b4df97..2726449a 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { #[instrument(level = "trace", target = "rpc", skip_all, err)] async fn process_route_safety_route_hop( diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 36a2a018..c2e20463 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + #[derive(Clone, Debug)] pub struct SetValueAnswer { pub set: bool, @@ -89,7 +91,7 @@ impl RPCProcessor { }); if debug_target_enabled!("dht") { - veilid_log!(self debug "{}", debug_string); + veilid_log!(self debug target: "dht", "{}", debug_string); } let waitable_reply = network_result_try!( @@ -140,13 +142,13 @@ impl RPCProcessor { dest, ); - veilid_log!(self debug "{}", debug_string_answer); + veilid_log!(self debug target: "dht", "{}", debug_string_answer); let peer_ids: Vec = peers .iter() .filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())) .collect(); - veilid_log!(self debug "Peers: {:#?}", peer_ids); + veilid_log!(self debug target: "dht", "Peers: {:#?}", peer_ids); } // Validate peers returned are, in fact, closer to the key than the node we sent this to @@ -244,7 +246,7 @@ impl RPCProcessor { msg.header.direct_sender_node_id() ); - veilid_log!(self debug "{}", debug_string); + veilid_log!(self debug target: "dht", "{}", debug_string); // If there are less than 'set_value_count' peers that are closer, then store here too let set_value_count = self @@ -296,7 +298,7 @@ impl RPCProcessor { msg.header.direct_sender_node_id() ); - veilid_log!(self debug "{}", debug_string_answer); + veilid_log!(self debug target: "dht", "{}", debug_string_answer); } // Make SetValue answer diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 24fa55c0..a14b74d2 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { // Sends a unidirectional signal to a node // Can be sent via relays but not routes. For routed 'signal' like capabilities, use AppMessage. diff --git a/veilid-core/src/rpc_processor/rpc_start_tunnel.rs b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs index fb5012f5..2e4a6db0 100644 --- a/veilid-core/src/rpc_processor/rpc_start_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(super) async fn process_start_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index cdf1ec0d..52e5c5a1 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)] pub struct StatusResult { pub opt_sender_info: Option, diff --git a/veilid-core/src/rpc_processor/rpc_supply_block.rs b/veilid-core/src/rpc_processor/rpc_supply_block.rs index 9a4234be..ea2332e2 100644 --- a/veilid-core/src/rpc_processor/rpc_supply_block.rs +++ b/veilid-core/src/rpc_processor/rpc_supply_block.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(super) async fn process_supply_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 1948ae3c..8c1bd73f 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { // Can only be sent directly, not via relays or routes #[instrument(level = "trace", target = "rpc", skip(self), ret, err)] diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index 619985dc..bd6202b8 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + impl RPCProcessor { // Sends a dht value change notification // Can be sent via all methods including relays and routes but never over a safety route @@ -88,7 +90,7 @@ impl RPCProcessor { msg.header.direct_sender_node_id(), ); - veilid_log!(self debug "{}", debug_string_stmt); + veilid_log!(self debug target: "dht", "{}", debug_string_stmt); } // Save the subkey, creating a new record if necessary diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index 69093248..b5d1c1c2 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("rpc"); + #[derive(Clone, Debug)] pub struct WatchValueAnswer { pub accepted: bool, @@ -85,7 +87,7 @@ impl RPCProcessor { RPCQuestionDetail::WatchValueQ(Box::new(watch_value_q)), ); - veilid_log!(self debug "{}", debug_string); + veilid_log!(self debug target: "dht", "{}", debug_string); let waitable_reply = network_result_try!(self.question(dest.clone(), question, None).await?); @@ -122,13 +124,13 @@ impl RPCProcessor { dest ); - veilid_log!(self debug "{}", debug_string_answer); + veilid_log!(self debug target: "dht", "{}", debug_string_answer); let peer_ids: Vec = peers .iter() .filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())) .collect(); - veilid_log!(self debug "Peers: {:#?}", peer_ids); + veilid_log!(self debug target: "dht", "Peers: {:#?}", peer_ids); } // Validate accepted requests @@ -249,7 +251,7 @@ impl RPCProcessor { watcher ); - veilid_log!(self debug "{}", debug_string); + veilid_log!(self debug target: "dht", "{}", debug_string); } // Get the nodes that we know about that are closer to the the key than our own node @@ -308,7 +310,7 @@ impl RPCProcessor { msg.header.direct_sender_node_id() ); - veilid_log!(self debug "{}", debug_string_answer); + veilid_log!(self debug target: "dht", "{}", debug_string_answer); } // Make WatchValue answer diff --git a/veilid-core/src/rpc_processor/rpc_worker.rs b/veilid-core/src/rpc_processor/rpc_worker.rs index 194a0f2f..7621a2dd 100644 --- a/veilid-core/src/rpc_processor/rpc_worker.rs +++ b/veilid-core/src/rpc_processor/rpc_worker.rs @@ -3,6 +3,8 @@ use stop_token::future::FutureExt as _; use super::*; +impl_veilid_log_facility!("rpc"); + #[derive(Debug)] pub(super) enum RPCWorkerRequestKind { Message { message_encoded: MessageEncoded }, diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 934d283b..06695a57 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1032,6 +1032,8 @@ impl StorageManager { // If we have no subkeys left, then set the count to zero to indicate a full cancellation let count = if new_subkeys.is_empty() { 0 + } else if let Some(state) = outbound_watch.state() { + state.remaining_count() } else { desired.count }; diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs b/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs index 1c996da4..2d03459f 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs @@ -18,6 +18,8 @@ pub(in crate::storage_manager) struct OutboundWatchManager { pub outbound_watches: HashMap, /// Last known active watch per node+record pub per_node_states: HashMap, + /// Value changed updates that need inpection to determine if they should be reported + pub needs_change_inspection: HashMap, } impl fmt::Display for OutboundWatchManager { @@ -44,6 +46,21 @@ impl fmt::Display for OutboundWatchManager { } } out += "]\n"; + out += "needs_change_inspection: [\n"; + { + let mut keys = self + .needs_change_inspection + .keys() + .copied() + .collect::>(); + keys.sort(); + + for k in keys { + let v = self.needs_change_inspection.get(&k).unwrap(); + out += &format!(" {}: {}\n", k, v); + } + } + out += "]\n"; write!(f, "{}", out) } @@ -60,6 +77,7 @@ impl OutboundWatchManager { Self { outbound_watches: HashMap::new(), per_node_states: HashMap::new(), + needs_change_inspection: HashMap::new(), } } @@ -157,4 +175,12 @@ impl OutboundWatchManager { self.per_node_states .retain(|k, _| !finished_pnks.contains(k) && !expired_pnks.contains(k)); } + + /// Set a record up to be inspected for changed subkeys + pub fn enqueue_change_inspect(&mut self, record_key: TypedKey, subkeys: ValueSubkeyRangeSet) { + self.needs_change_inspection + .entry(record_key) + .and_modify(|x| *x = x.union(&subkeys)) + .or_insert(subkeys); + } } diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs index c3e48083..c6d45237 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs @@ -112,22 +112,25 @@ impl OutboundWatch { } // If there is no current watch then there is nothing to cancel - let Some(state) = self.state.as_ref() else { + let Some(_state) = self.state.as_ref() else { return false; }; // If the desired parameters is None then cancel - let Some(desired) = self.desired.as_ref() else { + let Some(_desired) = self.desired.as_ref() else { return true; }; - // If the desired parameters is different than the current parameters - // then cancel so we can eventually reconcile to the new parameters - state.params() != desired + false } /// Returns true if this outbound watch can be renewed - pub fn needs_renew(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool { + pub fn needs_renew( + &self, + registry: &VeilidComponentRegistry, + consensus_count: usize, + cur_ts: Timestamp, + ) -> bool { if self.is_dead() || self.needs_cancel(registry) { veilid_log!(registry warn "should have checked for is_dead and needs_cancel first"); return false; @@ -138,11 +141,84 @@ impl OutboundWatch { return false; }; - // If the watch has per node watches that have expired, - // but we can extend our watch then renew. Do this only within RENEW_OUTBOUND_WATCHES_DURATION_SECS - // of the actual expiration. If we're looking at this after the actual expiration, don't try because - // the watch id will have died + // Should have desired parameters here + let Some(desired) = self.desired.as_ref() else { + veilid_log!(registry warn "needs_cancel should have returned true"); + return false; + }; + // If we have a consensus, we can avoid fanout by renewing rather than reconciling + // but if we don't have a consensus, we should defer to fanout to try to improve it + if state.nodes().len() < consensus_count { + return false; + } + + // If we have a consensus but need to renew because some per-node watches + // either expired or had their routes die, do it + if self.wants_per_node_watch_update(registry, state, cur_ts) { + return true; + } + + // If the desired parameters have changed, then we should renew with them + state.params() != desired + } + + /// Returns true if there is work to be done on getting the outbound + /// watch to its desired state + pub fn needs_reconcile( + &self, + registry: &VeilidComponentRegistry, + consensus_count: usize, + cur_ts: Timestamp, + ) -> bool { + if self.is_dead() + || self.needs_cancel(registry) + || self.needs_renew(registry, consensus_count, cur_ts) + { + veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first"); + return false; + } + + // If desired is none, then is_dead() or needs_cancel() should have been true + let Some(desired) = self.desired.as_ref() else { + veilid_log!(registry warn "is_dead() or needs_cancel() should have been true"); + return false; + }; + + // If there is a desired watch but no current state, then reconcile + let Some(state) = self.state() else { + return true; + }; + + // If we are still working on getting the 'current' state to match + // the 'desired' state, then do the reconcile if we are within the timeframe for it + if state.nodes().len() < consensus_count + && cur_ts >= state.next_reconcile_ts().unwrap_or_default() + { + return true; + } + + // If we have a consensus, or are not attempting consensus at this time, + // but need to reconcile because some per-node watches either expired or had their routes die, do it + if self.wants_per_node_watch_update(registry, state, cur_ts) { + return true; + } + + // If the desired parameters have changed, then we should reconcile with them + state.params() != desired + } + + /// Returns true if we need to update our per-node watches due to expiration, + /// or if they are all dead because the route died and needs to be updated + fn wants_per_node_watch_update( + &self, + registry: &VeilidComponentRegistry, + state: &OutboundWatchState, + cur_ts: Timestamp, + ) -> bool { + // If the watch has per node watches that have expired, but we can extend our watch then renew. + // Do this only within RENEW_OUTBOUND_WATCHES_DURATION_SECS of the actual expiration. + // If we're looking at this after the actual expiration, don't try because the watch id will have died. let renew_ts = cur_ts + TimestampDuration::new_secs(RENEW_OUTBOUND_WATCHES_DURATION_SECS); if renew_ts >= state.min_expiration_ts() && cur_ts < state.min_expiration_ts() @@ -166,44 +242,4 @@ impl OutboundWatch { false } - - /// Returns true if there is work to be done on getting the outbound - /// watch to its desired state - pub fn needs_reconcile( - &self, - registry: &VeilidComponentRegistry, - consensus_count: usize, - cur_ts: Timestamp, - ) -> bool { - if self.is_dead() || self.needs_cancel(registry) || self.needs_renew(registry, cur_ts) { - veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first"); - return false; - } - - // If desired is none, then is_dead() or needs_cancel() should have been true - let Some(desired) = self.desired.as_ref() else { - veilid_log!(registry warn "is_dead() or needs_cancel() should have been true"); - return false; - }; - - // If there is a desired watch but no current state, then reconcile - let Some(state) = self.state() else { - return true; - }; - - // If the params are different, then needs_cancel() should have returned true - if state.params() != desired { - veilid_log!(registry warn "needs_cancel() should have returned true"); - return false; - } - // If we are still working on getting the 'current' state to match - // the 'desired' state, then do the reconcile if we are within the timeframe for it - if state.nodes().len() < consensus_count - && cur_ts >= state.next_reconcile_ts().unwrap_or_default() - { - return true; - } - // No work to do on this watch at this time - false - } } diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_state.rs b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_state.rs index dce18d4d..f9cb4494 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_state.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_state.rs @@ -52,7 +52,6 @@ pub(in crate::storage_manager) struct OutboundWatchStateEditor<'a> { } impl OutboundWatchStateEditor<'_> { - #[expect(dead_code)] pub fn set_params(&mut self, params: OutboundWatchParameters) { self.state.params = params; } @@ -108,6 +107,24 @@ impl OutboundWatchState { &self.value_changed_routes } + /// Get the parameters we use if we're updating this state's per node watches + pub fn get_per_node_params( + &self, + desired: &OutboundWatchParameters, + ) -> OutboundWatchParameters { + // Change the params to update count + if self.params() != desired { + // If parameters are changing, just use the desired parameters + desired.clone() + } else { + // If this is a renewal of the same parameters, + // use the current remaining update count for the rpc + let mut renew_params = desired.clone(); + renew_params.count = self.remaining_count(); + renew_params + } + } + pub fn edit R>( &mut self, per_node_state: &HashMap, diff --git a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs index aac170fc..96028920 100644 --- a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs @@ -25,6 +25,13 @@ impl StorageManager { }; } + // Iterate all queued change inspections and do them + for (k, v) in inner.outbound_watch_manager.needs_change_inspection.drain() { + // Get next work on watch and queue it if we have something to do + let op_fut = self.get_change_inspection_operation(k, v); + self.background_operation_processor.add_future(op_fut); + } + Ok(()) } } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index a3a976ea..b23eaf72 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -34,6 +34,14 @@ pub(super) struct OutboundWatchValueResult { pub ignored: Vec, } +impl OutboundWatchValueResult { + pub fn merge(&mut self, other: OutboundWatchValueResult) { + self.accepted.extend(other.accepted); + self.rejected.extend(other.rejected); + self.ignored.extend(other.ignored); + } +} + impl StorageManager { /// Perform a 'watch value cancel' on the network without fanout #[instrument(level = "trace", target = "dht", skip_all, err)] @@ -365,6 +373,7 @@ impl StorageManager { } // Send valuechange with dead count and no subkeys to inform the api that this watch is now gone completely + drop(watch_lock); self.update_callback_value_change(record_key, ValueSubkeyRangeSet::new(), 0, None); } @@ -497,7 +506,7 @@ impl StorageManager { return; } - let (per_node_states, renew_params) = { + let (per_node_states, per_node_params) = { let inner = &mut *self.inner.lock().await; let Some(outbound_watch) = inner .outbound_watch_manager @@ -507,10 +516,17 @@ impl StorageManager { veilid_log!(self warn "watch being renewed should have still been in the table"); return; }; + + let Some(desired) = outbound_watch.desired() else { + veilid_log!(self warn "watch being renewed should have desired parameters"); + return; + }; + let Some(state) = outbound_watch.state_mut() else { veilid_log!(self warn "watch being renewed should have current state"); return; }; + let mut per_node_states = vec![]; let mut missing_pnks = BTreeSet::new(); for pnk in state.nodes() { @@ -530,17 +546,15 @@ impl StorageManager { editor.retain_nodes(|x| !missing_pnks.contains(x)); }); - // Change the params to update count - let mut renew_params = state.params().clone(); - renew_params.count = state.remaining_count(); + let per_node_params = state.get_per_node_params(&desired); - (per_node_states, renew_params) + (per_node_states, per_node_params) }; // Now reach out to each node and renew their watches let mut unord = FuturesUnordered::new(); for (_pnk, pns) in per_node_states { - let params = renew_params.clone(); + let params = per_node_params.clone(); let watch_lock = watch_lock.clone(); unord.push(async move { self.outbound_watch_value_change( @@ -552,27 +566,34 @@ impl StorageManager { .await }); } - let mut owvresults = vec![]; + + // Process and merge all results since we're not fanning out + let mut opt_owvresult: Option = None; while let Some(res) = unord.next().await { match res { - Ok(r) => owvresults.push(r), + Ok(r) => { + opt_owvresult = match opt_owvresult { + Some(mut owvresult) => { + owvresult.merge(r); + Some(owvresult) + } + None => Some(r), + }; + } Err(e) => { veilid_log!(self debug "outbound watch change error: {}", e); } } } - // Update state - { + // Update state with merged results if we have them + if let Some(owvresult) = opt_owvresult { let inner = &mut *self.inner.lock().await; - for owvresult in owvresults { - self.process_outbound_watch_value_result_inner(inner, record_key, owvresult); - } + self.process_outbound_watch_value_result_inner(inner, record_key, owvresult); } } - /// Perform fanout to add per-node watches to an outbound watch - /// Must have no current state, or have a match to desired parameters + /// Perform fanout to add or update per-node watches to an outbound watch pub(super) async fn process_outbound_watch_reconcile( &self, watch_lock: AsyncTagLockGuard, @@ -586,7 +607,7 @@ impl StorageManager { // Get the nodes already active on this watch, // and the parameters to fanout with for the rest - let (per_node_state, reconcile_params) = { + let (per_node_state, per_node_params) = { let inner = &mut *self.inner.lock().await; let Some(outbound_watch) = inner .outbound_watch_manager @@ -604,13 +625,9 @@ impl StorageManager { }; // Get active per node states - let mut per_node_state = if let Some(state) = outbound_watch.state() { - // Assert matching parameters - if state.params() != &desired { - veilid_log!(self warn "watch being reconciled should have had matching current and desired parameters"); - return; - } - state + let (mut per_node_state, per_node_params) = if let Some(state) = outbound_watch.state() + { + let per_node_state = state .nodes() .iter() .map(|pnk| { @@ -624,9 +641,12 @@ impl StorageManager { .unwrap(), ) }) - .collect() + .collect(); + let per_node_params = state.get_per_node_params(&desired); + + (per_node_state, per_node_params) } else { - HashMap::new() + (HashMap::new(), desired) }; // Add in any inactive per node states @@ -641,15 +661,13 @@ impl StorageManager { } } - let reconcile_params = desired.clone(); - - (per_node_state, reconcile_params) + (per_node_state, per_node_params) }; // Now fan out with parameters and get new per node watches let cur_ts = Timestamp::now(); let res = self - .outbound_watch_value(watch_lock.clone(), reconcile_params, per_node_state) + .outbound_watch_value(watch_lock.clone(), per_node_params, per_node_state) .await; { @@ -762,6 +780,7 @@ impl StorageManager { } state.edit(&inner.outbound_watch_manager.per_node_states, |editor| { + editor.set_params(desired); editor.retain_nodes(|x| !remove_nodes.contains(x)); editor.add_nodes(added_nodes); }); @@ -816,7 +835,7 @@ impl StorageManager { } }; return Some(pin_dyn_future!(fut)); - } else if outbound_watch.needs_renew(®istry, cur_ts) { + } else if outbound_watch.needs_renew(®istry, consensus_count, cur_ts) { // Outbound watch expired but can be renewed let watch_lock = opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?; @@ -850,6 +869,101 @@ impl StorageManager { None } + /// Perform an inspection of the record's subkeys to see if we have the latest data + /// If not, then get the first changed subkey and post a ValueChanged update about it + /// Can be processed in the foreground, or by the background operation queue + pub(super) fn get_change_inspection_operation( + &self, + record_key: TypedKey, + subkeys: ValueSubkeyRangeSet, + ) -> PinBoxFutureStatic<()> { + let fut = { + let registry = self.registry(); + async move { + let this = registry.storage_manager(); + + let report = match this + .inspect_record(record_key, subkeys.clone(), DHTReportScope::SyncGet) + .await + { + Ok(v) => v, + Err(e) => { + veilid_log!(this debug "Failed to inspect record for changes: {}", e); + return; + } + }; + let mut changed_subkeys = report.changed_subkeys(); + + // Get changed first changed subkey until we find one to report + let mut n = 0; + while !changed_subkeys.is_empty() { + let first_changed_subkey = changed_subkeys.first().unwrap(); + + let value = match this.get_value(record_key, first_changed_subkey, true).await { + Ok(v) => v, + Err(e) => { + veilid_log!(this debug "Failed to get changed record: {}", e); + return; + } + }; + + if let Some(value) = value { + if value.seq() > report.local_seqs()[n] { + // Calculate the update + let (changed_subkeys, remaining_count, value) = { + let _watch_lock = + this.outbound_watch_lock_table.lock_tag(record_key).await; + let inner = &mut *this.inner.lock().await; + + // Get the outbound watch + let Some(outbound_watch) = inner + .outbound_watch_manager + .outbound_watches + .get_mut(&record_key) + else { + // No outbound watch means no callback + return; + }; + + let Some(state) = outbound_watch.state_mut() else { + // No outbound watch current state means no callback + return; + }; + + // the remaining updates count + let remaining_count = state.remaining_count().saturating_sub(1); + state.edit( + &inner.outbound_watch_manager.per_node_states, + |editor| { + editor.set_remaining_count(remaining_count); + }, + ); + + (changed_subkeys, remaining_count, value) + }; + + // Send the update + this.update_callback_value_change( + record_key, + changed_subkeys, + remaining_count, + Some(value), + ); + + // Update was sent, we're done + return; + } + } + + // If we didn't send an update, remove the first changed subkey and try again + changed_subkeys.pop_first(); + n += 1; + } + } + }; + pin_dyn_future!(fut) + } + /// Handle a received 'Watch Value' query #[allow(clippy::too_many_arguments)] #[instrument(level = "trace", target = "dht", skip_all)] @@ -897,20 +1011,20 @@ impl StorageManager { pub async fn inbound_value_changed( &self, record_key: TypedKey, - subkeys: ValueSubkeyRangeSet, + mut subkeys: ValueSubkeyRangeSet, count: u32, value: Option>, inbound_node_id: TypedKey, watch_id: u64, ) -> VeilidAPIResult> { // Operate on the watch for this record - let _watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await; + let watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await; // Update local record store with new value - let (is_value_seq_newer, value, remaining_count) = { + let (report_value_change, value, remaining_count, reportable_subkeys) = { let inner = &mut *self.inner.lock().await; - { + let watched_subkeys = { // Get the outbound watch let Some(outbound_watch) = inner .outbound_watch_manager @@ -922,7 +1036,12 @@ impl StorageManager { }; let Some(state) = outbound_watch.state() else { - // No outbound watch current state means no callback + // No outbound watch current state means no callback (we haven't reconciled yet) + return Ok(NetworkResult::value(())); + }; + + let Some(desired) = outbound_watch.desired() else { + // No outbound watch desired state means no callback (we are cancelling) return Ok(NetworkResult::value(())); }; @@ -947,7 +1066,7 @@ impl StorageManager { // If watch id doesn't match it's for an older watch and should be ignored if per_node_state.watch_id != watch_id { // No per node state means no callback - veilid_log!(self warn "incorred watch id for per node state in outbound watch: {:?} {} != {}", pnk, per_node_state.watch_id, watch_id); + veilid_log!(self warn "incorrect watch id for per node state in outbound watch: {:?} {} != {}", pnk, per_node_state.watch_id, watch_id); return Ok(NetworkResult::value(())); } @@ -991,18 +1110,22 @@ impl StorageManager { ); per_node_state.count = count; } - } + + desired.subkeys + }; + + // No subkeys means remote node cancelled, but we already captured that with the + // assignment of 'count' to the per_node_state above, so we can just jump out here + let Some(mut first_subkey) = subkeys.first() else { + return Ok(NetworkResult::value(())); + }; // Null out default value let value = value.filter(|value| *value.value_data() != ValueData::default()); // Set the local value - let mut is_value_seq_newer = false; + let mut report_value_change = false; if let Some(value) = &value { - let Some(first_subkey) = subkeys.first() else { - apibail_internal!("should not have value without first subkey"); - }; - let last_get_result = Self::handle_get_local_value_inner(inner, record_key, first_subkey, true) .await?; @@ -1025,14 +1148,25 @@ impl StorageManager { } // Make sure this value would actually be newer - is_value_seq_newer = true; + report_value_change = true; if let Some(last_value) = &last_get_result.opt_value { if value.value_data().seq() <= last_value.value_data().seq() { - // inbound value is older than or equal to the sequence number that we have, just return the one we have - is_value_seq_newer = false; + // inbound value is older than or equal to the sequence number that we have + // so we're not going to report this + report_value_change = false; + + // Shrink up the subkey range because we're removing the first value from the things we'd possibly report on + subkeys.pop_first().unwrap(); + if subkeys.is_empty() { + // If there's nothing left to report, just return no + return Ok(NetworkResult::value(())); + } + first_subkey = subkeys.first().unwrap(); } } - if is_value_seq_newer { + + // Keep the value because it is newer than the one we have + if report_value_change { Self::handle_set_local_value_inner( inner, record_key, @@ -1044,38 +1178,71 @@ impl StorageManager { } } - // If we got an actual update, decrement the total remaining watch count + // If our watched subkey range differs from the reported change's range + // we should only report changes that we care about + let reportable_subkeys = subkeys.intersect(&watched_subkeys); + if let Some(first_reportable_subkey) = reportable_subkeys.first() { + if first_reportable_subkey != first_subkey { + report_value_change = false; + } + } else { + report_value_change = false; + } + // Get the outbound watch - let outbound_watch = inner + let Some(outbound_watch) = inner .outbound_watch_manager .outbound_watches .get_mut(&record_key) - .unwrap(); + else { + // No outbound watch means no callback + return Ok(NetworkResult::value(())); + }; - let state = outbound_watch.state_mut().unwrap(); + let Some(state) = outbound_watch.state_mut() else { + // No outbound watch current state means no callback + return Ok(NetworkResult::value(())); + }; - if is_value_seq_newer { + // If we're going to report, update the remaining change count + if report_value_change { let remaining_count = state.remaining_count().saturating_sub(1); state.edit(&inner.outbound_watch_manager.per_node_states, |editor| { editor.set_remaining_count(remaining_count); }); } - (is_value_seq_newer, value, state.remaining_count()) + ( + report_value_change, + value, + state.remaining_count(), + reportable_subkeys, + ) }; + drop(watch_lock); + // Announce ValueChanged VeilidUpdate - // * if the value in the update had a newer sequence number - // * if more than a single subkey has changed - // * cancellations (count=0) are sent by process_outbound_watch_dead(), not here - let do_update = is_value_seq_newer || subkeys.len() > 1; - if do_update { - let value = if is_value_seq_newer { - Some(value.unwrap().value_data().clone()) - } else { - None - }; - self.update_callback_value_change(record_key, subkeys, remaining_count, value); + // Cancellations (count=0) are sent by process_outbound_watch_dead(), not here + if report_value_change { + // We have a value with a newer sequence number to report + let value = value.unwrap().value_data().clone(); + self.update_callback_value_change( + record_key, + reportable_subkeys, + remaining_count, + Some(value), + ); + } else if reportable_subkeys.len() > 0 { + // We have subkeys that have be reported as possibly changed + // but not a specific record reported, so we should defer reporting and + // inspect the range to see what changed + + // Queue this up for inspection + let inner = &mut *self.inner.lock().await; + inner + .outbound_watch_manager + .enqueue_change_inspect(record_key, reportable_subkeys); } Ok(NetworkResult::value(())) diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 70bc2851..45e72479 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -1494,6 +1494,11 @@ impl VeilidAPI { let registry = self.core_context()?.registry(); let storage_manager = registry.storage_manager(); + self.with_debug_cache(|dc| { + dc.opened_record_contexts.clear(); + }); + storage_manager.close_all_records().await?; + let scope = get_debug_argument_at(&args, 1, "debug_record_purge", "scope", get_string)?; let bytes = get_debug_argument_at(&args, 2, "debug_record_purge", "bytes", get_number).ok(); let out = match scope.as_str() { diff --git a/veilid-core/src/veilid_api/types/dht/dht_record_report.rs b/veilid-core/src/veilid_api/types/dht/dht_record_report.rs index 82896e85..92c75d58 100644 --- a/veilid-core/src/veilid_api/types/dht/dht_record_report.rs +++ b/veilid-core/src/veilid_api/types/dht/dht_record_report.rs @@ -51,6 +51,20 @@ impl DHTRecordReport { pub fn network_seqs(&self) -> &[ValueSeqNum] { &self.network_seqs } + pub fn changed_subkeys(&self) -> ValueSubkeyRangeSet { + let mut changed = ValueSubkeyRangeSet::new(); + for ((sk, lseq), nseq) in self + .subkeys + .iter() + .zip(self.local_seqs.iter()) + .zip(self.network_seqs.iter()) + { + if nseq > lseq { + changed.insert(sk); + } + } + changed + } } impl fmt::Debug for DHTRecordReport { diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index b62197cb..244547a7 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -1,5 +1,4 @@ # Routing context veilid tests - from typing import Any, Awaitable, Callable, Optional import pytest import asyncio @@ -8,6 +7,7 @@ import os import veilid from veilid import ValueSubkey, Timestamp, SafetySelection +from veilid.types import VeilidJSONEncoder ################################################################## BOGUS_KEY = veilid.TypedKey.from_value( @@ -275,6 +275,13 @@ async def test_watch_dht_values(): await api1.debug("record purge local") await api1.debug("record purge remote") + # Clear the change queue if record purge cancels old watches + while True: + try: + upd = await asyncio.wait_for(value_change_queue.get(), timeout=3) + except asyncio.TimeoutError: + break + # make routing contexts rc0 = await api0.new_routing_context() rc1 = await api1.new_routing_context() @@ -287,6 +294,8 @@ async def test_watch_dht_values(): vd = await rc0.set_dht_value(rec0.key, ValueSubkey(3), b"BLAH") assert vd is None + await sync(rc0, [rec0]) + # Server 0: Make a watch on all the subkeys ts = await rc0.watch_dht_values(rec0.key, [], Timestamp(0), 0xFFFFFFFF) assert ts != 0 @@ -297,18 +306,20 @@ async def test_watch_dht_values(): # Server 1: Now set the subkey and trigger an update vd = await rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH") assert vd is None + await sync(rc1, [rec1]) # Server 0: Now we should NOT get an update because the update is the same as our local copy - update = None + upd = None try: - update = await asyncio.wait_for(value_change_queue.get(), timeout=10) + upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) except asyncio.TimeoutError: pass - assert update is None + assert upd is None # Server 1: Now set subkey and trigger an update vd = await rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH") assert vd is None + await sync(rc1, [rec1]) # Server 0: Wait for the update upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) @@ -322,6 +333,7 @@ async def test_watch_dht_values(): # Server 1: Now set subkey and trigger an update vd = await rc1.set_dht_value(rec1.key, ValueSubkey(4), b"BZORT") assert vd is None + await sync(rc1, [rec1]) # Server 0: Wait for the update upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) @@ -339,6 +351,7 @@ async def test_watch_dht_values(): # Server 1: Now set multiple subkeys and trigger an update vd = await asyncio.gather(*[rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH BLAH"), rc1.set_dht_value(rec1.key, ValueSubkey(4), b"BZORT BZORT")]) assert vd == [None, None] + await sync(rc1, [rec1]) # Server 0: Wait for the update upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) @@ -350,28 +363,42 @@ async def test_watch_dht_values(): assert upd.detail.value.data == b"BZORT BZORT" # Server 0: Now we should NOT get any other update - update = None + upd = None try: - update = await asyncio.wait_for(value_change_queue.get(), timeout=10) + upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) except asyncio.TimeoutError: pass - assert update is None + if upd is not None: + print(f"bad update: {VeilidJSONEncoder.dumps(upd)}") + assert upd is None # Now cancel the update still_active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(3), ValueSubkey(9))]) assert not still_active + # Server 0: Wait for the cancellation update + upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) + + # Server 0: Verify only one update came back + assert upd.detail.key == rec0.key + assert upd.detail.count == 0 + assert upd.detail.subkeys == [] + assert upd.detail.value is None + # Now set multiple subkeys vd = await asyncio.gather(*[rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH BLAH BLAH"), rc1.set_dht_value(rec1.key, ValueSubkey(5), b"BZORT BZORT BZORT")]) assert vd == [None, None] + await sync(rc1, [rec1]) # Now we should NOT get an update - update = None + upd = None try: - update = await asyncio.wait_for(value_change_queue.get(), timeout=10) + upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) except asyncio.TimeoutError: pass - assert update is None + if upd is not None: + print(f"bad update: {VeilidJSONEncoder.dumps(upd)}") + assert upd is None # Clean up await rc1.close_dht_record(rec1.key) @@ -380,7 +407,6 @@ async def test_watch_dht_values(): await rc0.delete_dht_record(rec0.key) - @pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") @pytest.mark.skipif(os.getenv("STRESS") != "1", reason="stress test takes a long time") @pytest.mark.asyncio @@ -817,21 +843,23 @@ async def test_dht_write_read_full_subkeys_local(): async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescriptor]): - print('syncing records to the network') syncrecords = records.copy() - while len(syncrecords) > 0: + if len(syncrecords) == 0: + return + while True: donerecords = set() subkeysleft = 0 for desc in records: rr = await rc.inspect_dht_record(desc.key, []) left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys] if left == 0: - if veilid.ValueSeqNum.NONE not in rr.local_seqs: - donerecords.add(desc) + donerecords.add(desc) else: subkeysleft += left syncrecords = [x for x in syncrecords if x not in donerecords] - print(f' {len(syncrecords)} records {subkeysleft} subkeys left') + if len(syncrecords) == 0: + break + print(f' syncing {len(syncrecords)} records {subkeysleft} subkeys left') time.sleep(1) diff --git a/veilid-python/veilid/state.py b/veilid-python/veilid/state.py index becfd29e..c28256f7 100644 --- a/veilid-python/veilid/state.py +++ b/veilid-python/veilid/state.py @@ -59,6 +59,8 @@ class VeilidStateAttachment: j["attached_uptime"], ) + def to_json(self) -> dict: + return self.__dict__ class AnswerStats: @@ -114,6 +116,9 @@ class AnswerStats: j["consecutive_lost_answers_minimum"], ) + def to_json(self) -> dict: + return self.__dict__ + class RPCStats: messages_sent: int messages_rcvd: int @@ -172,6 +177,9 @@ class RPCStats: AnswerStats.from_json(j["answer_ordered"]), ) + def to_json(self) -> dict: + return self.__dict__ + class LatencyStats: fastest: TimestampDuration @@ -213,6 +221,9 @@ class LatencyStats: TimestampDuration(j["p75"]), ) + def to_json(self) -> dict: + return self.__dict__ + class TransferStats: total: ByteCount @@ -365,6 +376,9 @@ class PeerStats: StateStats.from_json(j["state"]), ) + def to_json(self) -> dict: + return self.__dict__ + class PeerTableData: node_ids: list[str] @@ -381,6 +395,9 @@ class PeerTableData: """JSON object hook""" return cls(j["node_ids"], j["peer_address"], PeerStats.from_json(j["peer_stats"])) + def to_json(self) -> dict: + return self.__dict__ + class VeilidStateNetwork: started: bool @@ -410,6 +427,9 @@ class VeilidStateNetwork: [PeerTableData.from_json(peer) for peer in j["peers"]], ) + def to_json(self) -> dict: + return self.__dict__ + class VeilidStateConfig: config: VeilidConfig @@ -422,6 +442,9 @@ class VeilidStateConfig: """JSON object hook""" return cls(VeilidConfig.from_json(j["config"])) + def to_json(self) -> dict: + return self.__dict__ + class VeilidState: attachment: VeilidStateAttachment @@ -447,6 +470,9 @@ class VeilidState: VeilidStateConfig.from_json(j["config"]), ) + def to_json(self) -> dict: + return self.__dict__ + class VeilidLog: log_level: VeilidLogLevel @@ -463,6 +489,9 @@ class VeilidLog: """JSON object hook""" return cls(VeilidLogLevel(j["log_level"]), j["message"], j["backtrace"]) + def to_json(self) -> dict: + return self.__dict__ + class VeilidAppMessage: sender: Optional[TypedKey] @@ -483,6 +512,9 @@ class VeilidAppMessage: urlsafe_b64decode_no_pad(j["message"]), ) + def to_json(self) -> dict: + return self.__dict__ + class VeilidAppCall: sender: Optional[TypedKey] @@ -506,6 +538,9 @@ class VeilidAppCall: OperationId(j["call_id"]), ) + def to_json(self) -> dict: + return self.__dict__ + class VeilidRouteChange: dead_routes: list[RouteId] @@ -523,6 +558,9 @@ class VeilidRouteChange: [RouteId(route) for route in j["dead_remote_routes"]], ) + def to_json(self) -> dict: + return self.__dict__ + class VeilidValueChange: key: TypedKey @@ -546,6 +584,9 @@ class VeilidValueChange: None if j["value"] is None else ValueData.from_json(j["value"]), ) + def to_json(self) -> dict: + return self.__dict__ + class VeilidUpdateKind(StrEnum): LOG = "Log" @@ -610,3 +651,6 @@ class VeilidUpdate: case _: raise ValueError("Unknown VeilidUpdateKind") return cls(kind, detail) + + def to_json(self) -> dict: + return self.__dict__