diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index e01fd7b1..023e1f58 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -269,24 +269,35 @@ impl RPCProcessor { #[cfg(feature = "debug-dht")] log_rpc!(debug "Not close enough for watch value"); - (false, Timestamp::default(), watch_id.unwrap_or_default()) + (false, 0, watch_id.unwrap_or_default()) } else { // Accepted, lets try to watch or cancel it + let params = WatchParameters { + subkeys, + expiration: Timestamp::new(expiration), + count, + watcher, + target, + }; + // See if we have this record ourselves, if so, accept the watch let storage_manager = self.storage_manager(); - let (ret_expiration, ret_watch_id) = network_result_try!(storage_manager - .inbound_watch_value( - key, - subkeys.clone(), - Timestamp::new(expiration), - count, - watch_id, - target, - watcher - ) + let watch_result = network_result_try!(storage_manager + .inbound_watch_value(key, params, watch_id,) .await .map_err(RPCError::internal)?); + + // Encode the watch result + // Rejections and cancellations are treated the same way by clients + let (ret_expiration, ret_watch_id) = match watch_result { + WatchResult::Created { id, expiration } => (expiration.as_u64(), id), + WatchResult::Changed { expiration } => { + (expiration.as_u64(), watch_id.unwrap_or_default()) + } + WatchResult::Cancelled => (0, watch_id.unwrap_or_default()), + WatchResult::Rejected => (0, watch_id.unwrap_or_default()), + }; (true, ret_expiration, ret_watch_id) }; @@ -309,7 +320,7 @@ impl RPCProcessor { // Make WatchValue answer let watch_value_a = RPCOperationWatchValueA::new( ret_accepted, - ret_expiration.as_u64(), + ret_expiration, closer_to_key_peers, ret_watch_id, )?; diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 7fae7899..d5248985 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,27 +1,21 @@ mod debug; mod get_value; -mod keys; -mod limited_size; mod record_store; -mod record_store_limits; mod set_value; mod storage_manager_inner; mod tasks; mod types; mod watch_value; -use keys::*; -use limited_size::*; -use record_store::*; -use record_store_limits::*; -use storage_manager_inner::*; - -pub use types::*; - use super::*; use network_manager::*; +use record_store::*; use routing_table::*; use rpc_processor::*; +use storage_manager_inner::*; + +pub use record_store::{WatchParameters, WatchResult}; +pub use types::*; /// The maximum size of a single subkey const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN; @@ -317,9 +311,9 @@ impl StorageManager { ) .await?; if let Some(owvresult) = opt_owvresult { - if owvresult.expiration_ts.as_u64() == 0 { + if owvresult.expiration_ts.as_u64() != 0 { log_stor!(debug - "close record watch cancel should have old expiration, but got zero" + "close record watch cancel should have zero expiration" ); } } else { @@ -649,7 +643,7 @@ impl StorageManager { expiration.as_u64() }; - // If the expiration time is less than our minimum expiration time (or zero) consider this watch cancelled + // If the expiration time is less than our minimum expiration time (or zero) consider this watch inactive let mut expiration_ts = owvresult.expiration_ts; if expiration_ts.as_u64() < min_expiration_ts { return Ok(Timestamp::new(0)); @@ -664,7 +658,7 @@ impl StorageManager { if count == 0 { // Expiration returned should be zero if we requested a cancellation if expiration_ts.as_u64() != 0 { - log_stor!(debug "got unexpired watch despite asking for a cancellation"); + log_stor!(debug "got active watch despite asking for a cancellation"); } return Ok(Timestamp::new(0)); } @@ -719,12 +713,14 @@ impl StorageManager { active_watch.count }; - // Update the watch + // Update the watch. This just calls through to the above watch_values() function + // This will update the active_watch so we don't need to do that in this routine. let expiration_ts = self .watch_values(key, subkeys, active_watch.expiration_ts, count) .await?; - // A zero expiration time means the watch is done or nothing is left, and the watch is no longer active + // A zero expiration time returned from watch_value() means the watch is done + // or no subkeys are left, and the watch is no longer active if expiration_ts.as_u64() == 0 { // Return false indicating the watch is completely gone return Ok(false); diff --git a/veilid-core/src/storage_manager/keys.rs b/veilid-core/src/storage_manager/record_store/keys.rs similarity index 100% rename from veilid-core/src/storage_manager/keys.rs rename to veilid-core/src/storage_manager/record_store/keys.rs diff --git a/veilid-core/src/storage_manager/limited_size.rs b/veilid-core/src/storage_manager/record_store/limited_size.rs similarity index 100% rename from veilid-core/src/storage_manager/limited_size.rs rename to veilid-core/src/storage_manager/record_store/limited_size.rs diff --git a/veilid-core/src/storage_manager/types/local_record_detail.rs b/veilid-core/src/storage_manager/record_store/local_record_detail.rs similarity index 100% rename from veilid-core/src/storage_manager/types/local_record_detail.rs rename to veilid-core/src/storage_manager/record_store/local_record_detail.rs diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store/mod.rs similarity index 82% rename from veilid-core/src/storage_manager/record_store.rs rename to veilid-core/src/storage_manager/record_store/mod.rs index 79012da0..6f343b26 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -4,6 +4,27 @@ /// This store does not perform any validation on the schema, and all ValueRecordData passed in must have been previously validated. /// Uses an in-memory store for the records, backed by the TableStore. Subkey data is LRU cached and rotated out by a limits policy, /// and backed to the TableStore for persistence. +mod keys; +mod limited_size; +mod local_record_detail; +mod opened_record; +mod record; +mod record_data; +mod record_store_limits; +mod remote_record_detail; +mod watch; + +pub(super) use keys::*; +pub(super) use limited_size::*; +pub(super) use local_record_detail::*; +pub(super) use opened_record::*; +pub(super) use record::*; +pub(super) use record_data::*; +pub(super) use record_store_limits::*; +pub(super) use remote_record_detail::*; +pub(super) use watch::*; +pub use watch::{WatchParameters, WatchResult}; + use super::*; use hashlink::LruCache; @@ -22,31 +43,6 @@ where in_total_storage: bool, } -/// An individual watch -#[derive(Debug, Clone)] -struct WatchedRecordWatch { - id: u64, - subkeys: ValueSubkeyRangeSet, - expiration: Timestamp, - count: u32, - target: Target, - watcher: CryptoKey, - changed: ValueSubkeyRangeSet, -} - -#[derive(Debug, Default, Clone)] -/// A record being watched for changes -pub(super) struct WatchedRecord { - /// The list of active watchers - watchers: Vec, -} - -pub(super) enum WatchUpdateMode { - NoUpdate, - UpdateAll, - ExcludeTarget(Target), -} - pub(super) struct RecordStore where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, @@ -72,10 +68,9 @@ where /// The list of records that have changed since last flush to disk (optimization for batched writes) changed_records: HashSet, /// The list of records being watched for changes - watched_records: HashMap, + watched_records: HashMap, /// The list of watched records that have changed values since last notification changed_watched_values: HashSet, - /// A mutex to ensure we handle this concurrently purge_dead_records_mutex: Arc>, } @@ -423,6 +418,11 @@ where Ok(()) } + pub(super) fn contains_record(&mut self, key: TypedKey) -> bool { + let rtk = RecordTableKey { key }; + self.record_index.contains_key(&rtk) + } + pub(super) fn with_record(&mut self, key: TypedKey, f: F) -> Option where F: FnOnce(&Record) -> R, @@ -626,20 +626,30 @@ where &mut self, key: TypedKey, subkey: ValueSubkey, - opt_ignore_target: Option, + watch_update_mode: WatchUpdateMode, ) { + let (do_update, opt_ignore_target) = match watch_update_mode { + WatchUpdateMode::NoUpdate => (false, None), + WatchUpdateMode::UpdateAll => (true, None), + WatchUpdateMode::ExcludeTarget(target) => (true, Some(target)), + }; + if !do_update { + return; + } + let rtk = RecordTableKey { key }; let Some(wr) = self.watched_records.get_mut(&rtk) else { return; }; + // Update all watchers let mut changed = false; - for w in &mut wr.watchers { + for w in &mut wr.watches { // If this watcher is watching the changed subkey then add to the watcher's changed list // Don't bother marking changes for value sets coming from the same watching node/target because they // are already going to be aware of the changes in that case - if Some(&w.target) != opt_ignore_target.as_ref() - && w.subkeys.contains(subkey) + if Some(&w.params.target) != opt_ignore_target.as_ref() + && w.params.subkeys.contains(subkey) && w.changed.insert(subkey) { changed = true; @@ -743,37 +753,154 @@ where // Update storage space self.total_storage_space.commit().unwrap(); - // Update watched value - - let (do_update, opt_ignore_target) = match watch_update_mode { - WatchUpdateMode::NoUpdate => (false, None), - WatchUpdateMode::UpdateAll => (true, None), - WatchUpdateMode::ExcludeTarget(target) => (true, Some(target)), - }; - if do_update { - self.update_watched_value(key, subkey, opt_ignore_target) - .await; - } + // Send updates to + self.update_watched_value(key, subkey, watch_update_mode) + .await; Ok(()) } - /// Add an inbound record watch for changes + pub async fn _change_existing_watch( + &mut self, + key: TypedKey, + params: WatchParameters, + watch_id: u64, + ) -> VeilidAPIResult { + if params.count == 0 { + apibail_internal!("cancel watch should not have gotten here"); + } + if params.expiration.as_u64() == 0 { + apibail_internal!("zero expiration should have been resolved to max by now"); + } + // Get the watch list for this record + let rtk = RecordTableKey { key }; + let Some(watch_list) = self.watched_records.get_mut(&rtk) else { + // No watches, nothing to change + return Ok(WatchResult::Rejected); + }; + + // Check each watch to see if we have an exact match for the id to change + for w in &mut watch_list.watches { + // If the watch id doesn't match, then we're not updating + // Also do not allow the watcher key to change + if w.id == watch_id && w.params.watcher == params.watcher { + // Updating an existing watch + w.params = params; + return Ok(WatchResult::Changed { + expiration: w.params.expiration, + }); + } + } + + // No existing watch found + Ok(WatchResult::Rejected) + } + + pub async fn _create_new_watch( + &mut self, + key: TypedKey, + params: WatchParameters, + member_check: Box bool + Send>, + ) -> VeilidAPIResult { + // Generate a record-unique watch id > 0 + let rtk = RecordTableKey { key }; + let mut id = 0; + while id == 0 { + id = get_random_u64(); + } + if let Some(watched_record) = self.watched_records.get_mut(&rtk) { + // Make sure it doesn't match any other id (unlikely, but lets be certain) + 'x: loop { + for w in &mut watched_record.watches { + if w.id == id { + loop { + id = id.overflowing_add(1).0; + if id != 0 { + break; + } + } + continue 'x; + } + } + break; + } + } + + // Calculate watch limits + let mut watch_count = 0; + let mut target_watch_count = 0; + + let is_member = member_check(params.watcher); + + let rtk = RecordTableKey { key }; + if let Some(watched_record) = self.watched_records.get_mut(&rtk) { + // Total up the number of watches for this key + for w in &mut watched_record.watches { + // See if this watch should be counted toward any limits + let count_watch = if is_member { + // If the watcher is a member of the schema, then consider the total per-watcher key + w.params.watcher == params.watcher + } else { + // If the watcher is not a member of the schema, the check if this watch is an anonymous watch and contributes to per-record key total + !member_check(w.params.watcher) + }; + + // For any watch, if the target matches our also tally that separately + // If the watcher is a member of the schema, then consider the total per-target-per-watcher key + // If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-target-per-record key + if count_watch { + watch_count += 1; + if w.params.target == params.target { + target_watch_count += 1; + } + } + } + } + + // For members, no more than one watch per target per watcher per record + // For anonymous, no more than one watch per target per record + if target_watch_count > 0 { + // Too many watches + return Ok(WatchResult::Rejected); + } + + // Check watch table for limits + let watch_limit = if is_member { + self.limits.member_watch_limit + } else { + self.limits.public_watch_limit + }; + if watch_count >= watch_limit { + return Ok(WatchResult::Rejected); + } + + // Ok this is an acceptable new watch, add it + let watch_list = self.watched_records.entry(rtk).or_default(); + let expiration = params.expiration; + watch_list.watches.push(Watch { + params, + id, + changed: ValueSubkeyRangeSet::new(), + }); + Ok(WatchResult::Created { id, expiration }) + } + + /// Add or update an inbound record watch for changes #[allow(clippy::too_many_arguments)] pub async fn watch_record( &mut self, key: TypedKey, - subkeys: ValueSubkeyRangeSet, - mut expiration: Timestamp, - count: u32, - watch_id: Option, - target: Target, - watcher: PublicKey, - ) -> VeilidAPIResult> { - // If subkeys is empty or count is zero then we're cancelling a watch completely - if subkeys.is_empty() || count == 0 { - if let Some(watch_id) = watch_id { - return self.cancel_watch(key, watch_id, watcher).await; + mut params: WatchParameters, + opt_watch_id: Option, + ) -> VeilidAPIResult { + // If count is zero then we're cancelling a watch completely + if params.count == 0 { + if let Some(watch_id) = opt_watch_id { + let cancelled = self.cancel_watch(key, watch_id, params.watcher).await?; + if cancelled { + return Ok(WatchResult::Cancelled); + } + return Ok(WatchResult::Rejected); } apibail_internal!("shouldn't have let a None watch id get here"); } @@ -783,161 +910,67 @@ where let max_ts = cur_ts + self.limits.max_watch_expiration.as_u64(); let min_ts = cur_ts + self.limits.min_watch_expiration.as_u64(); - if expiration.as_u64() == 0 || expiration.as_u64() > max_ts { + if params.expiration.as_u64() == 0 || params.expiration.as_u64() > max_ts { // Clamp expiration max time (or set zero expiration to max) - expiration = Timestamp::new(max_ts); - } else if expiration.as_u64() < min_ts { + params.expiration = Timestamp::new(max_ts); + } else if params.expiration.as_u64() < min_ts { // Don't add watches with too low of an expiration time - return Ok(None); + if let Some(watch_id) = opt_watch_id { + let cancelled = self.cancel_watch(key, watch_id, params.watcher).await?; + if cancelled { + return Ok(WatchResult::Cancelled); + } + } + return Ok(WatchResult::Rejected); } - // Get the record being watched - let Some(is_member) = self.with_record(key, |record| { - // Check if the watcher specified is a schema member + // Make a closure to check for member vs anonymous + let Some(member_check) = self.with_record(key, |record| { let schema = record.schema(); - (*record.owner()) == watcher || schema.is_member(&watcher) + let owner = *record.owner(); + Box::new(move |watcher| owner == params.watcher || schema.is_member(&watcher)) }) else { // Record not found - return Ok(None); + return Ok(WatchResult::Rejected); }; - // Generate a record-unique watch id > 0 if one is not specified - let rtk = RecordTableKey { key }; - let mut new_watch = false; - let watch_id = watch_id.unwrap_or_else(|| { - new_watch = true; - let mut id = 0; - while id == 0 { - id = get_random_u64(); - } - if let Some(watched_record) = self.watched_records.get_mut(&rtk) { - // Make sure it doesn't match any other id (unlikely, but lets be certain) - 'x: loop { - for w in &mut watched_record.watchers { - if w.id == id { - loop { - id = id.overflowing_add(1).0; - if id != 0 { - break; - } - } - continue 'x; - } - } - break; - } - } - id - }); - - // See if we are updating an existing watch - // with the watcher matched on target - let mut watch_count = 0; - let mut target_watch_count = 0; - if let Some(watched_record) = self.watched_records.get_mut(&rtk) { - for w in &mut watched_record.watchers { - // Total up the number of watches for this key - // If the watcher is a member of the schema, then consider the total per-watcher key - // If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-record key - if !is_member || w.watcher == watcher { - watch_count += 1; - - // For any watch, if the target matches our also tally that separately - // If the watcher is a member of the schema, then consider the total per-target-per-watcher key - // If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-target-per-record key - if w.target == target { - target_watch_count += 1; - } - } - - // If this is not a new watch and the watch id doesn't match, then we're not updating - if !new_watch && w.id == watch_id { - // Updating an existing watch - // You can change a watcher key, or target via this update - // as well as the subkey range, expiration and count of the watch - w.watcher = watcher; - w.target = target; - w.subkeys = subkeys; - w.expiration = expiration; - w.count = count; - return Ok(Some((expiration, watch_id))); - } - } - } - - // Only proceed here if this is a new watch - if !new_watch { - // Not a new watch - return Ok(None); - } - - // For members, no more than one watch per target per watcher per record - // For anonymous, no more than one watch per target per record - if target_watch_count > 0 { - // Too many watches - return Ok(None); - } - - // Adding a new watcher to a watch - // Check watch table for limits - if is_member { - // Member watch - if watch_count >= self.limits.member_watch_limit { - // Too many watches - return Ok(None); - } + // Create or update depending on if a watch id is specified or not + if let Some(watch_id) = opt_watch_id { + self._change_existing_watch(key, params, watch_id).await } else { - // Public watch - // No more than one - if watch_count >= self.limits.public_watch_limit { - // Too many watches - return Ok(None); - } + self._create_new_watch(key, params, member_check).await } - - // Ok this is an acceptable new watch, add it - let watch = self.watched_records.entry(rtk).or_default(); - watch.watchers.push(WatchedRecordWatch { - id: watch_id, - subkeys, - expiration, - count, - target, - watcher, - changed: ValueSubkeyRangeSet::new(), - }); - Ok(Some((expiration, watch_id))) } /// Clear a specific watch for a record + /// returns true if the watch was found and cancelled async fn cancel_watch( &mut self, key: TypedKey, watch_id: u64, watcher: PublicKey, - ) -> VeilidAPIResult> { + ) -> VeilidAPIResult { if watch_id == 0 { - apibail_internal!("should not have let a a zero watch id get here"); + apibail_internal!("should not have let a zero watch id get here"); } // See if we are cancelling an existing watch - // with the watcher matched on target let rtk = RecordTableKey { key }; let mut is_empty = false; - let mut ret = None; - if let Some(watch) = self.watched_records.get_mut(&rtk) { + let mut ret = false; + if let Some(watch_list) = self.watched_records.get_mut(&rtk) { let mut dead_watcher = None; - for (wn, w) in watch.watchers.iter_mut().enumerate() { + for (wn, w) in watch_list.watches.iter_mut().enumerate() { // Must match the watch id and the watcher key to cancel - if w.id == watch_id && w.watcher == watcher { + if w.id == watch_id && w.params.watcher == watcher { // Canceling an existing watch dead_watcher = Some(wn); - ret = Some((w.expiration, watch_id)); + ret = true; break; } } if let Some(dw) = dead_watcher { - watch.watchers.remove(dw); - if watch.watchers.is_empty() { + watch_list.watches.remove(dw); + if watch_list.watches.is_empty() { is_empty = true; } } @@ -953,8 +986,8 @@ where pub fn move_watches( &mut self, key: TypedKey, - in_watch: Option<(WatchedRecord, bool)>, - ) -> Option<(WatchedRecord, bool)> { + in_watch: Option<(WatchList, bool)>, + ) -> Option<(WatchList, bool)> { let rtk = RecordTableKey { key }; let out = self.watched_records.remove(&rtk); if let Some(in_watch) = in_watch { @@ -983,7 +1016,7 @@ where if let Some(watch) = self.watched_records.get_mut(&rtk) { // Process watch notifications let mut dead_watchers = vec![]; - for (wn, w) in watch.watchers.iter_mut().enumerate() { + for (wn, w) in watch.watches.iter_mut().enumerate() { // Get the subkeys that have changed let subkeys = w.changed.clone(); @@ -996,14 +1029,14 @@ where // Reduce the count of changes sent // if count goes to zero mark this watcher dead - w.count -= 1; - let count = w.count; + w.params.count -= 1; + let count = w.params.count; if count == 0 { dead_watchers.push(wn); } evcis.push(EarlyValueChangedInfo { - target: w.target, + target: w.params.target, key: rtk.key, subkeys, count, @@ -1013,8 +1046,8 @@ where // Remove in reverse so we don't have to offset the index to remove the right key for dw in dead_watchers.iter().rev().copied() { - watch.watchers.remove(dw); - if watch.watchers.is_empty() { + watch.watches.remove(dw); + if watch.watches.is_empty() { empty_watched_records.push(rtk); } } diff --git a/veilid-core/src/storage_manager/types/opened_record.rs b/veilid-core/src/storage_manager/record_store/opened_record.rs similarity index 100% rename from veilid-core/src/storage_manager/types/opened_record.rs rename to veilid-core/src/storage_manager/record_store/opened_record.rs diff --git a/veilid-core/src/storage_manager/types/record.rs b/veilid-core/src/storage_manager/record_store/record.rs similarity index 100% rename from veilid-core/src/storage_manager/types/record.rs rename to veilid-core/src/storage_manager/record_store/record.rs diff --git a/veilid-core/src/storage_manager/types/record_data.rs b/veilid-core/src/storage_manager/record_store/record_data.rs similarity index 100% rename from veilid-core/src/storage_manager/types/record_data.rs rename to veilid-core/src/storage_manager/record_store/record_data.rs diff --git a/veilid-core/src/storage_manager/record_store_limits.rs b/veilid-core/src/storage_manager/record_store/record_store_limits.rs similarity index 100% rename from veilid-core/src/storage_manager/record_store_limits.rs rename to veilid-core/src/storage_manager/record_store/record_store_limits.rs diff --git a/veilid-core/src/storage_manager/types/remote_record_detail.rs b/veilid-core/src/storage_manager/record_store/remote_record_detail.rs similarity index 100% rename from veilid-core/src/storage_manager/types/remote_record_detail.rs rename to veilid-core/src/storage_manager/record_store/remote_record_detail.rs diff --git a/veilid-core/src/storage_manager/record_store/watch.rs b/veilid-core/src/storage_manager/record_store/watch.rs new file mode 100644 index 00000000..60440a29 --- /dev/null +++ b/veilid-core/src/storage_manager/record_store/watch.rs @@ -0,0 +1,66 @@ +use super::*; + +/// Watch parameters used to configure a watch +#[derive(Debug, Clone)] +pub struct WatchParameters { + /// The range of subkeys being watched, empty meaning full + pub subkeys: ValueSubkeyRangeSet, + /// When this watch will expire + pub expiration: Timestamp, + /// How many updates are left before forced expiration + pub count: u32, + /// The watching schema member key, or an anonymous key + pub watcher: PublicKey, + /// The place where updates are sent + pub target: Target, +} + +/// Watch result to return with answer +/// Default result is cancelled/expired/inactive/rejected +#[derive(Debug, Clone)] +pub enum WatchResult { + /// A new watch was created + Created { + /// The new id of the watch + id: u64, + /// The expiration timestamp of the watch. This should never be zero. + expiration: Timestamp, + }, + /// An existing watch was modified + Changed { + /// The new expiration timestamp of the modified watch. This should never be zero. + expiration: Timestamp, + }, + /// An existing watch was cancelled + Cancelled, + /// The request was rejected due to invalid parameters or a missing watch + Rejected, +} + +/// An individual watch +#[derive(Debug, Clone)] +pub struct Watch { + /// The configuration of the watch + pub params: WatchParameters, + /// A unique id per record assigned at watch creation time. Used to disambiguate a client's version of a watch + pub id: u64, + /// What has changed since the last update + pub changed: ValueSubkeyRangeSet, +} + +#[derive(Debug, Default, Clone)] +/// A record being watched for changes +pub struct WatchList { + /// The list of active watches + pub watches: Vec, +} + +/// How a watch gets updated when a value changes +pub enum WatchUpdateMode { + /// Update no watchers + NoUpdate, + /// Update all watchers + UpdateAll, + /// Update all watchers except ones that come from a specific target + ExcludeTarget(Target), +} diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index 3f5fa6ac..6a198ca6 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -537,26 +537,6 @@ impl StorageManagerInner { Ok(()) } - #[allow(clippy::too_many_arguments)] - pub(super) async fn handle_watch_local_value( - &mut self, - key: TypedKey, - subkeys: ValueSubkeyRangeSet, - expiration: Timestamp, - count: u32, - watch_id: Option, - target: Target, - watcher: PublicKey, - ) -> VeilidAPIResult> { - // See if it's in the local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - local_record_store - .watch_record(key, subkeys, expiration, count, watch_id, target, watcher) - .await - } - pub(super) async fn handle_get_remote_value( &mut self, key: TypedKey, @@ -614,26 +594,6 @@ impl StorageManagerInner { Ok(()) } - #[allow(clippy::too_many_arguments)] - pub(super) async fn handle_watch_remote_value( - &mut self, - key: TypedKey, - subkeys: ValueSubkeyRangeSet, - expiration: Timestamp, - count: u32, - watch_id: Option, - target: Target, - watcher: PublicKey, - ) -> VeilidAPIResult> { - // See if it's in the remote record store - let Some(remote_record_store) = self.remote_record_store.as_mut() else { - apibail_not_initialized!(); - }; - remote_record_store - .watch_record(key, subkeys, expiration, count, watch_id, target, watcher) - .await - } - /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey where diff --git a/veilid-core/src/storage_manager/types/mod.rs b/veilid-core/src/storage_manager/types/mod.rs index 71c1e84c..9eea4ad8 100644 --- a/veilid-core/src/storage_manager/types/mod.rs +++ b/veilid-core/src/storage_manager/types/mod.rs @@ -1,17 +1,7 @@ -mod local_record_detail; -mod opened_record; -mod record; -mod record_data; -mod remote_record_detail; mod signed_value_data; mod signed_value_descriptor; use super::*; -pub(super) use local_record_detail::*; -pub(super) use opened_record::*; -pub(super) use record::*; -pub(super) use record_data::*; -pub(super) use remote_record_detail::*; pub use signed_value_data::*; pub use signed_value_descriptor::*; diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 94306559..d1093695 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -20,7 +20,7 @@ pub(super) struct OutboundWatchValueResult { } impl StorageManager { - /// Perform a 'watch value' query on the network + /// Perform a 'watch value' query on the network using fanout #[allow(clippy::too_many_arguments)] pub(super) async fn outbound_watch_value( &self, @@ -91,12 +91,12 @@ impl StorageManager { // Keep answer if we got one if wva.answer.accepted { - if expiration_ts.as_u64() > 0 { + if wva.answer.expiration_ts.as_u64() > 0 { // If the expiration time is greater than zero this watch is active log_stor!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64())); } else { - // If we asked for a zero notification count, then this is a cancelled watch - log_stor!(debug "Watch cancelled: id={}", wva.answer.watch_id); + // If the returned expiration time is zero, this watch was cancelled, or inactive + log_stor!(debug "Watch inactive: id={}", wva.answer.watch_id); } let mut ctx = context.lock(); ctx.opt_watch_value_result = Some(OutboundWatchValueResult { @@ -186,51 +186,40 @@ impl StorageManager { pub async fn inbound_watch_value( &self, key: TypedKey, - subkeys: ValueSubkeyRangeSet, - expiration: Timestamp, - count: u32, + params: WatchParameters, watch_id: Option, - target: Target, - watcher: PublicKey, - ) -> VeilidAPIResult> { + ) -> VeilidAPIResult> { let mut inner = self.lock().await?; // Validate input - if (subkeys.is_empty() || count == 0) && (watch_id.unwrap_or_default() == 0) { + if params.count == 0 && (watch_id.unwrap_or_default() == 0) { // Can't cancel a watch without a watch id return VeilidAPIResult::Ok(NetworkResult::invalid_message( "can't cancel watch without id", )); } - // See if this is a remote or local value - let (_is_local, opt_ret) = { - // See if the subkey we are watching has a local value - let opt_ret = inner - .handle_watch_local_value( - key, - subkeys.clone(), - expiration, - count, - watch_id, - target, - watcher, - ) - .await?; - if opt_ret.is_some() { - (true, opt_ret) - } else { - // See if the subkey we are watching is a remote value - let opt_ret = inner - .handle_watch_remote_value( - key, subkeys, expiration, count, watch_id, target, watcher, - ) - .await?; - (false, opt_ret) - } + // Try from local and remote record stores + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); }; - - Ok(NetworkResult::value(opt_ret.unwrap_or_default())) + if local_record_store.contains_record(key) { + return local_record_store + .watch_record(key, params, watch_id) + .await + .map(NetworkResult::value); + } + let Some(remote_record_store) = inner.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if remote_record_store.contains_record(key) { + return remote_record_store + .watch_record(key, params, watch_id) + .await + .map(NetworkResult::value); + } + // No record found + Ok(NetworkResult::value(WatchResult::Rejected)) } /// Handle a received 'Value Changed' statement