From 3d88416baa4abe4dfc6f225cbfd858aed7ce9c46 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 12 Apr 2025 16:24:01 -0400 Subject: [PATCH] watchvalue fixes --- veilid-core/proto/veilid.capnp | 2 +- veilid-core/proto/veilid_capnp.rs | 52 +++++++++---------- veilid-core/src/storage_manager/debug.rs | 6 +-- .../outbound_watch_manager/mod.rs | 29 +++++++++++ .../outbound_watch_manager/outbound_watch.rs | 33 ++++++++++-- .../outbound_watch_parameters.rs | 18 +++++++ .../outbound_watch_state.rs | 36 ++++++++++++- .../outbound_watch_manager/per_node_state.rs | 36 +++++++++++++ .../src/storage_manager/watch_value.rs | 18 +++++-- 9 files changed, 190 insertions(+), 40 deletions(-) diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index df07b21f..170e5365 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -354,7 +354,7 @@ struct OperationSetValueA @0x9378d0732dc95be2 { struct OperationWatchValueQ @0xf9a5a6c547b9b228 { key @0 :TypedKey; # key for value to watch subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges). An empty range here should not be specified unless cancelling a watch (count=0). - expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max) + expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (watch can return less, 0 for max) count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous) watchId @4 :UInt64; # if 0, request a new watch. if >0, existing watch id watcher @5 :PublicKey; # the watcher performing the watch, can be the owner or a schema member, or a generated anonymous watch keypair diff --git a/veilid-core/proto/veilid_capnp.rs b/veilid-core/proto/veilid_capnp.rs index 3434d036..d8b47f87 100644 --- a/veilid-core/proto/veilid_capnp.rs +++ b/veilid-core/proto/veilid_capnp.rs @@ -14167,7 +14167,7 @@ pub mod operation_watch_value_q { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(4, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(76, 65, 0, 0, 116, 69, 0, 0), + ::capnp::word(76, 65, 0, 0, 119, 69, 0, 0), ::capnp::word(21, 0, 0, 0, 66, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -14514,7 +14514,7 @@ pub mod operation_watch_value_a { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(118, 69, 0, 0, 185, 71, 0, 0), + ::capnp::word(121, 69, 0, 0, 188, 71, 0, 0), ::capnp::word(21, 0, 0, 0, 66, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -14816,7 +14816,7 @@ pub mod operation_inspect_value_q { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(2, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(187, 71, 0, 0, 95, 73, 0, 0), + ::capnp::word(190, 71, 0, 0, 98, 73, 0, 0), ::capnp::word(21, 0, 0, 0, 82, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -15114,7 +15114,7 @@ pub mod operation_inspect_value_a { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(3, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(97, 73, 0, 0, 220, 75, 0, 0), + ::capnp::word(100, 73, 0, 0, 223, 75, 0, 0), ::capnp::word(21, 0, 0, 0, 82, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -15443,7 +15443,7 @@ pub mod operation_value_changed { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(3, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(222, 75, 0, 0, 84, 78, 0, 0), + ::capnp::word(225, 75, 0, 0, 87, 78, 0, 0), ::capnp::word(21, 0, 0, 0, 74, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -15724,7 +15724,7 @@ pub mod operation_supply_block_q { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(86, 78, 0, 0, 228, 78, 0, 0), + ::capnp::word(89, 78, 0, 0, 231, 78, 0, 0), ::capnp::word(21, 0, 0, 0, 74, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -15946,7 +15946,7 @@ pub mod operation_supply_block_a { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(230, 78, 0, 0, 26, 80, 0, 0), + ::capnp::word(233, 78, 0, 0, 29, 80, 0, 0), ::capnp::word(21, 0, 0, 0, 74, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -16180,7 +16180,7 @@ pub mod operation_find_block_q { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(28, 80, 0, 0, 164, 80, 0, 0), + ::capnp::word(31, 80, 0, 0, 167, 80, 0, 0), ::capnp::word(21, 0, 0, 0, 58, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -16437,7 +16437,7 @@ pub mod operation_find_block_a { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(3, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(166, 80, 0, 0, 119, 82, 0, 0), + ::capnp::word(169, 80, 0, 0, 122, 82, 0, 0), ::capnp::word(21, 0, 0, 0, 58, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -16737,7 +16737,7 @@ pub mod operation_signal { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 2, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(121, 82, 0, 0, 42, 83, 0, 0), + ::capnp::word(124, 82, 0, 0, 45, 83, 0, 0), ::capnp::word(21, 0, 0, 0, 26, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -16851,7 +16851,7 @@ pub static ENCODED_NODE: [::capnp::Word; 28] = [ ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(44, 83, 0, 0, 230, 83, 0, 0), + ::capnp::word(47, 83, 0, 0, 233, 83, 0, 0), ::capnp::word(21, 0, 0, 0, 50, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -16921,7 +16921,7 @@ pub static ENCODED_NODE: [::capnp::Word; 38] = [ ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(232, 83, 0, 0, 88, 85, 0, 0), + ::capnp::word(235, 83, 0, 0, 91, 85, 0, 0), ::capnp::word(21, 0, 0, 0, 250, 0, 0, 0), ::capnp::word(33, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -17128,7 +17128,7 @@ pub mod tunnel_endpoint { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(90, 85, 0, 0, 53, 86, 0, 0), + ::capnp::word(93, 85, 0, 0, 56, 86, 0, 0), ::capnp::word(21, 0, 0, 0, 18, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -17408,7 +17408,7 @@ pub mod full_tunnel { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(2, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(55, 86, 0, 0, 204, 87, 0, 0), + ::capnp::word(58, 86, 0, 0, 207, 87, 0, 0), ::capnp::word(21, 0, 0, 0, 242, 0, 0, 0), ::capnp::word(33, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -17691,7 +17691,7 @@ pub mod partial_tunnel { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(206, 87, 0, 0, 0, 89, 0, 0), + ::capnp::word(209, 87, 0, 0, 3, 89, 0, 0), ::capnp::word(21, 0, 0, 0, 10, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -17944,7 +17944,7 @@ pub mod operation_start_tunnel_q { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(0, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(2, 89, 0, 0, 86, 90, 0, 0), + ::capnp::word(5, 89, 0, 0, 89, 90, 0, 0), ::capnp::word(21, 0, 0, 0, 74, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -18222,7 +18222,7 @@ pub mod operation_start_tunnel_a { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 2, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(88, 90, 0, 0, 86, 91, 0, 0), + ::capnp::word(91, 90, 0, 0, 89, 91, 0, 0), ::capnp::word(21, 0, 0, 0, 74, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -18493,7 +18493,7 @@ pub mod operation_complete_tunnel_q { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(88, 91, 0, 0, 9, 93, 0, 0), + ::capnp::word(91, 91, 0, 0, 12, 93, 0, 0), ::capnp::word(21, 0, 0, 0, 98, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -18788,7 +18788,7 @@ pub mod operation_complete_tunnel_a { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 2, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(11, 93, 0, 0, 9, 94, 0, 0), + ::capnp::word(14, 93, 0, 0, 12, 94, 0, 0), ::capnp::word(21, 0, 0, 0, 98, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -19008,7 +19008,7 @@ pub mod operation_cancel_tunnel_q { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(0, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(11, 94, 0, 0, 146, 94, 0, 0), + ::capnp::word(14, 94, 0, 0, 149, 94, 0, 0), ::capnp::word(21, 0, 0, 0, 82, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -19238,7 +19238,7 @@ pub mod operation_cancel_tunnel_a { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(0, 0, 7, 0, 0, 0, 2, 0), ::capnp::word(4, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(148, 94, 0, 0, 143, 95, 0, 0), + ::capnp::word(151, 94, 0, 0, 146, 95, 0, 0), ::capnp::word(21, 0, 0, 0, 82, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -19480,7 +19480,7 @@ pub mod question { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(2, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(174, 95, 0, 0, 21, 100, 0, 0), + ::capnp::word(177, 95, 0, 0, 24, 100, 0, 0), ::capnp::word(21, 0, 0, 0, 226, 0, 0, 0), ::capnp::word(33, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -20470,7 +20470,7 @@ pub mod statement { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(58, 100, 0, 0, 5, 102, 0, 0), + ::capnp::word(61, 100, 0, 0, 8, 102, 0, 0), ::capnp::word(21, 0, 0, 0, 234, 0, 0, 0), ::capnp::word(33, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -21137,7 +21137,7 @@ pub mod answer { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(33, 102, 0, 0, 189, 105, 0, 0), + ::capnp::word(36, 102, 0, 0, 192, 105, 0, 0), ::capnp::word(21, 0, 0, 0, 210, 0, 0, 0), ::capnp::word(33, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -21903,7 +21903,7 @@ pub mod operation { ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(2, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(191, 105, 0, 0, 254, 107, 0, 0), + ::capnp::word(194, 105, 0, 0, 1, 108, 0, 0), ::capnp::word(21, 0, 0, 0, 234, 0, 0, 0), ::capnp::word(33, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -22327,6 +22327,6 @@ pub mod operation { } } -//BUILDHASH:196f6bbf9c5b0ec79e01c72ef11ada6bef828ecbd7e4a9ee9021e9e2574e3352 +//BUILDHASH:d299ae03245f6c89b9bc93626cd5df415ad3ddf5fb23f9b4395809c1bba656e5 //CAPNPDESIREDVERSIONHASH:bfec2e34583ada7e6af2cb73993fb75a3f7147a6c943e5ff5f5c4294fc577b90 diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index f0a03d0d..4b1b1018 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -30,11 +30,7 @@ impl StorageManager { } pub async fn debug_watched_records(&self) -> String { let inner = self.inner.lock().await; - let mut out = "[\n".to_owned(); - for (k, v) in &inner.outbound_watch_manager.outbound_watches { - out += &format!(" {} {:?}\n", k, v); - } - format!("{}]\n", out) + inner.outbound_watch_manager.to_string() } pub async fn debug_offline_records(&self) -> String { let inner = self.inner.lock().await; diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs b/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs index 3bbe190d..c6ac381f 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs @@ -20,6 +20,35 @@ pub(in crate::storage_manager) struct OutboundWatchManager { pub per_node_state: HashMap, } +impl fmt::Display for OutboundWatchManager { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut out = "outbound_watches: [\n".to_owned(); + { + let mut keys = self.outbound_watches.keys().copied().collect::>(); + keys.sort(); + + for k in keys { + let v = self.outbound_watches.get(&k).unwrap(); + out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.to_string())); + } + } + out += "]\n"; + out += "per_node_state: [\n"; + { + let mut keys = self.per_node_state.keys().copied().collect::>(); + keys.sort(); + + for k in keys { + let v = self.per_node_state.get(&k).unwrap(); + out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.to_string())); + } + } + out += "]\n"; + + write!(f, "{}", out) + } +} + impl Default for OutboundWatchManager { fn default() -> Self { Self::new() diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs index 25c59e90..99dba136 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs @@ -13,6 +13,25 @@ pub(in crate::storage_manager) struct OutboundWatch { desired: Option, } +impl fmt::Display for OutboundWatch { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Desired: {}\nState:\n{}\n", + if let Some(desired) = &self.desired { + desired.to_string() + } else { + "None".to_owned() + }, + if let Some(state) = &self.state { + indent_all_by(4, state.to_string()) + } else { + "None".to_owned() + }, + ) + } +} + impl OutboundWatch { /// Create new outbound watch with desired parameters pub fn new(desired: OutboundWatchParameters) -> Self { @@ -68,6 +87,8 @@ impl OutboundWatch { veilid_log!(registry warn "should have checked for is_dead first"); return false; } + + // If there is no current watch then there is nothing to cancel let Some(state) = self.state() else { return false; }; @@ -79,7 +100,7 @@ impl OutboundWatch { } // If we have expired and can't renew, then cancel - if cur_ts >= state.params().expiration_ts { + if state.params().expiration_ts.as_u64() != 0 && cur_ts >= state.params().expiration_ts { return true; } @@ -98,13 +119,17 @@ impl OutboundWatch { veilid_log!(registry warn "should have checked for is_dead and needs_cancel first"); return false; } + // If there is no current watch then there is nothing to renew let Some(state) = self.state() else { return false; }; + // If the watch has per node watches that have expired, // but we can extend our watch then renew - if cur_ts >= state.min_expiration_ts() && cur_ts < state.params().expiration_ts { + if cur_ts >= state.min_expiration_ts() + && (state.params().expiration_ts.as_u64() == 0 || cur_ts < state.params().expiration_ts) + { return true; } @@ -145,7 +170,7 @@ impl OutboundWatch { return false; }; - // If there is a desired watch but no current watch, then reconcile + // If there is a desired watch but no current state, then reconcile let Some(state) = self.state() else { return true; }; @@ -157,7 +182,7 @@ impl OutboundWatch { } // If we are still working on getting the 'current' state to match // the 'desired' state, then do the reconcile if we are within the timeframe for it - if state.nodes().len() != consensus_count + if state.nodes().len() < consensus_count && cur_ts >= state.next_reconcile_ts().unwrap_or_default() { return true; diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_parameters.rs b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_parameters.rs index 127030ae..637feab4 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_parameters.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_parameters.rs @@ -17,3 +17,21 @@ pub struct OutboundWatchParameters { /// What safety selection to use on the network pub safety_selection: SafetySelection, } + +impl fmt::Display for OutboundWatchParameters { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{{ expiration={}, count={}, subkeys={}, opt_watcher={}, safety_selection={:?} }}", + self.expiration_ts, + self.count, + self.subkeys, + if let Some(watcher) = &self.opt_watcher { + watcher.to_string() + } else { + "None".to_owned() + }, + self.safety_selection + ) + } +} diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_state.rs b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_state.rs index 39e325d3..dce18d4d 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_state.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch_state.rs @@ -17,6 +17,36 @@ pub(in crate::storage_manager) struct OutboundWatchState { value_changed_routes: BTreeSet, } +impl fmt::Display for OutboundWatchState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut value_changed_routes = self + .value_changed_routes + .iter() + .map(|x| x.to_string()) + .collect::>(); + value_changed_routes.sort(); + + write!( + f, + "params: {}\nnodes: [{}]\nremaining_count: {}\nopt_next_reconcile_ts: {}\nmin_expiration_ts: {}\nvalue_changed_routes: [{}]\n", + self.params, + self.nodes + .iter() + .map(|x| x.node_id.to_string()) + .collect::>() + .join(","), + self.remaining_count, + if let Some(next_reconcile_ts) = &self.opt_next_reconcile_ts { + next_reconcile_ts.to_string() + } else { + "None".to_owned() + }, + self.min_expiration_ts, + value_changed_routes.join(","), + ) + } +} + pub(in crate::storage_manager) struct OutboundWatchStateEditor<'a> { state: &'a mut OutboundWatchState, } @@ -27,7 +57,11 @@ impl OutboundWatchStateEditor<'_> { self.state.params = params; } pub fn add_nodes>(&mut self, nodes: I) { - self.state.nodes.extend(nodes); + for node in nodes { + if !self.state.nodes.contains(&node) { + self.state.nodes.push(node); + } + } } pub fn retain_nodes bool>(&mut self, f: F) { self.state.nodes.retain(f); diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/per_node_state.rs b/veilid-core/src/storage_manager/outbound_watch_manager/per_node_state.rs index e88e261b..2c754da2 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/per_node_state.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/per_node_state.rs @@ -10,6 +10,16 @@ pub(in crate::storage_manager) struct PerNodeKey { pub node_id: TypedKey, } +impl fmt::Display for PerNodeKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{{ record_key={}, node_id={} }}", + self.record_key, self.node_id + ) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub(in crate::storage_manager) struct PerNodeState { /// Watch Id @@ -28,3 +38,29 @@ pub(in crate::storage_manager) struct PerNodeState { /// Which private route is responsible for receiving ValueChanged notifications pub opt_value_changed_route: Option, } + +impl fmt::Display for PerNodeState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{{ watch_id={}, safety_selection={:?}, opt_watcher={}, expiration_ts={}, count={}, watch_node_ref={}, opt_value_changed_route={} }}", + self.watch_id, + self.safety_selection, + if let Some(watcher) = &self.opt_watcher { + watcher.to_string() + } else { + "None".to_owned() + }, + self.expiration_ts, + self.count, + if let Some(watch_node_ref) = &self.watch_node_ref { + watch_node_ref.to_string() + } else { + "None".to_string() + }, + if let Some(value_changed_route)= &self.opt_value_changed_route { + value_changed_route.to_string() + } else { + "None".to_string() + } + ) + } +} diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 48791da1..889dfdb9 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -715,6 +715,7 @@ impl StorageManager { let watch_node_ref = Some(accepted_watch.node_ref); let opt_value_changed_route = accepted_watch.opt_value_changed_route; + // Insert state, possibly overwriting an existing one inner.outbound_watch_manager.per_node_state.insert( pnk, PerNodeState { @@ -939,7 +940,14 @@ impl StorageManager { if count > per_node_state.count { // If count is greater than our requested count then this is invalid, cancel the watch // XXX: Should this be a punishment? - veilid_log!(self debug "watch count went backward: {}: {} > {}", record_key, count, per_node_state.count); + veilid_log!(self debug + "watch count went backward: {} @ {} id={}: {} > {}", + record_key, + inbound_node_id, + watch_id, + count, + per_node_state.count + ); // Force count to zero for this node id so it gets cancelled out by the background process per_node_state.count = 0; @@ -949,16 +957,20 @@ impl StorageManager { // Log this because watch counts should always be decrementing non a per-node basis. // XXX: Should this be a punishment? veilid_log!(self debug - "watch count duplicate: {}: {} == {}", + "watch count duplicate: {} @ {} id={}: {} == {}", record_key, + inbound_node_id, + watch_id, count, per_node_state.count ); } else { // Reduce the per-node watch count veilid_log!(self debug - "watch count decremented: {}: {} < {}", + "watch count decremented: {} @ {} id={}: {} < {}", record_key, + inbound_node_id, + watch_id, count, per_node_state.count );