break everything / xfer

This commit is contained in:
Christien Rioux 2024-03-05 15:28:11 -05:00
parent 72066360ad
commit 9b4e490994
26 changed files with 200 additions and 88 deletions

View File

@ -346,7 +346,7 @@ struct OperationSetValueQ @0xbac06191ff8bdbc5 {
}
struct OperationSetValueA @0x9378d0732dc95be2 {
set @0 :Bool; # true if the set was close enough to be set
set @0 :Bool; # true if the set was accepted
value @1 :SignedValueData; # optional: the current value at the key if the set seq number was lower or equal to what was there before
peers @2 :List(PeerInfo); # returned 'closer peer' information on either success or failure
}
@ -356,15 +356,16 @@ struct OperationWatchValueQ @0xf9a5a6c547b9b228 {
subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges), if empty, watch everything
expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max)
count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous)
watchId @4 :UInt64; # optional: (0 = unspecified) existing watch id to update or cancel unless this is a new watch
watchId @4 :UInt64; # if 0, request a new watch. if >0, existing watch id
watcher @5 :PublicKey; # the watcher performing the watch, can be the owner or a schema member, or a generated anonymous watch keypair
signature @6 :Signature; # signature of the watcher, signature covers: key, subkeys, expiration, count, watchId
}
struct OperationWatchValueA @0xa726cab7064ba893 {
expiration @0 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch was rejected). if watch is being cancelled (with count = 0), this will be the non-zero former expiration time.
peers @1 :List(PeerInfo); # returned list of other nodes to ask that could propagate watches
watchId @2 :UInt64; # random id for watch instance on this node
accepted @0 :Bool; # true if the watch was close enough to be accepted
expiration @1 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch was cancelled/dropped)
peers @2 :List(PeerInfo); # returned list of other nodes to ask that could propagate watches
watchId @3 :UInt64; # (0 = id not allocated if rejecting new watch) random id for watch instance on this node
}
struct OperationValueChanged @0xd1c59ebdd8cc1bf6 {

View File

@ -14223,8 +14223,12 @@ pub mod operation_watch_value_a {
self.reader.total_size()
}
#[inline]
pub fn get_accepted(self) -> bool {
self.reader.get_bool_field(0)
}
#[inline]
pub fn get_expiration(self) -> u64 {
self.reader.get_data_field::<u64>(0)
self.reader.get_data_field::<u64>(1)
}
#[inline]
pub fn get_peers(self) -> ::capnp::Result<::capnp::struct_list::Reader<'a,crate::veilid_capnp::peer_info::Owned>> {
@ -14236,13 +14240,13 @@ pub mod operation_watch_value_a {
}
#[inline]
pub fn get_watch_id(self) -> u64 {
self.reader.get_data_field::<u64>(1)
self.reader.get_data_field::<u64>(2)
}
}
pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> }
impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> {
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 2, pointers: 1 };
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 3, pointers: 1 };
}
impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> {
const TYPE_ID: u64 = _private::TYPE_ID;
@ -14293,12 +14297,20 @@ pub mod operation_watch_value_a {
self.builder.as_reader().total_size()
}
#[inline]
pub fn get_accepted(self) -> bool {
self.builder.get_bool_field(0)
}
#[inline]
pub fn set_accepted(&mut self, value: bool) {
self.builder.set_bool_field(0, value);
}
#[inline]
pub fn get_expiration(self) -> u64 {
self.builder.get_data_field::<u64>(0)
self.builder.get_data_field::<u64>(1)
}
#[inline]
pub fn set_expiration(&mut self, value: u64) {
self.builder.set_data_field::<u64>(0, value);
self.builder.set_data_field::<u64>(1, value);
}
#[inline]
pub fn get_peers(self) -> ::capnp::Result<::capnp::struct_list::Builder<'a,crate::veilid_capnp::peer_info::Owned>> {
@ -14318,11 +14330,11 @@ pub mod operation_watch_value_a {
}
#[inline]
pub fn get_watch_id(self) -> u64 {
self.builder.get_data_field::<u64>(1)
self.builder.get_data_field::<u64>(2)
}
#[inline]
pub fn set_watch_id(&mut self, value: u64) {
self.builder.set_data_field::<u64>(1, value);
self.builder.set_data_field::<u64>(2, value);
}
}
@ -14335,17 +14347,17 @@ pub mod operation_watch_value_a {
impl Pipeline {
}
mod _private {
pub static ENCODED_NODE: [::capnp::Word; 69] = [
pub static ENCODED_NODE: [::capnp::Word; 85] = [
::capnp::word(0, 0, 0, 0, 5, 0, 6, 0),
::capnp::word(147, 168, 75, 6, 183, 202, 38, 167),
::capnp::word(19, 0, 0, 0, 1, 0, 2, 0),
::capnp::word(19, 0, 0, 0, 1, 0, 3, 0),
::capnp::word(2, 171, 52, 55, 3, 232, 252, 143),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(21, 0, 0, 0, 66, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 175, 0, 0, 0),
::capnp::word(33, 0, 0, 0, 231, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(112, 114, 111, 116, 111, 47, 118, 101),
@ -14354,28 +14366,44 @@ pub mod operation_watch_value_a {
::capnp::word(116, 105, 111, 110, 87, 97, 116, 99),
::capnp::word(104, 86, 97, 108, 117, 101, 65, 0),
::capnp::word(0, 0, 0, 0, 1, 0, 1, 0),
::capnp::word(12, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(16, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(69, 0, 0, 0, 90, 0, 0, 0),
::capnp::word(97, 0, 0, 0, 74, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(68, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(80, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(1, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(96, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(108, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(1, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(77, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(105, 0, 0, 0, 90, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(72, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(100, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(2, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(104, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(116, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(2, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(97, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(113, 0, 0, 0, 50, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(108, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(136, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(3, 0, 0, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 3, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(133, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(128, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(140, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(97, 99, 99, 101, 112, 116, 101, 100),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(1, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(1, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(92, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(104, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(101, 120, 112, 105, 114, 97, 116, 105),
::capnp::word(111, 110, 0, 0, 0, 0, 0, 0),
::capnp::word(9, 0, 0, 0, 0, 0, 0, 0),
@ -14408,9 +14436,10 @@ pub mod operation_watch_value_a {
];
pub fn get_field_types(index: u16) -> ::capnp::introspect::Type {
match index {
0 => <u64 as ::capnp::introspect::Introspect>::introspect(),
1 => <::capnp::struct_list::Owned<crate::veilid_capnp::peer_info::Owned> as ::capnp::introspect::Introspect>::introspect(),
2 => <u64 as ::capnp::introspect::Introspect>::introspect(),
0 => <bool as ::capnp::introspect::Introspect>::introspect(),
1 => <u64 as ::capnp::introspect::Introspect>::introspect(),
2 => <::capnp::struct_list::Owned<crate::veilid_capnp::peer_info::Owned> as ::capnp::introspect::Introspect>::introspect(),
3 => <u64 as ::capnp::introspect::Introspect>::introspect(),
_ => panic!("invalid field index {}", index),
}
}
@ -14422,7 +14451,7 @@ pub mod operation_watch_value_a {
nonunion_members: NONUNION_MEMBERS,
members_by_discriminant: MEMBERS_BY_DISCRIMINANT,
};
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2];
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3];
pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[];
pub const TYPE_ID: u64 = 0xa726_cab7_064b_a893;
}
@ -21377,4 +21406,4 @@ pub mod operation {
}
}
//BUILDHASH:539ec27eab88af2af5785cd8c1145478f30cd3fe2c08cd0ec7f18d2f4f3c2128
//BUILDHASH:2361d45ebb46feb1cecc71c1756fc90ff94487663e8d7211177e6df6e4033386

View File

@ -45,6 +45,8 @@ impl RPCOperationAppCallQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCOperationAppCallA {
message: Vec<u8>,

View File

@ -38,6 +38,8 @@ impl RPCOperationCancelTunnelQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) enum RPCOperationCancelTunnelA {

View File

@ -75,6 +75,8 @@ impl RPCOperationCompleteTunnelQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) enum RPCOperationCompleteTunnelA {

View File

@ -44,6 +44,8 @@ impl RPCOperationFindBlockQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCOperationFindBlockA {
data: Vec<u8>,

View File

@ -73,6 +73,8 @@ impl RPCOperationFindNodeQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCOperationFindNodeA {
peers: Vec<PeerInfo>,

View File

@ -75,6 +75,8 @@ impl RPCOperationGetValueQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCOperationGetValueA {
value: Option<SignedValueData>,

View File

@ -109,6 +109,8 @@ impl RPCOperationSetValueQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCOperationSetValueA {
set: bool,

View File

@ -65,6 +65,8 @@ impl RPCOperationStartTunnelQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) enum RPCOperationStartTunnelA {

View File

@ -42,6 +42,8 @@ impl RPCOperationStatusQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCOperationStatusA {
node_status: Option<NodeStatus>,

View File

@ -42,6 +42,8 @@ impl RPCOperationSupplyBlockQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCOperationSupplyBlockA {
expiration: u64,

View File

@ -29,6 +29,10 @@ impl RPCOperationValueChanged {
return Err(RPCError::protocol("ValueChanged subkeys length too long"));
}
if watch_id == 0 {
return Err(RPCError::protocol("ValueChanged needs a nonzero watch id"));
}
Ok(Self {
key,
subkeys,

View File

@ -33,6 +33,11 @@ impl RPCOperationWatchValueQ {
return Err(RPCError::protocol("WatchValueQ subkeys length too long"));
}
// Count is zero means cancelling, so there should always be a watch id in this case
if count == 0 && watch_id.is_none() {
return Err(RPCError::protocol("can't cancel zero watch id"));
}
let signature_data = Self::make_signature_data(&key, &subkeys, expiration, count, watch_id);
let signature = vcrypto
.sign(&watcher.key, &watcher.secret, &signature_data)
@ -91,6 +96,12 @@ impl RPCOperationWatchValueQ {
vcrypto
.verify(&self.watcher, &sig_data, &self.signature)
.map_err(RPCError::protocol)?;
// Count is zero means cancelling, so there should always be a watch id in this case
if self.count == 0 && self.watch_id.is_none() {
return Err(RPCError::protocol("can't cancel zero watch id"));
}
Ok(())
}
@ -233,8 +244,11 @@ impl RPCOperationWatchValueQ {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCOperationWatchValueA {
accepted: bool,
expiration: u64,
peers: Vec<PeerInfo>,
watch_id: u64,
@ -242,11 +256,17 @@ pub(in crate::rpc_processor) struct RPCOperationWatchValueA {
impl RPCOperationWatchValueA {
#[allow(dead_code)]
pub fn new(expiration: u64, peers: Vec<PeerInfo>, watch_id: u64) -> Result<Self, RPCError> {
pub fn new(
accepted: bool,
expiration: u64,
peers: Vec<PeerInfo>,
watch_id: u64,
) -> Result<Self, RPCError> {
if peers.len() > MAX_WATCH_VALUE_A_PEERS_LEN {
return Err(RPCError::protocol("WatchValueA peers length too long"));
}
Ok(Self {
accepted,
expiration,
peers,
watch_id,
@ -254,13 +274,14 @@ impl RPCOperationWatchValueA {
}
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
if self.watch_id == 0 {
return Err(RPCError::protocol("WatchValueA does not have a valid id"));
}
PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone());
Ok(())
}
#[allow(dead_code)]
pub fn accepted(&self) -> bool {
self.accepted
}
#[allow(dead_code)]
pub fn expiration(&self) -> u64 {
self.expiration
@ -274,13 +295,14 @@ impl RPCOperationWatchValueA {
self.watch_id
}
#[allow(dead_code)]
pub fn destructure(self) -> (u64, Vec<PeerInfo>, u64) {
(self.expiration, self.peers, self.watch_id)
pub fn destructure(self) -> (bool, u64, Vec<PeerInfo>, u64) {
(self.accepted, self.expiration, self.peers, self.watch_id)
}
pub fn decode(
reader: &veilid_capnp::operation_watch_value_a::Reader,
) -> Result<Self, RPCError> {
let accepted = reader.get_accepted();
let expiration = reader.get_expiration();
let peers_reader = reader.get_peers().map_err(RPCError::protocol)?;
if peers_reader.len() as usize > MAX_WATCH_VALUE_A_PEERS_LEN {
@ -299,6 +321,7 @@ impl RPCOperationWatchValueA {
let watch_id = reader.get_watch_id();
Ok(Self {
accepted,
expiration,
peers,
watch_id,
@ -308,6 +331,7 @@ impl RPCOperationWatchValueA {
&self,
builder: &mut veilid_capnp::operation_watch_value_a::Builder,
) -> Result<(), RPCError> {
builder.set_accepted(self.accepted);
builder.set_expiration(self.expiration);
let mut peers_builder = builder.reborrow().init_peers(

View File

@ -55,6 +55,8 @@ impl RPCProcessor {
)))
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_app_call_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled

View File

@ -19,6 +19,8 @@ impl RPCProcessor {
self.statement(dest, statement).await
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_app_message(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled

View File

@ -84,6 +84,8 @@ impl RPCProcessor {
)))
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_find_node_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ensure this never came over a private route, safety route is okay though

View File

@ -168,6 +168,8 @@ impl RPCProcessor {
)))
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_get_value_q(
&self,

View File

@ -181,6 +181,8 @@ impl RPCProcessor {
)))
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_set_value_q(
&self,

View File

@ -32,6 +32,8 @@ impl RPCProcessor {
self.statement(dest, statement).await
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_signal(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled

View File

@ -200,6 +200,8 @@ impl RPCProcessor {
)))
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Get the question

View File

@ -56,6 +56,8 @@ impl RPCProcessor {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
let routing_table = self.routing_table();

View File

@ -31,6 +31,8 @@ impl RPCProcessor {
self.statement(dest, statement).await
}
////////////////////////////////////////////////////////////////////////////////////////////////
pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Get the statement
let (_, _, _, kind) = msg.operation.destructure();

View File

@ -2,6 +2,7 @@ use super::*;
#[derive(Clone, Debug)]
pub struct WatchValueAnswer {
pub accepted: bool,
pub expiration_ts: Timestamp,
pub peers: Vec<PeerInfo>,
pub watch_id: u64,
@ -105,11 +106,12 @@ impl RPCProcessor {
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let question_watch_id = watch_id;
let (expiration, peers, watch_id) = watch_value_a.destructure();
let (accepted, expiration, peers, watch_id) = watch_value_a.destructure();
#[cfg(feature = "debug-dht")]
{
let debug_string_answer = format!(
"OUT <== WatchValueA(id={} {} #{:?}@{} peers={}) <= {}",
"OUT <== WatchValueA({}id={} {} #{:?}@{} peers={}) <= {}",
if accepted { "+accept " } else { "" },
watch_id,
key,
subkeys,
@ -127,13 +129,22 @@ impl RPCProcessor {
log_rpc!(debug "Peers: {:#?}", peer_ids);
}
// Validate returned answer watch id is the same as the question watch id if it exists
if let Some(question_watch_id) = question_watch_id {
if question_watch_id != watch_id {
return Ok(NetworkResult::invalid_message(format!(
"answer watch id={} doesn't match question watch id={}",
watch_id, question_watch_id,
)));
// Validate accepted requests
if accepted {
// Verify returned answer watch id is the same as the question watch id if it exists
if let Some(question_watch_id) = question_watch_id {
if question_watch_id != watch_id {
return Ok(NetworkResult::invalid_message(format!(
"answer watch id={} doesn't match question watch id={}",
watch_id, question_watch_id,
)));
}
}
// Validate if a watch is created/updated, that it has a nonzero id
if expiration != 0 && watch_id == 0 {
return Ok(NetworkResult::invalid_message(
"zero watch id returned on accepted or cancelled watch",
));
}
}
@ -162,6 +173,7 @@ impl RPCProcessor {
latency,
reply_private_route,
WatchValueAnswer {
accepted,
expiration_ts: Timestamp::new(expiration),
peers,
watch_id,
@ -169,6 +181,8 @@ impl RPCProcessor {
)))
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
let routing_table = self.routing_table();
@ -243,41 +257,44 @@ impl RPCProcessor {
routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT, CAP_DHT_WATCH])
);
// See if we would have accepted this as a set
// See if we would have accepted this as a set, same set_value_count for watches
let set_value_count = {
let c = self.config.get();
c.network.dht.set_value_count as usize
};
let (ret_expiration, ret_watch_id) = if closer_to_key_peers.len() >= set_value_count {
// Not close enough
let (ret_accepted, ret_expiration, ret_watch_id) =
if closer_to_key_peers.len() >= set_value_count {
// Not close enough, not accepted
#[cfg(feature = "debug-dht")]
log_rpc!(debug "Not close enough for watch value");
#[cfg(feature = "debug-dht")]
log_rpc!(debug "Not close enough for watch value");
(Timestamp::default(), 0)
} else {
// Close enough, lets watch it
(false, Timestamp::default(), watch_id.unwrap_or_default())
} else {
// Accepted, lets try to watch or cancel it
// See if we have this record ourselves, if so, accept the watch
let storage_manager = self.storage_manager();
network_result_try!(storage_manager
.inbound_watch_value(
key,
subkeys.clone(),
Timestamp::new(expiration),
count,
watch_id,
target,
watcher
)
.await
.map_err(RPCError::internal)?)
};
// See if we have this record ourselves, if so, accept the watch
let storage_manager = self.storage_manager();
let (ret_expiration, ret_watch_id) = network_result_try!(storage_manager
.inbound_watch_value(
key,
subkeys.clone(),
Timestamp::new(expiration),
count,
watch_id,
target,
watcher
)
.await
.map_err(RPCError::internal)?);
(true, ret_expiration, ret_watch_id)
};
#[cfg(feature = "debug-dht")]
{
let debug_string_answer = format!(
"IN ===> WatchValueA(id={} {} #{} expiration={} peers={}) ==> {}",
"IN ===> WatchValueA({}id={} {} #{} expiration={} peers={}) ==> {}",
if ret_accepted { "+accept " } else { "" },
ret_watch_id,
key,
subkeys,
@ -291,12 +308,9 @@ impl RPCProcessor {
// Make WatchValue answer
let watch_value_a = RPCOperationWatchValueA::new(
ret_accepted,
ret_expiration.as_u64(),
if ret_expiration.as_u64() == 0 {
closer_to_key_peers
} else {
vec![]
},
closer_to_key_peers,
ret_watch_id,
)?;

View File

@ -317,10 +317,9 @@ impl StorageManager {
)
.await?;
if let Some(owvresult) = opt_owvresult {
if owvresult.expiration_ts.as_u64() != 0 {
if owvresult.expiration_ts.as_u64() == 0 {
log_stor!(debug
"close record watch cancel got unexpected expiration: {}",
owvresult.expiration_ts
"close record watch cancel should have old expiration, but got zero"
);
}
} else {
@ -567,7 +566,7 @@ impl StorageManager {
Ok(None)
}
/// Add a watch to a DHT value
/// Add or change a watch to a DHT value
pub async fn watch_values(
&self,
key: TypedKey,
@ -605,7 +604,7 @@ impl StorageManager {
// Drop the lock for network access
drop(inner);
xxx continue here, make sure watch value semantics respect the 'accepted' flag and appropriate return values everywhere
// Use the safety selection we opened the record with
// Use the writer we opened with as the 'watcher' as well
let opt_owvresult = self
@ -622,9 +621,9 @@ impl StorageManager {
)
.await?;
// If we did not get a valid response return a zero timestamp
// If we did not get a valid response assume nothing changed
let Some(owvresult) = opt_owvresult else {
return Ok(Timestamp::new(0));
apibail_try_again!("did not get a valid response");
};
// Clear any existing watch if the watch succeeded or got cancelled
@ -656,7 +655,7 @@ impl StorageManager {
return Ok(Timestamp::new(0));
}
// If the expiration time is greated than our maximum expiration time, clamp our local watch so we ignore extra valuechanged messages
// If the expiration time is greater than our maximum expiration time, clamp our local watch so we ignore extra valuechanged messages
if expiration_ts.as_u64() > max_expiration_ts {
expiration_ts = Timestamp::new(max_expiration_ts);
}
@ -723,9 +722,11 @@ impl StorageManager {
// A zero expiration time means the watch is done or nothing is left, and the watch is no longer active
if expiration_ts.as_u64() == 0 {
// Return false indicating the watch is completely gone
return Ok(false);
}
// Return true because the the watch was changed
Ok(true)
}

View File

@ -349,19 +349,20 @@ impl RoutingContext {
storage_manager.set_value(key, subkey, data, writer).await
}
/// Add a watch to a DHT value that informs the user via an VeilidUpdate::ValueChange callback when the record has subkeys change.
/// Add or update a watch to a DHT value that informs the user via an VeilidUpdate::ValueChange callback when the record has subkeys change.
/// One remote node will be selected to perform the watch and it will offer an expiration time based on a suggestion, and make an attempt to
/// continue to report changes via the callback. Nodes that agree to doing watches will be put on our 'ping' list to ensure they are still around
/// otherwise the watch will be cancelled and will have to be re-watched.
///
/// There is only one watch permitted per record. If a change to a watch is desired, the first one will be overwritten.
/// There is only one watch permitted per record. If a change to a watch is desired, the previous one will be overwritten.
/// * `key` is the record key to watch. it must first be opened for reading or writing.
/// * `subkeys` is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
/// * `expiration` is the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.
/// * `count` is the number of times the watch will be sent, maximum. A zero value here is equivalent to a cancellation.
///
/// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future, and the returned timestamp will
/// be no later than the requested expiration, but -may- be before the requested expiration.
/// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future,
/// and the returned timestamp will be no later than the requested expiration, but -may- be before the requested expiration.
/// If the returned timestamp is zero it indicates that the watch creation or update has failed. In the case of a faild update, the watch is considered cancelled.
///
/// DHT watches are accepted with the following conditions:
/// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record