From 031d6463fad4215def3c992371f13fac326db486 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 8 Apr 2025 12:52:55 -0400 Subject: [PATCH] wip --- .../src/routing_table/tasks/ping_validator.rs | 2 +- veilid-core/src/storage_manager/debug.rs | 16 +- veilid-core/src/storage_manager/mod.rs | 323 ++++++----- .../src/storage_manager/outbound_watch.rs | 516 ++++++++++++++++-- .../tasks/check_outbound_watches.rs | 347 +----------- .../src/storage_manager/watch_value.rs | 47 +- veilid-core/src/veilid_api/debug.rs | 5 + veilid-core/src/veilid_api/routing_context.rs | 2 +- veilid-tools/src/async_tag_lock.rs | 34 +- 9 files changed, 704 insertions(+), 588 deletions(-) diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index 6824beff..8e76e9c1 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -236,7 +236,7 @@ impl RoutingTable { } // Get all the active watches from the storage manager - let watch_destinations = self.storage_manager().get_active_watch_nodes().await; + let watch_destinations = self.storage_manager().get_outbound_watch_nodes().await; for watch_destination in watch_destinations { let registry = self.registry(); diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index e7493fc0..ebdf1959 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -24,16 +24,18 @@ impl StorageManager { } else { "".to_owned() }; - let watch = if let Some(w) = v.outbound_watch() { - format!(" watch: {:?}\n", w) - } else { - "".to_owned() - }; - out += &format!(" {} {}{}\n", k, writer, watch); + out += &format!(" {} {}\n", k, writer); + } + format!("{}]\n", out) + } + pub async fn debug_watched_records(&self) -> String { + let inner = self.inner.lock().await; + let mut out = "[\n".to_owned(); + for (k, v) in &inner.outbound_watch_state.outbound_watches { + out += &format!(" {} {:?}\n", k, v); } format!("{}]\n", out) } - pub async fn debug_offline_records(&self) -> String { let inner = self.inner.lock().await; let Some(local_record_store) = &inner.local_record_store else { diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index d04684f2..c60ffb86 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -377,22 +377,37 @@ impl StorageManager { } /// Get the set of nodes in our active watches - pub async fn get_active_watch_nodes(&self) -> Vec { + pub async fn get_outbound_watch_nodes(&self) -> Vec { let inner = self.inner.lock().await; + let mut out = vec![]; - for opened_record in inner.opened_records.values() { - if let Some(aw) = opened_record.outbound_watch() { - for pn in &aw.per_node { + let mut node_set = HashSet::new(); + for v in inner.outbound_watch_state.outbound_watches.values() { + if let Some(current) = &v.current { + let node_refs = current.watch_node_refs(&inner.outbound_watch_state.per_node_state); + for node_ref in &node_refs { + let mut found = false; + for nid in node_ref.node_ids().iter() { + if node_set.contains(nid) { + found = true; + break; + } + } + if found { + continue; + } + + node_set.insert(node_ref.best_node_id()); out.push( Destination::direct( - pn.watch_node - .routing_domain_filtered(RoutingDomain::PublicInternet), + node_ref.routing_domain_filtered(RoutingDomain::PublicInternet), ) - .with_safety(opened_record.safety_selection()), - ); + .with_safety(current.params.safety_selection), + ) } } } + out } @@ -531,54 +546,14 @@ impl StorageManager { #[instrument(level = "trace", target = "stor", skip_all)] pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { // Attempt to close the record, returning the opened record if it wasn't already closed - let opened_record = { - let mut inner = self.inner.lock().await; - let Some(opened_record) = Self::close_record_inner(&mut inner, key)? else { - return Ok(()); - }; - opened_record - }; - - // See if we have an active watch on the closed record - let Some(active_watch) = opened_record.outbound_watch() else { + let mut inner = self.inner.lock().await; + let Some(_opened_record) = Self::close_record_inner(&mut inner, key)? else { return Ok(()); }; - // Send a one-time cancel request for the watch if we have one and we're online - if !self.dht_is_online() { - veilid_log!(self debug "skipping last-ditch watch cancel because we are offline"); - return Ok(()); - } - // Use the safety selection we opened the record with - // Use the writer we opened with as the 'watcher' as well - let opt_owvresult = match self - .outbound_watch_value_cancel( - key, - ValueSubkeyRangeSet::full(), - opened_record.safety_selection(), - opened_record.writer().cloned(), - active_watch.id, - active_watch.watch_node, - ) - .await - { - Ok(v) => v, - Err(e) => { - veilid_log!(self debug - "close record watch cancel failed: {}", e - ); - None - } - }; - if let Some(owvresult) = opt_owvresult { - if owvresult.expiration_ts.as_u64() != 0 { - veilid_log!(self debug - "close record watch cancel should have zero expiration" - ); - } - } else { - veilid_log!(self debug "close record watch cancel unsuccessful"); - } + // Set the watch to cancelled if we have one + // Will process cancellation in the background + inner.outbound_watch_state.set_desired_watch(key, None); Ok(()) } @@ -875,11 +850,25 @@ impl StorageManager { // (may need to wait for background operations to complete on the watch) let watch_lock = self.outbound_watch_lock_table.lock_tag(key).await; + self.watch_values_inner(watch_lock, subkeys, expiration, count) + .await + } + + //#[instrument(level = "trace", target = "stor", skip_all)] + async fn watch_values_inner( + &self, + watch_lock: AsyncTagLockGuard, + subkeys: ValueSubkeyRangeSet, + expiration: Timestamp, + count: u32, + ) -> VeilidAPIResult { + let key = watch_lock.tag(); + // Obtain the inner state lock - let inner = self.inner.lock().await; + let mut inner = self.inner.lock().await; // Get the safety selection and the writer we opened this record - let (safety_selection, opt_writer) = { + let (safety_selection, opt_watcher) = { let Some(opened_record) = inner.opened_records.get(&key) else { // Record must be opened already to change watch apibail_generic!("record not open"); @@ -909,17 +898,14 @@ impl StorageManager { let subkeys = schema.truncate_subkeys(&subkeys, None); // Calculate desired watch parameters - let desired = if count == 0 { + let desired_params = if count == 0 { // Cancel None } else { - // Get the minimum and maximum expiration timestamp we will accept - let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| { - ( - TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)), - TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)), - ) - }); + // Get the minimum expiration timestamp we will accept + let rpc_timeout_us = self + .config() + .with(|c| TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms))); let cur_ts = get_timestamp(); let min_expiration_ts = Timestamp::new(cur_ts + rpc_timeout_us.as_u64()); let expiration_ts = if expiration.as_u64() == 0 { @@ -935,95 +921,126 @@ impl StorageManager { expiration_ts, count, subkeys, + opt_watcher, + safety_selection, }) }; // Modify the 'desired' state of the watch or add one if it does not exist - inner.outbound_watch_state.set_desired_watch(key, desired); + inner + .outbound_watch_state + .set_desired_watch(key, desired_params); // Drop the lock for network access drop(inner); - // xxx continue here, make a 'reconcile outbound watch' routine that can be called imperatively, wait for it etc. - - // Use the safety selection we opened the record with - // Use the writer we opened with as the 'watcher' as well - let opt_owvresult = self - .outbound_watch_value( - key, - subkeys.clone(), - expiration, - count, - safety_selection, - opt_writer, - opt_active_watch.as_ref(), - ) - .await?; - - // If we did not get a valid response assume nothing changed - let Some(owvresult) = opt_owvresult else { - apibail_try_again!("did not get a valid response"); - }; - - // Clear any existing watch if the watch succeeded or got cancelled - let mut inner = self.inner.lock().await; - let Some(opened_record) = inner.opened_records.get_mut(&key) else { - apibail_generic!("record not open"); - }; - opened_record.clear_outbound_watch(); - - // Get the minimum expiration timestamp we will accept - let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| { - ( - TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)), - TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)), - ) - }); - let cur_ts = get_timestamp(); - let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64(); - let max_expiration_ts = if expiration.as_u64() == 0 { - cur_ts + max_watch_expiration_us.as_u64() - } else { - expiration.as_u64() - }; - - // Build a new active watch from the watchvalue result - - // If the expiration time is less than our minimum expiration time (or zero) consider this watch inactive - let mut expiration_ts = owvresult.expiration_ts; - if expiration_ts.as_u64() < min_expiration_ts { - // Try to fire out a last-chance watch cancellation, so the - if let Some(active_watch) = opt_active_watch.as_ref() { - self.last_change_cancel_watch(active_watch).await; - } - return Ok(Timestamp::new(0)); + // Process this watch's state machine operations until we are done + loop { + let opt_op_fut = { + let inner = self.inner.lock().await; + let Some(outbound_watch) = inner.outbound_watch_state.outbound_watches.get(&key) + else { + // Watch is gone + return Ok(Timestamp::new(0)); + }; + self.get_next_outbound_watch_operation( + key, + Some(watch_lock.clone()), + Timestamp::now(), + outbound_watch, + ) + }; + let Some(op_fut) = opt_op_fut else { + break; + }; + op_fut.await; } - // If the expiration time is greater than our maximum expiration time, clamp our local watch so we ignore extra valuechanged messages - if expiration_ts.as_u64() > max_expiration_ts { - expiration_ts = Timestamp::new(max_expiration_ts); - } + let inner = self.inner.lock().await; + let expiration_ts = inner + .outbound_watch_state + .get_min_expiration(key) + .unwrap_or_default(); + Ok(expiration_ts) - // If we requested a cancellation, then consider this watch cancelled - if count == 0 { - // Expiration returned should be zero if we requested a cancellation - if expiration_ts.as_u64() != 0 { - veilid_log!(self debug "got active watch despite asking for a cancellation"); - } - return Ok(Timestamp::new(0)); - } + // // Use the safety selection we opened the record with + // // Use the writer we opened with as the 'watcher' as well + // let opt_owvresult = self + // .outbound_watch_value( + // key, + // subkeys.clone(), + // expiration, + // count, + // safety_selection, + // opt_watcher, + // opt_active_watch.as_ref(), + // ) + // .await?; - // Keep a record of the watch - opened_record.set_outbound_watch(OutboundWatch { - id: owvresult.watch_id, - expiration_ts, - watch_node: owvresult.watch_node, - opt_value_changed_route: owvresult.opt_value_changed_route, - subkeys, - count, - }); + // // If we did not get a valid response assume nothing changed + // let Some(owvresult) = opt_owvresult else { + // apibail_try_again!("did not get a valid response"); + // }; - Ok(owvresult.expiration_ts) + // // Clear any existing watch if the watch succeeded or got cancelled + // let mut inner = self.inner.lock().await; + // let Some(opened_record) = inner.opened_records.get_mut(&key) else { + // apibail_generic!("record not open"); + // }; + // opened_record.clear_outbound_watch(); + + // // Get the minimum expiration timestamp we will accept + // let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| { + // ( + // TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)), + // TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)), + // ) + // }); + // let cur_ts = get_timestamp(); + // let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64(); + // let max_expiration_ts = if expiration.as_u64() == 0 { + // cur_ts + max_watch_expiration_us.as_u64() + // } else { + // expiration.as_u64() + // }; + + // // Build a new active watch from the watchvalue result + + // // If the expiration time is less than our minimum expiration time (or zero) consider this watch inactive + // let mut expiration_ts = owvresult.expiration_ts; + // if expiration_ts.as_u64() < min_expiration_ts { + // // Try to fire out a last-chance watch cancellation, so the + // if let Some(active_watch) = opt_active_watch.as_ref() { + // self.last_change_cancel_watch(active_watch).await; + // } + // return Ok(Timestamp::new(0)); + // } + + // // If the expiration time is greater than our maximum expiration time, clamp our local watch so we ignore extra valuechanged messages + // if expiration_ts.as_u64() > max_expiration_ts { + // expiration_ts = Timestamp::new(max_expiration_ts); + // } + + // // If we requested a cancellation, then consider this watch cancelled + // if count == 0 { + // // Expiration returned should be zero if we requested a cancellation + // if expiration_ts.as_u64() != 0 { + // veilid_log!(self debug "got active watch despite asking for a cancellation"); + // } + // return Ok(Timestamp::new(0)); + // } + + // // Keep a record of the watch + // opened_record.set_outbound_watch(OutboundWatch { + // id: owvresult.watch_id, + // expiration_ts, + // watch_node: owvresult.watch_node, + // opt_value_changed_route: owvresult.opt_value_changed_route, + // subkeys, + // count, + // }); + + // Ok(owvresult.expiration_ts) } #[instrument(level = "trace", target = "stor", skip_all)] @@ -1032,18 +1049,30 @@ impl StorageManager { key: TypedKey, subkeys: ValueSubkeyRangeSet, ) -> VeilidAPIResult { + // Obtain the watch change lock + // (may need to wait for background operations to complete on the watch) + let watch_lock = self.outbound_watch_lock_table.lock_tag(key).await; + + // Calculate change to existing watch let (subkeys, count, expiration_ts) = { let inner = self.inner.lock().await; - let Some(opened_record) = inner.opened_records.get(&key) else { + let Some(_opened_record) = inner.opened_records.get(&key) else { apibail_generic!("record not open"); }; // See what watch we have currently if any - let Some(active_watch) = opened_record.outbound_watch() else { + let Some(outbound_watch) = inner.outbound_watch_state.outbound_watches.get(&key) else { // If we didn't have an active watch, then we can just return false because there's nothing to do here return Ok(false); }; + // Ensure we have a 'desired' watch state + let Some(desired) = &outbound_watch.desired else { + // If we didn't have a desired watch, then we're already cancelling + let still_active = outbound_watch.current.is_some(); + return Ok(still_active); + }; + // Rewrite subkey range if empty to full let subkeys = if subkeys.is_empty() { ValueSubkeyRangeSet::full() @@ -1052,10 +1081,10 @@ impl StorageManager { }; // Reduce the subkey range - let new_subkeys = active_watch.params.subkeys.difference(&subkeys); + let new_subkeys = desired.subkeys.difference(&subkeys); // If no change is happening return false - if new_subkeys == active_watch.params.subkeys { + if new_subkeys == desired.subkeys { return Ok(false); } @@ -1063,16 +1092,16 @@ impl StorageManager { let count = if new_subkeys.is_empty() { 0 } else { - active_watch.params.count + desired.count }; - (new_subkeys, count, active_watch.params.expiration_ts) + (new_subkeys, count, desired.expiration_ts) }; // Update the watch. This just calls through to the above watch_values() function // This will update the active_watch so we don't need to do that in this routine. let expiration_ts = - pin_future!(self.watch_values(key, subkeys, expiration_ts, count)).await?; + pin_future!(self.watch_values_inner(watch_lock, subkeys, expiration_ts, count)).await?; // A zero expiration time returned from watch_value() means the watch is done // or no subkeys are left, and the watch is no longer active @@ -1081,7 +1110,7 @@ impl StorageManager { return Ok(false); } - // Return true because the the watch was changed + // Return true because the the watch was changed, but is not completely gone Ok(true) } diff --git a/veilid-core/src/storage_manager/outbound_watch.rs b/veilid-core/src/storage_manager/outbound_watch.rs index 4d1e3643..4d647b3b 100644 --- a/veilid-core/src/storage_manager/outbound_watch.rs +++ b/veilid-core/src/storage_manager/outbound_watch.rs @@ -1,3 +1,5 @@ +use futures_util::StreamExt as _; + use super::*; impl_veilid_log_facility!("stor"); @@ -14,6 +16,8 @@ pub(in crate::storage_manager) struct OutboundWatchParameters { pub subkeys: ValueSubkeyRangeSet, /// What key to use to perform the watch pub opt_watcher: Option, + /// What safety selection to use on the network + pub safety_selection: SafetySelection, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -30,6 +34,49 @@ pub(in crate::storage_manager) struct OutboundWatchCurrent { pub opt_value_changed_route: Option, } +impl OutboundWatchCurrent { + pub fn new( + params: OutboundWatchParameters, + opt_value_changed_route: Option, + ) -> Self { + let remaining_count = params.count; + let min_expiration_ts = params.expiration_ts; + + Self { + params, + nodes: vec![], + min_expiration_ts, + remaining_count, + opt_value_changed_route, + } + } + + pub fn update(&mut self, per_node_state: &HashMap) { + self.min_expiration_ts = self + .nodes + .iter() + .map(|x| per_node_state.get(x).unwrap().expiration_ts) + .reduce(|a, b| a.min(b)) + .unwrap_or(self.params.expiration_ts); + } + pub fn watch_node_refs( + &self, + per_node_state: &HashMap, + ) -> Vec { + self.nodes + .iter() + .map(|x| { + per_node_state + .get(x) + .unwrap() + .watch_node_ref + .clone() + .unwrap() + }) + .collect() + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub(in crate::storage_manager) struct OutboundWatch { /// Current state @@ -102,7 +149,7 @@ impl OutboundWatch { || self.needs_cancel(registry, cur_ts) || self.needs_renew(registry, cur_ts) { - veilid_log!(registry warn "should have checked for is_dead, needs_cancel first"); + veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first"); return false; } @@ -208,51 +255,438 @@ impl OutboundWatchState { } } } + + pub fn get_min_expiration(&self, record_key: TypedKey) -> Option { + self.outbound_watches + .get(&record_key) + .map(|x| x.current.as_ref().map(|y| y.min_expiration_ts)) + .flatten() + } } -// impl OutboundWatchCurrent { -// pub fn new( -// params: OutboundWatchParameters, -// opt_value_changed_route: Option, -// ) -> Self { -// let remaining_count = params.count; -// let min_expiration_ts = params.expiration_ts; +impl StorageManager { + /// Remove dead watches from the table + pub(super) async fn process_outbound_watch_dead( + &self, + watch_lock: AsyncTagLockGuard, + ) { + let record_key = watch_lock.tag(); -// Self { -// params, -// per_node: vec![], -// min_expiration_ts, -// remaining_count, -// opt_value_changed_route, -// } -// } + let mut inner = self.inner.lock().await; + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .remove(&record_key) + else { + veilid_log!(self warn "dead watch should have still been in the table"); + return; + }; -// pub fn per_node_outbound_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeOutboundWatch> { -// self.per_node.iter().find(|x| x.id == watch_id) -// } + if outbound_watch.current.is_some() { + veilid_log!(self warn "dead watch still had current state"); + } + if outbound_watch.desired.is_some() { + veilid_log!(self warn "dead watch still had desired params"); + } + } -// pub fn per_node_outbound_watch_by_id_mut( -// &mut self, -// watch_id: u64, -// ) -> Option<&mut PerNodeOutboundWatch> { -// self.per_node.iter_mut().find(|x| x.id == watch_id) -// } + /// Get the list of remaining active watch ids + /// and call their nodes to cancel the watch + pub(super) async fn process_outbound_watch_cancel( + &self, + watch_lock: AsyncTagLockGuard, + ) { + let record_key = watch_lock.tag(); -// pub fn remove_per_node_outbound_watch_by_id(&mut self, watch_id: u64) { -// let Some(n) = self.per_node.iter().position(|x| x.id == watch_id) else { -// return; -// }; -// self.per_node.remove(n); + // If we can't do this operation right now, don't try + if !self.dht_is_online() { + return; + } -// self.update_min_expiration_ts(); -// } + let per_node_states = { + let inner = &mut *self.inner.lock().await; + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "watch being cancelled should have still been in the table"); + return; + }; + let Some(current) = &mut outbound_watch.current else { + veilid_log!(self warn "watch being cancelled should have current state"); + return; + }; + let mut per_node_states = vec![]; + let mut dead_pnks = BTreeSet::new(); + for pnk in ¤t.nodes { + let Some(per_node_state) = + inner.outbound_watch_state.per_node_state.get(&pnk).cloned() + else { + veilid_log!(self warn "missing per-node state for watch"); + dead_pnks.insert(*pnk); + continue; + }; + per_node_states.push((*pnk, per_node_state)); + } + current.nodes.retain(|x| !dead_pnks.contains(x)); -// fn update_min_expiration_ts(&mut self) { -// self.min_expiration_ts = self -// .per_node -// .iter() -// .map(|x| x.expiration_ts) -// .reduce(|a, b| a.min(b)) -// .unwrap_or(self.params.expiration_ts); -// } -// } + per_node_states + }; + + // Now reach out to each node and cancel their watch ids + let mut unord = FuturesUnordered::new(); + for (pnk, pns) in per_node_states { + unord.push(async move { + let res = self + .outbound_watch_value_cancel( + pnk.record_key, + pns.opt_watcher, + pns.safety_selection, + pns.watch_node_ref.unwrap(), + pns.watch_id, + ) + .await; + (pnk, res) + }); + } + + let mut cancelled = vec![]; + while let Some((pnk, res)) = unord.next().await { + match res { + Ok(_) => { + // Remove from 'per node states' because we got some response + cancelled.push(pnk); + } + Err(e) => { + veilid_log!(self debug "outbound watch cancel error: {}", e); + + // xxx should do something different for network unreachable vs host unreachable + // Leave in the 'per node states' for now because we couldn't contact the node + // but remove from this watch. We'll try the cancel again if we reach this node again during fanout. + } + } + } + + // Update state + { + let inner = &mut *self.inner.lock().await; + + // Remove per node watches we cancelled + for pnk in cancelled { + if inner + .outbound_watch_state + .per_node_state + .remove(&pnk) + .is_none() + { + veilid_log!(self warn "per-node watch being cancelled should have still been in the table"); + }; + } + + // Remove outbound watch we've cancelled + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "watch being cancelled should have still been in the table"); + return; + }; + + // Mark as dead now that we cancelled + let Some(_current) = outbound_watch.current.take() else { + veilid_log!(self warn "watch being cancelled should have current state"); + return; + }; + } + } + + /// See which existing per-node watches can be renewed + /// and drop the ones that can't be or are dead + pub(super) async fn process_outbound_watch_renew( + &self, + watch_lock: AsyncTagLockGuard, + ) { + let record_key = watch_lock.tag(); + + // If we can't do this operation right now, don't try + if !self.dht_is_online() { + return; + } + + let (per_node_states, renew_params) = { + let inner = &mut *self.inner.lock().await; + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "watch being renewed should have still been in the table"); + return; + }; + let Some(current) = &mut outbound_watch.current else { + veilid_log!(self warn "watch being renewed should have current state"); + return; + }; + let mut per_node_states = vec![]; + let mut dead_pnks = BTreeSet::new(); + for pnk in ¤t.nodes { + let Some(per_node_state) = + inner.outbound_watch_state.per_node_state.get(&pnk).cloned() + else { + veilid_log!(self warn "missing per-node state for watch"); + dead_pnks.insert(*pnk); + continue; + }; + per_node_states.push((*pnk, per_node_state)); + } + current.nodes.retain(|x| !dead_pnks.contains(x)); + + // Change the params to update count + let mut renew_params = current.params.clone(); + renew_params.count = current.remaining_count; + + (per_node_states, renew_params) + }; + + // Now reach out to each node and renew their watches + let mut unord = FuturesUnordered::new(); + for (pnk, pns) in per_node_states { + let params = renew_params.clone(); + unord.push(async move { + let res = self + .outbound_watch_value_change( + pnk.record_key, + params, + pns.safety_selection, + pns.watch_node_ref.unwrap(), + pns.watch_id, + ) + .await; + (pnk, res) + }); + } + + let mut renewed = vec![]; + let mut rejected = vec![]; + let mut unanswered = vec![]; + while let Some((pnk, res)) = unord.next().await { + match res { + Ok(Some(r)) => { + // Note per node states we should keep vs throw away + renewed.push((pnk, r)); + } + Ok(None) => { + rejected.push(pnk); + } + Err(e) => { + veilid_log!(self debug "outbound watch change error: {}", e); + // Leave in the 'per node states' for now because we couldn't contact the node + // but remove from this watch. + + // xxx should do something different for network unreachable vs host unreachable + unanswered.push(pnk); + } + } + } + + // Update state + { + let inner = &mut *self.inner.lock().await; + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "watch being renewed should have still been in the table"); + return; + }; + let Some(current) = &mut outbound_watch.current else { + veilid_log!(self warn "watch being renewed should have current state"); + return; + }; + + let mut dead_pnks = BTreeSet::new(); + + // Perform renewals + for (pnk, r) in renewed { + let watch_node = r.watch_nodes.first().cloned().unwrap(); + let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk) + else { + veilid_log!(self warn "missing per-node state for watch"); + dead_pnks.insert(pnk); + continue; + }; + per_node_state.count = renew_params.count; + per_node_state.expiration_ts = watch_node.expiration_ts; + per_node_state.watch_id = watch_node.watch_id; + } + // Eliminate rejected + for pnk in rejected { + if inner + .outbound_watch_state + .per_node_state + .remove(&pnk) + .is_none() + { + veilid_log!(self warn "per-node watch being renewed should have still been in the table"); + } + dead_pnks.insert(pnk); + } + // Drop unanswered but leave in per node state + for pnk in unanswered { + dead_pnks.insert(pnk); + } + + current.nodes.retain(|x| !dead_pnks.contains(x)); + + // Update outbound watch + current.update(&inner.outbound_watch_state.per_node_state); + } + } + + /// Perform fanout to add per-node watches to an outbound watch + /// Must have no current state, or have a match to desired parameters + pub(super) async fn process_outbound_watch_reconcile( + &self, + watch_lock: AsyncTagLockGuard, + ) { + let record_key = watch_lock.tag(); + + // If we can't do this operation right now, don't try + if !self.dht_is_online() { + return; + } + + // Get the nodes already active on this watch, + // and the parameters to fanout with for the rest + let (active_nodes, reconcile_params) = { + let inner = &mut *self.inner.lock().await; + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "watch being reconciled should have still been in the table"); + return; + }; + + // Get params to reconcile + let Some(desired) = &outbound_watch.desired else { + veilid_log!(self warn "watch being reconciled should have had desired parameters"); + return; + }; + + let active_nodes = if let Some(current) = &mut outbound_watch.current { + // Assert matching parameters + if ¤t.params != desired { + veilid_log!(self warn "watch being reconciled should have had matching current and desired parameters"); + return; + } + current.nodes.iter().map(|x| x.node_id).collect() + } else { + vec![] + }; + + let reconcile_params = desired.clone(); + + (active_nodes, reconcile_params) + }; + + // Now fan out with parameters and get new per node watches + self.outbound_watch_value(record_key, reconcile_params, active_nodes) + .await; + } + + /// Get the next operation for a particular watch's state machine + /// Can be processed in the foreground, or by the bacgkround operation queue + pub(super) fn get_next_outbound_watch_operation( + &self, + key: TypedKey, + opt_watch_lock: Option>, + cur_ts: Timestamp, + outbound_watch: &OutboundWatch, + ) -> Option> { + let registry = self.registry(); + let consensus_count = self + .config() + .with(|c| c.network.dht.get_value_count as usize); + + // Check states + if outbound_watch.is_dead() { + // Outbound watch is dead + let Some(watch_lock) = + opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key)) + else { + return None; + }; + + let fut = { + let registry = self.registry(); + async move { + registry + .storage_manager() + .process_outbound_watch_dead(watch_lock) + .await + } + }; + return Some(pin_dyn_future!(fut)); + } else if outbound_watch.needs_cancel(®istry, cur_ts) { + // Outbound watch needs to be cancelled + let Some(watch_lock) = + opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key)) + else { + return None; + }; + + let fut = { + let registry = self.registry(); + async move { + registry + .storage_manager() + .process_outbound_watch_cancel(watch_lock) + .await + } + }; + return Some(pin_dyn_future!(fut)); + } else if outbound_watch.needs_renew(®istry, cur_ts) { + // Outbound watch expired but can be renewed + let Some(watch_lock) = + opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key)) + else { + return None; + }; + + let fut = { + let registry = self.registry(); + async move { + registry + .storage_manager() + .process_outbound_watch_renew(watch_lock) + .await + } + }; + return Some(pin_dyn_future!(fut)); + } else if outbound_watch.needs_reconcile(®istry, consensus_count, cur_ts) { + // Outbound watch parameters have changed or it needs more nodes + let Some(watch_lock) = + opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key)) + else { + return None; + }; + + let fut = { + let registry = self.registry(); + async move { + registry + .storage_manager() + .process_outbound_watch_reconcile(watch_lock) + .await + } + }; + return Some(pin_dyn_future!(fut)); + } + None + } +} diff --git a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs index e3dc0845..acb7fe97 100644 --- a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs @@ -1,279 +1,6 @@ -use futures_util::StreamExt as _; - use super::*; impl StorageManager { - /// Remove dead watches from the table - pub(super) async fn process_outbound_watch_dead( - &self, - watch_lock: AsyncTagLockGuard, - ) { - let record_key = watch_lock.tag(); - - let mut inner = self.inner.lock().await; - let Some(outbound_watch) = inner - .outbound_watch_state - .outbound_watches - .remove(&record_key) - else { - veilid_log!(self warn "dead watch should have still been in the table"); - return; - }; - - if outbound_watch.current.is_some() { - veilid_log!(self warn "dead watch still had current state"); - } - if outbound_watch.desired.is_some() { - veilid_log!(self warn "dead watch still had desired params"); - } - } - - /// Get the list of remaining active watch ids - /// and call their nodes to cancel the watch - pub(super) async fn process_outbound_watch_cancel( - &self, - watch_lock: AsyncTagLockGuard, - ) { - let record_key = watch_lock.tag(); - - // If we can't do this operation right now, don't try - if !self.dht_is_online() { - return; - } - - let per_node_states = { - let inner = &mut *self.inner.lock().await; - let Some(outbound_watch) = inner - .outbound_watch_state - .outbound_watches - .get_mut(&record_key) - else { - veilid_log!(self warn "watch being cancelled should have still been in the table"); - return; - }; - let Some(current) = &mut outbound_watch.current else { - veilid_log!(self warn "watch being cancelled should have current state"); - return; - }; - let mut per_node_states = vec![]; - let mut dead_pnks = BTreeSet::new(); - for pnk in ¤t.nodes { - let Some(per_node_state) = - inner.outbound_watch_state.per_node_state.get(&pnk).cloned() - else { - veilid_log!(self warn "missing per-node state for watch"); - dead_pnks.insert(*pnk); - continue; - }; - per_node_states.push((*pnk, per_node_state)); - } - current.nodes.retain(|x| !dead_pnks.contains(x)); - - per_node_states - }; - - // Now reach out to each node and cancel their watch ids - let mut unord = FuturesUnordered::new(); - for (pnk, pns) in per_node_states { - unord.push(async move { - let res = self - .outbound_watch_value_cancel( - pnk.record_key, - pns.opt_watcher, - pns.safety_selection, - pns.watch_node_ref.unwrap(), - pns.watch_id, - ) - .await; - (pnk, res) - }); - } - - let mut cancelled = vec![]; - while let Some((pnk, res)) = unord.next().await { - match res { - Ok(_) => { - // Remove from 'per node states' because we got some response - cancelled.push(pnk); - } - Err(e) => { - veilid_log!(self debug "outbound watch cancel error: {}", e); - // Leave in the 'per node states' for now because we couldn't contact the node - // but remove from this watch. We'll try the cancel again if we reach this node again during fanout. - } - } - } - - // Update state - { - let inner = &mut *self.inner.lock().await; - for pnk in cancelled { - let Some(outbound_watch) = inner - .outbound_watch_state - .outbound_watches - .get_mut(&pnk.record_key) - else { - veilid_log!(self warn "watch being cancelled should have still been in the table"); - return; - }; - - // Mark as dead now that we cancelled - let Some(_current) = outbound_watch.current.take() else { - veilid_log!(self warn "watch being cancelled should have current state"); - return; - }; - } - } - } - - /// See which existing per-node watches can be renewed - /// and drop the ones that can't be or are dead - pub(super) async fn process_outbound_watch_renew( - &self, - watch_lock: AsyncTagLockGuard, - ) { - let record_key = watch_lock.tag(); - - // If we can't do this operation right now, don't try - if !self.dht_is_online() { - return; - } - - let (per_node_states, renew_params) = { - let inner = &mut *self.inner.lock().await; - let Some(outbound_watch) = inner - .outbound_watch_state - .outbound_watches - .get_mut(&record_key) - else { - veilid_log!(self warn "watch being renewed should have still been in the table"); - return; - }; - let Some(current) = &mut outbound_watch.current else { - veilid_log!(self warn "watch being renewed should have current state"); - return; - }; - let mut per_node_states = vec![]; - let mut dead_pnks = BTreeSet::new(); - for pnk in ¤t.nodes { - let Some(per_node_state) = - inner.outbound_watch_state.per_node_state.get(&pnk).cloned() - else { - veilid_log!(self warn "missing per-node state for watch"); - dead_pnks.insert(*pnk); - continue; - }; - per_node_states.push((*pnk, per_node_state)); - } - current.nodes.retain(|x| !dead_pnks.contains(x)); - - // Change the params to update count - let mut renew_params = current.params.clone(); - renew_params.count = current.remaining_count; - - (per_node_states, renew_params) - }; - - // Now reach out to each node and renew their watches - let mut unord = FuturesUnordered::new(); - for (pnk, pns) in per_node_states { - let params = renew_params.clone(); - unord.push(async move { - let res = self - .outbound_watch_value_change( - pnk.record_key, - params, - pns.safety_selection, - pns.watch_node_ref.unwrap(), - pns.watch_id, - ) - .await; - (pnk, res) - }); - } - - let mut renewed = vec![]; - let mut rejected = vec![]; - let mut unanswered = vec![]; - while let Some((pnk, res)) = unord.next().await { - match res { - Ok(Some(r)) => { - // Note per node states we should keep vs throw away - renewed.push((pnk, r)); - } - Ok(None) => { - rejected.push(pnk); - } - Err(e) => { - veilid_log!(self debug "outbound watch change error: {}", e); - // Leave in the 'per node states' for now because we couldn't contact the node - // but remove from this watch. - - // xxx should do something different for network unreachable vs host unreachable - unanswered.push(pnk); - } - } - } - - // Update state - { - let inner = &mut *self.inner.lock().await; - let Some(outbound_watch) = inner - .outbound_watch_state - .outbound_watches - .get_mut(&record_key) - else { - veilid_log!(self warn "watch being renewed should have still been in the table"); - return; - }; - let Some(current) = &mut outbound_watch.current else { - veilid_log!(self warn "watch being renewed should have current state"); - return; - }; - - let mut dead_pnks = BTreeSet::new(); - - // Perform renewals - for (pnk, r) in renewed { - let watch_node = r.watch_nodes.first().cloned().unwrap(); - let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk) - else { - veilid_log!(self warn "missing per-node state for watch"); - dead_pnks.insert(pnk); - continue; - }; - per_node_state.count = renew_params.count; - per_node_state.expiration_ts = watch_node.expiration_ts; - per_node_state.watch_id = watch_node.watch_id; - } - // Eliminate rejected - for pnk in rejected { - inner.outbound_watch_state.per_node_state.remove(&pnk); - dead_pnks.insert(pnk); - } - // Drop unanswered but leave in per node state - for pnk in unanswered { - dead_pnks.insert(pnk); - } - - current.nodes.retain(|x| !dead_pnks.contains(x)); - } - } - - pub(super) async fn process_outbound_watch_reconcile( - &self, - watch_lock: AsyncTagLockGuard, - ) { - let record_key = watch_lock.tag(); - - // If we can't do this operation right now, don't try - if !self.dht_is_online() { - return; - } - - xxx continue here - } - // Check if client-side watches on opened records either have dead nodes or if the watch has expired //#[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn check_outbound_watches_task_routine( @@ -284,11 +11,7 @@ impl StorageManager { ) -> EyreResult<()> { let mut inner = self.inner.lock().await; - let registry = self.registry(); let cur_ts = Timestamp::now(); - let consensus_count = self - .config() - .with(|c| c.network.dht.get_value_count as usize); // Iterate all per-node watches and remove expired ones that are unreferenced let mut dead_pnks = HashSet::new(); @@ -311,74 +34,14 @@ impl StorageManager { .per_node_state .retain(|k, _| !dead_pnks.contains(k)); - // Iterate all outbound watches - // Determine what work needs doing if any + // Iterate all outbound watches and determine what work needs doing if any for (k, v) in &inner.outbound_watch_state.outbound_watches { let k = *k; - // Check states - if v.is_dead() { - // Outbound watch is dead - let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else { - continue; - }; - - let fut = { - let registry = self.registry(); - async move { - registry - .storage_manager() - .process_outbound_watch_dead(watch_lock) - .await - } - }; - self.background_operation_processor.add_future(fut); - } else if v.needs_cancel(®istry, cur_ts) { - // Outbound watch needs to be cancelled - let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else { - continue; - }; - let fut = { - let registry = self.registry(); - async move { - registry - .storage_manager() - .process_outbound_watch_cancel(watch_lock) - .await - } - }; - self.background_operation_processor.add_future(fut); - } else if v.needs_renew(®istry, cur_ts) { - // Outbound watch expired but can be renewed - let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else { - continue; - }; - let fut = { - let registry = self.registry(); - async move { - registry - .storage_manager() - .process_outbound_watch_renew(watch_lock) - .await - } - }; - self.background_operation_processor.add_future(fut); - } else if v.needs_reconcile(®istry, consensus_count, cur_ts) { - // Outbound watch parameters have changed or it needs more nodes - let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else { - continue; - }; - let fut = { - let registry = self.registry(); - async move { - registry - .storage_manager() - .process_outbound_watch_reconcile(watch_lock) - .await - } - }; - self.background_operation_processor.add_future(fut); - } + // Get next work on watch and queue it if we have something to do + if let Some(op_fut) = self.get_next_outbound_watch_operation(k, None, cur_ts, v) { + self.background_operation_processor.add_future(op_fut); + }; } // // See if the watch is expired or out of updates diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 502b6fbb..dcb78125 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -128,55 +128,15 @@ impl StorageManager { } } - /// Perform a 'watch value' query on the network using fanout XXX rewrite this so api-based cancel/change/new make sense + /// Perform a 'watch value' query on the network using fanout #[allow(clippy::too_many_arguments)] #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_watch_value( &self, key: TypedKey, params: OutboundWatchParameters, - safety_selection: SafetySelection, active_nodes: Vec, ) -> VeilidAPIResult> { - // if the count is zero, we are cancelling - if count == 0 { - // Ensure active watch is specified - let Some(active_watch) = opt_active_watch else { - apibail_internal!("Must specify an active watch in order to cancel it"); - }; - return self - .outbound_watch_value_cancel( - key, - subkeys, - safety_selection, - opt_watcher, - active_watch, - ) - .await; - } - - // if the watch id and watch node are specified, then we're trying to change an existing watch - // first try to do that, then fall back to fanout for a new watch id - if let Some(active_watch) = opt_active_watch { - if let Some(res) = self - .outbound_watch_value_change( - key, - subkeys.clone(), - expiration, - count, - safety_selection, - opt_watcher, - active_watch, - ) - .await? - { - // If a change was successful then return immediately - return Ok(Some(res)); - } - - // Otherwise, treat this like a new watch - } - let routing_domain = RoutingDomain::PublicInternet; // Get the DHT parameters for 'WatchValue', some of which are the same for 'GetValue' operations @@ -191,8 +151,9 @@ impl StorageManager { // Get the appropriate watcher key, if anonymous use a static anonymous watch key // which lives for the duration of the app's runtime - let watcher = - opt_watcher.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value); + let watcher = params + .opt_watcher + .unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value); // Get the nodes we know are caching this value to seed the fanout let init_fanout_queue = { diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 34f58d55..c6df3800 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -1474,6 +1474,11 @@ impl VeilidAPI { out += &storage_manager.debug_opened_records().await; out } + "watched" => { + let mut out = "Watched Records:\n".to_string(); + out += &storage_manager.debug_watched_records().await; + out + } "offline" => { let mut out = "Offline Records:\n".to_string(); out += &storage_manager.debug_offline_records().await; diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index acdfe783..d07808c3 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -404,7 +404,7 @@ impl RoutingContext { /// /// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future, /// and the returned timestamp will be no later than the requested expiration, but -may- be before the requested expiration. - /// If the returned timestamp is zero it indicates that the watch creation or update has failed. In the case of a failed update, the watch is considered cancelled. + /// If the returned timestamp is zero it indicates that the watch is considered cancelled, either from a failed update or due to `count` being zero /// /// DHT watches are accepted with the following conditions: /// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record. diff --git a/veilid-tools/src/async_tag_lock.rs b/veilid-tools/src/async_tag_lock.rs index a3faeeab..685b2461 100644 --- a/veilid-tools/src/async_tag_lock.rs +++ b/veilid-tools/src/async_tag_lock.rs @@ -3,8 +3,26 @@ use super::*; use core::fmt::Debug; use core::hash::Hash; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct AsyncTagLockGuard +where + T: Hash + Eq + Clone + Debug, +{ + inner: Arc>, +} + +impl AsyncTagLockGuard +where + T: Hash + Eq + Clone + Debug, +{ + #[must_use] + pub fn tag(&self) -> T { + self.inner.tag() + } +} + +#[derive(Debug)] +struct AsyncTagLockGuardInner where T: Hash + Eq + Clone + Debug, { @@ -13,7 +31,7 @@ where guard: Option>, } -impl AsyncTagLockGuard +impl AsyncTagLockGuardInner where T: Hash + Eq + Clone + Debug, { @@ -25,12 +43,12 @@ where } } - pub fn tag(&self) -> T { + fn tag(&self) -> T { self.tag.clone() } } -impl Drop for AsyncTagLockGuard +impl Drop for AsyncTagLockGuardInner where T: Hash + Eq + Clone + Debug, { @@ -137,7 +155,9 @@ where let guard = asyncmutex_lock_arc!(mutex); // Return the locked guard - AsyncTagLockGuard::new(self.clone(), tag, guard) + AsyncTagLockGuard { + inner: Arc::new(AsyncTagLockGuardInner::new(self.clone(), tag, guard)), + } } pub fn try_lock_tag(&self, tag: T) -> Option> { @@ -164,7 +184,9 @@ where } }; // Return guard - Some(AsyncTagLockGuard::new(self.clone(), tag, guard)) + Some(AsyncTagLockGuard { + inner: Arc::new(AsyncTagLockGuardInner::new(self.clone(), tag, guard)), + }) } }