This commit is contained in:
Christien Rioux 2025-03-29 10:39:20 -04:00
parent e636237b2d
commit 965a6e2af7
3 changed files with 41 additions and 30 deletions

View File

@ -860,16 +860,14 @@ impl StorageManager {
// Get the safety selection and the writer we opened this record
// and whatever active watch we may have in case this is a watch update
let (safety_selection, opt_writer, opt_watch_nodes) = {
let (safety_selection, opt_writer, opt_active_watch) = {
let Some(opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
};
(
opened_record.safety_selection(),
opened_record.writer().cloned(),
opened_record
.active_watch()
.map(|aw| aw.watch_nodes.clone()),
opened_record.active_watch().cloned(),
)
};
@ -909,10 +907,10 @@ impl StorageManager {
count,
safety_selection,
opt_writer,
opt_watch_id,
opt_watch_node,
opt_active_watch.as_ref(),
)
.await?;
// If we did not get a valid response assume nothing changed
let Some(owvresult) = opt_owvresult else {
apibail_try_again!("did not get a valid response");
@ -940,9 +938,15 @@ impl StorageManager {
expiration.as_u64()
};
// Build a new active watch from the watchvalue result
// If the expiration time is less than our minimum expiration time (or zero) consider this watch inactive
let mut expiration_ts = owvresult.expiration_ts;
if expiration_ts.as_u64() < min_expiration_ts {
// Try to fire out a last-chance watch cancellation, so the
if let Some(active_watch) = opt_active_watch.as_ref() {
self.last_change_cancel_watch(active_watch).await;
}
return Ok(Timestamp::new(0));
}

View File

@ -37,6 +37,20 @@ pub(in crate::storage_manager) struct ActiveWatch {
pub opt_value_changed_route: Option<PublicKey>,
}
impl ActiveWatch {
pub fn new(params: ActiveWatchParameters, opt_value_changed_route: Option<CryptoKey>) -> Self {
let remaining_count = params.count;
Self {
params,
per_node,
min_expiration_ts,
remaining_count,
opt_value_changed_route,
}
}
}
/// The state associated with a local record when it is opened
/// This is not serialized to storage as it is ephemeral for the lifetime of the opened record
#[derive(Clone, Debug, Default)]
@ -107,6 +121,14 @@ impl OpenedRecord {
});
}
pub fn set_active_watch(&mut self, active_watch: ActiveWatch) {
assert!(
self.active_watch.is_none(),
"should have cleared watch first before setting a new one"
);
self.active_watch = Some(active_watch);
}
pub fn clear_active_watch(&mut self) {
self.active_watch = None;
}

View File

@ -37,8 +37,7 @@ impl StorageManager {
subkeys: ValueSubkeyRangeSet,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
watch_id: u64,
watch_node: NodeRef,
active_watch: &ActiveWatch,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
let routing_domain = RoutingDomain::PublicInternet;
@ -87,17 +86,13 @@ impl StorageManager {
count: u32,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
watch_id: u64,
watch_node: NodeRef,
active_watch: &ActiveWatch,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
let routing_domain = RoutingDomain::PublicInternet;
if count == 0 {
apibail_internal!("cancel should be done with outbound_watch_value_cancel");
}
if watch_id == 0 {
apibail_internal!("watch id should not be zero when changing watch");
}
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
// which lives for the duration of the app's runtime
@ -148,18 +143,13 @@ impl StorageManager {
count: u32,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
opt_watch_id: Option<u64>,
opt_watch_node: Option<NodeRef>,
opt_active_watch: Option<&ActiveWatch>,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
// if the count is zero, we are cancelling
if count == 0 {
// Ensure watch id is specified
let Some(watch_id) = opt_watch_id else {
apibail_internal!("Must specify a watch id in order to cancel it");
};
// Ensure watch node is specified
let Some(watch_node) = opt_watch_node else {
apibail_internal!("Must specify a watch node in order to cancel it");
// Ensure active watch is specified
let Some(active_watch) = opt_active_watch else {
apibail_internal!("Must specify an active watch in order to cancel it");
};
return self
.outbound_watch_value_cancel(
@ -167,18 +157,14 @@ impl StorageManager {
subkeys,
safety_selection,
opt_watcher,
watch_id,
watch_node,
active_watch,
)
.await;
}
// if the watch id and watch node are specified, then we're trying to change an existing watch
// first try to do that, then fall back to fanout for a new watch id
if let Some(watch_id) = opt_watch_id {
let Some(watch_node) = opt_watch_node else {
apibail_internal!("Must specify a watch node in order to change it");
};
if let Some(active_watch) = opt_active_watch {
if let Some(res) = self
.outbound_watch_value_change(
key,
@ -187,8 +173,7 @@ impl StorageManager {
count,
safety_selection,
opt_watcher,
watch_id,
watch_node,
active_watch,
)
.await?
{