From e009b1097b11de2d330f359dcd16df7011dd0a07 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 4 Mar 2024 23:04:29 -0500 Subject: [PATCH] watch ids --- veilid-core/proto/veilid.capnp | 9 +- veilid-core/proto/veilid_capnp.rs | 234 ++++++++++++------ .../operations/operation_value_changed.rs | 27 +- .../operations/operation_watch_value.rs | 60 ++++- .../src/rpc_processor/rpc_value_changed.rs | 17 +- .../src/rpc_processor/rpc_watch_value.rs | 55 +++- veilid-core/src/storage_manager/debug.rs | 18 +- veilid-core/src/storage_manager/mod.rs | 12 +- .../src/storage_manager/record_store.rs | 140 +++++++---- .../storage_manager/storage_manager_inner.rs | 16 +- .../storage_manager/types/opened_record.rs | 2 + .../src/storage_manager/watch_value.rs | 72 ++++-- veilid-core/src/veilid_api/routing_context.rs | 2 +- 13 files changed, 481 insertions(+), 183 deletions(-) diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 6202a29e..84c7c541 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -356,20 +356,23 @@ struct OperationWatchValueQ @0xf9a5a6c547b9b228 { subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges), if empty, watch everything expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max) count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous) - watcher @4 :PublicKey; # optional: the watcher performing the watch, can be the owner or a schema member - signature @5 :Signature; # optional: signature of the watcher, must be one of the schema members or the key owner. signature covers: key, subkeys, expiration, count + watchId @4 :UInt64; # optional: (0 = unspecified) existing watch id to update or cancel unless this is a new watch + watcher @5 :PublicKey; # the watcher performing the watch, can be the owner or a schema member, or a generated anonymous watch keypair + signature @6 :Signature; # signature of the watcher, signature covers: key, subkeys, expiration, count, watchId } struct OperationWatchValueA @0xa726cab7064ba893 { expiration @0 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch was rejected). if watch is being cancelled (with count = 0), this will be the non-zero former expiration time. peers @1 :List(PeerInfo); # returned list of other nodes to ask that could propagate watches + watchId @2 :UInt64; # random id for watch instance on this node } struct OperationValueChanged @0xd1c59ebdd8cc1bf6 { key @0 :TypedKey; # key for value that changed subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time) count @2 :UInt32; # remaining changes left (0 means watch has expired) - value @3 :SignedValueData; # first value that changed (the rest can be gotten with getvalue) + watchId @3 :UInt64; # watch id this value change came from + value @4 :SignedValueData; # first value that changed (the rest can be gotten with getvalue) } struct OperationSupplyBlockQ @0xadbf4c542d749971 { diff --git a/veilid-core/proto/veilid_capnp.rs b/veilid-core/proto/veilid_capnp.rs index ec7837ac..9b484c10 100644 --- a/veilid-core/proto/veilid_capnp.rs +++ b/veilid-core/proto/veilid_capnp.rs @@ -13822,6 +13822,10 @@ pub mod operation_watch_value_q { self.reader.get_data_field::(2) } #[inline] + pub fn get_watch_id(self) -> u64 { + self.reader.get_data_field::(2) + } + #[inline] pub fn get_watcher(self) -> ::capnp::Result> { ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(2), ::core::option::Option::None) } @@ -13841,7 +13845,7 @@ pub mod operation_watch_value_q { pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> } impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> { - const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 2, pointers: 4 }; + const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 3, pointers: 4 }; } impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> { const TYPE_ID: u64 = _private::TYPE_ID; @@ -13940,6 +13944,14 @@ pub mod operation_watch_value_q { self.builder.set_data_field::(2, value); } #[inline] + pub fn get_watch_id(self) -> u64 { + self.builder.get_data_field::(2) + } + #[inline] + pub fn set_watch_id(&mut self, value: u64) { + self.builder.set_data_field::(2, value); + } + #[inline] pub fn get_watcher(self) -> ::capnp::Result> { ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(2), ::core::option::Option::None) } @@ -13991,17 +14003,17 @@ pub mod operation_watch_value_q { } } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 115] = [ + pub static ENCODED_NODE: [::capnp::Word; 130] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(40, 178, 185, 71, 197, 166, 165, 249), - ::capnp::word(19, 0, 0, 0, 1, 0, 2, 0), + ::capnp::word(19, 0, 0, 0, 1, 0, 3, 0), ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(4, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(21, 0, 0, 0, 66, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(33, 0, 0, 0, 87, 1, 0, 0), + ::capnp::word(33, 0, 0, 0, 143, 1, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(112, 114, 111, 116, 111, 47, 118, 101), @@ -14010,49 +14022,56 @@ pub mod operation_watch_value_q { ::capnp::word(116, 105, 111, 110, 87, 97, 116, 99), ::capnp::word(104, 86, 97, 108, 117, 101, 81, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), - ::capnp::word(24, 0, 0, 0, 3, 0, 4, 0), + ::capnp::word(28, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(153, 0, 0, 0, 34, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(148, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(160, 0, 0, 0, 2, 0, 1, 0), - ::capnp::word(1, 0, 0, 0, 1, 0, 0, 0), - ::capnp::word(0, 0, 1, 0, 1, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(157, 0, 0, 0, 66, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(152, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(180, 0, 0, 0, 2, 0, 1, 0), - ::capnp::word(2, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(177, 0, 0, 0, 90, 0, 0, 0), + ::capnp::word(181, 0, 0, 0, 34, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(176, 0, 0, 0, 3, 0, 1, 0), ::capnp::word(188, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(1, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(185, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(180, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(208, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(2, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(205, 0, 0, 0, 90, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(204, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(216, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(3, 0, 0, 0, 2, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 3, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(185, 0, 0, 0, 50, 0, 0, 0), + ::capnp::word(213, 0, 0, 0, 50, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(180, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(192, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(208, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(220, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(4, 0, 0, 0, 2, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 4, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(189, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(217, 0, 0, 0, 66, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(184, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(196, 0, 0, 0, 2, 0, 1, 0), - ::capnp::word(5, 0, 0, 0, 3, 0, 0, 0), + ::capnp::word(212, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(224, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(5, 0, 0, 0, 2, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 5, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(193, 0, 0, 0, 82, 0, 0, 0), + ::capnp::word(221, 0, 0, 0, 66, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(192, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(204, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(216, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(228, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(6, 0, 0, 0, 3, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 6, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(225, 0, 0, 0, 82, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(224, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(236, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(107, 101, 121, 0, 0, 0, 0, 0), ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(41, 27, 230, 241, 169, 103, 213, 226), @@ -14090,6 +14109,14 @@ pub mod operation_watch_value_q { ::capnp::word(8, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(119, 97, 116, 99, 104, 73, 100, 0), + ::capnp::word(9, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(9, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(119, 97, 116, 99, 104, 101, 114, 0), ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(13, 169, 246, 134, 50, 78, 228, 221), @@ -14114,8 +14141,9 @@ pub mod operation_watch_value_q { 1 => <::capnp::struct_list::Owned as ::capnp::introspect::Introspect>::introspect(), 2 => ::introspect(), 3 => ::introspect(), - 4 => ::introspect(), - 5 => ::introspect(), + 4 => ::introspect(), + 5 => ::introspect(), + 6 => ::introspect(), _ => panic!("invalid field index {}", index), } } @@ -14127,7 +14155,7 @@ pub mod operation_watch_value_q { nonunion_members: NONUNION_MEMBERS, members_by_discriminant: MEMBERS_BY_DISCRIMINANT, }; - pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3,4,5]; + pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3,4,5,6]; pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[]; pub const TYPE_ID: u64 = 0xf9a5_a6c5_47b9_b228; } @@ -14206,11 +14234,15 @@ pub mod operation_watch_value_a { pub fn has_peers(&self) -> bool { !self.reader.get_pointer_field(0).is_null() } + #[inline] + pub fn get_watch_id(self) -> u64 { + self.reader.get_data_field::(1) + } } pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> } impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> { - const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 1, pointers: 1 }; + const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 2, pointers: 1 }; } impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> { const TYPE_ID: u64 = _private::TYPE_ID; @@ -14284,6 +14316,14 @@ pub mod operation_watch_value_a { pub fn has_peers(&self) -> bool { !self.builder.is_pointer_field_null(0) } + #[inline] + pub fn get_watch_id(self) -> u64 { + self.builder.get_data_field::(1) + } + #[inline] + pub fn set_watch_id(&mut self, value: u64) { + self.builder.set_data_field::(1, value); + } } pub struct Pipeline { _typeless: ::capnp::any_pointer::Pipeline } @@ -14295,17 +14335,17 @@ pub mod operation_watch_value_a { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 54] = [ + pub static ENCODED_NODE: [::capnp::Word; 69] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(147, 168, 75, 6, 183, 202, 38, 167), - ::capnp::word(19, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(19, 0, 0, 0, 1, 0, 2, 0), ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(21, 0, 0, 0, 66, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(33, 0, 0, 0, 119, 0, 0, 0), + ::capnp::word(33, 0, 0, 0, 175, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(112, 114, 111, 116, 111, 47, 118, 101), @@ -14314,21 +14354,28 @@ pub mod operation_watch_value_a { ::capnp::word(116, 105, 111, 110, 87, 97, 116, 99), ::capnp::word(104, 86, 97, 108, 117, 101, 65, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), - ::capnp::word(8, 0, 0, 0, 3, 0, 4, 0), + ::capnp::word(12, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(41, 0, 0, 0, 90, 0, 0, 0), + ::capnp::word(69, 0, 0, 0, 90, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(40, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(52, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(68, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(80, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(1, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 1, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(49, 0, 0, 0, 50, 0, 0, 0), + ::capnp::word(77, 0, 0, 0, 50, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(44, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(72, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(72, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(100, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(2, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(97, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(92, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(104, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(101, 120, 112, 105, 114, 97, 116, 105), ::capnp::word(111, 110, 0, 0, 0, 0, 0, 0), ::capnp::word(9, 0, 0, 0, 0, 0, 0, 0), @@ -14350,11 +14397,20 @@ pub mod operation_watch_value_a { ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(119, 97, 116, 99, 104, 73, 100, 0), + ::capnp::word(9, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(9, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ]; pub fn get_field_types(index: u16) -> ::capnp::introspect::Type { match index { 0 => ::introspect(), 1 => <::capnp::struct_list::Owned as ::capnp::introspect::Introspect>::introspect(), + 2 => ::introspect(), _ => panic!("invalid field index {}", index), } } @@ -14366,7 +14422,7 @@ pub mod operation_watch_value_a { nonunion_members: NONUNION_MEMBERS, members_by_discriminant: MEMBERS_BY_DISCRIMINANT, }; - pub static NONUNION_MEMBERS : &[u16] = &[0,1]; + pub static NONUNION_MEMBERS : &[u16] = &[0,1,2]; pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[]; pub const TYPE_ID: u64 = 0xa726_cab7_064b_a893; } @@ -14454,6 +14510,10 @@ pub mod operation_value_changed { self.reader.get_data_field::(0) } #[inline] + pub fn get_watch_id(self) -> u64 { + self.reader.get_data_field::(1) + } + #[inline] pub fn get_value(self) -> ::capnp::Result> { ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(2), ::core::option::Option::None) } @@ -14465,7 +14525,7 @@ pub mod operation_value_changed { pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> } impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> { - const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 1, pointers: 3 }; + const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 2, pointers: 3 }; } impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> { const TYPE_ID: u64 = _private::TYPE_ID; @@ -14556,6 +14616,14 @@ pub mod operation_value_changed { self.builder.set_data_field::(0, value); } #[inline] + pub fn get_watch_id(self) -> u64 { + self.builder.get_data_field::(1) + } + #[inline] + pub fn set_watch_id(&mut self, value: u64) { + self.builder.set_data_field::(1, value); + } + #[inline] pub fn get_value(self) -> ::capnp::Result> { ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(2), ::core::option::Option::None) } @@ -14588,17 +14656,17 @@ pub mod operation_value_changed { } } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 84] = [ + pub static ENCODED_NODE: [::capnp::Word; 99] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(246, 27, 204, 216, 189, 158, 197, 209), - ::capnp::word(19, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(19, 0, 0, 0, 1, 0, 2, 0), ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(3, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(21, 0, 0, 0, 74, 1, 0, 0), ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(37, 0, 0, 0, 231, 0, 0, 0), + ::capnp::word(37, 0, 0, 0, 31, 1, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(112, 114, 111, 116, 111, 47, 118, 101), @@ -14608,35 +14676,42 @@ pub mod operation_value_changed { ::capnp::word(101, 67, 104, 97, 110, 103, 101, 100), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), - ::capnp::word(16, 0, 0, 0, 3, 0, 4, 0), + ::capnp::word(20, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(97, 0, 0, 0, 34, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(92, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(104, 0, 0, 0, 2, 0, 1, 0), - ::capnp::word(1, 0, 0, 0, 1, 0, 0, 0), - ::capnp::word(0, 0, 1, 0, 1, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(101, 0, 0, 0, 66, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(96, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(124, 0, 0, 0, 2, 0, 1, 0), - ::capnp::word(2, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(121, 0, 0, 0, 50, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(116, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(128, 0, 0, 0, 2, 0, 1, 0), - ::capnp::word(3, 0, 0, 0, 2, 0, 0, 0), - ::capnp::word(0, 0, 1, 0, 3, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(125, 0, 0, 0, 50, 0, 0, 0), + ::capnp::word(125, 0, 0, 0, 34, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(120, 0, 0, 0, 3, 0, 1, 0), ::capnp::word(132, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(1, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(129, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(124, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(152, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(2, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(149, 0, 0, 0, 50, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(144, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(156, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(3, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 3, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(153, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(148, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(160, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(4, 0, 0, 0, 2, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 4, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(157, 0, 0, 0, 50, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(152, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(164, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(107, 101, 121, 0, 0, 0, 0, 0), ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(41, 27, 230, 241, 169, 103, 213, 226), @@ -14665,6 +14740,14 @@ pub mod operation_value_changed { ::capnp::word(8, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(119, 97, 116, 99, 104, 73, 100, 0), + ::capnp::word(9, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(9, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(118, 97, 108, 117, 101, 0, 0, 0), ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(61, 42, 159, 22, 111, 65, 183, 180), @@ -14679,7 +14762,8 @@ pub mod operation_value_changed { 0 => ::introspect(), 1 => <::capnp::struct_list::Owned as ::capnp::introspect::Introspect>::introspect(), 2 => ::introspect(), - 3 => ::introspect(), + 3 => ::introspect(), + 4 => ::introspect(), _ => panic!("invalid field index {}", index), } } @@ -14691,7 +14775,7 @@ pub mod operation_value_changed { nonunion_members: NONUNION_MEMBERS, members_by_discriminant: MEMBERS_BY_DISCRIMINANT, }; - pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3]; + pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3,4]; pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[]; pub const TYPE_ID: u64 = 0xd1c5_9ebd_d8cc_1bf6; } @@ -21293,4 +21377,4 @@ pub mod operation { } } -//BUILDHASH:ab4fd70d40c9e543f799ce326dd41c61c7ea78132fb53f164156073d9786a9f6 +//BUILDHASH:539ec27eab88af2af5785cd8c1145478f30cd3fe2c08cd0ec7f18d2f4f3c2128 diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs index 1fecc143..46f78698 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs @@ -8,6 +8,7 @@ pub(in crate::rpc_processor) struct RPCOperationValueChanged { key: TypedKey, subkeys: ValueSubkeyRangeSet, count: u32, + watch_id: u64, value: SignedValueData, } @@ -17,6 +18,7 @@ impl RPCOperationValueChanged { key: TypedKey, subkeys: ValueSubkeyRangeSet, count: u32, + watch_id: u64, value: SignedValueData, ) -> Result { // Needed because RangeSetBlaze uses different types here all the time @@ -31,12 +33,16 @@ impl RPCOperationValueChanged { key, subkeys, count, + watch_id, value, }) } pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { - // validation must be done by storage manager as this is more complicated + if self.watch_id == 0 { + return Err(RPCError::protocol("ValueChanged does not have a valid id")); + } + // further validation must be done by storage manager as this is more complicated Ok(()) } @@ -55,14 +61,25 @@ impl RPCOperationValueChanged { self.count } + #[allow(dead_code)] + pub fn watch_id(&self) -> u64 { + self.watch_id + } + #[allow(dead_code)] pub fn value(&self) -> &SignedValueData { &self.value } #[allow(dead_code)] - pub fn destructure(self) -> (TypedKey, ValueSubkeyRangeSet, u32, SignedValueData) { - (self.key, self.subkeys, self.count, self.value) + pub fn destructure(self) -> (TypedKey, ValueSubkeyRangeSet, u32, u64, SignedValueData) { + ( + self.key, + self.subkeys, + self.count, + self.watch_id, + self.value, + ) } pub fn decode( @@ -93,11 +110,14 @@ impl RPCOperationValueChanged { } let count = reader.get_count(); let v_reader = reader.get_value().map_err(RPCError::protocol)?; + let watch_id = reader.get_watch_id(); let value = decode_signed_value_data(&v_reader)?; + Ok(Self { key, subkeys, count, + watch_id, value, }) } @@ -121,6 +141,7 @@ impl RPCOperationValueChanged { } builder.set_count(self.count); + builder.set_watch_id(self.watch_id); let mut v_builder = builder.reborrow().init_value(); encode_signed_value_data(&self.value, &mut v_builder)?; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs index bc137b74..1509fde8 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs @@ -9,6 +9,7 @@ pub(in crate::rpc_processor) struct RPCOperationWatchValueQ { subkeys: ValueSubkeyRangeSet, expiration: u64, count: u32, + watch_id: Option, watcher: PublicKey, signature: Signature, } @@ -20,6 +21,7 @@ impl RPCOperationWatchValueQ { subkeys: ValueSubkeyRangeSet, expiration: u64, count: u32, + watch_id: Option, watcher: KeyPair, vcrypto: CryptoSystemVersion, ) -> Result { @@ -31,7 +33,7 @@ impl RPCOperationWatchValueQ { return Err(RPCError::protocol("WatchValueQ subkeys length too long")); } - let signature_data = Self::make_signature_data(&key, &subkeys, expiration, count); + let signature_data = Self::make_signature_data(&key, &subkeys, expiration, count, watch_id); let signature = vcrypto .sign(&watcher.key, &watcher.secret, &signature_data) .map_err(RPCError::protocol)?; @@ -41,6 +43,7 @@ impl RPCOperationWatchValueQ { subkeys, expiration, count, + watch_id, watcher: watcher.key, signature, }) @@ -52,6 +55,7 @@ impl RPCOperationWatchValueQ { subkeys: &ValueSubkeyRangeSet, expiration: u64, count: u32, + watch_id: Option, ) -> Vec { // Needed because RangeSetBlaze uses different types here all the time #[allow(clippy::unnecessary_cast)] @@ -66,6 +70,9 @@ impl RPCOperationWatchValueQ { } sig_data.extend_from_slice(&expiration.to_le_bytes()); sig_data.extend_from_slice(&count.to_le_bytes()); + if let Some(watch_id) = watch_id { + sig_data.extend_from_slice(&watch_id.to_le_bytes()); + } sig_data } @@ -74,8 +81,13 @@ impl RPCOperationWatchValueQ { return Err(RPCError::protocol("unsupported cryptosystem")); }; - let sig_data = - Self::make_signature_data(&self.key, &self.subkeys, self.expiration, self.count); + let sig_data = Self::make_signature_data( + &self.key, + &self.subkeys, + self.expiration, + self.count, + self.watch_id, + ); vcrypto .verify(&self.watcher, &sig_data, &self.signature) .map_err(RPCError::protocol)?; @@ -102,6 +114,11 @@ impl RPCOperationWatchValueQ { self.count } + #[allow(dead_code)] + pub fn watch_id(&self) -> Option { + self.watch_id + } + #[allow(dead_code)] pub fn watcher(&self) -> &PublicKey { &self.watcher @@ -118,6 +135,7 @@ impl RPCOperationWatchValueQ { ValueSubkeyRangeSet, u64, u32, + Option, PublicKey, Signature, ) { @@ -126,6 +144,7 @@ impl RPCOperationWatchValueQ { self.subkeys, self.expiration, self.count, + self.watch_id, self.watcher, self.signature, ) @@ -159,6 +178,11 @@ impl RPCOperationWatchValueQ { let expiration = reader.get_expiration(); let count = reader.get_count(); + let watch_id = if reader.get_watch_id() == 0 { + Some(reader.get_watch_id()) + } else { + None + }; let w_reader = reader.get_watcher().map_err(RPCError::protocol)?; let watcher = decode_key256(&w_reader); @@ -171,6 +195,7 @@ impl RPCOperationWatchValueQ { subkeys, expiration, count, + watch_id, watcher, signature, }) @@ -196,6 +221,7 @@ impl RPCOperationWatchValueQ { } builder.set_expiration(self.expiration); builder.set_count(self.count); + builder.set_watch_id(self.watch_id.unwrap_or(0u64)); let mut w_builder = builder.reborrow().init_watcher(); encode_key256(&self.watcher, &mut w_builder); @@ -211,18 +237,26 @@ impl RPCOperationWatchValueQ { pub(in crate::rpc_processor) struct RPCOperationWatchValueA { expiration: u64, peers: Vec, + watch_id: u64, } impl RPCOperationWatchValueA { #[allow(dead_code)] - pub fn new(expiration: u64, peers: Vec) -> Result { + pub fn new(expiration: u64, peers: Vec, watch_id: u64) -> Result { if peers.len() > MAX_WATCH_VALUE_A_PEERS_LEN { return Err(RPCError::protocol("WatchValueA peers length too long")); } - Ok(Self { expiration, peers }) + Ok(Self { + expiration, + peers, + watch_id, + }) } pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { + if self.watch_id == 0 { + return Err(RPCError::protocol("WatchValueA does not have a valid id")); + } PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); Ok(()) } @@ -236,8 +270,12 @@ impl RPCOperationWatchValueA { &self.peers } #[allow(dead_code)] - pub fn destructure(self) -> (u64, Vec) { - (self.expiration, self.peers) + pub fn watch_id(&self) -> u64 { + self.watch_id + } + #[allow(dead_code)] + pub fn destructure(self) -> (u64, Vec, u64) { + (self.expiration, self.peers, self.watch_id) } pub fn decode( @@ -258,8 +296,13 @@ impl RPCOperationWatchValueA { let peer_info = decode_peer_info(&p)?; peers.push(peer_info); } + let watch_id = reader.get_watch_id(); - Ok(Self { expiration, peers }) + Ok(Self { + expiration, + peers, + watch_id, + }) } pub fn encode( &self, @@ -277,6 +320,7 @@ impl RPCOperationWatchValueA { let mut pi_builder = peers_builder.reborrow().get(i as u32); encode_peer_info(peer, &mut pi_builder)?; } + builder.set_watch_id(self.watch_id); Ok(()) } diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index 1f34f434..002480b8 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -14,6 +14,7 @@ impl RPCProcessor { key: TypedKey, subkeys: ValueSubkeyRangeSet, count: u32, + watch_id: u64, value: SignedValueData, ) -> RPCNetworkResult<()> { // Ensure destination is never using a safety route @@ -22,7 +23,7 @@ impl RPCProcessor { "Never send value changes over safety routes", )); } - let value_changed = RPCOperationValueChanged::new(key, subkeys, count, value)?; + let value_changed = RPCOperationValueChanged::new(key, subkeys, count, watch_id, value)?; let statement = RPCStatement::new(RPCStatementDetail::ValueChanged(Box::new(value_changed))); @@ -33,7 +34,7 @@ impl RPCProcessor { pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Get the statement let (_, _, _, kind) = msg.operation.destructure(); - let (key, subkeys, count, value) = match kind { + let (key, subkeys, count, watch_id, value) = match kind { RPCOperationKind::Statement(s) => match s.destructure() { RPCStatementDetail::ValueChanged(s) => s.destructure(), _ => panic!("not a value changed statement"), @@ -69,7 +70,8 @@ impl RPCProcessor { ); let debug_string_stmt = format!( - "IN <== ValueChanged({} #{:?}+{}{}) from {} <= {}", + "IN <== ValueChanged(id={} {} #{:?}+{}{}) from {} <= {}", + watch_id, key, subkeys, count, @@ -84,7 +86,14 @@ impl RPCProcessor { // Save the subkey, creating a new record if necessary let storage_manager = self.storage_manager(); storage_manager - .inbound_value_changed(key, subkeys, count, Arc::new(value), inbound_node_id) + .inbound_value_changed( + key, + subkeys, + count, + Arc::new(value), + inbound_node_id, + watch_id, + ) .await .map_err(RPCError::internal)?; diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index 8b08a642..ca0285bc 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -4,6 +4,7 @@ use super::*; pub struct WatchValueAnswer { pub expiration_ts: Timestamp, pub peers: Vec, + pub watch_id: u64, } impl RPCProcessor { @@ -22,6 +23,7 @@ impl RPCProcessor { ret.peers.len ),err) )] + #[allow(clippy::too_many_arguments)] pub async fn rpc_call_watch_value( self, dest: Destination, @@ -30,6 +32,7 @@ impl RPCProcessor { expiration: Timestamp, count: u32, watcher: KeyPair, + watch_id: Option, ) -> RPCNetworkResult> { // Ensure destination never has a private route // and get the target noderef so we can validate the response @@ -48,8 +51,18 @@ impl RPCProcessor { }; let debug_string = format!( - "OUT ==> WatchValueQ({} {}@{}+{}) => {} (watcher={})", - key, subkeys, expiration, count, dest, watcher.key + "OUT ==> WatchValueQ({} {} {}@{}+{}) => {} (watcher={}) ", + if let Some(watch_id) = watch_id { + format!("id={} ", watch_id) + } else { + "".to_owned() + }, + key, + subkeys, + expiration, + count, + dest, + watcher.key ); // Send the watchvalue question @@ -58,6 +71,7 @@ impl RPCProcessor { subkeys.clone(), expiration.as_u64(), count, + watch_id, watcher, vcrypto.clone(), )?; @@ -90,12 +104,13 @@ impl RPCProcessor { }, _ => return Ok(NetworkResult::invalid_message("not an answer")), }; - - let (expiration, peers) = watch_value_a.destructure(); + let question_watch_id = watch_id; + let (expiration, peers, watch_id) = watch_value_a.destructure(); #[cfg(feature = "debug-dht")] { let debug_string_answer = format!( - "OUT <== WatchValueA({} #{:?}@{} peers={}) <= {}", + "OUT <== WatchValueA(id={} {} #{:?}@{} peers={}) <= {}", + watch_id, key, subkeys, expiration, @@ -112,6 +127,16 @@ impl RPCProcessor { log_rpc!(debug "Peers: {:#?}", peer_ids); } + // Validate returned answer watch id is the same as the question watch id if it exists + if let Some(question_watch_id) = question_watch_id { + if question_watch_id != watch_id { + return Ok(NetworkResult::invalid_message(format!( + "answer watch id={} doesn't match question watch id={}", + watch_id, question_watch_id, + ))); + } + } + // Validate peers returned are, in fact, closer to the key than the node we sent this to let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { Ok(v) => v, @@ -139,6 +164,7 @@ impl RPCProcessor { WatchValueAnswer { expiration_ts: Timestamp::new(expiration), peers, + watch_id, }, ))) } @@ -185,7 +211,8 @@ impl RPCProcessor { }; // Destructure - let (key, subkeys, expiration, count, watcher, _signature) = watch_value_q.destructure(); + let (key, subkeys, expiration, count, watch_id, watcher, _signature) = + watch_value_q.destructure(); // Get target for ValueChanged notifications let dest = network_result_try!(self.get_respond_to_destination(&msg)); @@ -194,7 +221,12 @@ impl RPCProcessor { #[cfg(feature = "debug-dht")] { let debug_string = format!( - "IN <=== WatchValueQ({} {}@{}+{}) <== {} (watcher={})", + "IN <=== WatchValueQ({}{} {}@{}+{}) <== {} (watcher={})", + if let Some(watch_id) = watch_id { + format!("id={} ", watch_id) + } else { + "".to_owned() + }, key, subkeys, expiration, @@ -216,13 +248,13 @@ impl RPCProcessor { let c = self.config.get(); c.network.dht.set_value_count as usize }; - let ret_expiration = if closer_to_key_peers.len() >= set_value_count { + let (ret_expiration, ret_watch_id) = if closer_to_key_peers.len() >= set_value_count { // Not close enough #[cfg(feature = "debug-dht")] log_rpc!(debug "Not close enough for watch value"); - Timestamp::default() + (Timestamp::default(), 0) } else { // Close enough, lets watch it @@ -234,6 +266,7 @@ impl RPCProcessor { subkeys.clone(), Timestamp::new(expiration), count, + watch_id, target, watcher ) @@ -244,7 +277,8 @@ impl RPCProcessor { #[cfg(feature = "debug-dht")] { let debug_string_answer = format!( - "IN ===> WatchValueA({} #{} expiration={} peers={}) ==> {}", + "IN ===> WatchValueA(id={} {} #{} expiration={} peers={}) ==> {}", + ret_watch_id, key, subkeys, ret_expiration, @@ -263,6 +297,7 @@ impl RPCProcessor { } else { vec![] }, + ret_watch_id, )?; // Send GetValue answer diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index 84e47a34..2332ce31 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -17,14 +17,16 @@ impl StorageManager { } pub(crate) async fn debug_opened_records(&self) -> String { let inner = self.inner.lock().await; - format!( - "{:#?}", - inner - .opened_records - .keys() - .copied() - .collect::>() - ) + let mut out = "[\n".to_owned(); + for (k, v) in &inner.opened_records { + let writer = if let Some(w) = v.writer() { + w.to_string() + } else { + "".to_owned() + }; + out += &format!(" {} {},\n", k, writer); + } + format!("{}]\n", out) } pub(crate) async fn purge_local_records(&self, reclaim: Option) -> String { diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 40656391..bb1b7923 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -43,6 +43,7 @@ struct ValueChangedInfo { key: TypedKey, subkeys: ValueSubkeyRangeSet, count: u32, + watch_id: u64, value: Arc, } @@ -311,6 +312,7 @@ impl StorageManager { 0, opened_record.safety_selection(), opened_record.writer().cloned(), + Some(active_watch.id), Some(active_watch.watch_node), ) .await?; @@ -582,14 +584,16 @@ impl StorageManager { subkeys }; - // Get the safety selection and the writer we opened this record with - let (safety_selection, opt_writer, opt_watch_node) = { + // Get the safety selection and the writer we opened this record + // and whatever active watch id and watch node we may have in case this is a watch update + let (safety_selection, opt_writer, opt_watch_id, opt_watch_node) = { let Some(opened_record) = inner.opened_records.get(&key) else { apibail_generic!("record not open"); }; ( opened_record.safety_selection(), opened_record.writer().cloned(), + opened_record.active_watch().map(|aw| aw.id), opened_record.active_watch().map(|aw| aw.watch_node.clone()), ) }; @@ -613,6 +617,7 @@ impl StorageManager { count, safety_selection, opt_writer, + opt_watch_id, opt_watch_node, ) .await?; @@ -663,6 +668,7 @@ impl StorageManager { // Keep a record of the watch opened_record.set_active_watch(ActiveWatch { + id: owvresult.watch_id, expiration_ts, watch_node: owvresult.watch_node, opt_value_changed_route: owvresult.opt_value_changed_route, @@ -744,7 +750,7 @@ impl StorageManager { .map_err(VeilidAPIError::from)?; network_result_value_or_log!(rpc_processor - .rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, (*vc.value).clone()) + .rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, vc.watch_id, (*vc.value).clone() ) .await .map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {}); diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index 32f73710..79012da0 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -25,6 +25,7 @@ where /// An individual watch #[derive(Debug, Clone)] struct WatchedRecordWatch { + id: u64, subkeys: ValueSubkeyRangeSet, expiration: Timestamp, count: u32, @@ -757,19 +758,24 @@ where Ok(()) } - /// Add a record watch for changes + /// Add an inbound record watch for changes + #[allow(clippy::too_many_arguments)] pub async fn watch_record( &mut self, key: TypedKey, subkeys: ValueSubkeyRangeSet, mut expiration: Timestamp, count: u32, + watch_id: Option, target: Target, - watcher: CryptoKey, - ) -> VeilidAPIResult> { + watcher: PublicKey, + ) -> VeilidAPIResult> { // If subkeys is empty or count is zero then we're cancelling a watch completely if subkeys.is_empty() || count == 0 { - return self.cancel_watch(key, target, watcher).await; + if let Some(watch_id) = watch_id { + return self.cancel_watch(key, watch_id, watcher).await; + } + apibail_internal!("shouldn't have let a None watch id get here"); } // See if expiration timestamp is too far in the future or not enough in the future @@ -795,28 +801,83 @@ where return Ok(None); }; + // Generate a record-unique watch id > 0 if one is not specified + let rtk = RecordTableKey { key }; + let mut new_watch = false; + let watch_id = watch_id.unwrap_or_else(|| { + new_watch = true; + let mut id = 0; + while id == 0 { + id = get_random_u64(); + } + if let Some(watched_record) = self.watched_records.get_mut(&rtk) { + // Make sure it doesn't match any other id (unlikely, but lets be certain) + 'x: loop { + for w in &mut watched_record.watchers { + if w.id == id { + loop { + id = id.overflowing_add(1).0; + if id != 0 { + break; + } + } + continue 'x; + } + } + break; + } + } + id + }); + // See if we are updating an existing watch // with the watcher matched on target let mut watch_count = 0; - let rtk = RecordTableKey { key }; - if let Some(watch) = self.watched_records.get_mut(&rtk) { - for w in &mut watch.watchers { - if w.watcher == watcher { + let mut target_watch_count = 0; + if let Some(watched_record) = self.watched_records.get_mut(&rtk) { + for w in &mut watched_record.watchers { + // Total up the number of watches for this key + // If the watcher is a member of the schema, then consider the total per-watcher key + // If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-record key + if !is_member || w.watcher == watcher { watch_count += 1; - // Only one watch for an anonymous watcher - // Allow members to have one watch per target - if !is_member || w.target == target { - // Updating an existing watch - w.subkeys = subkeys; - w.expiration = expiration; - w.count = count; - return Ok(Some(expiration)); + // For any watch, if the target matches our also tally that separately + // If the watcher is a member of the schema, then consider the total per-target-per-watcher key + // If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-target-per-record key + if w.target == target { + target_watch_count += 1; } } + + // If this is not a new watch and the watch id doesn't match, then we're not updating + if !new_watch && w.id == watch_id { + // Updating an existing watch + // You can change a watcher key, or target via this update + // as well as the subkey range, expiration and count of the watch + w.watcher = watcher; + w.target = target; + w.subkeys = subkeys; + w.expiration = expiration; + w.count = count; + return Ok(Some((expiration, watch_id))); + } } } + // Only proceed here if this is a new watch + if !new_watch { + // Not a new watch + return Ok(None); + } + + // For members, no more than one watch per target per watcher per record + // For anonymous, no more than one watch per target per record + if target_watch_count > 0 { + // Too many watches + return Ok(None); + } + // Adding a new watcher to a watch // Check watch table for limits if is_member { @@ -827,6 +888,7 @@ where } } else { // Public watch + // No more than one if watch_count >= self.limits.public_watch_limit { // Too many watches return Ok(None); @@ -836,6 +898,7 @@ where // Ok this is an acceptable new watch, add it let watch = self.watched_records.entry(rtk).or_default(); watch.watchers.push(WatchedRecordWatch { + id: watch_id, subkeys, expiration, count, @@ -843,43 +906,33 @@ where watcher, changed: ValueSubkeyRangeSet::new(), }); - Ok(Some(expiration)) + Ok(Some((expiration, watch_id))) } - /// Add a record watch for changes + /// Clear a specific watch for a record async fn cancel_watch( &mut self, key: TypedKey, - target: Target, - watcher: CryptoKey, - ) -> VeilidAPIResult> { - // Get the record being watched - let Some(is_member) = self.with_record(key, |record| { - // Check if the watcher specified is a schema member - let schema = record.schema(); - (*record.owner()) == watcher || schema.is_member(&watcher) - }) else { - // Record not found - return Ok(None); - }; - + watch_id: u64, + watcher: PublicKey, + ) -> VeilidAPIResult> { + if watch_id == 0 { + apibail_internal!("should not have let a a zero watch id get here"); + } // See if we are cancelling an existing watch // with the watcher matched on target let rtk = RecordTableKey { key }; let mut is_empty = false; - let mut ret_timestamp = None; + let mut ret = None; if let Some(watch) = self.watched_records.get_mut(&rtk) { let mut dead_watcher = None; for (wn, w) in watch.watchers.iter_mut().enumerate() { - if w.watcher == watcher { - // Only one watch for an anonymous watcher - // Allow members to have one watch per target - if !is_member || w.target == target { - // Canceling an existing watch - dead_watcher = Some(wn); - ret_timestamp = Some(w.expiration); - break; - } + // Must match the watch id and the watcher key to cancel + if w.id == watch_id && w.watcher == watcher { + // Canceling an existing watch + dead_watcher = Some(wn); + ret = Some((w.expiration, watch_id)); + break; } } if let Some(dw) = dead_watcher { @@ -893,7 +946,7 @@ where self.watched_records.remove(&rtk); } - Ok(ret_timestamp) + Ok(ret) } /// Move watches from one store to another @@ -921,6 +974,7 @@ where key: TypedKey, subkeys: ValueSubkeyRangeSet, count: u32, + watch_id: u64, } let mut evcis = vec![]; @@ -953,6 +1007,7 @@ where key: rtk.key, subkeys, count, + watch_id: w.id, }); } @@ -996,6 +1051,7 @@ where key: evci.key, subkeys: evci.subkeys, count: evci.count, + watch_id: evci.watch_id, value, }); } diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index c5d1a0f3..3f5fa6ac 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -537,21 +537,23 @@ impl StorageManagerInner { Ok(()) } + #[allow(clippy::too_many_arguments)] pub(super) async fn handle_watch_local_value( &mut self, key: TypedKey, subkeys: ValueSubkeyRangeSet, expiration: Timestamp, count: u32, + watch_id: Option, target: Target, - watcher: CryptoKey, - ) -> VeilidAPIResult> { + watcher: PublicKey, + ) -> VeilidAPIResult> { // See if it's in the local record store let Some(local_record_store) = self.local_record_store.as_mut() else { apibail_not_initialized!(); }; local_record_store - .watch_record(key, subkeys, expiration, count, target, watcher) + .watch_record(key, subkeys, expiration, count, watch_id, target, watcher) .await } @@ -612,21 +614,23 @@ impl StorageManagerInner { Ok(()) } + #[allow(clippy::too_many_arguments)] pub(super) async fn handle_watch_remote_value( &mut self, key: TypedKey, subkeys: ValueSubkeyRangeSet, expiration: Timestamp, count: u32, + watch_id: Option, target: Target, - watcher: CryptoKey, - ) -> VeilidAPIResult> { + watcher: PublicKey, + ) -> VeilidAPIResult> { // See if it's in the remote record store let Some(remote_record_store) = self.remote_record_store.as_mut() else { apibail_not_initialized!(); }; remote_record_store - .watch_record(key, subkeys, expiration, count, target, watcher) + .watch_record(key, subkeys, expiration, count, watch_id, target, watcher) .await } diff --git a/veilid-core/src/storage_manager/types/opened_record.rs b/veilid-core/src/storage_manager/types/opened_record.rs index cbdd1c16..acde0a08 100644 --- a/veilid-core/src/storage_manager/types/opened_record.rs +++ b/veilid-core/src/storage_manager/types/opened_record.rs @@ -2,6 +2,8 @@ use super::*; #[derive(Clone, Debug)] pub(in crate::storage_manager) struct ActiveWatch { + /// The watch id returned from the watch node + pub id: u64, /// The expiration of a successful watch pub expiration_ts: Timestamp, /// Which node accepted the watch diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 526a76f6..6271923c 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -11,6 +11,8 @@ struct OutboundWatchValueContext { pub(super) struct OutboundWatchValueResult { /// The expiration of a successful watch pub expiration_ts: Timestamp, + /// What watch id was returned + pub watch_id: u64, /// Which node accepted the watch pub watch_node: NodeRef, /// Which private route is responsible for receiving ValueChanged notifications @@ -29,6 +31,7 @@ impl StorageManager { count: u32, safety_selection: SafetySelection, opt_watcher: Option, + opt_watch_id: Option, opt_watch_node: Option, ) -> VeilidAPIResult> { let routing_table = rpc_processor.routing_table(); @@ -79,7 +82,8 @@ impl StorageManager { subkeys, expiration, count, - watcher + watcher, + opt_watch_id ) .await? ); @@ -88,14 +92,15 @@ impl StorageManager { if wva.answer.expiration_ts.as_u64() > 0 { if count > 0 { // If we asked for a nonzero notification count, then this is an accepted watch - log_stor!(debug "Watch accepted: expiration_ts={}", debug_ts(wva.answer.expiration_ts.as_u64())); + log_stor!(debug "Watch accepted: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64())); } else { // If we asked for a zero notification count, then this is a cancelled watch - log_stor!(debug "Watch cancelled"); + log_stor!(debug "Watch cancelled: id={}", wva.answer.watch_id); } let mut ctx = context.lock(); ctx.opt_watch_value_result = Some(OutboundWatchValueResult { expiration_ts: wva.answer.expiration_ts, + watch_id: wva.answer.watch_id, watch_node: next_node.clone(), opt_value_changed_route: wva.reply_private_route, }); @@ -176,35 +181,55 @@ impl StorageManager { } /// Handle a received 'Watch Value' query + #[allow(clippy::too_many_arguments)] pub async fn inbound_watch_value( &self, key: TypedKey, subkeys: ValueSubkeyRangeSet, expiration: Timestamp, count: u32, + watch_id: Option, target: Target, - watcher: CryptoKey, - ) -> VeilidAPIResult> { + watcher: PublicKey, + ) -> VeilidAPIResult> { let mut inner = self.lock().await?; + // Validate input + if (subkeys.is_empty() || count == 0) && (watch_id.unwrap_or_default() == 0) { + // Can't cancel a watch without a watch id + return VeilidAPIResult::Ok(NetworkResult::invalid_message( + "can't cancel watch without id", + )); + } + // See if this is a remote or local value - let (_is_local, opt_expiration_ts) = { + let (_is_local, opt_ret) = { // See if the subkey we are watching has a local value - let opt_expiration_ts = inner - .handle_watch_local_value(key, subkeys.clone(), expiration, count, target, watcher) + let opt_ret = inner + .handle_watch_local_value( + key, + subkeys.clone(), + expiration, + count, + watch_id, + target, + watcher, + ) .await?; - if opt_expiration_ts.is_some() { - (true, opt_expiration_ts) + if opt_ret.is_some() { + (true, opt_ret) } else { // See if the subkey we are watching is a remote value - let opt_expiration_ts = inner - .handle_watch_remote_value(key, subkeys, expiration, count, target, watcher) + let opt_ret = inner + .handle_watch_remote_value( + key, subkeys, expiration, count, watch_id, target, watcher, + ) .await?; - (false, opt_expiration_ts) + (false, opt_ret) } }; - Ok(NetworkResult::value(opt_expiration_ts.unwrap_or_default())) + Ok(NetworkResult::value(opt_ret.unwrap_or_default())) } /// Handle a received 'Value Changed' statement @@ -215,26 +240,33 @@ impl StorageManager { mut count: u32, value: Arc, inbound_node_id: TypedKey, + watch_id: u64, ) -> VeilidAPIResult<()> { // Update local record store with new value let (res, opt_update_callback) = { let mut inner = self.lock().await?; + // Don't process update if the record is closed let Some(opened_record) = inner.opened_records.get_mut(&key) else { - // Don't process update if the record is closed - return Ok(()); - }; - let Some(mut active_watch) = opened_record.active_watch() else { - // No active watch means no callback return Ok(()); }; + // No active watch means no callback + let Some(mut active_watch) = opened_record.active_watch() else { + return Ok(()); + }; + + // If the watch id doesn't match, then don't process this + if active_watch.id != watch_id { + return Ok(()); + } + + // If the reporting node is not the same as our watch, don't process the value change if !active_watch .watch_node .node_ids() .contains(&inbound_node_id) { - // If the reporting node is not the same as our watch, don't process the value change return Ok(()); } diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 15f94c0f..81f5ace2 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -354,7 +354,7 @@ impl RoutingContext { /// continue to report changes via the callback. Nodes that agree to doing watches will be put on our 'ping' list to ensure they are still around /// otherwise the watch will be cancelled and will have to be re-watched. /// - /// There is only one watch permitted per record. If a change to a watch is desired, the first one must will be overwritten. + /// There is only one watch permitted per record. If a change to a watch is desired, the first one will be overwritten. /// * `key` is the record key to watch. it must first be opened for reading or writing. /// * `subkeys` is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys. /// * `expiration` is the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.