diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index d4f1a9b1..6202a29e 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -361,7 +361,7 @@ struct OperationWatchValueQ @0xf9a5a6c547b9b228 { } struct OperationWatchValueA @0xa726cab7064ba893 { - expiration @0 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch failed) + expiration @0 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch was rejected). if watch is being cancelled (with count = 0), this will be the non-zero former expiration time. peers @1 :List(PeerInfo); # returned list of other nodes to ask that could propagate watches } diff --git a/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs b/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs index 604ac9a2..9127f00d 100644 --- a/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs +++ b/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs @@ -3,7 +3,7 @@ use super::*; /// The core representation of the RouteSpecStore that can be serialized #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub(super) struct RouteSpecStoreContent { - /// All of the route sets we have allocated so far indexed by key + /// All of the route sets we have allocated so far indexed by key (many to one) id_by_key: HashMap, /// All of the route sets we have allocated so far details: HashMap, diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index efa7f7f2..4cc2c64f 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -219,7 +219,10 @@ where Ok(()) } - pub async fn run(self: Arc) -> TimeoutOr, RPCError>> { + pub async fn run( + self: Arc, + opt_init_fanout_queue: Option>, + ) -> TimeoutOr, RPCError>> { // Get timeout in milliseconds let timeout_ms = match us_to_ms(self.timeout_us.as_u64()).map_err(RPCError::internal) { Ok(v) => v, @@ -229,7 +232,9 @@ where }; // Initialize closest nodes list - if let Err(e) = self.clone().init_closest_nodes() { + if let Some(init_fanout_queue) = opt_init_fanout_queue { + self.clone().add_to_fanout_queue(&init_fanout_queue); + } else if let Err(e) = self.clone().init_closest_nodes() { return TimeoutOr::value(Err(e)); } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 009559ed..7c623409 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -196,12 +196,16 @@ struct WaitableReply { #[derive(Clone, Debug, Default)] pub struct Answer { - pub latency: TimestampDuration, // how long it took to get this answer - pub answer: T, // the answer itself + /// Hpw long it took to get this answer + pub latency: TimestampDuration, + /// The private route requested to receive the reply + pub reply_private_route: Option, + /// The answer itself + pub answer: T, } impl Answer { - pub fn new(latency: TimestampDuration, answer: T) -> Self { - Self { latency, answer } + pub fn new(latency: TimestampDuration, reply_private_route: Option, answer: T) -> Self { + Self { latency, reply_private_route, answer } } } @@ -512,7 +516,7 @@ impl RPCProcessor { check_done, ); - fanout_call.run().await + fanout_call.run(None).await } /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index 239768ca..f82588ac 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -23,6 +23,9 @@ impl RPCProcessor { // Send the app call question let waitable_reply = network_result_try!(self.question(dest, question, None).await?); + // Keep the reply private route that was used to return with the answer + let reply_private_route = waitable_reply.reply_private_route.clone(); + // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), @@ -45,7 +48,11 @@ impl RPCProcessor { tracing::Span::current().record("ret.latency", latency.as_u64()); #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("ret.len", a_message.len()); - Ok(NetworkResult::value(Answer::new(latency, a_message))) + Ok(NetworkResult::value(Answer::new( + latency, + reply_private_route, + a_message, + ))) } #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index ded13285..3db9e39e 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -43,6 +43,9 @@ impl RPCProcessor { // Send the find_node request let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?); + // Keep the reply private route that was used to return with the answer + let reply_private_route = waitable_reply.reply_private_route.clone(); + // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), @@ -74,7 +77,11 @@ impl RPCProcessor { } } - Ok(NetworkResult::value(Answer::new(latency, peers))) + Ok(NetworkResult::value(Answer::new( + latency, + reply_private_route, + peers, + ))) } #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index d45d211f..b251e97e 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -82,6 +82,9 @@ impl RPCProcessor { .await? ); + // Keep the reply private route that was used to return with the answer + let reply_private_route = waitable_reply.reply_private_route.clone(); + // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), @@ -156,6 +159,7 @@ impl RPCProcessor { Ok(NetworkResult::value(Answer::new( latency, + reply_private_route, GetValueAnswer { value, peers, diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index b507b73e..397f3b03 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -96,6 +96,8 @@ impl RPCProcessor { .await? ); + // Keep the reply private route that was used to return with the answer + let reply_private_route = waitable_reply.reply_private_route.clone(); // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { @@ -174,6 +176,7 @@ impl RPCProcessor { Ok(NetworkResult::value(Answer::new( latency, + reply_private_route, SetValueAnswer { set, value, peers }, ))) } diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index ae03778d..51832682 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -113,6 +113,9 @@ impl RPCProcessor { // Note what kind of ping this was and to what peer scope let send_data_method = waitable_reply.send_data_method.clone(); + // Keep the reply private route that was used to return with the answer + let reply_private_route = waitable_reply.reply_private_route.clone(); + // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), @@ -190,7 +193,11 @@ impl RPCProcessor { // sender info is irrelevant over relays and routes } }; - Ok(NetworkResult::value(Answer::new(latency, opt_sender_info))) + Ok(NetworkResult::value(Answer::new( + latency, + reply_private_route, + opt_sender_info, + ))) } #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index 8ec66489..7781398e 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -77,6 +77,9 @@ impl RPCProcessor { let waitable_reply = network_result_try!(self.question(dest.clone(), question, None).await?); + // Keep the reply private route that was used to return with the answer + let reply_private_route = waitable_reply.reply_private_route.clone(); + // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), @@ -138,6 +141,7 @@ impl RPCProcessor { Ok(NetworkResult::value(Answer::new( latency, + reply_private_route, WatchValueAnswer { expiration_ts: Timestamp::new(expiration), peers, diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index abfe0272..2243af42 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -174,7 +174,7 @@ impl StorageManager { check_done, ); - match fanout_call.run().await { + match fanout_call.run(None).await { // If we don't finish in the timeout (too much time passed checking for consensus) TimeoutOr::Timeout => { // Return the best answer we've got diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index aa2ad095..0db2f0b0 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -255,19 +255,52 @@ impl StorageManager { /// Close an opened local record pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { - let mut inner = self.lock().await?; - inner.close_record(key) + let (opened_record, opt_rpc_processor) = { + let mut inner = self.lock().await?; + (inner.close_record(key)?, inner.rpc_processor.clone()) + }; + + // Send a one-time cancel request for the watch if we have one and we're online + if let Some(active_watch) = opened_record.active_watch() { + if let Some(rpc_processor) = opt_rpc_processor { + // Use the safety selection we opened the record with + // Use the writer we opened with as the 'watcher' as well + let opt_owvresult = self + .outbound_watch_value( + rpc_processor, + key, + ValueSubkeyRangeSet::full(), + Timestamp::new(0), + 0, + opened_record.safety_selection(), + opened_record.writer().cloned(), + Some(active_watch.watch_node), + ) + .await?; + if let Some(owvresult) = opt_owvresult { + if owvresult.expiration_ts.as_u64() != 0 { + log_stor!(debug + "close record watch cancel got unexpected expiration: {}", + owvresult.expiration_ts + ); + } + } else { + log_stor!(debug "close record watch cancel unsuccessful"); + } + } else { + log_stor!(debug "skipping last-ditch watch cancel because we are offline"); + } + } + + Ok(()) } /// Delete a local record pub async fn delete_record(&self, key: TypedKey) -> VeilidAPIResult<()> { - let mut inner = self.lock().await?; - // Ensure the record is closed - if inner.opened_records.contains_key(&key) { - inner.close_record(key)?; - } + self.close_record(key).await?; + let mut inner = self.lock().await?; let Some(local_record_store) = inner.local_record_store.as_mut() else { apibail_not_initialized!(); }; @@ -482,14 +515,22 @@ impl StorageManager { ) -> VeilidAPIResult { let inner = self.lock().await?; + // Rewrite subkey range if empty to full + let subkeys = if subkeys.is_empty() { + ValueSubkeyRangeSet::full() + } else { + subkeys + }; + // Get the safety selection and the writer we opened this record with - let (safety_selection, opt_writer) = { + let (safety_selection, opt_writer, opt_watch_node) = { let Some(opened_record) = inner.opened_records.get(&key) else { apibail_generic!("record not open"); }; ( opened_record.safety_selection(), opened_record.writer().cloned(), + opened_record.active_watch().map(|aw| aw.watch_node.clone()), ) }; @@ -502,60 +543,112 @@ impl StorageManager { drop(inner); // Use the safety selection we opened the record with - let expiration_ts = self + // Use the writer we opened with as the 'watcher' as well + let opt_owvresult = self .outbound_watch_value( rpc_processor, key, - subkeys, + subkeys.clone(), expiration, count, safety_selection, opt_writer, + opt_watch_node, ) .await?; - Ok(expiration_ts) + // If we did not get a valid response return a zero timestamp + let Some(owvresult) = opt_owvresult else { + return Ok(Timestamp::new(0)); + }; + + // Clear any existing watch if the watch succeeded or got cancelled + let mut inner = self.lock().await?; + let Some(opened_record) = inner.opened_records.get_mut(&key) else { + apibail_generic!("record not open"); + }; + opened_record.clear_active_watch(); + + // Get the minimum expiration timestamp we will accept + let rpc_timeout_us = { + let c = self.unlocked_inner.config.get(); + TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)) + }; + let cur_ts = get_timestamp(); + let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64(); + + // If the expiration time is less than our minimum expiration time or greater than the requested time, consider this watch cancelled + if owvresult.expiration_ts.as_u64() < min_expiration_ts + || owvresult.expiration_ts.as_u64() > expiration.as_u64() + { + // Don't set the watch so we ignore any stray valuechanged messages + return Ok(Timestamp::new(0)); + } + + // If we requested a cancellation, then consider this watch cancelled + if count == 0 { + return Ok(Timestamp::new(0)); + } + + // Keep a record of the watch + opened_record.set_active_watch(ActiveWatch { + expiration_ts: owvresult.expiration_ts, + watch_node: owvresult.watch_node, + opt_value_changed_route: owvresult.opt_value_changed_route, + subkeys, + count, + }); + + Ok(owvresult.expiration_ts) } - // pub async fn cancel_watch_values( - // &self, - // key: TypedKey, - // subkeys: ValueSubkeyRangeSet, - // ) -> VeilidAPIResult { - // let inner = self.lock().await?; + pub async fn cancel_watch_values( + &self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + ) -> VeilidAPIResult { + let (subkeys, active_watch) = { + let inner = self.lock().await?; + let Some(opened_record) = inner.opened_records.get(&key) else { + apibail_generic!("record not open"); + }; - // // // Get the safety selection and the writer we opened this record with - // // let (safety_selection, opt_writer) = { - // // let Some(opened_record) = inner.opened_records.get(&key) else { - // // apibail_generic!("record not open"); - // // }; - // // ( - // // opened_record.safety_selection(), - // // opened_record.writer().cloned(), - // // ) - // // }; + // See what watch we have currently if any + let Some(active_watch) = opened_record.active_watch() else { + // If we didn't have an active watch, then we can just return false because there's nothing to do here + return Ok(false); + }; - // // // Get rpc processor and drop mutex so we don't block while requesting the watch from the network - // // let Some(rpc_processor) = inner.rpc_processor.clone() else { - // // apibail_try_again!("offline, try again later"); - // // }; + // Rewrite subkey range if empty to full + let subkeys = if subkeys.is_empty() { + ValueSubkeyRangeSet::full() + } else { + subkeys + }; - // // // Drop the lock for network access - // // drop(inner); + // Reduce the subkey range + let new_subkeys = active_watch.subkeys.difference(&subkeys); - // // // Use the safety selection we opened the record with - // // let expiration_ts = self - // // .outbound_watch_value( - // // rpc_processor, - // // key, - // // subkeys, - // // expiration, - // // count, - // // safety_selection, - // // opt_writer, - // // ) - // // .await?; + (new_subkeys, active_watch) + }; - // // Ok(expiration_ts) - // } + // If we have no subkeys left, then set the count to zero to indicate a full cancellation + let count = if subkeys.is_empty() { + 0 + } else { + active_watch.count + }; + + // Update the watch + let expiration_ts = self + .watch_values(key, subkeys, active_watch.expiration_ts, count) + .await?; + + // A zero expiration time means the watch is done or nothing is left, and the watch is no longer active + if expiration_ts.as_u64() == 0 { + return Ok(false); + } + + Ok(true) + } } diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index b2bae1c5..ce9d6de1 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -165,7 +165,7 @@ impl StorageManager { check_done, ); - match fanout_call.run().await { + match fanout_call.run(None).await { // If we don't finish in the timeout (too much time passed checking for consensus) TimeoutOr::Timeout => { // Return the best answer we've got diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index 4aa3008a..bf78b593 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -409,9 +409,9 @@ impl StorageManagerInner { Ok(descriptor) } - pub fn get_value_nodes(&mut self, key: TypedKey) -> VeilidAPIResult>> { + pub fn get_value_nodes(&self, key: TypedKey) -> VeilidAPIResult>> { // Get local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { + let Some(local_record_store) = self.local_record_store.as_ref() else { apibail_not_initialized!(); }; @@ -456,11 +456,11 @@ impl StorageManagerInner { Ok(()) } - pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult<()> { - let Some(_opened_record) = self.opened_records.remove(&key) else { + pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult { + let Some(opened_record) = self.opened_records.remove(&key) else { apibail_generic!("record not open"); }; - Ok(()) + Ok(opened_record) } pub async fn handle_get_local_value( diff --git a/veilid-core/src/storage_manager/types/opened_record.rs b/veilid-core/src/storage_manager/types/opened_record.rs index 8f47786c..e2a0a4a4 100644 --- a/veilid-core/src/storage_manager/types/opened_record.rs +++ b/veilid-core/src/storage_manager/types/opened_record.rs @@ -1,5 +1,19 @@ use super::*; +#[derive(Clone, Debug)] +pub struct ActiveWatch { + /// The expiration of a successful watch + pub expiration_ts: Timestamp, + /// Which node accepted the watch + pub watch_node: NodeRef, + /// Which private route is responsible for receiving ValueChanged notifications + pub opt_value_changed_route: Option, + /// Which subkeys we are watching + pub subkeys: ValueSubkeyRangeSet, + /// How many notifications are left + pub count: u32, +} + /// The state associated with a local record when it is opened /// This is not serialized to storage as it is ephemeral for the lifetime of the opened record #[derive(Clone, Debug, Default)] @@ -11,6 +25,9 @@ pub struct OpenedRecord { /// The safety selection in current use safety_selection: SafetySelection, + + /// Active watch we have on this record + active_watch: Option, } impl OpenedRecord { @@ -18,6 +35,7 @@ impl OpenedRecord { Self { writer, safety_selection, + active_watch: None, } } @@ -28,4 +46,16 @@ impl OpenedRecord { pub fn safety_selection(&self) -> SafetySelection { self.safety_selection } + + pub fn set_active_watch(&mut self, active_watch: ActiveWatch) { + self.active_watch = Some(active_watch); + } + + pub fn clear_active_watch(&mut self) { + self.active_watch = None; + } + + pub fn active_watch(&self) -> Option { + self.active_watch.clone() + } } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index b1062b9e..c9424ee5 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -2,8 +2,19 @@ use super::*; /// The context of the outbound_watch_value operation struct OutboundWatchValueContext { - /// The timestamp for the expiration of the watch we successfully got - pub opt_expiration_ts: Option, + /// A successful watch + pub opt_watch_value_result: Option, +} + +/// The result of the outbound_watch_value operation +#[derive(Debug, Clone)] +struct OutboundWatchValueResult { + /// The expiration of a successful watch + pub expiration_ts: Timestamp, + /// Which node accepted the watch + pub watch_node: NodeRef, + /// Which private route is responsible for receiving ValueChanged notifications + pub opt_value_changed_route: Option, } impl StorageManager { @@ -17,26 +28,30 @@ impl StorageManager { count: u32, safety_selection: SafetySelection, opt_watcher: Option, - ) -> VeilidAPIResult { + opt_watch_node: Option, + ) -> VeilidAPIResult> { let routing_table = rpc_processor.routing_table(); // Get the DHT parameters for 'WatchValue', some of which are the same for 'WatchValue' operations - let (key_count, timeout_us, rpc_timeout_us) = { + let (key_count, timeout_us) = { let c = self.unlocked_inner.config.get(); ( c.network.dht.max_find_node_count as usize, TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)), - TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)), ) }; - // Get the minimum expiration timestamp we will accept - let cur_ts = get_timestamp(); - let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64(); + // Get the nodes we know are caching this value to seed the fanout + let opt_init_fanout_queue = if let Some(watch_node) = opt_watch_node { + Some(vec![watch_node]) + } else { + let inner = self.inner.lock().await; + inner.get_value_nodes(key)? + }; // Make do-watch-value answer context let context = Arc::new(Mutex::new(OutboundWatchValueContext { - opt_expiration_ts: None, + opt_watch_value_result: None, })); // Routine to call to generate fanout @@ -59,11 +74,21 @@ impl StorageManager { .await? ); - // Keep the expiration_ts if we got one - if wva.answer.expiration_ts.as_u64() >= min_expiration_ts { - log_stor!(debug "Got expiration back: expiration_ts={}", wva.answer.expiration_ts); + // Keep answer if we got one + if wva.answer.expiration_ts.as_u64() > 0 { + if count > 0 { + // If we asked for a nonzero notification count, then this is an accepted watch + log_stor!(debug "Watch accepted: expiration_ts={}", wva.answer.expiration_ts); + } else { + // If we asked for a zero notification count, then this is a cancelled watch + log_stor!(debug "Watch cancelled"); + } let mut ctx = context.lock(); - ctx.opt_expiration_ts = Some(wva.answer.expiration_ts); + ctx.opt_watch_value_result = Some(OutboundWatchValueResult { + expiration_ts: wva.answer.expiration_ts, + watch_node: next_node.clone(), + opt_value_changed_route: wva.reply_private_route, + }); } // Return peers if we have some @@ -78,7 +103,7 @@ impl StorageManager { let check_done = |_closest_nodes: &[NodeRef]| { // If a watch has succeeded, return done let ctx = context.lock(); - if ctx.opt_expiration_ts.is_some() { + if ctx.opt_watch_value_result.is_some() { return Some(()); } None @@ -97,39 +122,39 @@ impl StorageManager { check_done, ); - match fanout_call.run().await { + match fanout_call.run(opt_init_fanout_queue).await { // If we don't finish in the timeout (too much time passed without a successful watch) TimeoutOr::Timeout => { // Return the best answer we've got let ctx = context.lock(); - if ctx.opt_expiration_ts.is_some() { + if ctx.opt_watch_value_result.is_some() { log_stor!(debug "WatchValue Fanout Timeout Success"); } else { log_stor!(debug "WatchValue Fanout Timeout Failure"); } - Ok(ctx.opt_expiration_ts.unwrap_or_default()) + Ok(ctx.opt_watch_value_result.clone()) } // If we finished with done TimeoutOr::Value(Ok(Some(()))) => { // Return the best answer we've got let ctx = context.lock(); - if ctx.opt_expiration_ts.is_some() { + if ctx.opt_watch_value_result.is_some() { log_stor!(debug "WatchValue Fanout Success"); } else { log_stor!(debug "WatchValue Fanout Failure"); } - Ok(ctx.opt_expiration_ts.unwrap_or_default()) + Ok(ctx.opt_watch_value_result.clone()) } // If we ran out of nodes TimeoutOr::Value(Ok(None)) => { // Return the best answer we've got let ctx = context.lock(); - if ctx.opt_expiration_ts.is_some() { + if ctx.opt_watch_value_result.is_some() { log_stor!(debug "WatchValue Fanout Exhausted Success"); } else { log_stor!(debug "WatchValue Fanout Exhausted Failure"); } - Ok(ctx.opt_expiration_ts.unwrap_or_default()) + Ok(ctx.opt_watch_value_result.clone()) } // Failed TimeoutOr::Value(Err(e)) => { diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 37f1e207..5520c6e7 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -350,6 +350,8 @@ impl RoutingContext { /// Cancels a watch early /// /// This is a convenience function that cancels watching all subkeys in a range + /// Returns Ok(true) if there is any remaining watch for this record + /// Returns Ok(false) if the entire watch has been cancelled pub async fn cancel_dht_watch( &self, key: TypedKey, diff --git a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs index 99ba294b..62d75fd9 100644 --- a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs +++ b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs @@ -16,6 +16,11 @@ impl ValueSubkeyRangeSet { data: Default::default(), } } + pub fn full() -> Self { + let mut data = RangeSetBlaze::new(); + data.ranges_insert(u32::MIN..=u32::MAX); + Self { data } + } pub fn new_with_data(data: RangeSetBlaze) -> Self { Self { data } } @@ -24,6 +29,23 @@ impl ValueSubkeyRangeSet { data.insert(value); Self { data } } + + pub fn interset(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet { + Self::new_with_data(self.data & other.data) + } + pub fn difference(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet { + Self::new_with_data(self.data - other.data) + } + pub fn union(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet { + Self::new_with_data(self.data | other.data) + } + + pub fn data(&self) -> RangeSetBlaze { + self.data().clone() + } + pub fn into_data(self) -> RangeSetBlaze { + self.data() + } } impl FromStr for ValueSubkeyRangeSet {