add missing expiration check for server side watches

This commit is contained in:
Christien Rioux 2024-03-24 21:27:34 -05:00
parent 77f0f852a5
commit b7aedbbe7d
5 changed files with 58 additions and 5 deletions

View File

@ -28,8 +28,10 @@ const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1;
const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1; const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1;
/// Frequency to send ValueChanged notifications to the network /// Frequency to send ValueChanged notifications to the network
const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1; const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1;
/// Frequence to check for dead nodes and routes for active watches /// Frequency to check for dead nodes and routes for client-side active watches
const CHECK_ACTIVE_WATCHES_INTERVAL_SECS: u32 = 1; const CHECK_ACTIVE_WATCHES_INTERVAL_SECS: u32 = 1;
/// Frequency to check for expired server-side watched records
const CHECK_WATCHED_RECORDS_INTERVAL_SECS: u32 = 1;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
/// A single 'value changed' message to send /// A single 'value changed' message to send
@ -54,6 +56,7 @@ struct StorageManagerUnlockedInner {
offline_subkey_writes_task: TickTask<EyreReport>, offline_subkey_writes_task: TickTask<EyreReport>,
send_value_changes_task: TickTask<EyreReport>, send_value_changes_task: TickTask<EyreReport>,
check_active_watches_task: TickTask<EyreReport>, check_active_watches_task: TickTask<EyreReport>,
check_watched_records_task: TickTask<EyreReport>,
// Anonymous watch keys // Anonymous watch keys
anonymous_watch_keys: TypedKeyPairGroup, anonymous_watch_keys: TypedKeyPairGroup,
@ -90,6 +93,7 @@ impl StorageManager {
offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS), offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS),
send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS), send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS),
check_active_watches_task: TickTask::new(CHECK_ACTIVE_WATCHES_INTERVAL_SECS), check_active_watches_task: TickTask::new(CHECK_ACTIVE_WATCHES_INTERVAL_SECS),
check_watched_records_task: TickTask::new(CHECK_WATCHED_RECORDS_INTERVAL_SECS),
anonymous_watch_keys, anonymous_watch_keys,
} }

View File

@ -1109,6 +1109,23 @@ where
out.map(|r| (r, is_watched)) out.map(|r| (r, is_watched))
} }
/// See if any watched records have expired and clear them out
pub fn check_watched_records(&mut self) {
let now = get_aligned_timestamp();
self.watched_records.retain(|key, watch_list| {
watch_list.watches.retain(|w| {
w.params.count != 0 && w.params.expiration > now && !w.params.subkeys.is_empty()
});
if watch_list.watches.is_empty() {
// If we're removing the watched record, drop any changed watch values too
self.changed_watched_values.remove(key);
false
} else {
true
}
});
}
pub async fn take_value_changes(&mut self, changes: &mut Vec<ValueChangedInfo>) { pub async fn take_value_changes(&mut self, changes: &mut Vec<ValueChangedInfo>) {
// ValueChangedInfo but without the subkey data that requires a double mutable borrow to get // ValueChangedInfo but without the subkey data that requires a double mutable borrow to get
struct EarlyValueChangedInfo { struct EarlyValueChangedInfo {

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
impl StorageManager { impl StorageManager {
// Check if watches either have dead nodes or if the watch has expired // Check if client-side watches on opened records either have dead nodes or if the watch has expired
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(super) async fn check_active_watches_task_routine( pub(super) async fn check_active_watches_task_routine(
self, self,

View File

@ -1,4 +1,5 @@
pub mod check_active_watches; pub mod check_active_watches;
pub mod check_watched_records;
pub mod flush_record_stores; pub mod flush_record_stores;
pub mod offline_subkey_writes; pub mod offline_subkey_writes;
pub mod send_value_changes; pub mod send_value_changes;
@ -91,6 +92,27 @@ impl StorageManager {
) )
}); });
} }
// Set check watched records tick task
log_stor!(debug "starting checked watched records task");
{
let this = self.clone();
self.unlocked_inner
.check_watched_records_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.check_watched_records_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager check watched records task routine"
)),
)
});
}
} }
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
@ -100,6 +122,12 @@ impl StorageManager {
// Check active watches // Check active watches
self.unlocked_inner.check_active_watches_task.tick().await?; self.unlocked_inner.check_active_watches_task.tick().await?;
// Check watched records
self.unlocked_inner
.check_watched_records_task
.tick()
.await?;
// Run online-only tasks // Run online-only tasks
if self.online_writes_ready().await?.is_some() { if self.online_writes_ready().await?.is_some() {
// Run offline subkey writes task if there's work to be done // Run offline subkey writes task if there's work to be done
@ -117,6 +145,10 @@ impl StorageManager {
} }
pub(crate) async fn cancel_tasks(&self) { pub(crate) async fn cancel_tasks(&self) {
log_stor!(debug "stopping check watched records task");
if let Err(e) = self.unlocked_inner.check_watched_records_task.stop().await {
warn!("check_watched_records_task not stopped: {}", e);
}
log_stor!(debug "stopping check active watches task"); log_stor!(debug "stopping check active watches task");
if let Err(e) = self.unlocked_inner.check_active_watches_task.stop().await { if let Err(e) = self.unlocked_inner.check_active_watches_task.stop().await {
warn!("check_active_watches_task not stopped: {}", e); warn!("check_active_watches_task not stopped: {}", e);

View File

@ -479,10 +479,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: win32 name: win32
sha256: "464f5674532865248444b4c3daca12bd9bf2d7c47f759ce2617986e7229494a8" sha256: "8cb58b45c47dcb42ab3651533626161d6b67a2921917d8d429791f76972b3480"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "5.2.0" version: "5.3.0"
xdg_directories: xdg_directories:
dependency: transitive dependency: transitive
description: description:
@ -508,5 +508,5 @@ packages:
source: hosted source: hosted
version: "0.0.6" version: "0.0.6"
sdks: sdks:
dart: ">=3.3.0-279.1.beta <4.0.0" dart: ">=3.3.0 <4.0.0"
flutter: ">=3.19.1" flutter: ">=3.19.1"