diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 84c7c541..f94f669b 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -346,7 +346,7 @@ struct OperationSetValueQ @0xbac06191ff8bdbc5 { } struct OperationSetValueA @0x9378d0732dc95be2 { - set @0 :Bool; # true if the set was close enough to be set + set @0 :Bool; # true if the set was accepted value @1 :SignedValueData; # optional: the current value at the key if the set seq number was lower or equal to what was there before peers @2 :List(PeerInfo); # returned 'closer peer' information on either success or failure } @@ -356,15 +356,16 @@ struct OperationWatchValueQ @0xf9a5a6c547b9b228 { subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges), if empty, watch everything expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max) count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous) - watchId @4 :UInt64; # optional: (0 = unspecified) existing watch id to update or cancel unless this is a new watch + watchId @4 :UInt64; # if 0, request a new watch. if >0, existing watch id watcher @5 :PublicKey; # the watcher performing the watch, can be the owner or a schema member, or a generated anonymous watch keypair signature @6 :Signature; # signature of the watcher, signature covers: key, subkeys, expiration, count, watchId } struct OperationWatchValueA @0xa726cab7064ba893 { - expiration @0 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch was rejected). if watch is being cancelled (with count = 0), this will be the non-zero former expiration time. - peers @1 :List(PeerInfo); # returned list of other nodes to ask that could propagate watches - watchId @2 :UInt64; # random id for watch instance on this node + accepted @0 :Bool; # true if the watch was close enough to be accepted + expiration @1 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch was cancelled/dropped) + peers @2 :List(PeerInfo); # returned list of other nodes to ask that could propagate watches + watchId @3 :UInt64; # (0 = id not allocated if rejecting new watch) random id for watch instance on this node } struct OperationValueChanged @0xd1c59ebdd8cc1bf6 { diff --git a/veilid-core/proto/veilid_capnp.rs b/veilid-core/proto/veilid_capnp.rs index 9b484c10..f844bc9a 100644 --- a/veilid-core/proto/veilid_capnp.rs +++ b/veilid-core/proto/veilid_capnp.rs @@ -14223,8 +14223,12 @@ pub mod operation_watch_value_a { self.reader.total_size() } #[inline] + pub fn get_accepted(self) -> bool { + self.reader.get_bool_field(0) + } + #[inline] pub fn get_expiration(self) -> u64 { - self.reader.get_data_field::(0) + self.reader.get_data_field::(1) } #[inline] pub fn get_peers(self) -> ::capnp::Result<::capnp::struct_list::Reader<'a,crate::veilid_capnp::peer_info::Owned>> { @@ -14236,13 +14240,13 @@ pub mod operation_watch_value_a { } #[inline] pub fn get_watch_id(self) -> u64 { - self.reader.get_data_field::(1) + self.reader.get_data_field::(2) } } pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> } impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> { - const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 2, pointers: 1 }; + const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 3, pointers: 1 }; } impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> { const TYPE_ID: u64 = _private::TYPE_ID; @@ -14293,12 +14297,20 @@ pub mod operation_watch_value_a { self.builder.as_reader().total_size() } #[inline] + pub fn get_accepted(self) -> bool { + self.builder.get_bool_field(0) + } + #[inline] + pub fn set_accepted(&mut self, value: bool) { + self.builder.set_bool_field(0, value); + } + #[inline] pub fn get_expiration(self) -> u64 { - self.builder.get_data_field::(0) + self.builder.get_data_field::(1) } #[inline] pub fn set_expiration(&mut self, value: u64) { - self.builder.set_data_field::(0, value); + self.builder.set_data_field::(1, value); } #[inline] pub fn get_peers(self) -> ::capnp::Result<::capnp::struct_list::Builder<'a,crate::veilid_capnp::peer_info::Owned>> { @@ -14318,11 +14330,11 @@ pub mod operation_watch_value_a { } #[inline] pub fn get_watch_id(self) -> u64 { - self.builder.get_data_field::(1) + self.builder.get_data_field::(2) } #[inline] pub fn set_watch_id(&mut self, value: u64) { - self.builder.set_data_field::(1, value); + self.builder.set_data_field::(2, value); } } @@ -14335,17 +14347,17 @@ pub mod operation_watch_value_a { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 69] = [ + pub static ENCODED_NODE: [::capnp::Word; 85] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(147, 168, 75, 6, 183, 202, 38, 167), - ::capnp::word(19, 0, 0, 0, 1, 0, 2, 0), + ::capnp::word(19, 0, 0, 0, 1, 0, 3, 0), ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(21, 0, 0, 0, 66, 1, 0, 0), ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(33, 0, 0, 0, 175, 0, 0, 0), + ::capnp::word(33, 0, 0, 0, 231, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(112, 114, 111, 116, 111, 47, 118, 101), @@ -14354,28 +14366,44 @@ pub mod operation_watch_value_a { ::capnp::word(116, 105, 111, 110, 87, 97, 116, 99), ::capnp::word(104, 86, 97, 108, 117, 101, 65, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), - ::capnp::word(12, 0, 0, 0, 3, 0, 4, 0), + ::capnp::word(16, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(69, 0, 0, 0, 90, 0, 0, 0), + ::capnp::word(97, 0, 0, 0, 74, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(68, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(80, 0, 0, 0, 2, 0, 1, 0), - ::capnp::word(1, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(96, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(108, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(1, 0, 0, 0, 1, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 1, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(77, 0, 0, 0, 50, 0, 0, 0), + ::capnp::word(105, 0, 0, 0, 90, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(72, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(100, 0, 0, 0, 2, 0, 1, 0), - ::capnp::word(2, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(104, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(116, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(2, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(97, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(113, 0, 0, 0, 50, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(108, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(136, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(3, 0, 0, 0, 2, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 3, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(133, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(128, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(140, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(97, 99, 99, 101, 112, 116, 101, 100), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(1, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(1, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(92, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(104, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(101, 120, 112, 105, 114, 97, 116, 105), ::capnp::word(111, 110, 0, 0, 0, 0, 0, 0), ::capnp::word(9, 0, 0, 0, 0, 0, 0, 0), @@ -14408,9 +14436,10 @@ pub mod operation_watch_value_a { ]; pub fn get_field_types(index: u16) -> ::capnp::introspect::Type { match index { - 0 => ::introspect(), - 1 => <::capnp::struct_list::Owned as ::capnp::introspect::Introspect>::introspect(), - 2 => ::introspect(), + 0 => ::introspect(), + 1 => ::introspect(), + 2 => <::capnp::struct_list::Owned as ::capnp::introspect::Introspect>::introspect(), + 3 => ::introspect(), _ => panic!("invalid field index {}", index), } } @@ -14422,7 +14451,7 @@ pub mod operation_watch_value_a { nonunion_members: NONUNION_MEMBERS, members_by_discriminant: MEMBERS_BY_DISCRIMINANT, }; - pub static NONUNION_MEMBERS : &[u16] = &[0,1,2]; + pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3]; pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[]; pub const TYPE_ID: u64 = 0xa726_cab7_064b_a893; } @@ -21377,4 +21406,4 @@ pub mod operation { } } -//BUILDHASH:539ec27eab88af2af5785cd8c1145478f30cd3fe2c08cd0ec7f18d2f4f3c2128 +//BUILDHASH:2361d45ebb46feb1cecc71c1756fc90ff94487663e8d7211177e6df6e4033386 diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs b/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs index 9f8d5262..55c435d3 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs @@ -45,6 +45,8 @@ impl RPCOperationAppCallQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, Clone)] pub(in crate::rpc_processor) struct RPCOperationAppCallA { message: Vec, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs index e69997db..41c34248 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs @@ -38,6 +38,8 @@ impl RPCOperationCancelTunnelQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg(feature = "unstable-tunnels")] #[derive(Debug, Clone)] pub(in crate::rpc_processor) enum RPCOperationCancelTunnelA { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs index 0a429877..36a4b695 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs @@ -75,6 +75,8 @@ impl RPCOperationCompleteTunnelQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg(feature = "unstable-tunnels")] #[derive(Debug, Clone)] pub(in crate::rpc_processor) enum RPCOperationCompleteTunnelA { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs index 73a2c705..1fe48924 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs @@ -44,6 +44,8 @@ impl RPCOperationFindBlockQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, Clone)] pub(in crate::rpc_processor) struct RPCOperationFindBlockA { data: Vec, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs index a0d414f6..26efcc98 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs @@ -73,6 +73,8 @@ impl RPCOperationFindNodeQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, Clone)] pub(in crate::rpc_processor) struct RPCOperationFindNodeA { peers: Vec, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index e9af8d5c..0eb39514 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -75,6 +75,8 @@ impl RPCOperationGetValueQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, Clone)] pub(in crate::rpc_processor) struct RPCOperationGetValueA { value: Option, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs index be7ba9a3..154124bb 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs @@ -109,6 +109,8 @@ impl RPCOperationSetValueQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, Clone)] pub(in crate::rpc_processor) struct RPCOperationSetValueA { set: bool, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs index e9b2fe24..3b7d8ff4 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs @@ -65,6 +65,8 @@ impl RPCOperationStartTunnelQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg(feature = "unstable-tunnels")] #[derive(Debug, Clone)] pub(in crate::rpc_processor) enum RPCOperationStartTunnelA { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs index f2725c18..f77f5baf 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs @@ -42,6 +42,8 @@ impl RPCOperationStatusQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, Clone)] pub(in crate::rpc_processor) struct RPCOperationStatusA { node_status: Option, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs b/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs index e9a618b9..dbfa1a5c 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs @@ -42,6 +42,8 @@ impl RPCOperationSupplyBlockQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, Clone)] pub(in crate::rpc_processor) struct RPCOperationSupplyBlockA { expiration: u64, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs index 46f78698..121e27d4 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs @@ -29,6 +29,10 @@ impl RPCOperationValueChanged { return Err(RPCError::protocol("ValueChanged subkeys length too long")); } + if watch_id == 0 { + return Err(RPCError::protocol("ValueChanged needs a nonzero watch id")); + } + Ok(Self { key, subkeys, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs index 33a5005a..62a45271 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs @@ -33,6 +33,11 @@ impl RPCOperationWatchValueQ { return Err(RPCError::protocol("WatchValueQ subkeys length too long")); } + // Count is zero means cancelling, so there should always be a watch id in this case + if count == 0 && watch_id.is_none() { + return Err(RPCError::protocol("can't cancel zero watch id")); + } + let signature_data = Self::make_signature_data(&key, &subkeys, expiration, count, watch_id); let signature = vcrypto .sign(&watcher.key, &watcher.secret, &signature_data) @@ -91,6 +96,12 @@ impl RPCOperationWatchValueQ { vcrypto .verify(&self.watcher, &sig_data, &self.signature) .map_err(RPCError::protocol)?; + + // Count is zero means cancelling, so there should always be a watch id in this case + if self.count == 0 && self.watch_id.is_none() { + return Err(RPCError::protocol("can't cancel zero watch id")); + } + Ok(()) } @@ -233,8 +244,11 @@ impl RPCOperationWatchValueQ { } } +//////////////////////////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, Clone)] pub(in crate::rpc_processor) struct RPCOperationWatchValueA { + accepted: bool, expiration: u64, peers: Vec, watch_id: u64, @@ -242,11 +256,17 @@ pub(in crate::rpc_processor) struct RPCOperationWatchValueA { impl RPCOperationWatchValueA { #[allow(dead_code)] - pub fn new(expiration: u64, peers: Vec, watch_id: u64) -> Result { + pub fn new( + accepted: bool, + expiration: u64, + peers: Vec, + watch_id: u64, + ) -> Result { if peers.len() > MAX_WATCH_VALUE_A_PEERS_LEN { return Err(RPCError::protocol("WatchValueA peers length too long")); } Ok(Self { + accepted, expiration, peers, watch_id, @@ -254,13 +274,14 @@ impl RPCOperationWatchValueA { } pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { - if self.watch_id == 0 { - return Err(RPCError::protocol("WatchValueA does not have a valid id")); - } PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); Ok(()) } + #[allow(dead_code)] + pub fn accepted(&self) -> bool { + self.accepted + } #[allow(dead_code)] pub fn expiration(&self) -> u64 { self.expiration @@ -274,13 +295,14 @@ impl RPCOperationWatchValueA { self.watch_id } #[allow(dead_code)] - pub fn destructure(self) -> (u64, Vec, u64) { - (self.expiration, self.peers, self.watch_id) + pub fn destructure(self) -> (bool, u64, Vec, u64) { + (self.accepted, self.expiration, self.peers, self.watch_id) } pub fn decode( reader: &veilid_capnp::operation_watch_value_a::Reader, ) -> Result { + let accepted = reader.get_accepted(); let expiration = reader.get_expiration(); let peers_reader = reader.get_peers().map_err(RPCError::protocol)?; if peers_reader.len() as usize > MAX_WATCH_VALUE_A_PEERS_LEN { @@ -299,6 +321,7 @@ impl RPCOperationWatchValueA { let watch_id = reader.get_watch_id(); Ok(Self { + accepted, expiration, peers, watch_id, @@ -308,6 +331,7 @@ impl RPCOperationWatchValueA { &self, builder: &mut veilid_capnp::operation_watch_value_a::Builder, ) -> Result<(), RPCError> { + builder.set_accepted(self.accepted); builder.set_expiration(self.expiration); let mut peers_builder = builder.reborrow().init_peers( diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index 925516ff..d358b4a1 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -55,6 +55,8 @@ impl RPCProcessor { ))) } + //////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_app_call_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled diff --git a/veilid-core/src/rpc_processor/rpc_app_message.rs b/veilid-core/src/rpc_processor/rpc_app_message.rs index d0aac195..b77fda93 100644 --- a/veilid-core/src/rpc_processor/rpc_app_message.rs +++ b/veilid-core/src/rpc_processor/rpc_app_message.rs @@ -19,6 +19,8 @@ impl RPCProcessor { self.statement(dest, statement).await } + //////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_app_message(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 3eeeea34..8d456238 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -84,6 +84,8 @@ impl RPCProcessor { ))) } + //////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_find_node_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ensure this never came over a private route, safety route is okay though diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 0859c9b2..d9ddc35e 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -168,6 +168,8 @@ impl RPCProcessor { ))) } + //////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_get_value_q( &self, diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 36bead3a..01926a72 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -181,6 +181,8 @@ impl RPCProcessor { ))) } + //////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_set_value_q( &self, diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index aa36d37f..6c8c73ef 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -32,6 +32,8 @@ impl RPCProcessor { self.statement(dest, statement).await } + //////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_signal(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 9a203cb9..d08d50eb 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -200,6 +200,8 @@ impl RPCProcessor { ))) } + //////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Get the question diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 44e16711..07970dda 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -56,6 +56,8 @@ impl RPCProcessor { } } + //////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> RPCNetworkResult<()> { let routing_table = self.routing_table(); diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index 002480b8..d61e7892 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -31,6 +31,8 @@ impl RPCProcessor { self.statement(dest, statement).await } + //////////////////////////////////////////////////////////////////////////////////////////////// + pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Get the statement let (_, _, _, kind) = msg.operation.destructure(); diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index ca0285bc..e01fd7b1 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -2,6 +2,7 @@ use super::*; #[derive(Clone, Debug)] pub struct WatchValueAnswer { + pub accepted: bool, pub expiration_ts: Timestamp, pub peers: Vec, pub watch_id: u64, @@ -105,11 +106,12 @@ impl RPCProcessor { _ => return Ok(NetworkResult::invalid_message("not an answer")), }; let question_watch_id = watch_id; - let (expiration, peers, watch_id) = watch_value_a.destructure(); + let (accepted, expiration, peers, watch_id) = watch_value_a.destructure(); #[cfg(feature = "debug-dht")] { let debug_string_answer = format!( - "OUT <== WatchValueA(id={} {} #{:?}@{} peers={}) <= {}", + "OUT <== WatchValueA({}id={} {} #{:?}@{} peers={}) <= {}", + if accepted { "+accept " } else { "" }, watch_id, key, subkeys, @@ -127,13 +129,22 @@ impl RPCProcessor { log_rpc!(debug "Peers: {:#?}", peer_ids); } - // Validate returned answer watch id is the same as the question watch id if it exists - if let Some(question_watch_id) = question_watch_id { - if question_watch_id != watch_id { - return Ok(NetworkResult::invalid_message(format!( - "answer watch id={} doesn't match question watch id={}", - watch_id, question_watch_id, - ))); + // Validate accepted requests + if accepted { + // Verify returned answer watch id is the same as the question watch id if it exists + if let Some(question_watch_id) = question_watch_id { + if question_watch_id != watch_id { + return Ok(NetworkResult::invalid_message(format!( + "answer watch id={} doesn't match question watch id={}", + watch_id, question_watch_id, + ))); + } + } + // Validate if a watch is created/updated, that it has a nonzero id + if expiration != 0 && watch_id == 0 { + return Ok(NetworkResult::invalid_message( + "zero watch id returned on accepted or cancelled watch", + )); } } @@ -162,6 +173,7 @@ impl RPCProcessor { latency, reply_private_route, WatchValueAnswer { + accepted, expiration_ts: Timestamp::new(expiration), peers, watch_id, @@ -169,6 +181,8 @@ impl RPCProcessor { ))) } + //////////////////////////////////////////////////////////////////////////////////////////////// + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { let routing_table = self.routing_table(); @@ -243,41 +257,44 @@ impl RPCProcessor { routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT, CAP_DHT_WATCH]) ); - // See if we would have accepted this as a set + // See if we would have accepted this as a set, same set_value_count for watches let set_value_count = { let c = self.config.get(); c.network.dht.set_value_count as usize }; - let (ret_expiration, ret_watch_id) = if closer_to_key_peers.len() >= set_value_count { - // Not close enough + let (ret_accepted, ret_expiration, ret_watch_id) = + if closer_to_key_peers.len() >= set_value_count { + // Not close enough, not accepted - #[cfg(feature = "debug-dht")] - log_rpc!(debug "Not close enough for watch value"); + #[cfg(feature = "debug-dht")] + log_rpc!(debug "Not close enough for watch value"); - (Timestamp::default(), 0) - } else { - // Close enough, lets watch it + (false, Timestamp::default(), watch_id.unwrap_or_default()) + } else { + // Accepted, lets try to watch or cancel it - // See if we have this record ourselves, if so, accept the watch - let storage_manager = self.storage_manager(); - network_result_try!(storage_manager - .inbound_watch_value( - key, - subkeys.clone(), - Timestamp::new(expiration), - count, - watch_id, - target, - watcher - ) - .await - .map_err(RPCError::internal)?) - }; + // See if we have this record ourselves, if so, accept the watch + let storage_manager = self.storage_manager(); + let (ret_expiration, ret_watch_id) = network_result_try!(storage_manager + .inbound_watch_value( + key, + subkeys.clone(), + Timestamp::new(expiration), + count, + watch_id, + target, + watcher + ) + .await + .map_err(RPCError::internal)?); + (true, ret_expiration, ret_watch_id) + }; #[cfg(feature = "debug-dht")] { let debug_string_answer = format!( - "IN ===> WatchValueA(id={} {} #{} expiration={} peers={}) ==> {}", + "IN ===> WatchValueA({}id={} {} #{} expiration={} peers={}) ==> {}", + if ret_accepted { "+accept " } else { "" }, ret_watch_id, key, subkeys, @@ -291,12 +308,9 @@ impl RPCProcessor { // Make WatchValue answer let watch_value_a = RPCOperationWatchValueA::new( + ret_accepted, ret_expiration.as_u64(), - if ret_expiration.as_u64() == 0 { - closer_to_key_peers - } else { - vec![] - }, + closer_to_key_peers, ret_watch_id, )?; diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index bb1b7923..d4a88dd8 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -317,10 +317,9 @@ impl StorageManager { ) .await?; if let Some(owvresult) = opt_owvresult { - if owvresult.expiration_ts.as_u64() != 0 { + if owvresult.expiration_ts.as_u64() == 0 { log_stor!(debug - "close record watch cancel got unexpected expiration: {}", - owvresult.expiration_ts + "close record watch cancel should have old expiration, but got zero" ); } } else { @@ -567,7 +566,7 @@ impl StorageManager { Ok(None) } - /// Add a watch to a DHT value + /// Add or change a watch to a DHT value pub async fn watch_values( &self, key: TypedKey, @@ -605,7 +604,7 @@ impl StorageManager { // Drop the lock for network access drop(inner); - +xxx continue here, make sure watch value semantics respect the 'accepted' flag and appropriate return values everywhere // Use the safety selection we opened the record with // Use the writer we opened with as the 'watcher' as well let opt_owvresult = self @@ -622,9 +621,9 @@ impl StorageManager { ) .await?; - // If we did not get a valid response return a zero timestamp + // If we did not get a valid response assume nothing changed let Some(owvresult) = opt_owvresult else { - return Ok(Timestamp::new(0)); + apibail_try_again!("did not get a valid response"); }; // Clear any existing watch if the watch succeeded or got cancelled @@ -656,7 +655,7 @@ impl StorageManager { return Ok(Timestamp::new(0)); } - // If the expiration time is greated than our maximum expiration time, clamp our local watch so we ignore extra valuechanged messages + // If the expiration time is greater than our maximum expiration time, clamp our local watch so we ignore extra valuechanged messages if expiration_ts.as_u64() > max_expiration_ts { expiration_ts = Timestamp::new(max_expiration_ts); } @@ -723,9 +722,11 @@ impl StorageManager { // A zero expiration time means the watch is done or nothing is left, and the watch is no longer active if expiration_ts.as_u64() == 0 { + // Return false indicating the watch is completely gone return Ok(false); } + // Return true because the the watch was changed Ok(true) } diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 81f5ace2..0a176186 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -349,19 +349,20 @@ impl RoutingContext { storage_manager.set_value(key, subkey, data, writer).await } - /// Add a watch to a DHT value that informs the user via an VeilidUpdate::ValueChange callback when the record has subkeys change. + /// Add or update a watch to a DHT value that informs the user via an VeilidUpdate::ValueChange callback when the record has subkeys change. /// One remote node will be selected to perform the watch and it will offer an expiration time based on a suggestion, and make an attempt to /// continue to report changes via the callback. Nodes that agree to doing watches will be put on our 'ping' list to ensure they are still around /// otherwise the watch will be cancelled and will have to be re-watched. /// - /// There is only one watch permitted per record. If a change to a watch is desired, the first one will be overwritten. + /// There is only one watch permitted per record. If a change to a watch is desired, the previous one will be overwritten. /// * `key` is the record key to watch. it must first be opened for reading or writing. /// * `subkeys` is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys. /// * `expiration` is the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately. /// * `count` is the number of times the watch will be sent, maximum. A zero value here is equivalent to a cancellation. /// - /// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future, and the returned timestamp will - /// be no later than the requested expiration, but -may- be before the requested expiration. + /// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future, + /// and the returned timestamp will be no later than the requested expiration, but -may- be before the requested expiration. + /// If the returned timestamp is zero it indicates that the watch creation or update has failed. In the case of a faild update, the watch is considered cancelled. /// /// DHT watches are accepted with the following conditions: /// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record