watchvalue fixes

This commit is contained in:
Christien Rioux 2025-04-12 16:24:01 -04:00
parent 5de4d80151
commit 3d88416baa
9 changed files with 190 additions and 40 deletions

View File

@ -354,7 +354,7 @@ struct OperationSetValueA @0x9378d0732dc95be2 {
struct OperationWatchValueQ @0xf9a5a6c547b9b228 {
key @0 :TypedKey; # key for value to watch
subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges). An empty range here should not be specified unless cancelling a watch (count=0).
expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max)
expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (watch can return less, 0 for max)
count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous)
watchId @4 :UInt64; # if 0, request a new watch. if >0, existing watch id
watcher @5 :PublicKey; # the watcher performing the watch, can be the owner or a schema member, or a generated anonymous watch keypair

View File

@ -14167,7 +14167,7 @@ pub mod operation_watch_value_q {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(4, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(76, 65, 0, 0, 116, 69, 0, 0),
::capnp::word(76, 65, 0, 0, 119, 69, 0, 0),
::capnp::word(21, 0, 0, 0, 66, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -14514,7 +14514,7 @@ pub mod operation_watch_value_a {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(118, 69, 0, 0, 185, 71, 0, 0),
::capnp::word(121, 69, 0, 0, 188, 71, 0, 0),
::capnp::word(21, 0, 0, 0, 66, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -14816,7 +14816,7 @@ pub mod operation_inspect_value_q {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(2, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(187, 71, 0, 0, 95, 73, 0, 0),
::capnp::word(190, 71, 0, 0, 98, 73, 0, 0),
::capnp::word(21, 0, 0, 0, 82, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -15114,7 +15114,7 @@ pub mod operation_inspect_value_a {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(3, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(97, 73, 0, 0, 220, 75, 0, 0),
::capnp::word(100, 73, 0, 0, 223, 75, 0, 0),
::capnp::word(21, 0, 0, 0, 82, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -15443,7 +15443,7 @@ pub mod operation_value_changed {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(3, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(222, 75, 0, 0, 84, 78, 0, 0),
::capnp::word(225, 75, 0, 0, 87, 78, 0, 0),
::capnp::word(21, 0, 0, 0, 74, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -15724,7 +15724,7 @@ pub mod operation_supply_block_q {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(86, 78, 0, 0, 228, 78, 0, 0),
::capnp::word(89, 78, 0, 0, 231, 78, 0, 0),
::capnp::word(21, 0, 0, 0, 74, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -15946,7 +15946,7 @@ pub mod operation_supply_block_a {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(230, 78, 0, 0, 26, 80, 0, 0),
::capnp::word(233, 78, 0, 0, 29, 80, 0, 0),
::capnp::word(21, 0, 0, 0, 74, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -16180,7 +16180,7 @@ pub mod operation_find_block_q {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(28, 80, 0, 0, 164, 80, 0, 0),
::capnp::word(31, 80, 0, 0, 167, 80, 0, 0),
::capnp::word(21, 0, 0, 0, 58, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -16437,7 +16437,7 @@ pub mod operation_find_block_a {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(3, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(166, 80, 0, 0, 119, 82, 0, 0),
::capnp::word(169, 80, 0, 0, 122, 82, 0, 0),
::capnp::word(21, 0, 0, 0, 58, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -16737,7 +16737,7 @@ pub mod operation_signal {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 2, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(121, 82, 0, 0, 42, 83, 0, 0),
::capnp::word(124, 82, 0, 0, 45, 83, 0, 0),
::capnp::word(21, 0, 0, 0, 26, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -16851,7 +16851,7 @@ pub static ENCODED_NODE: [::capnp::Word; 28] = [
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(44, 83, 0, 0, 230, 83, 0, 0),
::capnp::word(47, 83, 0, 0, 233, 83, 0, 0),
::capnp::word(21, 0, 0, 0, 50, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -16921,7 +16921,7 @@ pub static ENCODED_NODE: [::capnp::Word; 38] = [
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(232, 83, 0, 0, 88, 85, 0, 0),
::capnp::word(235, 83, 0, 0, 91, 85, 0, 0),
::capnp::word(21, 0, 0, 0, 250, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -17128,7 +17128,7 @@ pub mod tunnel_endpoint {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(90, 85, 0, 0, 53, 86, 0, 0),
::capnp::word(93, 85, 0, 0, 56, 86, 0, 0),
::capnp::word(21, 0, 0, 0, 18, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -17408,7 +17408,7 @@ pub mod full_tunnel {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(2, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(55, 86, 0, 0, 204, 87, 0, 0),
::capnp::word(58, 86, 0, 0, 207, 87, 0, 0),
::capnp::word(21, 0, 0, 0, 242, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -17691,7 +17691,7 @@ pub mod partial_tunnel {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(206, 87, 0, 0, 0, 89, 0, 0),
::capnp::word(209, 87, 0, 0, 3, 89, 0, 0),
::capnp::word(21, 0, 0, 0, 10, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -17944,7 +17944,7 @@ pub mod operation_start_tunnel_q {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(0, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(2, 89, 0, 0, 86, 90, 0, 0),
::capnp::word(5, 89, 0, 0, 89, 90, 0, 0),
::capnp::word(21, 0, 0, 0, 74, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -18222,7 +18222,7 @@ pub mod operation_start_tunnel_a {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 2, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(88, 90, 0, 0, 86, 91, 0, 0),
::capnp::word(91, 90, 0, 0, 89, 91, 0, 0),
::capnp::word(21, 0, 0, 0, 74, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -18493,7 +18493,7 @@ pub mod operation_complete_tunnel_q {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(88, 91, 0, 0, 9, 93, 0, 0),
::capnp::word(91, 91, 0, 0, 12, 93, 0, 0),
::capnp::word(21, 0, 0, 0, 98, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -18788,7 +18788,7 @@ pub mod operation_complete_tunnel_a {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 2, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(11, 93, 0, 0, 9, 94, 0, 0),
::capnp::word(14, 93, 0, 0, 12, 94, 0, 0),
::capnp::word(21, 0, 0, 0, 98, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -19008,7 +19008,7 @@ pub mod operation_cancel_tunnel_q {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(0, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(11, 94, 0, 0, 146, 94, 0, 0),
::capnp::word(14, 94, 0, 0, 149, 94, 0, 0),
::capnp::word(21, 0, 0, 0, 82, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -19238,7 +19238,7 @@ pub mod operation_cancel_tunnel_a {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(0, 0, 7, 0, 0, 0, 2, 0),
::capnp::word(4, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(148, 94, 0, 0, 143, 95, 0, 0),
::capnp::word(151, 94, 0, 0, 146, 95, 0, 0),
::capnp::word(21, 0, 0, 0, 82, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -19480,7 +19480,7 @@ pub mod question {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(2, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(174, 95, 0, 0, 21, 100, 0, 0),
::capnp::word(177, 95, 0, 0, 24, 100, 0, 0),
::capnp::word(21, 0, 0, 0, 226, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -20470,7 +20470,7 @@ pub mod statement {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(58, 100, 0, 0, 5, 102, 0, 0),
::capnp::word(61, 100, 0, 0, 8, 102, 0, 0),
::capnp::word(21, 0, 0, 0, 234, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -21137,7 +21137,7 @@ pub mod answer {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(33, 102, 0, 0, 189, 105, 0, 0),
::capnp::word(36, 102, 0, 0, 192, 105, 0, 0),
::capnp::word(21, 0, 0, 0, 210, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -21903,7 +21903,7 @@ pub mod operation {
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(2, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(191, 105, 0, 0, 254, 107, 0, 0),
::capnp::word(194, 105, 0, 0, 1, 108, 0, 0),
::capnp::word(21, 0, 0, 0, 234, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@ -22327,6 +22327,6 @@ pub mod operation {
}
}
//BUILDHASH:196f6bbf9c5b0ec79e01c72ef11ada6bef828ecbd7e4a9ee9021e9e2574e3352
//BUILDHASH:d299ae03245f6c89b9bc93626cd5df415ad3ddf5fb23f9b4395809c1bba656e5
//CAPNPDESIREDVERSIONHASH:bfec2e34583ada7e6af2cb73993fb75a3f7147a6c943e5ff5f5c4294fc577b90

View File

@ -30,11 +30,7 @@ impl StorageManager {
}
pub async fn debug_watched_records(&self) -> String {
let inner = self.inner.lock().await;
let mut out = "[\n".to_owned();
for (k, v) in &inner.outbound_watch_manager.outbound_watches {
out += &format!(" {} {:?}\n", k, v);
}
format!("{}]\n", out)
inner.outbound_watch_manager.to_string()
}
pub async fn debug_offline_records(&self) -> String {
let inner = self.inner.lock().await;

View File

@ -20,6 +20,35 @@ pub(in crate::storage_manager) struct OutboundWatchManager {
pub per_node_state: HashMap<PerNodeKey, PerNodeState>,
}
impl fmt::Display for OutboundWatchManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut out = "outbound_watches: [\n".to_owned();
{
let mut keys = self.outbound_watches.keys().copied().collect::<Vec<_>>();
keys.sort();
for k in keys {
let v = self.outbound_watches.get(&k).unwrap();
out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.to_string()));
}
}
out += "]\n";
out += "per_node_state: [\n";
{
let mut keys = self.per_node_state.keys().copied().collect::<Vec<_>>();
keys.sort();
for k in keys {
let v = self.per_node_state.get(&k).unwrap();
out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.to_string()));
}
}
out += "]\n";
write!(f, "{}", out)
}
}
impl Default for OutboundWatchManager {
fn default() -> Self {
Self::new()

View File

@ -13,6 +13,25 @@ pub(in crate::storage_manager) struct OutboundWatch {
desired: Option<OutboundWatchParameters>,
}
impl fmt::Display for OutboundWatch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Desired: {}\nState:\n{}\n",
if let Some(desired) = &self.desired {
desired.to_string()
} else {
"None".to_owned()
},
if let Some(state) = &self.state {
indent_all_by(4, state.to_string())
} else {
"None".to_owned()
},
)
}
}
impl OutboundWatch {
/// Create new outbound watch with desired parameters
pub fn new(desired: OutboundWatchParameters) -> Self {
@ -68,6 +87,8 @@ impl OutboundWatch {
veilid_log!(registry warn "should have checked for is_dead first");
return false;
}
// If there is no current watch then there is nothing to cancel
let Some(state) = self.state() else {
return false;
};
@ -79,7 +100,7 @@ impl OutboundWatch {
}
// If we have expired and can't renew, then cancel
if cur_ts >= state.params().expiration_ts {
if state.params().expiration_ts.as_u64() != 0 && cur_ts >= state.params().expiration_ts {
return true;
}
@ -98,13 +119,17 @@ impl OutboundWatch {
veilid_log!(registry warn "should have checked for is_dead and needs_cancel first");
return false;
}
// If there is no current watch then there is nothing to renew
let Some(state) = self.state() else {
return false;
};
// If the watch has per node watches that have expired,
// but we can extend our watch then renew
if cur_ts >= state.min_expiration_ts() && cur_ts < state.params().expiration_ts {
if cur_ts >= state.min_expiration_ts()
&& (state.params().expiration_ts.as_u64() == 0 || cur_ts < state.params().expiration_ts)
{
return true;
}
@ -145,7 +170,7 @@ impl OutboundWatch {
return false;
};
// If there is a desired watch but no current watch, then reconcile
// If there is a desired watch but no current state, then reconcile
let Some(state) = self.state() else {
return true;
};
@ -157,7 +182,7 @@ impl OutboundWatch {
}
// If we are still working on getting the 'current' state to match
// the 'desired' state, then do the reconcile if we are within the timeframe for it
if state.nodes().len() != consensus_count
if state.nodes().len() < consensus_count
&& cur_ts >= state.next_reconcile_ts().unwrap_or_default()
{
return true;

View File

@ -17,3 +17,21 @@ pub struct OutboundWatchParameters {
/// What safety selection to use on the network
pub safety_selection: SafetySelection,
}
impl fmt::Display for OutboundWatchParameters {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{{ expiration={}, count={}, subkeys={}, opt_watcher={}, safety_selection={:?} }}",
self.expiration_ts,
self.count,
self.subkeys,
if let Some(watcher) = &self.opt_watcher {
watcher.to_string()
} else {
"None".to_owned()
},
self.safety_selection
)
}
}

View File

@ -17,6 +17,36 @@ pub(in crate::storage_manager) struct OutboundWatchState {
value_changed_routes: BTreeSet<PublicKey>,
}
impl fmt::Display for OutboundWatchState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut value_changed_routes = self
.value_changed_routes
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>();
value_changed_routes.sort();
write!(
f,
"params: {}\nnodes: [{}]\nremaining_count: {}\nopt_next_reconcile_ts: {}\nmin_expiration_ts: {}\nvalue_changed_routes: [{}]\n",
self.params,
self.nodes
.iter()
.map(|x| x.node_id.to_string())
.collect::<Vec<_>>()
.join(","),
self.remaining_count,
if let Some(next_reconcile_ts) = &self.opt_next_reconcile_ts {
next_reconcile_ts.to_string()
} else {
"None".to_owned()
},
self.min_expiration_ts,
value_changed_routes.join(","),
)
}
}
pub(in crate::storage_manager) struct OutboundWatchStateEditor<'a> {
state: &'a mut OutboundWatchState,
}
@ -27,7 +57,11 @@ impl OutboundWatchStateEditor<'_> {
self.state.params = params;
}
pub fn add_nodes<I: IntoIterator<Item = PerNodeKey>>(&mut self, nodes: I) {
self.state.nodes.extend(nodes);
for node in nodes {
if !self.state.nodes.contains(&node) {
self.state.nodes.push(node);
}
}
}
pub fn retain_nodes<F: FnMut(&PerNodeKey) -> bool>(&mut self, f: F) {
self.state.nodes.retain(f);

View File

@ -10,6 +10,16 @@ pub(in crate::storage_manager) struct PerNodeKey {
pub node_id: TypedKey,
}
impl fmt::Display for PerNodeKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{{ record_key={}, node_id={} }}",
self.record_key, self.node_id
)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeState {
/// Watch Id
@ -28,3 +38,29 @@ pub(in crate::storage_manager) struct PerNodeState {
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
}
impl fmt::Display for PerNodeState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{ watch_id={}, safety_selection={:?}, opt_watcher={}, expiration_ts={}, count={}, watch_node_ref={}, opt_value_changed_route={} }}",
self.watch_id,
self.safety_selection,
if let Some(watcher) = &self.opt_watcher {
watcher.to_string()
} else {
"None".to_owned()
},
self.expiration_ts,
self.count,
if let Some(watch_node_ref) = &self.watch_node_ref {
watch_node_ref.to_string()
} else {
"None".to_string()
},
if let Some(value_changed_route)= &self.opt_value_changed_route {
value_changed_route.to_string()
} else {
"None".to_string()
}
)
}
}

View File

@ -715,6 +715,7 @@ impl StorageManager {
let watch_node_ref = Some(accepted_watch.node_ref);
let opt_value_changed_route = accepted_watch.opt_value_changed_route;
// Insert state, possibly overwriting an existing one
inner.outbound_watch_manager.per_node_state.insert(
pnk,
PerNodeState {
@ -939,7 +940,14 @@ impl StorageManager {
if count > per_node_state.count {
// If count is greater than our requested count then this is invalid, cancel the watch
// XXX: Should this be a punishment?
veilid_log!(self debug "watch count went backward: {}: {} > {}", record_key, count, per_node_state.count);
veilid_log!(self debug
"watch count went backward: {} @ {} id={}: {} > {}",
record_key,
inbound_node_id,
watch_id,
count,
per_node_state.count
);
// Force count to zero for this node id so it gets cancelled out by the background process
per_node_state.count = 0;
@ -949,16 +957,20 @@ impl StorageManager {
// Log this because watch counts should always be decrementing non a per-node basis.
// XXX: Should this be a punishment?
veilid_log!(self debug
"watch count duplicate: {}: {} == {}",
"watch count duplicate: {} @ {} id={}: {} == {}",
record_key,
inbound_node_id,
watch_id,
count,
per_node_state.count
);
} else {
// Reduce the per-node watch count
veilid_log!(self debug
"watch count decremented: {}: {} < {}",
"watch count decremented: {} @ {} id={}: {} < {}",
record_key,
inbound_node_id,
watch_id,
count,
per_node_state.count
);