From 6d41039a5b517776bda2524fe310b65905bf5f45 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Wed, 2 Apr 2025 15:27:35 -0500 Subject: [PATCH] [ci skip] updating watch logic --- veilid-core/src/storage_manager/mod.rs | 49 +- .../src/storage_manager/outbound_watch.rs | 245 +++++++--- .../tasks/check_outbound_watches.rs | 431 ++++++++++++++---- .../src/storage_manager/watch_value.rs | 40 +- 4 files changed, 594 insertions(+), 171 deletions(-) diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index dff868d6..d04684f2 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -64,7 +64,7 @@ struct StorageManagerInner { /// Record subkeys that are currently being written to in the foreground pub active_subkey_writes: HashMap, /// State management for outbound watches - pub outbound_watches: HashMap, + pub outbound_watch_state: OutboundWatchState, /// Storage manager metadata that is persistent, including copy of offline subkey writes pub metadata_db: Option, /// Background processing task (not part of attachment manager tick tree so it happens when detached too) @@ -80,6 +80,7 @@ impl fmt::Debug for StorageManagerInner { .field("remote_record_store", &self.remote_record_store) .field("offline_subkey_writes", &self.offline_subkey_writes) .field("active_subkey_writes", &self.active_subkey_writes) + .field("outbound_watch_state", &self.outbound_watch_state) //.field("metadata_db", &self.metadata_db) //.field("tick_future", &self.tick_future) .finish() @@ -870,18 +871,22 @@ impl StorageManager { expiration: Timestamp, count: u32, ) -> VeilidAPIResult { + // Obtain the watch change lock + // (may need to wait for background operations to complete on the watch) + let watch_lock = self.outbound_watch_lock_table.lock_tag(key).await; + + // Obtain the inner state lock let inner = self.inner.lock().await; // Get the safety selection and the writer we opened this record - // and whatever active watch we may have in case this is a watch update - let (safety_selection, opt_writer, opt_active_watch) = { + let (safety_selection, opt_writer) = { let Some(opened_record) = inner.opened_records.get(&key) else { + // Record must be opened already to change watch apibail_generic!("record not open"); }; ( opened_record.safety_selection(), opened_record.writer().cloned(), - opened_record.outbound_watch().cloned(), ) }; @@ -903,14 +908,44 @@ impl StorageManager { }; let subkeys = schema.truncate_subkeys(&subkeys, None); - // Get rpc processor and drop mutex so we don't block while requesting the watch from the network - if !self.dht_is_online() { - apibail_try_again!("offline, try again later"); + // Calculate desired watch parameters + let desired = if count == 0 { + // Cancel + None + } else { + // Get the minimum and maximum expiration timestamp we will accept + let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| { + ( + TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)), + TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)), + ) + }); + let cur_ts = get_timestamp(); + let min_expiration_ts = Timestamp::new(cur_ts + rpc_timeout_us.as_u64()); + let expiration_ts = if expiration.as_u64() == 0 { + expiration + } else if expiration < min_expiration_ts { + apibail_invalid_argument!("expiration is too soon", "expiration", expiration); + } else { + expiration + }; + + // Create or modify + Some(OutboundWatchParameters { + expiration_ts, + count, + subkeys, + }) }; + // Modify the 'desired' state of the watch or add one if it does not exist + inner.outbound_watch_state.set_desired_watch(key, desired); + // Drop the lock for network access drop(inner); + // xxx continue here, make a 'reconcile outbound watch' routine that can be called imperatively, wait for it etc. + // Use the safety selection we opened the record with // Use the writer we opened with as the 'watcher' as well let opt_owvresult = self diff --git a/veilid-core/src/storage_manager/outbound_watch.rs b/veilid-core/src/storage_manager/outbound_watch.rs index 2104480b..4d1e3643 100644 --- a/veilid-core/src/storage_manager/outbound_watch.rs +++ b/veilid-core/src/storage_manager/outbound_watch.rs @@ -1,37 +1,27 @@ use super::*; -#[derive(Clone, Debug, Serialize, Deserialize)] -pub(in crate::storage_manager) struct PerNodeOutboundWatch { - /// The watch id returned from the watch node - pub id: u64, - /// The expiration of a successful watch - pub expiration_ts: Timestamp, - /// Which node accepted the watch - pub watch_node_id: TypedKey, - /// Resolved watch node reference - #[serde(skip)] - pub watch_node_ref: Option, - /// How many value change notifications are left - pub count: u32, -} +impl_veilid_log_facility!("stor"); /// Requested parameters for watch -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub(in crate::storage_manager) struct OutboundWatchParameters { - /// Requested expiration timestamp + /// Requested expiration timestamp. A zero timestamp here indicates + /// that the watch it to be renewed indefinitely pub expiration_ts: Timestamp, /// How many notifications the requestor asked for pub count: u32, /// Subkeys requested for this watch pub subkeys: ValueSubkeyRangeSet, + /// What key to use to perform the watch + pub opt_watcher: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] pub(in crate::storage_manager) struct OutboundWatchCurrent { /// Requested parameters pub params: OutboundWatchParameters, - /// Outbound watches per node - pub per_node: Vec, + /// Nodes that have an active watch on our behalf + pub nodes: Vec, /// Minimum expiration time for all our nodes pub min_expiration_ts: Timestamp, /// How many value change updates remain @@ -51,7 +41,98 @@ pub(in crate::storage_manager) struct OutboundWatch { pub desired: Option, } -#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +impl OutboundWatch { + /// Returns true if this outbound watch can be removed from the table + pub fn is_dead(&self) -> bool { + self.desired.is_none() && self.current.is_none() + } + + /// Returns true if this outbound watch needs to be cancelled + pub fn needs_cancel(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool { + if self.is_dead() { + veilid_log!(registry warn "should have checked for is_dead first"); + return false; + } + let Some(current) = self.current.as_ref() else { + return false; + }; + + // If the total number of changes has been reached + // then we're done and should cancel + if current.remaining_count == 0 { + return true; + } + + // If we have expired and can't renew, then cancel + if cur_ts >= current.params.expiration_ts { + return true; + } + + // If the desired parameters is None then cancel + let Some(desired) = self.desired.as_ref() else { + return true; + }; + // If the desired parameters is different than the current parameters + // then cancel so we can eventually reconcile to the new parameters + current.params != *desired + } + + /// Returns true if this outbound watch can be renewed + pub fn needs_renew(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool { + if self.is_dead() || self.needs_cancel(registry, cur_ts) { + veilid_log!(registry warn "should have checked for is_dead and needs_cancel first"); + return false; + } + // If there is no current watch then there is nothing to renew + let Some(current) = self.current.as_ref() else { + return false; + }; + cur_ts >= current.min_expiration_ts && cur_ts < current.params.expiration_ts + } + + /// Returns true if there is work to be done on getting the outbound + /// watch to its desired state + pub fn needs_reconcile( + &self, + registry: &VeilidComponentRegistry, + consensus_count: usize, + cur_ts: Timestamp, + ) -> bool { + if self.is_dead() + || self.needs_cancel(registry, cur_ts) + || self.needs_renew(registry, cur_ts) + { + veilid_log!(registry warn "should have checked for is_dead, needs_cancel first"); + return false; + } + + // If desired is none, then is_dead() or needs_cancel() should have been true + let Some(desired) = self.desired.as_ref() else { + veilid_log!(registry warn "is_dead() or needs_cancel() should have been true"); + return false; + }; + + // If there is a desired watch but no current watch, then reconcile + let Some(current) = self.current.as_ref() else { + return true; + }; + + // If the params are different, then needs_cancel() should have returned true + if current.params != *desired { + veilid_log!(registry warn "needs_cancel() should have returned true"); + return false; + } + // If we are still working on getting the 'current' state to match + // the 'desired' state, then + if current.nodes.len() != consensus_count { + return true; + } + // No work to do on this watch + false + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] pub(in crate::storage_manager) struct PerNodeKey { /// Watched record key pub record_key: TypedKey, @@ -63,13 +144,14 @@ pub(in crate::storage_manager) struct PerNodeKey { pub(in crate::storage_manager) struct PerNodeState { /// Watch Id pub watch_id: u64, - /// SafetySpec used to contact the node - pub safety_spec: SafetySpec, + /// SafetySelection used to contact the node + pub safety_selection: SafetySelection, + /// What key was used to perform the watch + pub opt_watcher: Option, /// The expiration of a successful watch pub expiration_ts: Timestamp, /// How many value change notifications are left pub count: u32, - /// Resolved watch node reference #[serde(skip)] pub watch_node_ref: Option, @@ -83,49 +165,94 @@ pub(in crate::storage_manager) struct OutboundWatchState { pub per_node_state: HashMap, } -impl OutboundWatchCurrent { - pub fn new( - params: OutboundWatchParameters, - opt_value_changed_route: Option, - ) -> Self { - let remaining_count = params.count; - let min_expiration_ts = params.expiration_ts; +impl Default for OutboundWatchState { + fn default() -> Self { + Self::new() + } +} +impl OutboundWatchState { + pub fn new() -> Self { Self { - params, - per_node: vec![], - min_expiration_ts, - remaining_count, - opt_value_changed_route, + outbound_watches: HashMap::new(), + per_node_state: HashMap::new(), } } - pub fn per_node_outbound_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeOutboundWatch> { - self.per_node.iter().find(|x| x.id == watch_id) - } - - pub fn per_node_outbound_watch_by_id_mut( + pub fn set_desired_watch( &mut self, - watch_id: u64, - ) -> Option<&mut PerNodeOutboundWatch> { - self.per_node.iter_mut().find(|x| x.id == watch_id) - } + record_key: TypedKey, + desired_watch: Option, + ) { + match self.outbound_watches.get_mut(&record_key) { + Some(w) => { + // Replace desired watch + w.desired = desired_watch; - pub fn remove_per_node_outbound_watch_by_id(&mut self, watch_id: u64) { - let Some(n) = self.per_node.iter().position(|x| x.id == watch_id) else { - return; - }; - self.per_node.remove(n); - - self.update_min_expiration_ts(); - } - - fn update_min_expiration_ts(&mut self) { - self.min_expiration_ts = self - .per_node - .iter() - .map(|x| x.expiration_ts) - .reduce(|a, b| a.min(b)) - .unwrap_or(self.params.expiration_ts); + // Remove if the watch is done + if w.current.is_none() && w.desired.is_none() { + self.outbound_watches.remove(&record_key); + } + } + None => { + // Watch does not exist, add one if that's what is desired + if desired_watch.is_some() { + self.outbound_watches.insert( + record_key, + OutboundWatch { + current: None, + desired: desired_watch, + }, + ); + } + } + } } } + +// impl OutboundWatchCurrent { +// pub fn new( +// params: OutboundWatchParameters, +// opt_value_changed_route: Option, +// ) -> Self { +// let remaining_count = params.count; +// let min_expiration_ts = params.expiration_ts; + +// Self { +// params, +// per_node: vec![], +// min_expiration_ts, +// remaining_count, +// opt_value_changed_route, +// } +// } + +// pub fn per_node_outbound_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeOutboundWatch> { +// self.per_node.iter().find(|x| x.id == watch_id) +// } + +// pub fn per_node_outbound_watch_by_id_mut( +// &mut self, +// watch_id: u64, +// ) -> Option<&mut PerNodeOutboundWatch> { +// self.per_node.iter_mut().find(|x| x.id == watch_id) +// } + +// pub fn remove_per_node_outbound_watch_by_id(&mut self, watch_id: u64) { +// let Some(n) = self.per_node.iter().position(|x| x.id == watch_id) else { +// return; +// }; +// self.per_node.remove(n); + +// self.update_min_expiration_ts(); +// } + +// fn update_min_expiration_ts(&mut self) { +// self.min_expiration_ts = self +// .per_node +// .iter() +// .map(|x| x.expiration_ts) +// .reduce(|a, b| a.min(b)) +// .unwrap_or(self.params.expiration_ts); +// } +// } diff --git a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs index dcb26038..6a22be36 100644 --- a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs @@ -1,114 +1,377 @@ +use futures_util::StreamExt as _; + use super::*; impl StorageManager { - async fn background_outbound_watch_cancel( - self, - watch_locked_key: AsyncTagLockGuard, - safety_selection: SafetySelection, - opt_watcher: Option, - mut outbound_watch: OutboundWatch, + /// Remove dead watches from the table + pub(super) async fn process_outbound_watch_dead( + &self, + watch_lock: AsyncTagLockGuard, ) { - let key = watch_locked_key.tag(); + let record_key = watch_lock.tag(); - // Last ditch cancellation of per-node watches that may not be fully exhausted - let cancelled_ids = self - .outbound_watch_value_cancel_set(key, safety_selection, opt_watcher, &outbound_watch) - .await; - - // Remove any fully cancelled watch ids - for cancelled_id in cancelled_ids { - outbound_watch.remove_per_node_outbound_watch_by_id(cancelled_id); - } - - // Ensure the watch is put into cancelled state - outbound_watch.remaining_count = 0; - - // Update the opened record let mut inner = self.inner.lock().await; - let Some(opened_record) = inner.opened_records.get_mut(&key) else { - // Already closed + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .remove(&record_key) + else { + veilid_log!(self warn "dead watch should have still been in the table"); return; }; - opened_record.clear_outbound_watch(); + + if outbound_watch.current.is_some() { + veilid_log!(self warn "dead watch still had current state"); + } + if outbound_watch.desired.is_some() { + veilid_log!(self warn "dead watch still had desired params"); + } + } + /// Get the list of remaining active watch ids + /// and call their nodes to cancel the watch + pub(super) async fn process_outbound_watch_cancel( + &self, + watch_lock: AsyncTagLockGuard, + ) { + let record_key = watch_lock.tag(); + + // If we can't do this operation right now, don't try + if !self.dht_is_online() { + return; + } + + let per_node_states = { + let inner = &mut *self.inner.lock().await; + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "watch being cancelled should have still been in the table"); + return; + }; + let Some(current) = &mut outbound_watch.current else { + veilid_log!(self warn "watch being cancelled should have current state"); + return; + }; + let mut per_node_states = vec![]; + let mut dead_pnks = BTreeSet::new(); + for pnk in ¤t.nodes { + let Some(per_node_state) = + inner.outbound_watch_state.per_node_state.get(&pnk).cloned() + else { + veilid_log!(self warn "missing per-node state for watch"); + dead_pnks.insert(*pnk); + continue; + }; + per_node_states.push((*pnk, per_node_state)); + } + current.nodes.retain(|x| !dead_pnks.contains(x)); + + per_node_states + }; + + // Now reach out to each node and cancel their watch ids + let mut unord = FuturesUnordered::new(); + for (pnk, pns) in per_node_states { + unord.push(async move { + let res = self + .outbound_watch_value_cancel( + pnk.record_key, + pns.safety_selection, + pns.opt_watcher, + pns.watch_node_ref.unwrap(), + pns.watch_id, + ) + .await; + (pnk, res) + }); + } + + let mut cancelled = vec![]; + while let Some((pnk, res)) = unord.next().await { + match res { + Ok(_) => { + // Remove from 'per node states' because we got some response + cancelled.push(pnk); + } + Err(e) => { + veilid_log!(self debug "outbound watch cancel error: {}", e); + // Leave in the 'per node states' for now because we couldn't contact the node + // but remove from this watch. We'll try the cancel again if we reach this node again during fanout. + } + } + } + + // Update state + { + let inner = &mut *self.inner.lock().await; + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "watch being cancelled should have still been in the table"); + return; + }; + let Some(current) = &mut outbound_watch.current else { + veilid_log!(self warn "watch being cancelled should have current state"); + return; + }; + + // Mark as dead now that we cancelled + outbound_watch.current = None; + } + } + + pub(super) async fn process_outbound_watch_renew( + &self, + watch_lock: AsyncTagLockGuard, + ) { + let record_key = watch_lock.tag(); + + // If we can't do this operation right now, don't try + if !self.dht_is_online() { + return; + } + + let (per_node_states, params, safety_selection) = { + let inner = &mut *self.inner.lock().await; + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "watch being cancelled should have still been in the table"); + return; + }; + let Some(current) = &mut outbound_watch.current else { + veilid_log!(self warn "watch being cancelled should have current state"); + return; + }; + let mut per_node_states = vec![]; + let mut dead_pnks = BTreeSet::new(); + for pnk in ¤t.nodes { + let Some(per_node_state) = + inner.outbound_watch_state.per_node_state.get(&pnk).cloned() + else { + veilid_log!(self warn "missing per-node state for watch"); + dead_pnks.insert(*pnk); + continue; + }; + per_node_states.push((*pnk, per_node_state)); + } + current.nodes.retain(|x| !dead_pnks.contains(x)); + + (per_node_states, current.params.clone()) + }; + + // Now reach out to each node and renew their watches + let mut unord = FuturesUnordered::new(); + let cur_ts = Timestamp::now(); + for (pnk, pns) in per_node_states { + unord.push(async move { + let res = self + .outbound_watch_value_change( + pnk.record_key, + params, + pns.safety_selection, + pns.watch_node_ref.unwrap(), + pns.watch_id, + ) + .await; + (pnk, res) + }); + } + + let mut renewed = vec![]; + let mut rejected = vec![]; + while let Some((pnk, res)) = unord.next().await { + match res { + Ok(accepted) => { + // Note per node states we should keep vs throw away + if accepted { + renewed.push(pnk); + } else { + rejected.push(pnk); + } + } + Err(e) => { + veilid_log!(self debug "outbound watch cancel error: {}", e); + // Leave in the 'per node states' for now because we couldn't contact the node + // but remove from this watch. + rejected.push(pnk); + } + } + } + + // // Update state + // { + // let inner = &mut *self.inner.lock().await; + // let Some(outbound_watch) = inner + // .outbound_watch_state + // .outbound_watches + // .get_mut(&record_key) + // else { + // veilid_log!(self warn "watch being cancelled should have still been in the table"); + // return; + // }; + // let Some(current) = &mut outbound_watch.current else { + // veilid_log!(self warn "watch being cancelled should have current state"); + // return; + // }; + + // // Mark as dead now that we cancelled + // outbound_watch.current = None; + // } + } + + pub(super) async fn process_outbound_watch_reconcile( + &self, + watch_lock: AsyncTagLockGuard, + ) { + let record_key = watch_lock.tag(); + // } // Check if client-side watches on opened records either have dead nodes or if the watch has expired - #[instrument(level = "trace", target = "stor", skip_all, err)] + //#[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn check_outbound_watches_task_routine( &self, _stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, ) -> EyreResult<()> { - let mut inner = self.inner.lock().await; - - let routing_table = self.routing_table(); - //let update_callback = self.update_callback(); + let inner = self.inner.lock().await; + // Iterate all outbound watches + let registry = self.registry(); let cur_ts = Timestamp::now(); - for (k, v) in inner.opened_records.iter_mut() { - let Some(outbound_watch) = v.outbound_watch() else { - continue; - }; + let consensus_count = self + .config() + .with(|c| c.network.dht.get_value_count as usize); - // See if the watch is expired or out of updates - if outbound_watch.min_expiration_ts <= cur_ts || outbound_watch.remaining_count == 0 { - // See if we can lock the outbound watch - let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(*k) else { - // Watch is busy, come back later + // Determine what work needs doing if any + for (k, v) in &inner.outbound_watch_state.outbound_watches { + let k = *k; + if v.is_dead() { + // Outbound watch is dead + let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else { continue; }; - let outbound_watch = outbound_watch.clone(); - let safety_selection = v.safety_selection(); - let opt_watcher = v.writer().cloned(); - - self.background_operation_processor.add_future( - self.clone().background_outbound_watch_cancel( - watch_lock, - safety_selection, - opt_watcher, - outbound_watch, - ), - ); - - // Clear active watch - v.remove_active_watch(outbound_watch.id); - - // // Send valuechange with dead count and no subkeys - // update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { - // key: *k, - // subkeys: ValueSubkeyRangeSet::new(), - // count: 0, - // value: None, - // }))); - } - - // See if the private route we're using is dead - let mut is_dead = false; - - if !is_dead { - if let Some(value_changed_route) = outbound_watch.opt_value_changed_route { - if routing_table - .route_spec_store() - .get_route_id_for_key(&value_changed_route) - .is_none() - { - // Route we would receive value changes on is dead - is_dead = true; + let fut = { + let registry = self.registry(); + async move { + registry + .storage_manager() + .process_outbound_watch_dead(watch_lock) + .await } - } - } - - for outbound_watch in &outbound_watch.per_node { - // See if the active watch's node is dead - if !outbound_watch.watch_node.state(cur_ts).is_alive() { - // Watched node is dead - is_dead = true; - } + }; + self.background_operation_processor.add_future(fut); + } else if v.needs_cancel(®istry, cur_ts) { + // Outbound watch needs to be cancelled + let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else { + continue; + }; + let fut = { + let registry = self.registry(); + async move { + registry + .storage_manager() + .process_outbound_watch_cancel(watch_lock) + .await + } + }; + self.background_operation_processor.add_future(fut); + } else if v.needs_renew(®istry, cur_ts) { + // Outbound watch expired but can be renewed + let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else { + continue; + }; + let fut = { + let registry = self.registry(); + async move { + registry + .storage_manager() + .process_outbound_watch_renew(watch_lock) + .await + } + }; + self.background_operation_processor.add_future(fut); + } else if v.needs_reconcile(®istry, consensus_count, cur_ts) { + // Outbound watch parameters have changed or it needs more nodes + let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else { + continue; + }; + let fut = { + let registry = self.registry(); + async move { + registry + .storage_manager() + .process_outbound_watch_reconcile(watch_lock) + .await + } + }; + self.background_operation_processor.add_future(fut); } } + // // See if the watch is expired or out of updates + // if outbound_watch.min_expiration_ts <= cur_ts || outbound_watch.remaining_count == 0 { + // // See if we can lock the outbound watch + // let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(*k) else { + // // Watch is busy, come back later + // continue; + // }; + + // let outbound_watch = outbound_watch.clone(); + // let safety_selection = v.safety_selection(); + // let opt_watcher = v.writer().cloned(); + + // self.background_operation_processor.add_future( + // self.clone().background_outbound_watch_cancel( + // watch_lock, + // safety_selection, + // opt_watcher, + // outbound_watch, + // ), + // ); + + // // Send valuechange with dead count and no subkeys + // update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { + // key: *k, + // subkeys: ValueSubkeyRangeSet::new(), + // count: 0, + // value: None, + // }))); + + // // See if the private route we're using is dead + // let mut is_dead = false; + + // if !is_dead { + // if let Some(value_changed_route) = outbound_watch.opt_value_changed_route { + // if routing_table + // .route_spec_store() + // .get_route_id_for_key(&value_changed_route) + // .is_none() + // { + // // Route we would receive value changes on is dead + // is_dead = true; + // } + // } + // } + + // for outbound_watch in &outbound_watch.per_node { + // // See if the active watch's node is dead + // if !outbound_watch.watch_node.state(cur_ts).is_alive() { + // // Watched node is dead + // is_dead = true; + // } + // } + // } + Ok(()) } } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index c11b995b..5806e4d8 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -84,8 +84,8 @@ impl StorageManager { pub(super) async fn outbound_watch_value_cancel( &self, key: TypedKey, - safety_selection: SafetySelection, opt_watcher: Option, + safety_selection: SafetySelection, watch_node: NodeRef, watch_id: u64, ) -> VeilidAPIResult { @@ -126,32 +126,31 @@ impl StorageManager { pub(super) async fn outbound_watch_value_change( &self, key: TypedKey, - subkeys: ValueSubkeyRangeSet, - expiration: Timestamp, - count: u32, + params: OutboundWatchParameters, safety_selection: SafetySelection, - opt_watcher: Option, - active_watch: &OutboundWatch, + watch_node: NodeRef, + watch_id: u64, ) -> VeilidAPIResult> { let routing_domain = RoutingDomain::PublicInternet; - if count == 0 { + if params.count == 0 { apibail_internal!("cancel should be done with outbound_watch_value_cancel"); } // Get the appropriate watcher key, if anonymous use a static anonymous watch key // which lives for the duration of the app's runtime - let watcher = - opt_watcher.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value); + let watcher = params + .opt_watcher + .unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value); let wva = VeilidAPIError::from_network_result( pin_future!(self.rpc_processor().rpc_call_watch_value( Destination::direct(watch_node.routing_domain_filtered(routing_domain)) .with_safety(safety_selection), key, - subkeys, - expiration, - count, + params.subkeys, + params.expiration, + params.count, watcher, Some(watch_id), )) @@ -166,9 +165,11 @@ impl StorageManager { } Ok(Some(OutboundWatchValueResult { - expiration_ts: wva.answer.expiration_ts, - watch_id: wva.answer.watch_id, - watch_node, + watch_nodes: vec![WatchNode { + watch_id: wva.answer.watch_id, + node_ref: watch_node, + expiration_ts: wva.answer.expiration_ts, + }], opt_value_changed_route: wva.reply_private_route, })) } else { @@ -177,18 +178,15 @@ impl StorageManager { } } - /// Perform a 'watch value' query on the network using fanout + /// Perform a 'watch value' query on the network using fanout XXX rewrite this so api-based cancel/change/new make sense #[allow(clippy::too_many_arguments)] #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_watch_value( &self, key: TypedKey, - subkeys: ValueSubkeyRangeSet, - expiration: Timestamp, - count: u32, + params: OutboundWatchParameters, safety_selection: SafetySelection, - opt_watcher: Option, - opt_active_watch: Option<&OutboundWatch>, + active_nodes: Vec, ) -> VeilidAPIResult> { // if the count is zero, we are cancelling if count == 0 {