diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 8d0a9f00..729f22c1 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -276,6 +276,12 @@ impl RoutingTable { inner.route_spec_store = Some(route_spec_store); } + // Inform storage manager we are up + self.network_manager + .storage_manager() + .set_routing_table(Some(self.clone())) + .await; + debug!("finished routing table init"); Ok(()) } @@ -284,6 +290,12 @@ impl RoutingTable { pub async fn terminate(&self) { debug!("starting routing table terminate"); + // Stop storage manager from using us + self.network_manager + .storage_manager() + .set_routing_table(None) + .await; + // Stop tasks self.cancel_tasks().await; diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index bb65ebc9..36ee21b7 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -33,6 +33,8 @@ const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1; const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1; /// Frequency to send ValueChanged notifications to the network const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1; +/// Frequence to check for dead nodes and routes for active watches +const CHECK_ACTIVE_WATCHES_INTERVAL_SECS: u32 = 1; #[derive(Debug, Clone)] /// A single 'value changed' message to send @@ -55,6 +57,7 @@ struct StorageManagerUnlockedInner { flush_record_stores_task: TickTask, offline_subkey_writes_task: TickTask, send_value_changes_task: TickTask, + check_active_watches_task: TickTask, // Anonymous watch keys anonymous_watch_keys: TypedKeyPairGroup, @@ -90,6 +93,7 @@ impl StorageManager { flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS), offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS), send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS), + check_active_watches_task: TickTask::new(CHECK_ACTIVE_WATCHES_INTERVAL_SECS), anonymous_watch_keys, } @@ -149,7 +153,12 @@ impl StorageManager { pub async fn set_rpc_processor(&self, opt_rpc_processor: Option) { let mut inner = self.inner.lock().await; - inner.rpc_processor = opt_rpc_processor + inner.opt_rpc_processor = opt_rpc_processor + } + + pub async fn set_routing_table(&self, opt_routing_table: Option) { + let mut inner = self.inner.lock().await; + inner.opt_routing_table = opt_routing_table } async fn lock(&self) -> VeilidAPIResult> { @@ -161,7 +170,7 @@ impl StorageManager { } fn online_writes_ready_inner(inner: &StorageManagerInner) -> Option { - if let Some(rpc_processor) = { inner.rpc_processor.clone() } { + if let Some(rpc_processor) = { inner.opt_rpc_processor.clone() } { if let Some(network_class) = rpc_processor .routing_table() .get_network_class(RoutingDomain::PublicInternet) @@ -234,7 +243,7 @@ impl StorageManager { // No record yet, try to get it from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = inner.rpc_processor.clone() else { + let Some(rpc_processor) = inner.opt_rpc_processor.clone() else { apibail_try_again!("offline, try again later"); }; @@ -284,7 +293,7 @@ impl StorageManager { pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { let (opened_record, opt_rpc_processor) = { let mut inner = self.lock().await?; - (inner.close_record(key)?, inner.rpc_processor.clone()) + (inner.close_record(key)?, inner.opt_rpc_processor.clone()) }; // Send a one-time cancel request for the watch if we have one and we're online @@ -364,7 +373,7 @@ impl StorageManager { // Refresh if we can // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = inner.rpc_processor.clone() else { + let Some(rpc_processor) = inner.opt_rpc_processor.clone() else { // Return the existing value if we have one if we aren't online if let Some(last_subkey_result_value) = last_subkey_result.value { return Ok(Some(last_subkey_result_value.value_data().clone())); @@ -562,7 +571,7 @@ impl StorageManager { }; // Get rpc processor and drop mutex so we don't block while requesting the watch from the network - let Some(rpc_processor) = inner.rpc_processor.clone() else { + let Some(rpc_processor) = inner.opt_rpc_processor.clone() else { apibail_try_again!("offline, try again later"); }; @@ -684,7 +693,7 @@ impl StorageManager { async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> { let rpc_processor = { let inner = self.inner.lock().await; - if let Some(rpc_processor) = &inner.rpc_processor { + if let Some(rpc_processor) = &inner.opt_rpc_processor { rpc_processor.clone() } else { apibail_try_again!("network is not available"); diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index a67b2ba1..58c56a90 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -859,7 +859,7 @@ where } let mut evcis = vec![]; - + let mut empty_watched_records = vec![]; for rtk in self.changed_watched_values.drain() { if let Some(watch) = self.watched_records.get_mut(&rtk) { // Process watch notifications @@ -888,9 +888,15 @@ where // Remove in reverse so we don't have to offset the index to remove the right key for dw in dead_watchers.iter().rev().copied() { watch.watchers.remove(dw); + if watch.watchers.is_empty() { + empty_watched_records.push(rtk); + } } } } + for ewr in empty_watched_records { + self.watched_records.remove(&ewr); + } for evci in evcis { // Get the first subkey data diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index ad872c4e..198a4628 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -25,7 +25,9 @@ pub(super) struct StorageManagerInner { /// Storage manager metadata that is persistent, including copy of offline subkey writes pub metadata_db: Option, /// RPC processor if it is available - pub rpc_processor: Option, + pub opt_rpc_processor: Option, + /// Routing table if it is available + pub opt_routing_table: Option, /// Background processing task (not part of attachment manager tick tree so it happens when detached too) pub tick_future: Option>, /// Update callback to send ValueChanged notification to @@ -78,7 +80,8 @@ impl StorageManagerInner { remote_record_store: Default::default(), offline_subkey_writes: Default::default(), metadata_db: Default::default(), - rpc_processor: Default::default(), + opt_rpc_processor: Default::default(), + opt_routing_table: Default::default(), tick_future: Default::default(), update_callback: None, } @@ -437,7 +440,7 @@ impl StorageManagerInner { }; // Get routing table to see if we still know about these nodes - let Some(routing_table) = self.rpc_processor.as_ref().map(|r| r.routing_table()) else { + let Some(routing_table) = self.opt_rpc_processor.as_ref().map(|r| r.routing_table()) else { apibail_try_again!("offline, try again later"); }; diff --git a/veilid-core/src/storage_manager/tasks/check_active_watches.rs b/veilid-core/src/storage_manager/tasks/check_active_watches.rs new file mode 100644 index 00000000..5ce1917c --- /dev/null +++ b/veilid-core/src/storage_manager/tasks/check_active_watches.rs @@ -0,0 +1,66 @@ +use super::*; + +impl StorageManager { + // Flush records stores to disk and remove dead records and send watch notifications + #[instrument(level = "trace", skip(self), err)] + pub(super) async fn check_active_watches_task_routine( + self, + stop_token: StopToken, + _last_ts: Timestamp, + _cur_ts: Timestamp, + ) -> EyreResult<()> { + { + let mut inner = self.inner.lock().await; + let Some(routing_table) = inner.opt_routing_table.clone() else { + return Ok(()); + }; + let rss = routing_table.route_spec_store(); + + let opt_update_callback = inner.update_callback.clone(); + + let cur_ts = get_aligned_timestamp(); + for (k, v) in inner.opened_records.iter_mut() { + // If no active watch, then skip this + let Some(active_watch) = v.active_watch() else { + continue; + }; + + // See if the active watch's node is dead + let mut is_dead = false; + if matches!( + active_watch.watch_node.state(cur_ts), + BucketEntryState::Dead + ) { + // Watched node is dead + is_dead = true; + } + + // See if the private route we're using is dead + if !is_dead { + if let Some(value_changed_route) = active_watch.opt_value_changed_route { + if rss.get_route_id_for_key(&value_changed_route).is_none() { + // Route we would receive value changes on is dead + is_dead = true; + } + } + } + + if is_dead { + if let Some(update_callback) = opt_update_callback.clone() { + // Send valuechange with dead count and no subkeys + update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { + key: *k, + subkeys: ValueSubkeyRangeSet::new(), + count: 0, + value: ValueData::default(), + }))); + } + + v.clear_active_watch(); + } + } + } + + Ok(()) + } +} diff --git a/veilid-core/src/storage_manager/tasks/flush_record_stores.rs b/veilid-core/src/storage_manager/tasks/flush_record_stores.rs index af76ef28..a596baec 100644 --- a/veilid-core/src/storage_manager/tasks/flush_record_stores.rs +++ b/veilid-core/src/storage_manager/tasks/flush_record_stores.rs @@ -1,7 +1,7 @@ use super::*; impl StorageManager { - // Flush records stores to disk and remove dead records and send watch notifications + // Flush records stores to disk and remove dead records #[instrument(level = "trace", skip(self), err)] pub(crate) async fn flush_record_stores_task_routine( self, diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs index ac8c52ca..e87e66a0 100644 --- a/veilid-core/src/storage_manager/tasks/mod.rs +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -1,3 +1,4 @@ +pub mod check_active_watches; pub mod flush_record_stores; pub mod offline_subkey_writes; pub mod send_value_changes; @@ -69,12 +70,36 @@ impl StorageManager { ) }); } + // Set check active watches tick task + debug!("starting check active watches task"); + { + let this = self.clone(); + self.unlocked_inner + .check_active_watches_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .check_active_watches_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + ) + .instrument(trace_span!( + parent: None, + "StorageManager check active watches task routine" + )), + ) + }); + } } pub async fn tick(&self) -> EyreResult<()> { // Run the flush stores task self.unlocked_inner.flush_record_stores_task.tick().await?; + // Check active watches + self.unlocked_inner.check_active_watches_task.tick().await?; + // Run online-only tasks if self.online_writes_ready().await?.is_some() { // Run offline subkey writes task if there's work to be done @@ -92,6 +117,10 @@ impl StorageManager { } pub(crate) async fn cancel_tasks(&self) { + debug!("stopping check active watches task"); + if let Err(e) = self.unlocked_inner.check_active_watches_task.stop().await { + warn!("check_active_watches_task not stopped: {}", e); + } debug!("stopping send value changes task"); if let Err(e) = self.unlocked_inner.send_value_changes_task.stop().await { warn!("send_value_changes_task not stopped: {}", e); diff --git a/veilid-core/src/storage_manager/tasks/send_value_changes.rs b/veilid-core/src/storage_manager/tasks/send_value_changes.rs index ed054c00..5fe7866e 100644 --- a/veilid-core/src/storage_manager/tasks/send_value_changes.rs +++ b/veilid-core/src/storage_manager/tasks/send_value_changes.rs @@ -3,7 +3,7 @@ use futures_util::StreamExt; use stop_token::future::FutureExt; impl StorageManager { - // Flush records stores to disk and remove dead records and send watch notifications + // Send value change notifications across the network #[instrument(level = "trace", skip(self), err)] pub(super) async fn send_value_changes_task_routine( self, diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index a65ad547..5ab0f516 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -219,7 +219,7 @@ impl StorageManager { &self, key: TypedKey, subkeys: ValueSubkeyRangeSet, - count: u32, + mut count: u32, value: Arc, ) -> VeilidAPIResult<()> { // Update local record store with new value @@ -233,8 +233,40 @@ impl StorageManager { } else { VeilidAPIResult::Ok(()) }; + + let Some(opened_record) = inner.opened_records.get_mut(&key) else { + // Don't send update or update the ActiveWatch if this record is closed + return res; + }; + let Some(mut active_watch) = opened_record.active_watch() else { + // No active watch means no callback + return res; + }; + + if count > active_watch.count { + // If count is greater than our requested count then this is invalid, cancel the watch + log_stor!(debug "watch count went backward: {}: {}/{}", key, count, active_watch.count); + // Force count to zero + count = 0; + opened_record.clear_active_watch(); + } else if count == 0 { + // If count is zero, we're done, cancel the watch and the app can renew it if it wants + log_stor!(debug "watch count finished: {}", key); + opened_record.clear_active_watch(); + } else { + log_stor!( + "watch count decremented: {}: {}/{}", + key, + count, + active_watch.count + ); + active_watch.count = count; + opened_record.set_active_watch(active_watch); + } + (res, inner.update_callback.clone()) }; + // Announce ValueChanged VeilidUpdate if let Some(update_callback) = opt_update_callback { update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { @@ -244,6 +276,7 @@ impl StorageManager { value: value.value_data().clone(), }))); } + res } }