diff --git a/veilid-core/src/storage_manager/outbound_watch.rs b/veilid-core/src/storage_manager/outbound_watch.rs new file mode 100644 index 00000000..2104480b --- /dev/null +++ b/veilid-core/src/storage_manager/outbound_watch.rs @@ -0,0 +1,131 @@ +use super::*; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(in crate::storage_manager) struct PerNodeOutboundWatch { + /// The watch id returned from the watch node + pub id: u64, + /// The expiration of a successful watch + pub expiration_ts: Timestamp, + /// Which node accepted the watch + pub watch_node_id: TypedKey, + /// Resolved watch node reference + #[serde(skip)] + pub watch_node_ref: Option, + /// How many value change notifications are left + pub count: u32, +} + +/// Requested parameters for watch +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(in crate::storage_manager) struct OutboundWatchParameters { + /// Requested expiration timestamp + pub expiration_ts: Timestamp, + /// How many notifications the requestor asked for + pub count: u32, + /// Subkeys requested for this watch + pub subkeys: ValueSubkeyRangeSet, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(in crate::storage_manager) struct OutboundWatchCurrent { + /// Requested parameters + pub params: OutboundWatchParameters, + /// Outbound watches per node + pub per_node: Vec, + /// Minimum expiration time for all our nodes + pub min_expiration_ts: Timestamp, + /// How many value change updates remain + pub remaining_count: u32, + /// Which private route is responsible for receiving ValueChanged notifications + pub opt_value_changed_route: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(in crate::storage_manager) struct OutboundWatch { + /// Current state + /// None means inactive/cancelled + pub current: Option, + + /// Desired parameters + /// None means cancelled + pub desired: Option, +} + +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +pub(in crate::storage_manager) struct PerNodeKey { + /// Watched record key + pub record_key: TypedKey, + /// Watching node id + pub node_id: TypedKey, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(in crate::storage_manager) struct PerNodeState { + /// Watch Id + pub watch_id: u64, + /// SafetySpec used to contact the node + pub safety_spec: SafetySpec, + /// The expiration of a successful watch + pub expiration_ts: Timestamp, + /// How many value change notifications are left + pub count: u32, + + /// Resolved watch node reference + #[serde(skip)] + pub watch_node_ref: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(in crate::storage_manager) struct OutboundWatchState { + /// Each watch per record key + pub outbound_watches: HashMap, + /// Last known active watch per node+record + pub per_node_state: HashMap, +} + +impl OutboundWatchCurrent { + pub fn new( + params: OutboundWatchParameters, + opt_value_changed_route: Option, + ) -> Self { + let remaining_count = params.count; + let min_expiration_ts = params.expiration_ts; + + Self { + params, + per_node: vec![], + min_expiration_ts, + remaining_count, + opt_value_changed_route, + } + } + + pub fn per_node_outbound_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeOutboundWatch> { + self.per_node.iter().find(|x| x.id == watch_id) + } + + pub fn per_node_outbound_watch_by_id_mut( + &mut self, + watch_id: u64, + ) -> Option<&mut PerNodeOutboundWatch> { + self.per_node.iter_mut().find(|x| x.id == watch_id) + } + + pub fn remove_per_node_outbound_watch_by_id(&mut self, watch_id: u64) { + let Some(n) = self.per_node.iter().position(|x| x.id == watch_id) else { + return; + }; + self.per_node.remove(n); + + self.update_min_expiration_ts(); + } + + fn update_min_expiration_ts(&mut self) { + self.min_expiration_ts = self + .per_node + .iter() + .map(|x| x.expiration_ts) + .reduce(|a, b| a.min(b)) + .unwrap_or(self.params.expiration_ts); + } +} diff --git a/veilid-core/src/storage_manager/record_store/inbound_watch.rs b/veilid-core/src/storage_manager/record_store/inbound_watch.rs new file mode 100644 index 00000000..a9ab3cc5 --- /dev/null +++ b/veilid-core/src/storage_manager/record_store/inbound_watch.rs @@ -0,0 +1,66 @@ +use super::*; + +/// Watch parameters used to configure a watch +#[derive(Debug, Clone)] +pub struct InboundWatchParameters { + /// The range of subkeys being watched, empty meaning full + pub subkeys: ValueSubkeyRangeSet, + /// When this watch will expire + pub expiration: Timestamp, + /// How many updates are left before forced expiration + pub count: u32, + /// The watching schema member key, or an anonymous key + pub watcher: PublicKey, + /// The place where updates are sent + pub target: Target, +} + +/// Watch result to return with answer +/// Default result is cancelled/expired/inactive/rejected +#[derive(Debug, Clone)] +pub enum InboundWatchResult { + /// A new watch was created + Created { + /// The new id of the watch + id: u64, + /// The expiration timestamp of the watch. This should never be zero. + expiration: Timestamp, + }, + /// An existing watch was modified + Changed { + /// The new expiration timestamp of the modified watch. This should never be zero. + expiration: Timestamp, + }, + /// An existing watch was cancelled + Cancelled, + /// The request was rejected due to invalid parameters or a missing watch + Rejected, +} + +/// An individual watch +#[derive(Debug, Clone)] +pub struct InboundWatch { + /// The configuration of the watch + pub params: InboundWatchParameters, + /// A unique id per record assigned at watch creation time. Used to disambiguate a client's version of a watch + pub id: u64, + /// What has changed since the last update + pub changed: ValueSubkeyRangeSet, +} + +#[derive(Debug, Default, Clone)] +/// A record being watched for changes +pub struct InboundWatchList { + /// The list of active watches + pub watches: Vec, +} + +/// How a watch gets updated when a value changes +pub enum InboundWatchUpdateMode { + /// Update no watchers + NoUpdate, + /// Update all watchers + UpdateAll, + /// Update all watchers except ones that come from a specific target + ExcludeTarget(Target), +} diff --git a/veilid-core/src/storage_manager/tasks/check_inbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_inbound_watches.rs new file mode 100644 index 00000000..3c8638b3 --- /dev/null +++ b/veilid-core/src/storage_manager/tasks/check_inbound_watches.rs @@ -0,0 +1,23 @@ +use super::*; + +impl StorageManager { + // Check if server-side watches have expired + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn check_inbound_watches_task_routine( + &self, + _stop_token: StopToken, + _last_ts: Timestamp, + _cur_ts: Timestamp, + ) -> EyreResult<()> { + let mut inner = self.inner.lock().await; + + if let Some(local_record_store) = &mut inner.local_record_store { + local_record_store.check_watched_records(); + } + if let Some(remote_record_store) = &mut inner.remote_record_store { + remote_record_store.check_watched_records(); + } + + Ok(()) + } +} diff --git a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs new file mode 100644 index 00000000..dcb26038 --- /dev/null +++ b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs @@ -0,0 +1,114 @@ +use super::*; + +impl StorageManager { + async fn background_outbound_watch_cancel( + self, + watch_locked_key: AsyncTagLockGuard, + safety_selection: SafetySelection, + opt_watcher: Option, + mut outbound_watch: OutboundWatch, + ) { + let key = watch_locked_key.tag(); + + // Last ditch cancellation of per-node watches that may not be fully exhausted + let cancelled_ids = self + .outbound_watch_value_cancel_set(key, safety_selection, opt_watcher, &outbound_watch) + .await; + + // Remove any fully cancelled watch ids + for cancelled_id in cancelled_ids { + outbound_watch.remove_per_node_outbound_watch_by_id(cancelled_id); + } + + // Ensure the watch is put into cancelled state + outbound_watch.remaining_count = 0; + + // Update the opened record + let mut inner = self.inner.lock().await; + let Some(opened_record) = inner.opened_records.get_mut(&key) else { + // Already closed + return; + }; + opened_record.clear_outbound_watch(); + } + + // Check if client-side watches on opened records either have dead nodes or if the watch has expired + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn check_outbound_watches_task_routine( + &self, + _stop_token: StopToken, + _last_ts: Timestamp, + _cur_ts: Timestamp, + ) -> EyreResult<()> { + let mut inner = self.inner.lock().await; + + let routing_table = self.routing_table(); + //let update_callback = self.update_callback(); + + let cur_ts = Timestamp::now(); + for (k, v) in inner.opened_records.iter_mut() { + let Some(outbound_watch) = v.outbound_watch() else { + continue; + }; + + // See if the watch is expired or out of updates + if outbound_watch.min_expiration_ts <= cur_ts || outbound_watch.remaining_count == 0 { + // See if we can lock the outbound watch + let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(*k) else { + // Watch is busy, come back later + continue; + }; + + let outbound_watch = outbound_watch.clone(); + let safety_selection = v.safety_selection(); + let opt_watcher = v.writer().cloned(); + + self.background_operation_processor.add_future( + self.clone().background_outbound_watch_cancel( + watch_lock, + safety_selection, + opt_watcher, + outbound_watch, + ), + ); + + // Clear active watch + v.remove_active_watch(outbound_watch.id); + + // // Send valuechange with dead count and no subkeys + // update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { + // key: *k, + // subkeys: ValueSubkeyRangeSet::new(), + // count: 0, + // value: None, + // }))); + } + + // See if the private route we're using is dead + let mut is_dead = false; + + if !is_dead { + if let Some(value_changed_route) = outbound_watch.opt_value_changed_route { + if routing_table + .route_spec_store() + .get_route_id_for_key(&value_changed_route) + .is_none() + { + // Route we would receive value changes on is dead + is_dead = true; + } + } + } + + for outbound_watch in &outbound_watch.per_node { + // See if the active watch's node is dead + if !outbound_watch.watch_node.state(cur_ts).is_alive() { + // Watched node is dead + is_dead = true; + } + } + } + + Ok(()) + } +}