watch ids

This commit is contained in:
Christien Rioux 2024-03-04 23:04:29 -05:00
parent 4b632d8156
commit e009b1097b
13 changed files with 481 additions and 183 deletions

View File

@ -356,20 +356,23 @@ struct OperationWatchValueQ @0xf9a5a6c547b9b228 {
subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges), if empty, watch everything
expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max)
count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous)
watcher @4 :PublicKey; # optional: the watcher performing the watch, can be the owner or a schema member
signature @5 :Signature; # optional: signature of the watcher, must be one of the schema members or the key owner. signature covers: key, subkeys, expiration, count
watchId @4 :UInt64; # optional: (0 = unspecified) existing watch id to update or cancel unless this is a new watch
watcher @5 :PublicKey; # the watcher performing the watch, can be the owner or a schema member, or a generated anonymous watch keypair
signature @6 :Signature; # signature of the watcher, signature covers: key, subkeys, expiration, count, watchId
}
struct OperationWatchValueA @0xa726cab7064ba893 {
expiration @0 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch was rejected). if watch is being cancelled (with count = 0), this will be the non-zero former expiration time.
peers @1 :List(PeerInfo); # returned list of other nodes to ask that could propagate watches
watchId @2 :UInt64; # random id for watch instance on this node
}
struct OperationValueChanged @0xd1c59ebdd8cc1bf6 {
key @0 :TypedKey; # key for value that changed
subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time)
count @2 :UInt32; # remaining changes left (0 means watch has expired)
value @3 :SignedValueData; # first value that changed (the rest can be gotten with getvalue)
watchId @3 :UInt64; # watch id this value change came from
value @4 :SignedValueData; # first value that changed (the rest can be gotten with getvalue)
}
struct OperationSupplyBlockQ @0xadbf4c542d749971 {

View File

@ -13822,6 +13822,10 @@ pub mod operation_watch_value_q {
self.reader.get_data_field::<u32>(2)
}
#[inline]
pub fn get_watch_id(self) -> u64 {
self.reader.get_data_field::<u64>(2)
}
#[inline]
pub fn get_watcher(self) -> ::capnp::Result<crate::veilid_capnp::key256::Reader<'a>> {
::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(2), ::core::option::Option::None)
}
@ -13841,7 +13845,7 @@ pub mod operation_watch_value_q {
pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> }
impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> {
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 2, pointers: 4 };
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 3, pointers: 4 };
}
impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> {
const TYPE_ID: u64 = _private::TYPE_ID;
@ -13940,6 +13944,14 @@ pub mod operation_watch_value_q {
self.builder.set_data_field::<u32>(2, value);
}
#[inline]
pub fn get_watch_id(self) -> u64 {
self.builder.get_data_field::<u64>(2)
}
#[inline]
pub fn set_watch_id(&mut self, value: u64) {
self.builder.set_data_field::<u64>(2, value);
}
#[inline]
pub fn get_watcher(self) -> ::capnp::Result<crate::veilid_capnp::key256::Builder<'a>> {
::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(2), ::core::option::Option::None)
}
@ -13991,17 +14003,17 @@ pub mod operation_watch_value_q {
}
}
mod _private {
pub static ENCODED_NODE: [::capnp::Word; 115] = [
pub static ENCODED_NODE: [::capnp::Word; 130] = [
::capnp::word(0, 0, 0, 0, 5, 0, 6, 0),
::capnp::word(40, 178, 185, 71, 197, 166, 165, 249),
::capnp::word(19, 0, 0, 0, 1, 0, 2, 0),
::capnp::word(19, 0, 0, 0, 1, 0, 3, 0),
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(4, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(21, 0, 0, 0, 66, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 87, 1, 0, 0),
::capnp::word(33, 0, 0, 0, 143, 1, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(112, 114, 111, 116, 111, 47, 118, 101),
@ -14010,49 +14022,56 @@ pub mod operation_watch_value_q {
::capnp::word(116, 105, 111, 110, 87, 97, 116, 99),
::capnp::word(104, 86, 97, 108, 117, 101, 81, 0),
::capnp::word(0, 0, 0, 0, 1, 0, 1, 0),
::capnp::word(24, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(28, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(153, 0, 0, 0, 34, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(148, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(160, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(1, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(157, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(152, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(180, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(2, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(177, 0, 0, 0, 90, 0, 0, 0),
::capnp::word(181, 0, 0, 0, 34, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(176, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(188, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(1, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(185, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(180, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(208, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(2, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(205, 0, 0, 0, 90, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(204, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(216, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(3, 0, 0, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 3, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(185, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(213, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(180, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(192, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(208, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(220, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(4, 0, 0, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 4, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(189, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(217, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(184, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(196, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(5, 0, 0, 0, 3, 0, 0, 0),
::capnp::word(212, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(224, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(5, 0, 0, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 5, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(193, 0, 0, 0, 82, 0, 0, 0),
::capnp::word(221, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(192, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(204, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(216, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(228, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(6, 0, 0, 0, 3, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 6, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(225, 0, 0, 0, 82, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(224, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(236, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(107, 101, 121, 0, 0, 0, 0, 0),
::capnp::word(16, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(41, 27, 230, 241, 169, 103, 213, 226),
@ -14090,6 +14109,14 @@ pub mod operation_watch_value_q {
::capnp::word(8, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(119, 97, 116, 99, 104, 73, 100, 0),
::capnp::word(9, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(9, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(119, 97, 116, 99, 104, 101, 114, 0),
::capnp::word(16, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(13, 169, 246, 134, 50, 78, 228, 221),
@ -14114,8 +14141,9 @@ pub mod operation_watch_value_q {
1 => <::capnp::struct_list::Owned<crate::veilid_capnp::subkey_range::Owned> as ::capnp::introspect::Introspect>::introspect(),
2 => <u64 as ::capnp::introspect::Introspect>::introspect(),
3 => <u32 as ::capnp::introspect::Introspect>::introspect(),
4 => <crate::veilid_capnp::key256::Owned as ::capnp::introspect::Introspect>::introspect(),
5 => <crate::veilid_capnp::signature512::Owned as ::capnp::introspect::Introspect>::introspect(),
4 => <u64 as ::capnp::introspect::Introspect>::introspect(),
5 => <crate::veilid_capnp::key256::Owned as ::capnp::introspect::Introspect>::introspect(),
6 => <crate::veilid_capnp::signature512::Owned as ::capnp::introspect::Introspect>::introspect(),
_ => panic!("invalid field index {}", index),
}
}
@ -14127,7 +14155,7 @@ pub mod operation_watch_value_q {
nonunion_members: NONUNION_MEMBERS,
members_by_discriminant: MEMBERS_BY_DISCRIMINANT,
};
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3,4,5];
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3,4,5,6];
pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[];
pub const TYPE_ID: u64 = 0xf9a5_a6c5_47b9_b228;
}
@ -14206,11 +14234,15 @@ pub mod operation_watch_value_a {
pub fn has_peers(&self) -> bool {
!self.reader.get_pointer_field(0).is_null()
}
#[inline]
pub fn get_watch_id(self) -> u64 {
self.reader.get_data_field::<u64>(1)
}
}
pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> }
impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> {
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 1, pointers: 1 };
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 2, pointers: 1 };
}
impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> {
const TYPE_ID: u64 = _private::TYPE_ID;
@ -14284,6 +14316,14 @@ pub mod operation_watch_value_a {
pub fn has_peers(&self) -> bool {
!self.builder.is_pointer_field_null(0)
}
#[inline]
pub fn get_watch_id(self) -> u64 {
self.builder.get_data_field::<u64>(1)
}
#[inline]
pub fn set_watch_id(&mut self, value: u64) {
self.builder.set_data_field::<u64>(1, value);
}
}
pub struct Pipeline { _typeless: ::capnp::any_pointer::Pipeline }
@ -14295,17 +14335,17 @@ pub mod operation_watch_value_a {
impl Pipeline {
}
mod _private {
pub static ENCODED_NODE: [::capnp::Word; 54] = [
pub static ENCODED_NODE: [::capnp::Word; 69] = [
::capnp::word(0, 0, 0, 0, 5, 0, 6, 0),
::capnp::word(147, 168, 75, 6, 183, 202, 38, 167),
::capnp::word(19, 0, 0, 0, 1, 0, 1, 0),
::capnp::word(19, 0, 0, 0, 1, 0, 2, 0),
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(21, 0, 0, 0, 66, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 119, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 175, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(112, 114, 111, 116, 111, 47, 118, 101),
@ -14314,21 +14354,28 @@ pub mod operation_watch_value_a {
::capnp::word(116, 105, 111, 110, 87, 97, 116, 99),
::capnp::word(104, 86, 97, 108, 117, 101, 65, 0),
::capnp::word(0, 0, 0, 0, 1, 0, 1, 0),
::capnp::word(8, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(12, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(41, 0, 0, 0, 90, 0, 0, 0),
::capnp::word(69, 0, 0, 0, 90, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(40, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(52, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(68, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(80, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(1, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(49, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(77, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(44, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(72, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(72, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(100, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(2, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(97, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(92, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(104, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(101, 120, 112, 105, 114, 97, 116, 105),
::capnp::word(111, 110, 0, 0, 0, 0, 0, 0),
::capnp::word(9, 0, 0, 0, 0, 0, 0, 0),
@ -14350,11 +14397,20 @@ pub mod operation_watch_value_a {
::capnp::word(14, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(119, 97, 116, 99, 104, 73, 100, 0),
::capnp::word(9, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(9, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
];
pub fn get_field_types(index: u16) -> ::capnp::introspect::Type {
match index {
0 => <u64 as ::capnp::introspect::Introspect>::introspect(),
1 => <::capnp::struct_list::Owned<crate::veilid_capnp::peer_info::Owned> as ::capnp::introspect::Introspect>::introspect(),
2 => <u64 as ::capnp::introspect::Introspect>::introspect(),
_ => panic!("invalid field index {}", index),
}
}
@ -14366,7 +14422,7 @@ pub mod operation_watch_value_a {
nonunion_members: NONUNION_MEMBERS,
members_by_discriminant: MEMBERS_BY_DISCRIMINANT,
};
pub static NONUNION_MEMBERS : &[u16] = &[0,1];
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2];
pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[];
pub const TYPE_ID: u64 = 0xa726_cab7_064b_a893;
}
@ -14454,6 +14510,10 @@ pub mod operation_value_changed {
self.reader.get_data_field::<u32>(0)
}
#[inline]
pub fn get_watch_id(self) -> u64 {
self.reader.get_data_field::<u64>(1)
}
#[inline]
pub fn get_value(self) -> ::capnp::Result<crate::veilid_capnp::signed_value_data::Reader<'a>> {
::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(2), ::core::option::Option::None)
}
@ -14465,7 +14525,7 @@ pub mod operation_value_changed {
pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> }
impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> {
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 1, pointers: 3 };
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 2, pointers: 3 };
}
impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> {
const TYPE_ID: u64 = _private::TYPE_ID;
@ -14556,6 +14616,14 @@ pub mod operation_value_changed {
self.builder.set_data_field::<u32>(0, value);
}
#[inline]
pub fn get_watch_id(self) -> u64 {
self.builder.get_data_field::<u64>(1)
}
#[inline]
pub fn set_watch_id(&mut self, value: u64) {
self.builder.set_data_field::<u64>(1, value);
}
#[inline]
pub fn get_value(self) -> ::capnp::Result<crate::veilid_capnp::signed_value_data::Builder<'a>> {
::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(2), ::core::option::Option::None)
}
@ -14588,17 +14656,17 @@ pub mod operation_value_changed {
}
}
mod _private {
pub static ENCODED_NODE: [::capnp::Word; 84] = [
pub static ENCODED_NODE: [::capnp::Word; 99] = [
::capnp::word(0, 0, 0, 0, 5, 0, 6, 0),
::capnp::word(246, 27, 204, 216, 189, 158, 197, 209),
::capnp::word(19, 0, 0, 0, 1, 0, 1, 0),
::capnp::word(19, 0, 0, 0, 1, 0, 2, 0),
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(3, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(21, 0, 0, 0, 74, 1, 0, 0),
::capnp::word(41, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(37, 0, 0, 0, 231, 0, 0, 0),
::capnp::word(37, 0, 0, 0, 31, 1, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(112, 114, 111, 116, 111, 47, 118, 101),
@ -14608,35 +14676,42 @@ pub mod operation_value_changed {
::capnp::word(101, 67, 104, 97, 110, 103, 101, 100),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 1, 0, 1, 0),
::capnp::word(16, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(20, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(97, 0, 0, 0, 34, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(92, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(104, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(1, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(101, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(96, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(124, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(2, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(121, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(116, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(128, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(3, 0, 0, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 3, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(125, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(125, 0, 0, 0, 34, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(120, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(132, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(1, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(129, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(124, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(152, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(2, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(149, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(144, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(156, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(3, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 3, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(153, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(148, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(160, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(4, 0, 0, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 4, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(157, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(152, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(164, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(107, 101, 121, 0, 0, 0, 0, 0),
::capnp::word(16, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(41, 27, 230, 241, 169, 103, 213, 226),
@ -14665,6 +14740,14 @@ pub mod operation_value_changed {
::capnp::word(8, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(119, 97, 116, 99, 104, 73, 100, 0),
::capnp::word(9, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(9, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(118, 97, 108, 117, 101, 0, 0, 0),
::capnp::word(16, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(61, 42, 159, 22, 111, 65, 183, 180),
@ -14679,7 +14762,8 @@ pub mod operation_value_changed {
0 => <crate::veilid_capnp::typed_key::Owned as ::capnp::introspect::Introspect>::introspect(),
1 => <::capnp::struct_list::Owned<crate::veilid_capnp::subkey_range::Owned> as ::capnp::introspect::Introspect>::introspect(),
2 => <u32 as ::capnp::introspect::Introspect>::introspect(),
3 => <crate::veilid_capnp::signed_value_data::Owned as ::capnp::introspect::Introspect>::introspect(),
3 => <u64 as ::capnp::introspect::Introspect>::introspect(),
4 => <crate::veilid_capnp::signed_value_data::Owned as ::capnp::introspect::Introspect>::introspect(),
_ => panic!("invalid field index {}", index),
}
}
@ -14691,7 +14775,7 @@ pub mod operation_value_changed {
nonunion_members: NONUNION_MEMBERS,
members_by_discriminant: MEMBERS_BY_DISCRIMINANT,
};
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3];
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3,4];
pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[];
pub const TYPE_ID: u64 = 0xd1c5_9ebd_d8cc_1bf6;
}
@ -21293,4 +21377,4 @@ pub mod operation {
}
}
//BUILDHASH:ab4fd70d40c9e543f799ce326dd41c61c7ea78132fb53f164156073d9786a9f6
//BUILDHASH:539ec27eab88af2af5785cd8c1145478f30cd3fe2c08cd0ec7f18d2f4f3c2128

View File

@ -8,6 +8,7 @@ pub(in crate::rpc_processor) struct RPCOperationValueChanged {
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
watch_id: u64,
value: SignedValueData,
}
@ -17,6 +18,7 @@ impl RPCOperationValueChanged {
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
watch_id: u64,
value: SignedValueData,
) -> Result<Self, RPCError> {
// Needed because RangeSetBlaze uses different types here all the time
@ -31,12 +33,16 @@ impl RPCOperationValueChanged {
key,
subkeys,
count,
watch_id,
value,
})
}
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
// validation must be done by storage manager as this is more complicated
if self.watch_id == 0 {
return Err(RPCError::protocol("ValueChanged does not have a valid id"));
}
// further validation must be done by storage manager as this is more complicated
Ok(())
}
@ -55,14 +61,25 @@ impl RPCOperationValueChanged {
self.count
}
#[allow(dead_code)]
pub fn watch_id(&self) -> u64 {
self.watch_id
}
#[allow(dead_code)]
pub fn value(&self) -> &SignedValueData {
&self.value
}
#[allow(dead_code)]
pub fn destructure(self) -> (TypedKey, ValueSubkeyRangeSet, u32, SignedValueData) {
(self.key, self.subkeys, self.count, self.value)
pub fn destructure(self) -> (TypedKey, ValueSubkeyRangeSet, u32, u64, SignedValueData) {
(
self.key,
self.subkeys,
self.count,
self.watch_id,
self.value,
)
}
pub fn decode(
@ -93,11 +110,14 @@ impl RPCOperationValueChanged {
}
let count = reader.get_count();
let v_reader = reader.get_value().map_err(RPCError::protocol)?;
let watch_id = reader.get_watch_id();
let value = decode_signed_value_data(&v_reader)?;
Ok(Self {
key,
subkeys,
count,
watch_id,
value,
})
}
@ -121,6 +141,7 @@ impl RPCOperationValueChanged {
}
builder.set_count(self.count);
builder.set_watch_id(self.watch_id);
let mut v_builder = builder.reborrow().init_value();
encode_signed_value_data(&self.value, &mut v_builder)?;

View File

@ -9,6 +9,7 @@ pub(in crate::rpc_processor) struct RPCOperationWatchValueQ {
subkeys: ValueSubkeyRangeSet,
expiration: u64,
count: u32,
watch_id: Option<u64>,
watcher: PublicKey,
signature: Signature,
}
@ -20,6 +21,7 @@ impl RPCOperationWatchValueQ {
subkeys: ValueSubkeyRangeSet,
expiration: u64,
count: u32,
watch_id: Option<u64>,
watcher: KeyPair,
vcrypto: CryptoSystemVersion,
) -> Result<Self, RPCError> {
@ -31,7 +33,7 @@ impl RPCOperationWatchValueQ {
return Err(RPCError::protocol("WatchValueQ subkeys length too long"));
}
let signature_data = Self::make_signature_data(&key, &subkeys, expiration, count);
let signature_data = Self::make_signature_data(&key, &subkeys, expiration, count, watch_id);
let signature = vcrypto
.sign(&watcher.key, &watcher.secret, &signature_data)
.map_err(RPCError::protocol)?;
@ -41,6 +43,7 @@ impl RPCOperationWatchValueQ {
subkeys,
expiration,
count,
watch_id,
watcher: watcher.key,
signature,
})
@ -52,6 +55,7 @@ impl RPCOperationWatchValueQ {
subkeys: &ValueSubkeyRangeSet,
expiration: u64,
count: u32,
watch_id: Option<u64>,
) -> Vec<u8> {
// Needed because RangeSetBlaze uses different types here all the time
#[allow(clippy::unnecessary_cast)]
@ -66,6 +70,9 @@ impl RPCOperationWatchValueQ {
}
sig_data.extend_from_slice(&expiration.to_le_bytes());
sig_data.extend_from_slice(&count.to_le_bytes());
if let Some(watch_id) = watch_id {
sig_data.extend_from_slice(&watch_id.to_le_bytes());
}
sig_data
}
@ -74,8 +81,13 @@ impl RPCOperationWatchValueQ {
return Err(RPCError::protocol("unsupported cryptosystem"));
};
let sig_data =
Self::make_signature_data(&self.key, &self.subkeys, self.expiration, self.count);
let sig_data = Self::make_signature_data(
&self.key,
&self.subkeys,
self.expiration,
self.count,
self.watch_id,
);
vcrypto
.verify(&self.watcher, &sig_data, &self.signature)
.map_err(RPCError::protocol)?;
@ -102,6 +114,11 @@ impl RPCOperationWatchValueQ {
self.count
}
#[allow(dead_code)]
pub fn watch_id(&self) -> Option<u64> {
self.watch_id
}
#[allow(dead_code)]
pub fn watcher(&self) -> &PublicKey {
&self.watcher
@ -118,6 +135,7 @@ impl RPCOperationWatchValueQ {
ValueSubkeyRangeSet,
u64,
u32,
Option<u64>,
PublicKey,
Signature,
) {
@ -126,6 +144,7 @@ impl RPCOperationWatchValueQ {
self.subkeys,
self.expiration,
self.count,
self.watch_id,
self.watcher,
self.signature,
)
@ -159,6 +178,11 @@ impl RPCOperationWatchValueQ {
let expiration = reader.get_expiration();
let count = reader.get_count();
let watch_id = if reader.get_watch_id() == 0 {
Some(reader.get_watch_id())
} else {
None
};
let w_reader = reader.get_watcher().map_err(RPCError::protocol)?;
let watcher = decode_key256(&w_reader);
@ -171,6 +195,7 @@ impl RPCOperationWatchValueQ {
subkeys,
expiration,
count,
watch_id,
watcher,
signature,
})
@ -196,6 +221,7 @@ impl RPCOperationWatchValueQ {
}
builder.set_expiration(self.expiration);
builder.set_count(self.count);
builder.set_watch_id(self.watch_id.unwrap_or(0u64));
let mut w_builder = builder.reborrow().init_watcher();
encode_key256(&self.watcher, &mut w_builder);
@ -211,18 +237,26 @@ impl RPCOperationWatchValueQ {
pub(in crate::rpc_processor) struct RPCOperationWatchValueA {
expiration: u64,
peers: Vec<PeerInfo>,
watch_id: u64,
}
impl RPCOperationWatchValueA {
#[allow(dead_code)]
pub fn new(expiration: u64, peers: Vec<PeerInfo>) -> Result<Self, RPCError> {
pub fn new(expiration: u64, peers: Vec<PeerInfo>, watch_id: u64) -> Result<Self, RPCError> {
if peers.len() > MAX_WATCH_VALUE_A_PEERS_LEN {
return Err(RPCError::protocol("WatchValueA peers length too long"));
}
Ok(Self { expiration, peers })
Ok(Self {
expiration,
peers,
watch_id,
})
}
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
if self.watch_id == 0 {
return Err(RPCError::protocol("WatchValueA does not have a valid id"));
}
PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone());
Ok(())
}
@ -236,8 +270,12 @@ impl RPCOperationWatchValueA {
&self.peers
}
#[allow(dead_code)]
pub fn destructure(self) -> (u64, Vec<PeerInfo>) {
(self.expiration, self.peers)
pub fn watch_id(&self) -> u64 {
self.watch_id
}
#[allow(dead_code)]
pub fn destructure(self) -> (u64, Vec<PeerInfo>, u64) {
(self.expiration, self.peers, self.watch_id)
}
pub fn decode(
@ -258,8 +296,13 @@ impl RPCOperationWatchValueA {
let peer_info = decode_peer_info(&p)?;
peers.push(peer_info);
}
let watch_id = reader.get_watch_id();
Ok(Self { expiration, peers })
Ok(Self {
expiration,
peers,
watch_id,
})
}
pub fn encode(
&self,
@ -277,6 +320,7 @@ impl RPCOperationWatchValueA {
let mut pi_builder = peers_builder.reborrow().get(i as u32);
encode_peer_info(peer, &mut pi_builder)?;
}
builder.set_watch_id(self.watch_id);
Ok(())
}

View File

@ -14,6 +14,7 @@ impl RPCProcessor {
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
watch_id: u64,
value: SignedValueData,
) -> RPCNetworkResult<()> {
// Ensure destination is never using a safety route
@ -22,7 +23,7 @@ impl RPCProcessor {
"Never send value changes over safety routes",
));
}
let value_changed = RPCOperationValueChanged::new(key, subkeys, count, value)?;
let value_changed = RPCOperationValueChanged::new(key, subkeys, count, watch_id, value)?;
let statement =
RPCStatement::new(RPCStatementDetail::ValueChanged(Box::new(value_changed)));
@ -33,7 +34,7 @@ impl RPCProcessor {
pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Get the statement
let (_, _, _, kind) = msg.operation.destructure();
let (key, subkeys, count, value) = match kind {
let (key, subkeys, count, watch_id, value) = match kind {
RPCOperationKind::Statement(s) => match s.destructure() {
RPCStatementDetail::ValueChanged(s) => s.destructure(),
_ => panic!("not a value changed statement"),
@ -69,7 +70,8 @@ impl RPCProcessor {
);
let debug_string_stmt = format!(
"IN <== ValueChanged({} #{:?}+{}{}) from {} <= {}",
"IN <== ValueChanged(id={} {} #{:?}+{}{}) from {} <= {}",
watch_id,
key,
subkeys,
count,
@ -84,7 +86,14 @@ impl RPCProcessor {
// Save the subkey, creating a new record if necessary
let storage_manager = self.storage_manager();
storage_manager
.inbound_value_changed(key, subkeys, count, Arc::new(value), inbound_node_id)
.inbound_value_changed(
key,
subkeys,
count,
Arc::new(value),
inbound_node_id,
watch_id,
)
.await
.map_err(RPCError::internal)?;

View File

@ -4,6 +4,7 @@ use super::*;
pub struct WatchValueAnswer {
pub expiration_ts: Timestamp,
pub peers: Vec<PeerInfo>,
pub watch_id: u64,
}
impl RPCProcessor {
@ -22,6 +23,7 @@ impl RPCProcessor {
ret.peers.len
),err)
)]
#[allow(clippy::too_many_arguments)]
pub async fn rpc_call_watch_value(
self,
dest: Destination,
@ -30,6 +32,7 @@ impl RPCProcessor {
expiration: Timestamp,
count: u32,
watcher: KeyPair,
watch_id: Option<u64>,
) -> RPCNetworkResult<Answer<WatchValueAnswer>> {
// Ensure destination never has a private route
// and get the target noderef so we can validate the response
@ -48,8 +51,18 @@ impl RPCProcessor {
};
let debug_string = format!(
"OUT ==> WatchValueQ({} {}@{}+{}) => {} (watcher={})",
key, subkeys, expiration, count, dest, watcher.key
"OUT ==> WatchValueQ({} {} {}@{}+{}) => {} (watcher={}) ",
if let Some(watch_id) = watch_id {
format!("id={} ", watch_id)
} else {
"".to_owned()
},
key,
subkeys,
expiration,
count,
dest,
watcher.key
);
// Send the watchvalue question
@ -58,6 +71,7 @@ impl RPCProcessor {
subkeys.clone(),
expiration.as_u64(),
count,
watch_id,
watcher,
vcrypto.clone(),
)?;
@ -90,12 +104,13 @@ impl RPCProcessor {
},
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let (expiration, peers) = watch_value_a.destructure();
let question_watch_id = watch_id;
let (expiration, peers, watch_id) = watch_value_a.destructure();
#[cfg(feature = "debug-dht")]
{
let debug_string_answer = format!(
"OUT <== WatchValueA({} #{:?}@{} peers={}) <= {}",
"OUT <== WatchValueA(id={} {} #{:?}@{} peers={}) <= {}",
watch_id,
key,
subkeys,
expiration,
@ -112,6 +127,16 @@ impl RPCProcessor {
log_rpc!(debug "Peers: {:#?}", peer_ids);
}
// Validate returned answer watch id is the same as the question watch id if it exists
if let Some(question_watch_id) = question_watch_id {
if question_watch_id != watch_id {
return Ok(NetworkResult::invalid_message(format!(
"answer watch id={} doesn't match question watch id={}",
watch_id, question_watch_id,
)));
}
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
Ok(v) => v,
@ -139,6 +164,7 @@ impl RPCProcessor {
WatchValueAnswer {
expiration_ts: Timestamp::new(expiration),
peers,
watch_id,
},
)))
}
@ -185,7 +211,8 @@ impl RPCProcessor {
};
// Destructure
let (key, subkeys, expiration, count, watcher, _signature) = watch_value_q.destructure();
let (key, subkeys, expiration, count, watch_id, watcher, _signature) =
watch_value_q.destructure();
// Get target for ValueChanged notifications
let dest = network_result_try!(self.get_respond_to_destination(&msg));
@ -194,7 +221,12 @@ impl RPCProcessor {
#[cfg(feature = "debug-dht")]
{
let debug_string = format!(
"IN <=== WatchValueQ({} {}@{}+{}) <== {} (watcher={})",
"IN <=== WatchValueQ({}{} {}@{}+{}) <== {} (watcher={})",
if let Some(watch_id) = watch_id {
format!("id={} ", watch_id)
} else {
"".to_owned()
},
key,
subkeys,
expiration,
@ -216,13 +248,13 @@ impl RPCProcessor {
let c = self.config.get();
c.network.dht.set_value_count as usize
};
let ret_expiration = if closer_to_key_peers.len() >= set_value_count {
let (ret_expiration, ret_watch_id) = if closer_to_key_peers.len() >= set_value_count {
// Not close enough
#[cfg(feature = "debug-dht")]
log_rpc!(debug "Not close enough for watch value");
Timestamp::default()
(Timestamp::default(), 0)
} else {
// Close enough, lets watch it
@ -234,6 +266,7 @@ impl RPCProcessor {
subkeys.clone(),
Timestamp::new(expiration),
count,
watch_id,
target,
watcher
)
@ -244,7 +277,8 @@ impl RPCProcessor {
#[cfg(feature = "debug-dht")]
{
let debug_string_answer = format!(
"IN ===> WatchValueA({} #{} expiration={} peers={}) ==> {}",
"IN ===> WatchValueA(id={} {} #{} expiration={} peers={}) ==> {}",
ret_watch_id,
key,
subkeys,
ret_expiration,
@ -263,6 +297,7 @@ impl RPCProcessor {
} else {
vec![]
},
ret_watch_id,
)?;
// Send GetValue answer

View File

@ -17,14 +17,16 @@ impl StorageManager {
}
pub(crate) async fn debug_opened_records(&self) -> String {
let inner = self.inner.lock().await;
format!(
"{:#?}",
inner
.opened_records
.keys()
.copied()
.collect::<Vec<TypedKey>>()
)
let mut out = "[\n".to_owned();
for (k, v) in &inner.opened_records {
let writer = if let Some(w) = v.writer() {
w.to_string()
} else {
"".to_owned()
};
out += &format!(" {} {},\n", k, writer);
}
format!("{}]\n", out)
}
pub(crate) async fn purge_local_records(&self, reclaim: Option<usize>) -> String {

View File

@ -43,6 +43,7 @@ struct ValueChangedInfo {
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
watch_id: u64,
value: Arc<SignedValueData>,
}
@ -311,6 +312,7 @@ impl StorageManager {
0,
opened_record.safety_selection(),
opened_record.writer().cloned(),
Some(active_watch.id),
Some(active_watch.watch_node),
)
.await?;
@ -582,14 +584,16 @@ impl StorageManager {
subkeys
};
// Get the safety selection and the writer we opened this record with
let (safety_selection, opt_writer, opt_watch_node) = {
// Get the safety selection and the writer we opened this record
// and whatever active watch id and watch node we may have in case this is a watch update
let (safety_selection, opt_writer, opt_watch_id, opt_watch_node) = {
let Some(opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
};
(
opened_record.safety_selection(),
opened_record.writer().cloned(),
opened_record.active_watch().map(|aw| aw.id),
opened_record.active_watch().map(|aw| aw.watch_node.clone()),
)
};
@ -613,6 +617,7 @@ impl StorageManager {
count,
safety_selection,
opt_writer,
opt_watch_id,
opt_watch_node,
)
.await?;
@ -663,6 +668,7 @@ impl StorageManager {
// Keep a record of the watch
opened_record.set_active_watch(ActiveWatch {
id: owvresult.watch_id,
expiration_ts,
watch_node: owvresult.watch_node,
opt_value_changed_route: owvresult.opt_value_changed_route,
@ -744,7 +750,7 @@ impl StorageManager {
.map_err(VeilidAPIError::from)?;
network_result_value_or_log!(rpc_processor
.rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, (*vc.value).clone())
.rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, vc.watch_id, (*vc.value).clone() )
.await
.map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {});

View File

@ -25,6 +25,7 @@ where
/// An individual watch
#[derive(Debug, Clone)]
struct WatchedRecordWatch {
id: u64,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
@ -757,19 +758,24 @@ where
Ok(())
}
/// Add a record watch for changes
/// Add an inbound record watch for changes
#[allow(clippy::too_many_arguments)]
pub async fn watch_record(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
mut expiration: Timestamp,
count: u32,
watch_id: Option<u64>,
target: Target,
watcher: CryptoKey,
) -> VeilidAPIResult<Option<Timestamp>> {
watcher: PublicKey,
) -> VeilidAPIResult<Option<(Timestamp, u64)>> {
// If subkeys is empty or count is zero then we're cancelling a watch completely
if subkeys.is_empty() || count == 0 {
return self.cancel_watch(key, target, watcher).await;
if let Some(watch_id) = watch_id {
return self.cancel_watch(key, watch_id, watcher).await;
}
apibail_internal!("shouldn't have let a None watch id get here");
}
// See if expiration timestamp is too far in the future or not enough in the future
@ -795,26 +801,81 @@ where
return Ok(None);
};
// Generate a record-unique watch id > 0 if one is not specified
let rtk = RecordTableKey { key };
let mut new_watch = false;
let watch_id = watch_id.unwrap_or_else(|| {
new_watch = true;
let mut id = 0;
while id == 0 {
id = get_random_u64();
}
if let Some(watched_record) = self.watched_records.get_mut(&rtk) {
// Make sure it doesn't match any other id (unlikely, but lets be certain)
'x: loop {
for w in &mut watched_record.watchers {
if w.id == id {
loop {
id = id.overflowing_add(1).0;
if id != 0 {
break;
}
}
continue 'x;
}
}
break;
}
}
id
});
// See if we are updating an existing watch
// with the watcher matched on target
let mut watch_count = 0;
let rtk = RecordTableKey { key };
if let Some(watch) = self.watched_records.get_mut(&rtk) {
for w in &mut watch.watchers {
if w.watcher == watcher {
let mut target_watch_count = 0;
if let Some(watched_record) = self.watched_records.get_mut(&rtk) {
for w in &mut watched_record.watchers {
// Total up the number of watches for this key
// If the watcher is a member of the schema, then consider the total per-watcher key
// If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-record key
if !is_member || w.watcher == watcher {
watch_count += 1;
// Only one watch for an anonymous watcher
// Allow members to have one watch per target
if !is_member || w.target == target {
// For any watch, if the target matches our also tally that separately
// If the watcher is a member of the schema, then consider the total per-target-per-watcher key
// If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-target-per-record key
if w.target == target {
target_watch_count += 1;
}
}
// If this is not a new watch and the watch id doesn't match, then we're not updating
if !new_watch && w.id == watch_id {
// Updating an existing watch
// You can change a watcher key, or target via this update
// as well as the subkey range, expiration and count of the watch
w.watcher = watcher;
w.target = target;
w.subkeys = subkeys;
w.expiration = expiration;
w.count = count;
return Ok(Some(expiration));
return Ok(Some((expiration, watch_id)));
}
}
}
// Only proceed here if this is a new watch
if !new_watch {
// Not a new watch
return Ok(None);
}
// For members, no more than one watch per target per watcher per record
// For anonymous, no more than one watch per target per record
if target_watch_count > 0 {
// Too many watches
return Ok(None);
}
// Adding a new watcher to a watch
@ -827,6 +888,7 @@ where
}
} else {
// Public watch
// No more than one
if watch_count >= self.limits.public_watch_limit {
// Too many watches
return Ok(None);
@ -836,6 +898,7 @@ where
// Ok this is an acceptable new watch, add it
let watch = self.watched_records.entry(rtk).or_default();
watch.watchers.push(WatchedRecordWatch {
id: watch_id,
subkeys,
expiration,
count,
@ -843,45 +906,35 @@ where
watcher,
changed: ValueSubkeyRangeSet::new(),
});
Ok(Some(expiration))
Ok(Some((expiration, watch_id)))
}
/// Add a record watch for changes
/// Clear a specific watch for a record
async fn cancel_watch(
&mut self,
key: TypedKey,
target: Target,
watcher: CryptoKey,
) -> VeilidAPIResult<Option<Timestamp>> {
// Get the record being watched
let Some(is_member) = self.with_record(key, |record| {
// Check if the watcher specified is a schema member
let schema = record.schema();
(*record.owner()) == watcher || schema.is_member(&watcher)
}) else {
// Record not found
return Ok(None);
};
watch_id: u64,
watcher: PublicKey,
) -> VeilidAPIResult<Option<(Timestamp, u64)>> {
if watch_id == 0 {
apibail_internal!("should not have let a a zero watch id get here");
}
// See if we are cancelling an existing watch
// with the watcher matched on target
let rtk = RecordTableKey { key };
let mut is_empty = false;
let mut ret_timestamp = None;
let mut ret = None;
if let Some(watch) = self.watched_records.get_mut(&rtk) {
let mut dead_watcher = None;
for (wn, w) in watch.watchers.iter_mut().enumerate() {
if w.watcher == watcher {
// Only one watch for an anonymous watcher
// Allow members to have one watch per target
if !is_member || w.target == target {
// Must match the watch id and the watcher key to cancel
if w.id == watch_id && w.watcher == watcher {
// Canceling an existing watch
dead_watcher = Some(wn);
ret_timestamp = Some(w.expiration);
ret = Some((w.expiration, watch_id));
break;
}
}
}
if let Some(dw) = dead_watcher {
watch.watchers.remove(dw);
if watch.watchers.is_empty() {
@ -893,7 +946,7 @@ where
self.watched_records.remove(&rtk);
}
Ok(ret_timestamp)
Ok(ret)
}
/// Move watches from one store to another
@ -921,6 +974,7 @@ where
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
watch_id: u64,
}
let mut evcis = vec![];
@ -953,6 +1007,7 @@ where
key: rtk.key,
subkeys,
count,
watch_id: w.id,
});
}
@ -996,6 +1051,7 @@ where
key: evci.key,
subkeys: evci.subkeys,
count: evci.count,
watch_id: evci.watch_id,
value,
});
}

View File

@ -537,21 +537,23 @@ impl StorageManagerInner {
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_watch_local_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
watch_id: Option<u64>,
target: Target,
watcher: CryptoKey,
) -> VeilidAPIResult<Option<Timestamp>> {
watcher: PublicKey,
) -> VeilidAPIResult<Option<(Timestamp, u64)>> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
local_record_store
.watch_record(key, subkeys, expiration, count, target, watcher)
.watch_record(key, subkeys, expiration, count, watch_id, target, watcher)
.await
}
@ -612,21 +614,23 @@ impl StorageManagerInner {
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_watch_remote_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
watch_id: Option<u64>,
target: Target,
watcher: CryptoKey,
) -> VeilidAPIResult<Option<Timestamp>> {
watcher: PublicKey,
) -> VeilidAPIResult<Option<(Timestamp, u64)>> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
remote_record_store
.watch_record(key, subkeys, expiration, count, target, watcher)
.watch_record(key, subkeys, expiration, count, watch_id, target, watcher)
.await
}

View File

@ -2,6 +2,8 @@ use super::*;
#[derive(Clone, Debug)]
pub(in crate::storage_manager) struct ActiveWatch {
/// The watch id returned from the watch node
pub id: u64,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which node accepted the watch

View File

@ -11,6 +11,8 @@ struct OutboundWatchValueContext {
pub(super) struct OutboundWatchValueResult {
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// What watch id was returned
pub watch_id: u64,
/// Which node accepted the watch
pub watch_node: NodeRef,
/// Which private route is responsible for receiving ValueChanged notifications
@ -29,6 +31,7 @@ impl StorageManager {
count: u32,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
opt_watch_id: Option<u64>,
opt_watch_node: Option<NodeRef>,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
let routing_table = rpc_processor.routing_table();
@ -79,7 +82,8 @@ impl StorageManager {
subkeys,
expiration,
count,
watcher
watcher,
opt_watch_id
)
.await?
);
@ -88,14 +92,15 @@ impl StorageManager {
if wva.answer.expiration_ts.as_u64() > 0 {
if count > 0 {
// If we asked for a nonzero notification count, then this is an accepted watch
log_stor!(debug "Watch accepted: expiration_ts={}", debug_ts(wva.answer.expiration_ts.as_u64()));
log_stor!(debug "Watch accepted: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
} else {
// If we asked for a zero notification count, then this is a cancelled watch
log_stor!(debug "Watch cancelled");
log_stor!(debug "Watch cancelled: id={}", wva.answer.watch_id);
}
let mut ctx = context.lock();
ctx.opt_watch_value_result = Some(OutboundWatchValueResult {
expiration_ts: wva.answer.expiration_ts,
watch_id: wva.answer.watch_id,
watch_node: next_node.clone(),
opt_value_changed_route: wva.reply_private_route,
});
@ -176,35 +181,55 @@ impl StorageManager {
}
/// Handle a received 'Watch Value' query
#[allow(clippy::too_many_arguments)]
pub async fn inbound_watch_value(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
watch_id: Option<u64>,
target: Target,
watcher: CryptoKey,
) -> VeilidAPIResult<NetworkResult<Timestamp>> {
watcher: PublicKey,
) -> VeilidAPIResult<NetworkResult<(Timestamp, u64)>> {
let mut inner = self.lock().await?;
// Validate input
if (subkeys.is_empty() || count == 0) && (watch_id.unwrap_or_default() == 0) {
// Can't cancel a watch without a watch id
return VeilidAPIResult::Ok(NetworkResult::invalid_message(
"can't cancel watch without id",
));
}
// See if this is a remote or local value
let (_is_local, opt_expiration_ts) = {
let (_is_local, opt_ret) = {
// See if the subkey we are watching has a local value
let opt_expiration_ts = inner
.handle_watch_local_value(key, subkeys.clone(), expiration, count, target, watcher)
let opt_ret = inner
.handle_watch_local_value(
key,
subkeys.clone(),
expiration,
count,
watch_id,
target,
watcher,
)
.await?;
if opt_expiration_ts.is_some() {
(true, opt_expiration_ts)
if opt_ret.is_some() {
(true, opt_ret)
} else {
// See if the subkey we are watching is a remote value
let opt_expiration_ts = inner
.handle_watch_remote_value(key, subkeys, expiration, count, target, watcher)
let opt_ret = inner
.handle_watch_remote_value(
key, subkeys, expiration, count, watch_id, target, watcher,
)
.await?;
(false, opt_expiration_ts)
(false, opt_ret)
}
};
Ok(NetworkResult::value(opt_expiration_ts.unwrap_or_default()))
Ok(NetworkResult::value(opt_ret.unwrap_or_default()))
}
/// Handle a received 'Value Changed' statement
@ -215,26 +240,33 @@ impl StorageManager {
mut count: u32,
value: Arc<SignedValueData>,
inbound_node_id: TypedKey,
watch_id: u64,
) -> VeilidAPIResult<()> {
// Update local record store with new value
let (res, opt_update_callback) = {
let mut inner = self.lock().await?;
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
// Don't process update if the record is closed
return Ok(());
};
let Some(mut active_watch) = opened_record.active_watch() else {
// No active watch means no callback
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
return Ok(());
};
// No active watch means no callback
let Some(mut active_watch) = opened_record.active_watch() else {
return Ok(());
};
// If the watch id doesn't match, then don't process this
if active_watch.id != watch_id {
return Ok(());
}
// If the reporting node is not the same as our watch, don't process the value change
if !active_watch
.watch_node
.node_ids()
.contains(&inbound_node_id)
{
// If the reporting node is not the same as our watch, don't process the value change
return Ok(());
}

View File

@ -354,7 +354,7 @@ impl RoutingContext {
/// continue to report changes via the callback. Nodes that agree to doing watches will be put on our 'ping' list to ensure they are still around
/// otherwise the watch will be cancelled and will have to be re-watched.
///
/// There is only one watch permitted per record. If a change to a watch is desired, the first one must will be overwritten.
/// There is only one watch permitted per record. If a change to a watch is desired, the first one will be overwritten.
/// * `key` is the record key to watch. it must first be opened for reading or writing.
/// * `subkeys` is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
/// * `expiration` is the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.