From 39cb38047462afdd93eca8af096a10acc738ec1c Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Wed, 9 Apr 2025 09:07:00 -0400 Subject: [PATCH] xfer --- veilid-core/src/storage_manager/mod.rs | 2 + .../src/storage_manager/outbound_watch.rs | 133 +++++++++++++-- .../src/storage_manager/watch_value.rs | 159 +++++++++++------- 3 files changed, 221 insertions(+), 73 deletions(-) diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index c60ffb86..6bef5a02 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -32,6 +32,8 @@ const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 5; const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1; /// Frequency to check for dead nodes and routes for client-side outbound watches const CHECK_OUTBOUND_WATCHES_INTERVAL_SECS: u32 = 1; +/// Frequency to retry reconciliation of watches that are not at consensus +const RECONCILE_OUTBOUND_WATCHES_INTERVAL_SECS: u32 = 30; /// Frequency to check for expired server-side watched records const CHECK_WATCHED_RECORDS_INTERVAL_SECS: u32 = 1; /// Table store table for storage manager metadata diff --git a/veilid-core/src/storage_manager/outbound_watch.rs b/veilid-core/src/storage_manager/outbound_watch.rs index 4d647b3b..a03f3ae8 100644 --- a/veilid-core/src/storage_manager/outbound_watch.rs +++ b/veilid-core/src/storage_manager/outbound_watch.rs @@ -1,6 +1,6 @@ use futures_util::StreamExt as _; -use super::*; +use super::{watch_value::OutboundWatchValueResult, *}; impl_veilid_log_facility!("stor"); @@ -30,15 +30,12 @@ pub(in crate::storage_manager) struct OutboundWatchCurrent { pub min_expiration_ts: Timestamp, /// How many value change updates remain pub remaining_count: u32, - /// Which private route is responsible for receiving ValueChanged notifications - pub opt_value_changed_route: Option, + /// The next earliest time we are willing to try to reconcile and improve the watch + pub opt_next_reconcile_ts: Option, } impl OutboundWatchCurrent { - pub fn new( - params: OutboundWatchParameters, - opt_value_changed_route: Option, - ) -> Self { + pub fn new(params: OutboundWatchParameters) -> Self { let remaining_count = params.count; let min_expiration_ts = params.expiration_ts; @@ -47,7 +44,7 @@ impl OutboundWatchCurrent { nodes: vec![], min_expiration_ts, remaining_count, - opt_value_changed_route, + opt_next_reconcile_ts: None, } } @@ -59,6 +56,7 @@ impl OutboundWatchCurrent { .reduce(|a, b| a.min(b)) .unwrap_or(self.params.expiration_ts); } + pub fn watch_node_refs( &self, per_node_state: &HashMap, @@ -89,6 +87,13 @@ pub(in crate::storage_manager) struct OutboundWatch { } impl OutboundWatch { + /// Note next time to try reconciliation + pub fn set_next_reconcile_ts(&mut self, next_ts: Timestamp) { + if let Some(current) = self.current.as_mut() { + current.opt_next_reconcile_ts = Some(next_ts); + } + } + /// Returns true if this outbound watch can be removed from the table pub fn is_dead(&self) -> bool { self.desired.is_none() && self.current.is_none() @@ -170,11 +175,13 @@ impl OutboundWatch { return false; } // If we are still working on getting the 'current' state to match - // the 'desired' state, then - if current.nodes.len() != consensus_count { + // the 'desired' state, then do the reconcile if we are within the timeframe for it + if current.nodes.len() != consensus_count + && cur_ts >= current.opt_next_reconcile_ts.unwrap_or_default() + { return true; } - // No work to do on this watch + // No work to do on this watch at this time false } } @@ -202,6 +209,8 @@ pub(in crate::storage_manager) struct PerNodeState { /// Resolved watch node reference #[serde(skip)] pub watch_node_ref: Option, + /// Which private route is responsible for receiving ValueChanged notifications + pub opt_value_changed_route: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -595,8 +604,108 @@ impl StorageManager { }; // Now fan out with parameters and get new per node watches - self.outbound_watch_value(record_key, reconcile_params, active_nodes) + let cur_ts = Timestamp::now(); + + let res = self + .outbound_watch_value(record_key, reconcile_params, active_nodes) .await; + + // Regardless of result, set our next possible reconciliation time + { + let inner = &mut *self.inner.lock().await; + if let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + { + let next_ts = + cur_ts + TimestampDuration::new_secs(RECONCILE_OUTBOUND_WATCHES_INTERVAL_SECS); + outbound_watch.set_next_reconcile_ts(next_ts); + } + } + + match res { + Ok(v) => { + // + } + Err(e) => { + veilid_log!(self debug "outbound watch fanout error: {}", e); + // ??? Leave in the 'per node states' for now because we couldn't contact the node + // but remove from this watch. + + // xxx should do something different for network unreachable vs host unreachable + //unanswered.push(pnk); + } + } + } + + fn process_outbound_watch_value_result_inner( + &self, + inner: &mut StorageManagerInner, + record_key: TypedKey, + owvresult: OutboundWatchValueResult, + ) { + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + veilid_log!(self warn "outbound watch should have still been in the table"); + return; + }; + let Some(desired) = &mut outbound_watch.desired else { + veilid_log!(self warn "watch with result should have desired params"); + return; + }; + let current = { + if outbound_watch.current.is_none() { + outbound_watch.current = Some(OutboundWatchCurrent { + params: desired.clone(), + nodes: vec![], + min_expiration_ts: desired.expiration_ts, + remaining_count: desired.count, + opt_next_reconcile_ts: None, + }); + } + outbound_watch.current.as_mut().unwrap() + }; + + // let mut dead_pnks = BTreeSet::new(); + + // // Perform renewals + // for (pnk, r) in renewed { + // let watch_node = r.watch_nodes.first().cloned().unwrap(); + // let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk) + // else { + // veilid_log!(self warn "missing per-node state for watch"); + // dead_pnks.insert(pnk); + // continue; + // }; + // per_node_state.count = renew_params.count; + // per_node_state.expiration_ts = watch_node.expiration_ts; + // per_node_state.watch_id = watch_node.watch_id; + // } + // // Eliminate rejected + // for pnk in rejected { + // if inner + // .outbound_watch_state + // .per_node_state + // .remove(&pnk) + // .is_none() + // { + // veilid_log!(self warn "per-node watch being renewed should have still been in the table"); + // } + // dead_pnks.insert(pnk); + // } + // // Drop unanswered but leave in per node state + // for pnk in unanswered { + // dead_pnks.insert(pnk); + // } + + // current.nodes.retain(|x| !dead_pnks.contains(x)); + + // // Update outbound watch + // current.update(&inner.outbound_watch_state.per_node_state); } /// Get the next operation for a particular watch's state machine diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index dcb78125..efbe482e 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -11,21 +11,25 @@ struct OutboundWatchValueContext { /// The record of a node accepting a watch #[derive(Debug, Clone)] -pub(super) struct WatchNode { +pub(super) struct AcceptedWatch { pub watch_id: u64, - /// The node that + /// The node that accepted the watch pub node_ref: NodeRef, /// The expiration of a successful watch pub expiration_ts: Timestamp, + /// Which private route is responsible for receiving ValueChanged notifications + pub opt_value_changed_route: Option, } /// The result of the outbound_watch_value operation #[derive(Debug, Clone)] pub(super) struct OutboundWatchValueResult { /// Which nodes accepted the watch - pub watch_nodes: Vec, - /// Which private route is responsible for receiving ValueChanged notifications - pub opt_value_changed_route: Option, + pub accepted: Vec, + /// Which nodes rejected the watch + pub rejected: Vec, + /// Which nodes ignored the watch + pub ignored: Vec, } impl StorageManager { @@ -80,7 +84,7 @@ impl StorageManager { safety_selection: SafetySelection, watch_node: NodeRef, watch_id: u64, - ) -> VeilidAPIResult> { + ) -> VeilidAPIResult { let routing_domain = RoutingDomain::PublicInternet; if params.count == 0 { @@ -114,29 +118,36 @@ impl StorageManager { veilid_log!(self debug "WatchValue renewed: id={} expiration_ts={} ({})", watch_id, display_ts(wva.answer.expiration_ts.as_u64()), watch_node); } - Ok(Some(OutboundWatchValueResult { - watch_nodes: vec![WatchNode { + Ok(OutboundWatchValueResult { + accepted: vec![AcceptedWatch { watch_id: wva.answer.watch_id, node_ref: watch_node, expiration_ts: wva.answer.expiration_ts, + opt_value_changed_route: wva.reply_private_route, }], - opt_value_changed_route: wva.reply_private_route, - })) + rejected: vec![], + ignored: vec![], + }) } else { veilid_log!(self debug "WatchValue change failed: id={} ({})", wva.answer.watch_id, watch_node); - Ok(None) + Ok(OutboundWatchValueResult { + accepted: vec![], + rejected: vec![watch_node], + ignored: vec![], + }) } } /// Perform a 'watch value' query on the network using fanout + /// #[allow(clippy::too_many_arguments)] - #[instrument(level = "trace", target = "dht", skip_all, err)] + //#[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_watch_value( &self, key: TypedKey, params: OutboundWatchParameters, active_nodes: Vec, - ) -> VeilidAPIResult> { + ) -> VeilidAPIResult { let routing_domain = RoutingDomain::PublicInternet; // Get the DHT parameters for 'WatchValue', some of which are the same for 'GetValue' operations @@ -235,14 +246,29 @@ impl StorageManager { // Routine to call to check if we're done at each step let check_done = { - let context = context.clone(); - Arc::new(move |_closest_nodes: &[NodeRef]| { - // If a watch has succeeded, return done - let ctx = context.lock(); - if ctx.opt_watch_value_result.is_some() { - return Some(()); + // let context = context.clone(); + // let registry = self.registry(); + Arc::new(move |fanout_result: &FanoutResult| -> bool { + // let mut ctx = context.lock(); + + match fanout_result.kind { + FanoutResultKind::Incomplete => { + // Keep going + false + } + FanoutResultKind::Timeout | FanoutResultKind::Exhausted => { + // Signal we're done + true + } + FanoutResultKind::Consensus => { + // assert!( + // ctx.value.is_some() && ctx.descriptor.is_some(), + // "should have gotten a value if we got consensus" + // ); + // Signal we're done + true + } } - None }) }; @@ -262,47 +288,58 @@ impl StorageManager { check_done, ); - match fanout_call.run(init_fanout_queue).await { - // If we don't finish in the timeout (too much time passed without a successful watch) - TimeoutOr::Timeout => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.opt_watch_value_result.is_some() { - veilid_log!(self debug "WatchValue Fanout Timeout Success"); - } else { - veilid_log!(self debug "WatchValue Fanout Timeout Failure"); - } - Ok(ctx.opt_watch_value_result.clone()) - } - // If we finished with done - TimeoutOr::Value(Ok(Some(()))) => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.opt_watch_value_result.is_some() { - veilid_log!(self debug "WatchValue Fanout Success"); - } else { - veilid_log!(self debug "WatchValue Fanout Failure"); - } - Ok(ctx.opt_watch_value_result.clone()) - } - // If we ran out of nodes - TimeoutOr::Value(Ok(None)) => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.opt_watch_value_result.is_some() { - veilid_log!(self debug "WatchValue Fanout Exhausted Success"); - } else { - veilid_log!(self debug "WatchValue Fanout Exhausted Failure"); - } - Ok(ctx.opt_watch_value_result.clone()) - } - // Failed - TimeoutOr::Value(Err(e)) => { - // If we finished with an error, return that - veilid_log!(self debug "WatchValue Fanout Error: {}", e); - Err(e.into()) - } - } + let fanout_result = fanout_call.run(init_fanout_queue).await.inspect_err(|e| { + // If we finished with an error, return that + veilid_log!(self debug "WatchValue fanout error: {}", e); + })?; + + veilid_log!(self debug "WatchValue Fanout: {:?}", fanout_result); + + Ok(Some(OutboundWatchValueResult { + watch_nodes: todo!(), + opt_value_changed_route: todo!(), + })) + // match fanout_call.run(init_fanout_queue).await { + // // If we don't finish in the timeout (too much time passed without a successful watch) + // TimeoutOr::Timeout => { + // // Return the best answer we've got + // let ctx = context.lock(); + // if ctx.opt_watch_value_result.is_some() { + // veilid_log!(self debug "WatchValue Fanout Timeout Success"); + // } else { + // veilid_log!(self debug "WatchValue Fanout Timeout Failure"); + // } + // Ok(ctx.opt_watch_value_result.clone()) + // } + // // If we finished with done + // TimeoutOr::Value(Ok(Some(()))) => { + // // Return the best answer we've got + // let ctx = context.lock(); + // if ctx.opt_watch_value_result.is_some() { + // veilid_log!(self debug "WatchValue Fanout Success"); + // } else { + // veilid_log!(self debug "WatchValue Fanout Failure"); + // } + // Ok(ctx.opt_watch_value_result.clone()) + // } + // // If we ran out of nodes + // TimeoutOr::Value(Ok(None)) => { + // // Return the best answer we've got + // let ctx = context.lock(); + // if ctx.opt_watch_value_result.is_some() { + // veilid_log!(self debug "WatchValue Fanout Exhausted Success"); + // } else { + // veilid_log!(self debug "WatchValue Fanout Exhausted Failure"); + // } + // Ok(ctx.opt_watch_value_result.clone()) + // } + // // Failed + // TimeoutOr::Value(Err(e)) => { + // // If we finished with an error, return that + // veilid_log!(self debug "WatchValue Fanout Error: {}", e); + // Err(e.into()) + // } + // } } /// Handle a received 'Watch Value' query