From b7aedbbe7da5e7673e9b57398876d84382741af5 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 24 Mar 2024 21:27:34 -0500 Subject: [PATCH] add missing expiration check for server side watches --- veilid-core/src/storage_manager/mod.rs | 6 +++- .../src/storage_manager/record_store/mod.rs | 17 ++++++++++ .../tasks/check_active_watches.rs | 2 +- veilid-core/src/storage_manager/tasks/mod.rs | 32 +++++++++++++++++++ veilid-flutter/example/pubspec.lock | 6 ++-- 5 files changed, 58 insertions(+), 5 deletions(-) diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index b57b7a4d..79724517 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -28,8 +28,10 @@ const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1; const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1; /// Frequency to send ValueChanged notifications to the network const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1; -/// Frequence to check for dead nodes and routes for active watches +/// Frequency to check for dead nodes and routes for client-side active watches const CHECK_ACTIVE_WATCHES_INTERVAL_SECS: u32 = 1; +/// Frequency to check for expired server-side watched records +const CHECK_WATCHED_RECORDS_INTERVAL_SECS: u32 = 1; #[derive(Debug, Clone)] /// A single 'value changed' message to send @@ -54,6 +56,7 @@ struct StorageManagerUnlockedInner { offline_subkey_writes_task: TickTask, send_value_changes_task: TickTask, check_active_watches_task: TickTask, + check_watched_records_task: TickTask, // Anonymous watch keys anonymous_watch_keys: TypedKeyPairGroup, @@ -90,6 +93,7 @@ impl StorageManager { offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS), send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS), check_active_watches_task: TickTask::new(CHECK_ACTIVE_WATCHES_INTERVAL_SECS), + check_watched_records_task: TickTask::new(CHECK_WATCHED_RECORDS_INTERVAL_SECS), anonymous_watch_keys, } diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index d0d9711b..1214c6f2 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -1109,6 +1109,23 @@ where out.map(|r| (r, is_watched)) } + /// See if any watched records have expired and clear them out + pub fn check_watched_records(&mut self) { + let now = get_aligned_timestamp(); + self.watched_records.retain(|key, watch_list| { + watch_list.watches.retain(|w| { + w.params.count != 0 && w.params.expiration > now && !w.params.subkeys.is_empty() + }); + if watch_list.watches.is_empty() { + // If we're removing the watched record, drop any changed watch values too + self.changed_watched_values.remove(key); + false + } else { + true + } + }); + } + pub async fn take_value_changes(&mut self, changes: &mut Vec) { // ValueChangedInfo but without the subkey data that requires a double mutable borrow to get struct EarlyValueChangedInfo { diff --git a/veilid-core/src/storage_manager/tasks/check_active_watches.rs b/veilid-core/src/storage_manager/tasks/check_active_watches.rs index 89c37903..da49621a 100644 --- a/veilid-core/src/storage_manager/tasks/check_active_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_active_watches.rs @@ -1,7 +1,7 @@ use super::*; impl StorageManager { - // Check if watches either have dead nodes or if the watch has expired + // Check if client-side watches on opened records either have dead nodes or if the watch has expired #[instrument(level = "trace", skip(self), err)] pub(super) async fn check_active_watches_task_routine( self, diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs index bea1edd4..3e5df9cb 100644 --- a/veilid-core/src/storage_manager/tasks/mod.rs +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -1,4 +1,5 @@ pub mod check_active_watches; +pub mod check_watched_records; pub mod flush_record_stores; pub mod offline_subkey_writes; pub mod send_value_changes; @@ -91,6 +92,27 @@ impl StorageManager { ) }); } + // Set check watched records tick task + log_stor!(debug "starting checked watched records task"); + { + let this = self.clone(); + self.unlocked_inner + .check_watched_records_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .check_watched_records_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + ) + .instrument(trace_span!( + parent: None, + "StorageManager check watched records task routine" + )), + ) + }); + } } pub async fn tick(&self) -> EyreResult<()> { @@ -100,6 +122,12 @@ impl StorageManager { // Check active watches self.unlocked_inner.check_active_watches_task.tick().await?; + // Check watched records + self.unlocked_inner + .check_watched_records_task + .tick() + .await?; + // Run online-only tasks if self.online_writes_ready().await?.is_some() { // Run offline subkey writes task if there's work to be done @@ -117,6 +145,10 @@ impl StorageManager { } pub(crate) async fn cancel_tasks(&self) { + log_stor!(debug "stopping check watched records task"); + if let Err(e) = self.unlocked_inner.check_watched_records_task.stop().await { + warn!("check_watched_records_task not stopped: {}", e); + } log_stor!(debug "stopping check active watches task"); if let Err(e) = self.unlocked_inner.check_active_watches_task.stop().await { warn!("check_active_watches_task not stopped: {}", e); diff --git a/veilid-flutter/example/pubspec.lock b/veilid-flutter/example/pubspec.lock index cfef9d24..a535b345 100644 --- a/veilid-flutter/example/pubspec.lock +++ b/veilid-flutter/example/pubspec.lock @@ -479,10 +479,10 @@ packages: dependency: transitive description: name: win32 - sha256: "464f5674532865248444b4c3daca12bd9bf2d7c47f759ce2617986e7229494a8" + sha256: "8cb58b45c47dcb42ab3651533626161d6b67a2921917d8d429791f76972b3480" url: "https://pub.dev" source: hosted - version: "5.2.0" + version: "5.3.0" xdg_directories: dependency: transitive description: @@ -508,5 +508,5 @@ packages: source: hosted version: "0.0.6" sdks: - dart: ">=3.3.0-279.1.beta <4.0.0" + dart: ">=3.3.0 <4.0.0" flutter: ">=3.19.1"