This commit is contained in:
Christien Rioux 2025-04-09 09:07:00 -04:00
parent 031d6463fa
commit 39cb380474
3 changed files with 221 additions and 73 deletions

View File

@ -32,6 +32,8 @@ const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 5;
const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1;
/// Frequency to check for dead nodes and routes for client-side outbound watches
const CHECK_OUTBOUND_WATCHES_INTERVAL_SECS: u32 = 1;
/// Frequency to retry reconciliation of watches that are not at consensus
const RECONCILE_OUTBOUND_WATCHES_INTERVAL_SECS: u32 = 30;
/// Frequency to check for expired server-side watched records
const CHECK_WATCHED_RECORDS_INTERVAL_SECS: u32 = 1;
/// Table store table for storage manager metadata

View File

@ -1,6 +1,6 @@
use futures_util::StreamExt as _;
use super::*;
use super::{watch_value::OutboundWatchValueResult, *};
impl_veilid_log_facility!("stor");
@ -30,15 +30,12 @@ pub(in crate::storage_manager) struct OutboundWatchCurrent {
pub min_expiration_ts: Timestamp,
/// How many value change updates remain
pub remaining_count: u32,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
/// The next earliest time we are willing to try to reconcile and improve the watch
pub opt_next_reconcile_ts: Option<Timestamp>,
}
impl OutboundWatchCurrent {
pub fn new(
params: OutboundWatchParameters,
opt_value_changed_route: Option<CryptoKey>,
) -> Self {
pub fn new(params: OutboundWatchParameters) -> Self {
let remaining_count = params.count;
let min_expiration_ts = params.expiration_ts;
@ -47,7 +44,7 @@ impl OutboundWatchCurrent {
nodes: vec![],
min_expiration_ts,
remaining_count,
opt_value_changed_route,
opt_next_reconcile_ts: None,
}
}
@ -59,6 +56,7 @@ impl OutboundWatchCurrent {
.reduce(|a, b| a.min(b))
.unwrap_or(self.params.expiration_ts);
}
pub fn watch_node_refs(
&self,
per_node_state: &HashMap<PerNodeKey, PerNodeState>,
@ -89,6 +87,13 @@ pub(in crate::storage_manager) struct OutboundWatch {
}
impl OutboundWatch {
/// Note next time to try reconciliation
pub fn set_next_reconcile_ts(&mut self, next_ts: Timestamp) {
if let Some(current) = self.current.as_mut() {
current.opt_next_reconcile_ts = Some(next_ts);
}
}
/// Returns true if this outbound watch can be removed from the table
pub fn is_dead(&self) -> bool {
self.desired.is_none() && self.current.is_none()
@ -170,11 +175,13 @@ impl OutboundWatch {
return false;
}
// If we are still working on getting the 'current' state to match
// the 'desired' state, then
if current.nodes.len() != consensus_count {
// the 'desired' state, then do the reconcile if we are within the timeframe for it
if current.nodes.len() != consensus_count
&& cur_ts >= current.opt_next_reconcile_ts.unwrap_or_default()
{
return true;
}
// No work to do on this watch
// No work to do on this watch at this time
false
}
}
@ -202,6 +209,8 @@ pub(in crate::storage_manager) struct PerNodeState {
/// Resolved watch node reference
#[serde(skip)]
pub watch_node_ref: Option<NodeRef>,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -595,8 +604,108 @@ impl StorageManager {
};
// Now fan out with parameters and get new per node watches
self.outbound_watch_value(record_key, reconcile_params, active_nodes)
let cur_ts = Timestamp::now();
let res = self
.outbound_watch_value(record_key, reconcile_params, active_nodes)
.await;
// Regardless of result, set our next possible reconciliation time
{
let inner = &mut *self.inner.lock().await;
if let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
{
let next_ts =
cur_ts + TimestampDuration::new_secs(RECONCILE_OUTBOUND_WATCHES_INTERVAL_SECS);
outbound_watch.set_next_reconcile_ts(next_ts);
}
}
match res {
Ok(v) => {
//
}
Err(e) => {
veilid_log!(self debug "outbound watch fanout error: {}", e);
// ??? Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch.
// xxx should do something different for network unreachable vs host unreachable
//unanswered.push(pnk);
}
}
}
fn process_outbound_watch_value_result_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedKey,
owvresult: OutboundWatchValueResult,
) {
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "outbound watch should have still been in the table");
return;
};
let Some(desired) = &mut outbound_watch.desired else {
veilid_log!(self warn "watch with result should have desired params");
return;
};
let current = {
if outbound_watch.current.is_none() {
outbound_watch.current = Some(OutboundWatchCurrent {
params: desired.clone(),
nodes: vec![],
min_expiration_ts: desired.expiration_ts,
remaining_count: desired.count,
opt_next_reconcile_ts: None,
});
}
outbound_watch.current.as_mut().unwrap()
};
// let mut dead_pnks = BTreeSet::new();
// // Perform renewals
// for (pnk, r) in renewed {
// let watch_node = r.watch_nodes.first().cloned().unwrap();
// let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk)
// else {
// veilid_log!(self warn "missing per-node state for watch");
// dead_pnks.insert(pnk);
// continue;
// };
// per_node_state.count = renew_params.count;
// per_node_state.expiration_ts = watch_node.expiration_ts;
// per_node_state.watch_id = watch_node.watch_id;
// }
// // Eliminate rejected
// for pnk in rejected {
// if inner
// .outbound_watch_state
// .per_node_state
// .remove(&pnk)
// .is_none()
// {
// veilid_log!(self warn "per-node watch being renewed should have still been in the table");
// }
// dead_pnks.insert(pnk);
// }
// // Drop unanswered but leave in per node state
// for pnk in unanswered {
// dead_pnks.insert(pnk);
// }
// current.nodes.retain(|x| !dead_pnks.contains(x));
// // Update outbound watch
// current.update(&inner.outbound_watch_state.per_node_state);
}
/// Get the next operation for a particular watch's state machine

View File

@ -11,21 +11,25 @@ struct OutboundWatchValueContext {
/// The record of a node accepting a watch
#[derive(Debug, Clone)]
pub(super) struct WatchNode {
pub(super) struct AcceptedWatch {
pub watch_id: u64,
/// The node that
/// The node that accepted the watch
pub node_ref: NodeRef,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
}
/// The result of the outbound_watch_value operation
#[derive(Debug, Clone)]
pub(super) struct OutboundWatchValueResult {
/// Which nodes accepted the watch
pub watch_nodes: Vec<WatchNode>,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
pub accepted: Vec<AcceptedWatch>,
/// Which nodes rejected the watch
pub rejected: Vec<NodeRef>,
/// Which nodes ignored the watch
pub ignored: Vec<NodeRef>,
}
impl StorageManager {
@ -80,7 +84,7 @@ impl StorageManager {
safety_selection: SafetySelection,
watch_node: NodeRef,
watch_id: u64,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
) -> VeilidAPIResult<OutboundWatchValueResult> {
let routing_domain = RoutingDomain::PublicInternet;
if params.count == 0 {
@ -114,29 +118,36 @@ impl StorageManager {
veilid_log!(self debug "WatchValue renewed: id={} expiration_ts={} ({})", watch_id, display_ts(wva.answer.expiration_ts.as_u64()), watch_node);
}
Ok(Some(OutboundWatchValueResult {
watch_nodes: vec![WatchNode {
Ok(OutboundWatchValueResult {
accepted: vec![AcceptedWatch {
watch_id: wva.answer.watch_id,
node_ref: watch_node,
expiration_ts: wva.answer.expiration_ts,
opt_value_changed_route: wva.reply_private_route,
}],
opt_value_changed_route: wva.reply_private_route,
}))
rejected: vec![],
ignored: vec![],
})
} else {
veilid_log!(self debug "WatchValue change failed: id={} ({})", wva.answer.watch_id, watch_node);
Ok(None)
Ok(OutboundWatchValueResult {
accepted: vec![],
rejected: vec![watch_node],
ignored: vec![],
})
}
}
/// Perform a 'watch value' query on the network using fanout
///
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", target = "dht", skip_all, err)]
//#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_watch_value(
&self,
key: TypedKey,
params: OutboundWatchParameters,
active_nodes: Vec<TypedKey>,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
) -> VeilidAPIResult<OutboundWatchValueResult> {
let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'WatchValue', some of which are the same for 'GetValue' operations
@ -235,14 +246,29 @@ impl StorageManager {
// Routine to call to check if we're done at each step
let check_done = {
let context = context.clone();
Arc::new(move |_closest_nodes: &[NodeRef]| {
// If a watch has succeeded, return done
let ctx = context.lock();
if ctx.opt_watch_value_result.is_some() {
return Some(());
// let context = context.clone();
// let registry = self.registry();
Arc::new(move |fanout_result: &FanoutResult| -> bool {
// let mut ctx = context.lock();
match fanout_result.kind {
FanoutResultKind::Incomplete => {
// Keep going
false
}
FanoutResultKind::Timeout | FanoutResultKind::Exhausted => {
// Signal we're done
true
}
FanoutResultKind::Consensus => {
// assert!(
// ctx.value.is_some() && ctx.descriptor.is_some(),
// "should have gotten a value if we got consensus"
// );
// Signal we're done
true
}
}
None
})
};
@ -262,47 +288,58 @@ impl StorageManager {
check_done,
);
match fanout_call.run(init_fanout_queue).await {
// If we don't finish in the timeout (too much time passed without a successful watch)
TimeoutOr::Timeout => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.opt_watch_value_result.is_some() {
veilid_log!(self debug "WatchValue Fanout Timeout Success");
} else {
veilid_log!(self debug "WatchValue Fanout Timeout Failure");
}
Ok(ctx.opt_watch_value_result.clone())
}
// If we finished with done
TimeoutOr::Value(Ok(Some(()))) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.opt_watch_value_result.is_some() {
veilid_log!(self debug "WatchValue Fanout Success");
} else {
veilid_log!(self debug "WatchValue Fanout Failure");
}
Ok(ctx.opt_watch_value_result.clone())
}
// If we ran out of nodes
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.opt_watch_value_result.is_some() {
veilid_log!(self debug "WatchValue Fanout Exhausted Success");
} else {
veilid_log!(self debug "WatchValue Fanout Exhausted Failure");
}
Ok(ctx.opt_watch_value_result.clone())
}
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
veilid_log!(self debug "WatchValue Fanout Error: {}", e);
Err(e.into())
}
}
let fanout_result = fanout_call.run(init_fanout_queue).await.inspect_err(|e| {
// If we finished with an error, return that
veilid_log!(self debug "WatchValue fanout error: {}", e);
})?;
veilid_log!(self debug "WatchValue Fanout: {:?}", fanout_result);
Ok(Some(OutboundWatchValueResult {
watch_nodes: todo!(),
opt_value_changed_route: todo!(),
}))
// match fanout_call.run(init_fanout_queue).await {
// // If we don't finish in the timeout (too much time passed without a successful watch)
// TimeoutOr::Timeout => {
// // Return the best answer we've got
// let ctx = context.lock();
// if ctx.opt_watch_value_result.is_some() {
// veilid_log!(self debug "WatchValue Fanout Timeout Success");
// } else {
// veilid_log!(self debug "WatchValue Fanout Timeout Failure");
// }
// Ok(ctx.opt_watch_value_result.clone())
// }
// // If we finished with done
// TimeoutOr::Value(Ok(Some(()))) => {
// // Return the best answer we've got
// let ctx = context.lock();
// if ctx.opt_watch_value_result.is_some() {
// veilid_log!(self debug "WatchValue Fanout Success");
// } else {
// veilid_log!(self debug "WatchValue Fanout Failure");
// }
// Ok(ctx.opt_watch_value_result.clone())
// }
// // If we ran out of nodes
// TimeoutOr::Value(Ok(None)) => {
// // Return the best answer we've got
// let ctx = context.lock();
// if ctx.opt_watch_value_result.is_some() {
// veilid_log!(self debug "WatchValue Fanout Exhausted Success");
// } else {
// veilid_log!(self debug "WatchValue Fanout Exhausted Failure");
// }
// Ok(ctx.opt_watch_value_result.clone())
// }
// // Failed
// TimeoutOr::Value(Err(e)) => {
// // If we finished with an error, return that
// veilid_log!(self debug "WatchValue Fanout Error: {}", e);
// Err(e.into())
// }
// }
}
/// Handle a received 'Watch Value' query