From 922f4d9e152e5ba591ae52dd0edc11add5ace1fd Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 7 Apr 2025 15:04:26 -0400 Subject: [PATCH] [ci skip] xfer --- .../tasks/check_outbound_watches.rs | 161 ++++++++++++------ .../src/storage_manager/watch_value.rs | 54 +----- 2 files changed, 115 insertions(+), 100 deletions(-) diff --git a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs index 6a22be36..e3dc0845 100644 --- a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs @@ -27,6 +27,7 @@ impl StorageManager { veilid_log!(self warn "dead watch still had desired params"); } } + /// Get the list of remaining active watch ids /// and call their nodes to cancel the watch pub(super) async fn process_outbound_watch_cancel( @@ -78,8 +79,8 @@ impl StorageManager { let res = self .outbound_watch_value_cancel( pnk.record_key, - pns.safety_selection, pns.opt_watcher, + pns.safety_selection, pns.watch_node_ref.unwrap(), pns.watch_id, ) @@ -106,24 +107,27 @@ impl StorageManager { // Update state { let inner = &mut *self.inner.lock().await; - let Some(outbound_watch) = inner - .outbound_watch_state - .outbound_watches - .get_mut(&record_key) - else { - veilid_log!(self warn "watch being cancelled should have still been in the table"); - return; - }; - let Some(current) = &mut outbound_watch.current else { - veilid_log!(self warn "watch being cancelled should have current state"); - return; - }; + for pnk in cancelled { + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&pnk.record_key) + else { + veilid_log!(self warn "watch being cancelled should have still been in the table"); + return; + }; - // Mark as dead now that we cancelled - outbound_watch.current = None; + // Mark as dead now that we cancelled + let Some(_current) = outbound_watch.current.take() else { + veilid_log!(self warn "watch being cancelled should have current state"); + return; + }; + } } } + /// See which existing per-node watches can be renewed + /// and drop the ones that can't be or are dead pub(super) async fn process_outbound_watch_renew( &self, watch_lock: AsyncTagLockGuard, @@ -135,18 +139,18 @@ impl StorageManager { return; } - let (per_node_states, params, safety_selection) = { + let (per_node_states, renew_params) = { let inner = &mut *self.inner.lock().await; let Some(outbound_watch) = inner .outbound_watch_state .outbound_watches .get_mut(&record_key) else { - veilid_log!(self warn "watch being cancelled should have still been in the table"); + veilid_log!(self warn "watch being renewed should have still been in the table"); return; }; let Some(current) = &mut outbound_watch.current else { - veilid_log!(self warn "watch being cancelled should have current state"); + veilid_log!(self warn "watch being renewed should have current state"); return; }; let mut per_node_states = vec![]; @@ -163,13 +167,17 @@ impl StorageManager { } current.nodes.retain(|x| !dead_pnks.contains(x)); - (per_node_states, current.params.clone()) + // Change the params to update count + let mut renew_params = current.params.clone(); + renew_params.count = current.remaining_count; + + (per_node_states, renew_params) }; // Now reach out to each node and renew their watches let mut unord = FuturesUnordered::new(); - let cur_ts = Timestamp::now(); for (pnk, pns) in per_node_states { + let params = renew_params.clone(); unord.push(async move { let res = self .outbound_watch_value_change( @@ -186,44 +194,70 @@ impl StorageManager { let mut renewed = vec![]; let mut rejected = vec![]; + let mut unanswered = vec![]; while let Some((pnk, res)) = unord.next().await { match res { - Ok(accepted) => { + Ok(Some(r)) => { // Note per node states we should keep vs throw away - if accepted { - renewed.push(pnk); - } else { - rejected.push(pnk); - } + renewed.push((pnk, r)); + } + Ok(None) => { + rejected.push(pnk); } Err(e) => { - veilid_log!(self debug "outbound watch cancel error: {}", e); + veilid_log!(self debug "outbound watch change error: {}", e); // Leave in the 'per node states' for now because we couldn't contact the node // but remove from this watch. - rejected.push(pnk); + + // xxx should do something different for network unreachable vs host unreachable + unanswered.push(pnk); } } } - // // Update state - // { - // let inner = &mut *self.inner.lock().await; - // let Some(outbound_watch) = inner - // .outbound_watch_state - // .outbound_watches - // .get_mut(&record_key) - // else { - // veilid_log!(self warn "watch being cancelled should have still been in the table"); - // return; - // }; - // let Some(current) = &mut outbound_watch.current else { - // veilid_log!(self warn "watch being cancelled should have current state"); - // return; - // }; + // Update state + { + let inner = &mut *self.inner.lock().await; + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "watch being renewed should have still been in the table"); + return; + }; + let Some(current) = &mut outbound_watch.current else { + veilid_log!(self warn "watch being renewed should have current state"); + return; + }; - // // Mark as dead now that we cancelled - // outbound_watch.current = None; - // } + let mut dead_pnks = BTreeSet::new(); + + // Perform renewals + for (pnk, r) in renewed { + let watch_node = r.watch_nodes.first().cloned().unwrap(); + let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk) + else { + veilid_log!(self warn "missing per-node state for watch"); + dead_pnks.insert(pnk); + continue; + }; + per_node_state.count = renew_params.count; + per_node_state.expiration_ts = watch_node.expiration_ts; + per_node_state.watch_id = watch_node.watch_id; + } + // Eliminate rejected + for pnk in rejected { + inner.outbound_watch_state.per_node_state.remove(&pnk); + dead_pnks.insert(pnk); + } + // Drop unanswered but leave in per node state + for pnk in unanswered { + dead_pnks.insert(pnk); + } + + current.nodes.retain(|x| !dead_pnks.contains(x)); + } } pub(super) async fn process_outbound_watch_reconcile( @@ -231,7 +265,13 @@ impl StorageManager { watch_lock: AsyncTagLockGuard, ) { let record_key = watch_lock.tag(); - // + + // If we can't do this operation right now, don't try + if !self.dht_is_online() { + return; + } + + xxx continue here } // Check if client-side watches on opened records either have dead nodes or if the watch has expired @@ -242,18 +282,41 @@ impl StorageManager { _last_ts: Timestamp, _cur_ts: Timestamp, ) -> EyreResult<()> { - let inner = self.inner.lock().await; + let mut inner = self.inner.lock().await; - // Iterate all outbound watches let registry = self.registry(); let cur_ts = Timestamp::now(); let consensus_count = self .config() .with(|c| c.network.dht.get_value_count as usize); + // Iterate all per-node watches and remove expired ones that are unreferenced + let mut dead_pnks = HashSet::new(); + for (pnk, pns) in &inner.outbound_watch_state.per_node_state { + if cur_ts >= pns.expiration_ts || pns.count == 0 { + dead_pnks.insert(*pnk); + } + } + for (_, v) in &inner.outbound_watch_state.outbound_watches { + // If it's still referenced, keep it + let Some(current) = &v.current else { + continue; + }; + for pnk in ¤t.nodes { + dead_pnks.remove(pnk); + } + } + inner + .outbound_watch_state + .per_node_state + .retain(|k, _| !dead_pnks.contains(k)); + + // Iterate all outbound watches // Determine what work needs doing if any for (k, v) in &inner.outbound_watch_state.outbound_watches { let k = *k; + + // Check states if v.is_dead() { // Outbound watch is dead let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else { diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 5806e4d8..502b6fbb 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -29,56 +29,6 @@ pub(super) struct OutboundWatchValueResult { } impl StorageManager { - /// Perform a 'watch value cancel' on a set of nodes without fanout - /// Returns the list of successfully cancelled ids and just logs failures - pub(super) async fn outbound_watch_value_cancel_set( - &self, - key: TypedKey, - safety_selection: SafetySelection, - opt_watcher: Option, - outbound_watch: &OutboundWatch, - ) -> Vec { - let mut unord = FuturesUnordered::new(); - for pn in &outbound_watch.per_node { - unord.push(async { - - let cancelled = match self.outbound_watch_value_cancel( - key, - safety_selection, - opt_watcher, - pn.watch_node.clone(), - pn.id, - ).await { - Ok(_) => { - // Either watch was cancelled, or it didn't exist, but it's not there now - true - } - Err(e) => { - veilid_log!(self debug "Outbound watch value (id={}) cancel to {} failed: {}", pn.id, pn.watch_node, e); - false - } - }; - - if cancelled { - Some(pn.id) - } else { - None - } - }); - } - let mut cancelled = vec![]; - while let Some(x) = unord.next().await { - match x { - Some(id) => { - cancelled.push(id); - } - None => {} - } - } - - cancelled - } - /// Perform a 'watch value cancel' on the network without fanout #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_watch_value_cancel( @@ -149,7 +99,7 @@ impl StorageManager { .with_safety(safety_selection), key, params.subkeys, - params.expiration, + params.expiration_ts, params.count, watcher, Some(watch_id), @@ -447,6 +397,8 @@ impl StorageManager { inbound_node_id: TypedKey, watch_id: u64, ) -> VeilidAPIResult> { + // xxx remember to update per_node_state with lower count + // Update local record store with new value let (is_value_seq_newer, value) = { let mut inner = self.inner.lock().await;