From e2f750f207e5d8f70dd2730a621fce5c8ddfd60a Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 12 Apr 2025 21:37:07 -0400 Subject: [PATCH] more watchvalue fixes --- veilid-core/src/storage_manager/debug.rs | 6 ++ veilid-core/src/storage_manager/mod.rs | 36 +++++-- .../outbound_watch_manager/mod.rs | 66 +++++++++++-- .../outbound_watch_manager/outbound_watch.rs | 60 +++++++----- .../tasks/check_outbound_watches.rs | 52 +--------- .../src/storage_manager/watch_value.rs | 96 +++++++++++-------- veilid-core/src/veilid_api/debug.rs | 2 +- 7 files changed, 193 insertions(+), 125 deletions(-) diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index 4b1b1018..0f69938e 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -51,6 +51,9 @@ impl StorageManager { pub async fn purge_local_records(&self, reclaim: Option) -> String { let mut inner = self.inner.lock().await; + if !inner.opened_records.is_empty() { + return "records still opened".to_owned(); + } let Some(local_record_store) = &mut inner.local_record_store else { return "not initialized".to_owned(); }; @@ -62,6 +65,9 @@ impl StorageManager { } pub async fn purge_remote_records(&self, reclaim: Option) -> String { let mut inner = self.inner.lock().await; + if !inner.opened_records.is_empty() { + return "records still opened".to_owned(); + } let Some(remote_record_store) = &mut inner.remote_record_store else { return "not initialized".to_owned(); }; diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 46b9b563..934d283b 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -34,6 +34,8 @@ const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1; const CHECK_OUTBOUND_WATCHES_INTERVAL_SECS: u32 = 1; /// Frequency to retry reconciliation of watches that are not at consensus const RECONCILE_OUTBOUND_WATCHES_INTERVAL_SECS: u32 = 30; +/// How long before expiration to try to renew per-node watches +const RENEW_OUTBOUND_WATCHES_DURATION_SECS: u32 = 30; /// Frequency to check for expired server-side watched records const CHECK_WATCHED_RECORDS_INTERVAL_SECS: u32 = 1; /// Table store table for storage manager metadata @@ -387,7 +389,7 @@ impl StorageManager { for v in inner.outbound_watch_manager.outbound_watches.values() { if let Some(current) = v.state() { let node_refs = - current.watch_node_refs(&inner.outbound_watch_manager.per_node_state); + current.watch_node_refs(&inner.outbound_watch_manager.per_node_states); for node_ref in &node_refs { let mut found = false; for nid in node_ref.node_ids().iter() { @@ -554,9 +556,20 @@ impl StorageManager { return Ok(()); }; - // Set the watch to cancelled if we have one - // Will process cancellation in the background - inner.outbound_watch_manager.set_desired_watch(key, None); + Ok(()) + } + + /// Close all opened records + #[instrument(level = "trace", target = "stor", skip_all)] + pub async fn close_all_records(&self) -> VeilidAPIResult<()> { + // Attempt to close the record, returning the opened record if it wasn't already closed + let mut inner = self.inner.lock().await; + let keys = inner.opened_records.keys().copied().collect::>(); + for key in keys { + let Some(_opened_record) = Self::close_record_inner(&mut inner, key)? else { + return Ok(()); + }; + } Ok(()) } @@ -565,10 +578,12 @@ impl StorageManager { #[instrument(level = "trace", target = "stor", skip_all)] pub async fn delete_record(&self, key: TypedKey) -> VeilidAPIResult<()> { // Ensure the record is closed - self.close_record(key).await?; + let mut inner = self.inner.lock().await; + let Some(_opened_record) = Self::close_record_inner(&mut inner, key)? else { + return Ok(()); + }; // Get record from the local store - let mut inner = self.inner.lock().await; let Some(local_record_store) = inner.local_record_store.as_mut() else { apibail_not_initialized!(); }; @@ -940,8 +955,9 @@ impl StorageManager { // Process this watch's state machine operations until we are done loop { let opt_op_fut = { - let inner = self.inner.lock().await; - let Some(outbound_watch) = inner.outbound_watch_manager.outbound_watches.get(&key) + let mut inner = self.inner.lock().await; + let Some(outbound_watch) = + inner.outbound_watch_manager.outbound_watches.get_mut(&key) else { // Watch is gone return Ok(Timestamp::new(0)); @@ -1633,6 +1649,10 @@ impl StorageManager { return Err(VeilidAPIError::key_not_found(key)); } + // Set the watch to cancelled if we have one + // Will process cancellation in the background + inner.outbound_watch_manager.set_desired_watch(key, None); + Ok(inner.opened_records.remove(&key)) } diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs b/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs index c6ac381f..1c996da4 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/mod.rs @@ -17,7 +17,7 @@ pub(in crate::storage_manager) struct OutboundWatchManager { /// Each watch per record key pub outbound_watches: HashMap, /// Last known active watch per node+record - pub per_node_state: HashMap, + pub per_node_states: HashMap, } impl fmt::Display for OutboundWatchManager { @@ -33,13 +33,13 @@ impl fmt::Display for OutboundWatchManager { } } out += "]\n"; - out += "per_node_state: [\n"; + out += "per_node_states: [\n"; { - let mut keys = self.per_node_state.keys().copied().collect::>(); + let mut keys = self.per_node_states.keys().copied().collect::>(); keys.sort(); for k in keys { - let v = self.per_node_state.get(&k).unwrap(); + let v = self.per_node_states.get(&k).unwrap(); out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.to_string())); } } @@ -59,7 +59,7 @@ impl OutboundWatchManager { pub fn new() -> Self { Self { outbound_watches: HashMap::new(), - per_node_state: HashMap::new(), + per_node_states: HashMap::new(), } } @@ -91,7 +91,7 @@ impl OutboundWatchManager { pub fn set_next_reconcile_ts(&mut self, record_key: TypedKey, next_ts: Timestamp) { if let Some(outbound_watch) = self.outbound_watches.get_mut(&record_key) { if let Some(state) = outbound_watch.state_mut() { - state.edit(&self.per_node_state, |editor| { + state.edit(&self.per_node_states, |editor| { editor.set_next_reconcile_ts(next_ts); }); } @@ -103,4 +103,58 @@ impl OutboundWatchManager { .get(&record_key) .and_then(|x| x.state().map(|y| y.min_expiration_ts())) } + + /// Iterate all per-node watches and remove ones with dead nodes from outbound watches + /// This may trigger reconciliation to increase the number of active per-node watches + /// for an outbound watch that is still alive + pub fn update_per_node_states(&mut self, cur_ts: Timestamp) { + // Node is unreachable + let mut dead_pnks = HashSet::new(); + // Per-node expiration reached + let mut expired_pnks = HashSet::new(); + // Count reached + let mut finished_pnks = HashSet::new(); + + for (pnk, pns) in &self.per_node_states { + if pns.count == 0 { + // If per-node watch is done, add to finished list + finished_pnks.insert(*pnk); + } else if !pns + .watch_node_ref + .as_ref() + .unwrap() + .state(cur_ts) + .is_alive() + { + // If node is unreachable add to dead list + dead_pnks.insert(*pnk); + } else if cur_ts >= pns.expiration_ts { + // If per-node watch has expired add to expired list + expired_pnks.insert(*pnk); + } + } + + // Go through and remove nodes that are dead or finished from active states + // If an expired per-node watch is still referenced, it may be renewable + // so remove it from the expired list + for v in self.outbound_watches.values_mut() { + let Some(current) = v.state_mut() else { + continue; + }; + + // Don't drop expired per-node watches that could be renewed (still referenced by this watch) + for node in current.nodes() { + expired_pnks.remove(node); + } + + // Remove dead and finished per-node watch nodes from this outbound watch + current.edit(&self.per_node_states, |editor| { + editor.retain_nodes(|x| !dead_pnks.contains(x) && !finished_pnks.contains(x)); + }); + } + + // Drop finished per-node watches and unreferenced expired per-node watches + self.per_node_states + .retain(|k, _| !finished_pnks.contains(k) && !expired_pnks.contains(k)); + } } diff --git a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs index 99dba136..c3e48083 100644 --- a/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs +++ b/veilid-core/src/storage_manager/outbound_watch_manager/outbound_watch.rs @@ -76,38 +76,51 @@ impl OutboundWatch { self.desired = desired; } + /// Check for desired state changes + pub fn update_desired_state(&mut self, cur_ts: Timestamp) { + let Some(desired) = self.desired.as_ref() else { + // No desired parameters means this is already done + return; + }; + + // Check if desired parameters have expired + if desired.expiration_ts.as_u64() != 0 && desired.expiration_ts <= cur_ts { + // Expired + self.set_desired(None); + return; + } + + // Check if the existing state has no remaining count + if let Some(state) = self.state.as_ref() { + if state.remaining_count() == 0 { + // No remaining count + self.set_desired(None); + } + } + } + /// Returns true if this outbound watch can be removed from the table pub fn is_dead(&self) -> bool { self.desired.is_none() && self.state.is_none() } /// Returns true if this outbound watch needs to be cancelled - pub fn needs_cancel(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool { + pub fn needs_cancel(&self, registry: &VeilidComponentRegistry) -> bool { if self.is_dead() { veilid_log!(registry warn "should have checked for is_dead first"); return false; } // If there is no current watch then there is nothing to cancel - let Some(state) = self.state() else { + let Some(state) = self.state.as_ref() else { return false; }; - // If the total number of changes has been reached - // then we're done and should cancel - if state.remaining_count() == 0 { - return true; - } - - // If we have expired and can't renew, then cancel - if state.params().expiration_ts.as_u64() != 0 && cur_ts >= state.params().expiration_ts { - return true; - } - // If the desired parameters is None then cancel let Some(desired) = self.desired.as_ref() else { return true; }; + // If the desired parameters is different than the current parameters // then cancel so we can eventually reconcile to the new parameters state.params() != desired @@ -115,20 +128,26 @@ impl OutboundWatch { /// Returns true if this outbound watch can be renewed pub fn needs_renew(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool { - if self.is_dead() || self.needs_cancel(registry, cur_ts) { + if self.is_dead() || self.needs_cancel(registry) { veilid_log!(registry warn "should have checked for is_dead and needs_cancel first"); return false; } // If there is no current watch then there is nothing to renew - let Some(state) = self.state() else { + let Some(state) = self.state.as_ref() else { return false; }; // If the watch has per node watches that have expired, - // but we can extend our watch then renew - if cur_ts >= state.min_expiration_ts() - && (state.params().expiration_ts.as_u64() == 0 || cur_ts < state.params().expiration_ts) + // but we can extend our watch then renew. Do this only within RENEW_OUTBOUND_WATCHES_DURATION_SECS + // of the actual expiration. If we're looking at this after the actual expiration, don't try because + // the watch id will have died + + let renew_ts = cur_ts + TimestampDuration::new_secs(RENEW_OUTBOUND_WATCHES_DURATION_SECS); + if renew_ts >= state.min_expiration_ts() + && cur_ts < state.min_expiration_ts() + && (state.params().expiration_ts.as_u64() == 0 + || renew_ts < state.params().expiration_ts) { return true; } @@ -156,10 +175,7 @@ impl OutboundWatch { consensus_count: usize, cur_ts: Timestamp, ) -> bool { - if self.is_dead() - || self.needs_cancel(registry, cur_ts) - || self.needs_renew(registry, cur_ts) - { + if self.is_dead() || self.needs_cancel(registry) || self.needs_renew(registry, cur_ts) { veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first"); return false; } diff --git a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs index 9ae181cb..aac170fc 100644 --- a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs @@ -13,56 +13,14 @@ impl StorageManager { let cur_ts = Timestamp::now(); - // Iterate all per-node watches and remove dead ones from outbound watches - let mut dead_pnks = HashSet::new(); - for (pnk, pns) in &inner.outbound_watch_manager.per_node_state { - if !pns - .watch_node_ref - .as_ref() - .unwrap() - .state(cur_ts) - .is_alive() - { - dead_pnks.insert(*pnk); - } - } - for v in inner.outbound_watch_manager.outbound_watches.values_mut() { - let Some(current) = v.state_mut() else { - continue; - }; - - current.edit(&inner.outbound_watch_manager.per_node_state, |editor| { - editor.retain_nodes(|x| !dead_pnks.contains(x)); - }); - } - - // Iterate all per-node watches and remove expired ones that are unreferenced - let mut expired_pnks = HashSet::new(); - for (pnk, pns) in &inner.outbound_watch_manager.per_node_state { - if cur_ts >= pns.expiration_ts || pns.count == 0 { - expired_pnks.insert(*pnk); - } - } - for v in inner.outbound_watch_manager.outbound_watches.values() { - // If it's still referenced, keep it - let Some(current) = v.state() else { - continue; - }; - for pnk in current.nodes() { - expired_pnks.remove(pnk); - } - } - inner - .outbound_watch_manager - .per_node_state - .retain(|k, _| !expired_pnks.contains(k)); + // Update per-node watch states + // Desired state updates are performed by get_next_outbound_watch_operation + inner.outbound_watch_manager.update_per_node_states(cur_ts); // Iterate all outbound watches and determine what work needs doing if any - for (k, v) in &inner.outbound_watch_manager.outbound_watches { - let k = *k; - + for (k, v) in &mut inner.outbound_watch_manager.outbound_watches { // Get next work on watch and queue it if we have something to do - if let Some(op_fut) = self.get_next_outbound_watch_operation(k, None, cur_ts, v) { + if let Some(op_fut) = self.get_next_outbound_watch_operation(*k, None, cur_ts, v) { self.background_operation_processor.add_future(op_fut); }; } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 889dfdb9..a3a976ea 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -74,7 +74,7 @@ impl StorageManager { )?; if wva.answer.accepted { - veilid_log!(self debug "Outbound watch canceled: id={} ({})", wva.answer.watch_id, watch_node); + veilid_log!(self debug "Outbound watch cancelled: id={} ({})", wva.answer.watch_id, watch_node); Ok(true) } else { veilid_log!(self debug "Outbound watch id did not exist: id={} ({})", watch_id, watch_node); @@ -125,6 +125,8 @@ impl StorageManager { if wva.answer.accepted { if watch_id != wva.answer.watch_id { veilid_log!(self debug "WatchValue changed: id={}->{} expiration_ts={} ({})", watch_id, wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), watch_node); + } else if wva.answer.expiration_ts.as_u64() == 0 { + veilid_log!(self debug "WatchValue not renewed: id={} ({})", watch_id, watch_node); } else { veilid_log!(self debug "WatchValue renewed: id={} expiration_ts={} ({})", watch_id, display_ts(wva.answer.expiration_ts.as_u64()), watch_node); } @@ -361,6 +363,9 @@ impl StorageManager { if outbound_watch.desired().is_some() { veilid_log!(self warn "dead watch still had desired params"); } + + // Send valuechange with dead count and no subkeys to inform the api that this watch is now gone completely + self.update_callback_value_change(record_key, ValueSubkeyRangeSet::new(), 0, None); } /// Get the list of remaining active watch ids @@ -395,7 +400,7 @@ impl StorageManager { for pnk in state.nodes() { let Some(per_node_state) = inner .outbound_watch_manager - .per_node_state + .per_node_states .get(pnk) .cloned() else { @@ -406,7 +411,7 @@ impl StorageManager { per_node_states.push((*pnk, per_node_state)); } - state.edit(&inner.outbound_watch_manager.per_node_state, |editor| { + state.edit(&inner.outbound_watch_manager.per_node_states, |editor| { editor.retain_nodes(|x| !missing_pnks.contains(x)); }); @@ -456,7 +461,7 @@ impl StorageManager { for pnk in cancelled { if inner .outbound_watch_manager - .per_node_state + .per_node_states .remove(&pnk) .is_none() { @@ -477,9 +482,6 @@ impl StorageManager { // Mark as dead now that we cancelled outbound_watch.clear_state(); } - - // Send valuechange with dead count and no subkeys to inform the api that this was cancelled - self.update_callback_value_change(record_key, ValueSubkeyRangeSet::new(), 0, None); } /// See which existing per-node watches can be renewed @@ -514,7 +516,7 @@ impl StorageManager { for pnk in state.nodes() { let Some(per_node_state) = inner .outbound_watch_manager - .per_node_state + .per_node_states .get(pnk) .cloned() else { @@ -524,7 +526,7 @@ impl StorageManager { }; per_node_states.push((*pnk, per_node_state)); } - state.edit(&inner.outbound_watch_manager.per_node_state, |editor| { + state.edit(&inner.outbound_watch_manager.per_node_states, |editor| { editor.retain_nodes(|x| !missing_pnks.contains(x)); }); @@ -616,7 +618,7 @@ impl StorageManager { *pnk, inner .outbound_watch_manager - .per_node_state + .per_node_states .get(pnk) .cloned() .unwrap(), @@ -628,7 +630,7 @@ impl StorageManager { }; // Add in any inactive per node states - for (pnk, pns) in &inner.outbound_watch_manager.per_node_state { + for (pnk, pns) in &inner.outbound_watch_manager.per_node_states { // Skip any we have already if per_node_state.contains_key(pnk) { continue; @@ -707,28 +709,37 @@ impl StorageManager { node_id, }; - let watch_id = accepted_watch.watch_id; - let opt_watcher = desired.opt_watcher; - let safety_selection = desired.safety_selection; let expiration_ts = accepted_watch.expiration_ts; let count = state.remaining_count(); - let watch_node_ref = Some(accepted_watch.node_ref); - let opt_value_changed_route = accepted_watch.opt_value_changed_route; - // Insert state, possibly overwriting an existing one - inner.outbound_watch_manager.per_node_state.insert( - pnk, - PerNodeState { - watch_id, - safety_selection, - opt_watcher, - expiration_ts, - count, - watch_node_ref, - opt_value_changed_route, - }, - ); - added_nodes.push(pnk); + // Check for accepted watch that came back with a dead watch + // (non renewal, watch id didn't exist, didn't renew in time) + if expiration_ts.as_u64() != 0 && count > 0 { + // Insert state, possibly overwriting an existing one + let watch_id = accepted_watch.watch_id; + let opt_watcher = desired.opt_watcher; + let safety_selection = desired.safety_selection; + let watch_node_ref = Some(accepted_watch.node_ref); + let opt_value_changed_route = accepted_watch.opt_value_changed_route; + + inner.outbound_watch_manager.per_node_states.insert( + pnk, + PerNodeState { + watch_id, + safety_selection, + opt_watcher, + expiration_ts, + count, + watch_node_ref, + opt_value_changed_route, + }, + ); + added_nodes.push(pnk); + } else { + // Remove per node state because this watch id was not renewed + inner.outbound_watch_manager.per_node_states.remove(&pnk); + remove_nodes.insert(pnk); + } } // Eliminate rejected for rejected_node_ref in owvresult.rejected { @@ -737,7 +748,7 @@ impl StorageManager { record_key, node_id, }; - inner.outbound_watch_manager.per_node_state.remove(&pnk); + inner.outbound_watch_manager.per_node_states.remove(&pnk); remove_nodes.insert(pnk); } // Drop unanswered but leave in per node state @@ -750,26 +761,30 @@ impl StorageManager { remove_nodes.insert(pnk); } - state.edit(&inner.outbound_watch_manager.per_node_state, |editor| { + state.edit(&inner.outbound_watch_manager.per_node_states, |editor| { editor.retain_nodes(|x| !remove_nodes.contains(x)); editor.add_nodes(added_nodes); }); } /// Get the next operation for a particular watch's state machine - /// Can be processed in the foreground, or by the bacgkround operation queue + /// Can be processed in the foreground, or by the background operation queue pub(super) fn get_next_outbound_watch_operation( &self, key: TypedKey, opt_watch_lock: Option>, cur_ts: Timestamp, - outbound_watch: &OutboundWatch, + outbound_watch: &mut OutboundWatch, ) -> Option> { let registry = self.registry(); let consensus_count = self .config() .with(|c| c.network.dht.get_value_count as usize); + // Terminate the 'desired' params for watches + // that have no remaining count or have expired + outbound_watch.update_desired_state(cur_ts); + // Check states if outbound_watch.is_dead() { // Outbound watch is dead @@ -786,7 +801,7 @@ impl StorageManager { } }; return Some(pin_dyn_future!(fut)); - } else if outbound_watch.needs_cancel(®istry, cur_ts) { + } else if outbound_watch.needs_cancel(®istry) { // Outbound watch needs to be cancelled let watch_lock = opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?; @@ -922,7 +937,7 @@ impl StorageManager { // Get per node state let Some(per_node_state) = - inner.outbound_watch_manager.per_node_state.get_mut(&pnk) + inner.outbound_watch_manager.per_node_states.get_mut(&pnk) else { // No per node state means no callback veilid_log!(self warn "missing per node state in outbound watch: {:?}", pnk); @@ -1040,8 +1055,8 @@ impl StorageManager { let state = outbound_watch.state_mut().unwrap(); if is_value_seq_newer { - let remaining_count = state.remaining_count() - 1; - state.edit(&inner.outbound_watch_manager.per_node_state, |editor| { + let remaining_count = state.remaining_count().saturating_sub(1); + state.edit(&inner.outbound_watch_manager.per_node_states, |editor| { editor.set_remaining_count(remaining_count); }); } @@ -1052,9 +1067,8 @@ impl StorageManager { // Announce ValueChanged VeilidUpdate // * if the value in the update had a newer sequence number // * if more than a single subkey has changed - // * if the count was zero meaning cancelled - - let do_update = is_value_seq_newer || subkeys.len() > 1 || remaining_count == 0; + // * cancellations (count=0) are sent by process_outbound_watch_dead(), not here + let do_update = is_value_seq_newer || subkeys.len() > 1; if do_update { let value = if is_value_seq_newer { Some(value.unwrap().value_data().clone()) diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index fdc46c63..70bc2851 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -1807,7 +1807,7 @@ impl VeilidAPI { parse_duration, ) .ok() - .map(|dur| dur + get_timestamp()) + .map(|dur| if dur == 0 { 0 } else { dur + get_timestamp() }) .unwrap_or_else(|| { rest_defaults = true; Default::default()