From 965a6e2af7a27da40048378fd756958748ebd740 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 29 Mar 2025 10:39:20 -0400 Subject: [PATCH] switch --- veilid-core/src/storage_manager/mod.rs | 16 +++++---- .../record_store/opened_record.rs | 22 +++++++++++++ .../src/storage_manager/watch_value.rs | 33 +++++-------------- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 1e4c3d4d..620ceb70 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -860,16 +860,14 @@ impl StorageManager { // Get the safety selection and the writer we opened this record // and whatever active watch we may have in case this is a watch update - let (safety_selection, opt_writer, opt_watch_nodes) = { + let (safety_selection, opt_writer, opt_active_watch) = { let Some(opened_record) = inner.opened_records.get(&key) else { apibail_generic!("record not open"); }; ( opened_record.safety_selection(), opened_record.writer().cloned(), - opened_record - .active_watch() - .map(|aw| aw.watch_nodes.clone()), + opened_record.active_watch().cloned(), ) }; @@ -909,10 +907,10 @@ impl StorageManager { count, safety_selection, opt_writer, - opt_watch_id, - opt_watch_node, + opt_active_watch.as_ref(), ) .await?; + // If we did not get a valid response assume nothing changed let Some(owvresult) = opt_owvresult else { apibail_try_again!("did not get a valid response"); @@ -940,9 +938,15 @@ impl StorageManager { expiration.as_u64() }; + // Build a new active watch from the watchvalue result + // If the expiration time is less than our minimum expiration time (or zero) consider this watch inactive let mut expiration_ts = owvresult.expiration_ts; if expiration_ts.as_u64() < min_expiration_ts { + // Try to fire out a last-chance watch cancellation, so the + if let Some(active_watch) = opt_active_watch.as_ref() { + self.last_change_cancel_watch(active_watch).await; + } return Ok(Timestamp::new(0)); } diff --git a/veilid-core/src/storage_manager/record_store/opened_record.rs b/veilid-core/src/storage_manager/record_store/opened_record.rs index 7d235e42..e90d1e5c 100644 --- a/veilid-core/src/storage_manager/record_store/opened_record.rs +++ b/veilid-core/src/storage_manager/record_store/opened_record.rs @@ -37,6 +37,20 @@ pub(in crate::storage_manager) struct ActiveWatch { pub opt_value_changed_route: Option, } +impl ActiveWatch { + pub fn new(params: ActiveWatchParameters, opt_value_changed_route: Option) -> Self { + let remaining_count = params.count; + + Self { + params, + per_node, + min_expiration_ts, + remaining_count, + opt_value_changed_route, + } + } +} + /// The state associated with a local record when it is opened /// This is not serialized to storage as it is ephemeral for the lifetime of the opened record #[derive(Clone, Debug, Default)] @@ -107,6 +121,14 @@ impl OpenedRecord { }); } + pub fn set_active_watch(&mut self, active_watch: ActiveWatch) { + assert!( + self.active_watch.is_none(), + "should have cleared watch first before setting a new one" + ); + self.active_watch = Some(active_watch); + } + pub fn clear_active_watch(&mut self) { self.active_watch = None; } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 1059d62a..20980c53 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -37,8 +37,7 @@ impl StorageManager { subkeys: ValueSubkeyRangeSet, safety_selection: SafetySelection, opt_watcher: Option, - watch_id: u64, - watch_node: NodeRef, + active_watch: &ActiveWatch, ) -> VeilidAPIResult> { let routing_domain = RoutingDomain::PublicInternet; @@ -87,17 +86,13 @@ impl StorageManager { count: u32, safety_selection: SafetySelection, opt_watcher: Option, - watch_id: u64, - watch_node: NodeRef, + active_watch: &ActiveWatch, ) -> VeilidAPIResult> { let routing_domain = RoutingDomain::PublicInternet; if count == 0 { apibail_internal!("cancel should be done with outbound_watch_value_cancel"); } - if watch_id == 0 { - apibail_internal!("watch id should not be zero when changing watch"); - } // Get the appropriate watcher key, if anonymous use a static anonymous watch key // which lives for the duration of the app's runtime @@ -148,18 +143,13 @@ impl StorageManager { count: u32, safety_selection: SafetySelection, opt_watcher: Option, - opt_watch_id: Option, - opt_watch_node: Option, + opt_active_watch: Option<&ActiveWatch>, ) -> VeilidAPIResult> { // if the count is zero, we are cancelling if count == 0 { - // Ensure watch id is specified - let Some(watch_id) = opt_watch_id else { - apibail_internal!("Must specify a watch id in order to cancel it"); - }; - // Ensure watch node is specified - let Some(watch_node) = opt_watch_node else { - apibail_internal!("Must specify a watch node in order to cancel it"); + // Ensure active watch is specified + let Some(active_watch) = opt_active_watch else { + apibail_internal!("Must specify an active watch in order to cancel it"); }; return self .outbound_watch_value_cancel( @@ -167,18 +157,14 @@ impl StorageManager { subkeys, safety_selection, opt_watcher, - watch_id, - watch_node, + active_watch, ) .await; } // if the watch id and watch node are specified, then we're trying to change an existing watch // first try to do that, then fall back to fanout for a new watch id - if let Some(watch_id) = opt_watch_id { - let Some(watch_node) = opt_watch_node else { - apibail_internal!("Must specify a watch node in order to change it"); - }; + if let Some(active_watch) = opt_active_watch { if let Some(res) = self .outbound_watch_value_change( key, @@ -187,8 +173,7 @@ impl StorageManager { count, safety_selection, opt_watcher, - watch_id, - watch_node, + active_watch, ) .await? {