refactor and cleanup

This commit is contained in:
Christien Rioux 2025-04-11 20:36:37 -04:00
parent c108fab262
commit 1024d26c14
11 changed files with 987 additions and 940 deletions

View File

@ -268,7 +268,6 @@ impl RPCProcessor {
(false, 0, watch_id.unwrap_or_default())
} else {
// Accepted, lets try to watch or cancel it
let params = InboundWatchParameters {
subkeys: subkeys.clone(),
expiration: Timestamp::new(expiration),

View File

@ -31,7 +31,7 @@ impl StorageManager {
pub async fn debug_watched_records(&self) -> String {
let inner = self.inner.lock().await;
let mut out = "[\n".to_owned();
for (k, v) in &inner.outbound_watch_state.outbound_watches {
for (k, v) in &inner.outbound_watch_manager.outbound_watches {
out += &format!(" {} {:?}\n", k, v);
}
format!("{}]\n", out)

View File

@ -1,7 +1,7 @@
mod debug;
mod get_value;
mod inspect_value;
mod outbound_watch;
mod outbound_watch_manager;
mod record_store;
mod set_value;
mod tasks;
@ -9,7 +9,7 @@ mod types;
mod watch_value;
use super::*;
use outbound_watch::*;
use outbound_watch_manager::*;
use record_store::*;
use routing_table::*;
use rpc_processor::*;
@ -66,7 +66,7 @@ struct StorageManagerInner {
/// Record subkeys that are currently being written to in the foreground
pub active_subkey_writes: HashMap<TypedKey, ValueSubkeyRangeSet>,
/// State management for outbound watches
pub outbound_watch_state: OutboundWatchState,
pub outbound_watch_manager: OutboundWatchManager,
/// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
@ -82,7 +82,7 @@ impl fmt::Debug for StorageManagerInner {
.field("remote_record_store", &self.remote_record_store)
.field("offline_subkey_writes", &self.offline_subkey_writes)
.field("active_subkey_writes", &self.active_subkey_writes)
.field("outbound_watch_state", &self.outbound_watch_state)
.field("outbound_watch_manager", &self.outbound_watch_manager)
//.field("metadata_db", &self.metadata_db)
//.field("tick_future", &self.tick_future)
.finish()
@ -384,9 +384,10 @@ impl StorageManager {
let mut out = vec![];
let mut node_set = HashSet::new();
for v in inner.outbound_watch_state.outbound_watches.values() {
if let Some(current) = &v.current {
let node_refs = current.watch_node_refs(&inner.outbound_watch_state.per_node_state);
for v in inner.outbound_watch_manager.outbound_watches.values() {
if let Some(current) = v.state() {
let node_refs =
current.watch_node_refs(&inner.outbound_watch_manager.per_node_state);
for node_ref in &node_refs {
let mut found = false;
for nid in node_ref.node_ids().iter() {
@ -404,7 +405,7 @@ impl StorageManager {
Destination::direct(
node_ref.routing_domain_filtered(RoutingDomain::PublicInternet),
)
.with_safety(current.params.safety_selection),
.with_safety(current.params().safety_selection),
)
}
}
@ -555,7 +556,7 @@ impl StorageManager {
// Set the watch to cancelled if we have one
// Will process cancellation in the background
inner.outbound_watch_state.set_desired_watch(key, None);
inner.outbound_watch_manager.set_desired_watch(key, None);
Ok(())
}
@ -930,7 +931,7 @@ impl StorageManager {
// Modify the 'desired' state of the watch or add one if it does not exist
inner
.outbound_watch_state
.outbound_watch_manager
.set_desired_watch(key, desired_params);
// Drop the lock for network access
@ -940,7 +941,7 @@ impl StorageManager {
loop {
let opt_op_fut = {
let inner = self.inner.lock().await;
let Some(outbound_watch) = inner.outbound_watch_state.outbound_watches.get(&key)
let Some(outbound_watch) = inner.outbound_watch_manager.outbound_watches.get(&key)
else {
// Watch is gone
return Ok(Timestamp::new(0));
@ -960,89 +961,10 @@ impl StorageManager {
let inner = self.inner.lock().await;
let expiration_ts = inner
.outbound_watch_state
.outbound_watch_manager
.get_min_expiration(key)
.unwrap_or_default();
Ok(expiration_ts)
// // Use the safety selection we opened the record with
// // Use the writer we opened with as the 'watcher' as well
// let opt_owvresult = self
// .outbound_watch_value(
// key,
// subkeys.clone(),
// expiration,
// count,
// safety_selection,
// opt_watcher,
// opt_active_watch.as_ref(),
// )
// .await?;
// // If we did not get a valid response assume nothing changed
// let Some(owvresult) = opt_owvresult else {
// apibail_try_again!("did not get a valid response");
// };
// // Clear any existing watch if the watch succeeded or got cancelled
// let mut inner = self.inner.lock().await;
// let Some(opened_record) = inner.opened_records.get_mut(&key) else {
// apibail_generic!("record not open");
// };
// opened_record.clear_outbound_watch();
// // Get the minimum expiration timestamp we will accept
// let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| {
// (
// TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)),
// TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)),
// )
// });
// let cur_ts = get_timestamp();
// let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64();
// let max_expiration_ts = if expiration.as_u64() == 0 {
// cur_ts + max_watch_expiration_us.as_u64()
// } else {
// expiration.as_u64()
// };
// // Build a new active watch from the watchvalue result
// // If the expiration time is less than our minimum expiration time (or zero) consider this watch inactive
// let mut expiration_ts = owvresult.expiration_ts;
// if expiration_ts.as_u64() < min_expiration_ts {
// // Try to fire out a last-chance watch cancellation, so the
// if let Some(active_watch) = opt_active_watch.as_ref() {
// self.last_change_cancel_watch(active_watch).await;
// }
// return Ok(Timestamp::new(0));
// }
// // If the expiration time is greater than our maximum expiration time, clamp our local watch so we ignore extra valuechanged messages
// if expiration_ts.as_u64() > max_expiration_ts {
// expiration_ts = Timestamp::new(max_expiration_ts);
// }
// // If we requested a cancellation, then consider this watch cancelled
// if count == 0 {
// // Expiration returned should be zero if we requested a cancellation
// if expiration_ts.as_u64() != 0 {
// veilid_log!(self debug "got active watch despite asking for a cancellation");
// }
// return Ok(Timestamp::new(0));
// }
// // Keep a record of the watch
// opened_record.set_outbound_watch(OutboundWatch {
// id: owvresult.watch_id,
// expiration_ts,
// watch_node: owvresult.watch_node,
// opt_value_changed_route: owvresult.opt_value_changed_route,
// subkeys,
// count,
// });
// Ok(owvresult.expiration_ts)
}
#[instrument(level = "trace", target = "stor", skip_all)]
@ -1063,15 +985,16 @@ impl StorageManager {
};
// See what watch we have currently if any
let Some(outbound_watch) = inner.outbound_watch_state.outbound_watches.get(&key) else {
let Some(outbound_watch) = inner.outbound_watch_manager.outbound_watches.get(&key)
else {
// If we didn't have an active watch, then we can just return false because there's nothing to do here
return Ok(false);
};
// Ensure we have a 'desired' watch state
let Some(desired) = &outbound_watch.desired else {
let Some(desired) = outbound_watch.desired() else {
// If we didn't have a desired watch, then we're already cancelling
let still_active = outbound_watch.current.is_some();
let still_active = outbound_watch.state().is_some();
return Ok(still_active);
};

View File

@ -1,766 +0,0 @@
use futures_util::StreamExt as _;
use super::{watch_value::OutboundWatchValueResult, *};
impl_veilid_log_facility!("stor");
/// Requested parameters for watch
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchParameters {
/// Requested expiration timestamp. A zero timestamp here indicates
/// that the watch it to be renewed indefinitely
pub expiration_ts: Timestamp,
/// How many notifications the requestor asked for
pub count: u32,
/// Subkeys requested for this watch
pub subkeys: ValueSubkeyRangeSet,
/// What key to use to perform the watch
pub opt_watcher: Option<KeyPair>,
/// What safety selection to use on the network
pub safety_selection: SafetySelection,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchCurrent {
/// Requested parameters
pub params: OutboundWatchParameters,
/// Nodes that have an active watch on our behalf
pub nodes: Vec<PerNodeKey>,
/// Minimum expiration time for all our nodes
pub min_expiration_ts: Timestamp,
/// How many value change updates remain
pub remaining_count: u32,
/// The next earliest time we are willing to try to reconcile and improve the watch
pub opt_next_reconcile_ts: Option<Timestamp>,
}
impl OutboundWatchCurrent {
pub fn new(params: OutboundWatchParameters) -> Self {
let remaining_count = params.count;
let min_expiration_ts = params.expiration_ts;
Self {
params,
nodes: vec![],
min_expiration_ts,
remaining_count,
opt_next_reconcile_ts: None,
}
}
pub fn update(&mut self, per_node_state: &HashMap<PerNodeKey, PerNodeState>) {
self.min_expiration_ts = self
.nodes
.iter()
.map(|x| per_node_state.get(x).unwrap().expiration_ts)
.reduce(|a, b| a.min(b))
.unwrap_or(self.params.expiration_ts);
}
pub fn watch_node_refs(
&self,
per_node_state: &HashMap<PerNodeKey, PerNodeState>,
) -> Vec<NodeRef> {
self.nodes
.iter()
.map(|x| {
per_node_state
.get(x)
.unwrap()
.watch_node_ref
.clone()
.unwrap()
})
.collect()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatch {
/// Current state
/// None means inactive/cancelled
pub current: Option<OutboundWatchCurrent>,
/// Desired parameters
/// None means cancelled
pub desired: Option<OutboundWatchParameters>,
}
impl OutboundWatch {
/// Note next time to try reconciliation
pub fn set_next_reconcile_ts(&mut self, next_ts: Timestamp) {
if let Some(current) = self.current.as_mut() {
current.opt_next_reconcile_ts = Some(next_ts);
}
}
/// Returns true if this outbound watch can be removed from the table
pub fn is_dead(&self) -> bool {
self.desired.is_none() && self.current.is_none()
}
/// Returns true if this outbound watch needs to be cancelled
pub fn needs_cancel(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool {
if self.is_dead() {
veilid_log!(registry warn "should have checked for is_dead first");
return false;
}
let Some(current) = self.current.as_ref() else {
return false;
};
// If the total number of changes has been reached
// then we're done and should cancel
if current.remaining_count == 0 {
return true;
}
// If we have expired and can't renew, then cancel
if cur_ts >= current.params.expiration_ts {
return true;
}
// If the desired parameters is None then cancel
let Some(desired) = self.desired.as_ref() else {
return true;
};
// If the desired parameters is different than the current parameters
// then cancel so we can eventually reconcile to the new parameters
current.params != *desired
}
/// Returns true if this outbound watch can be renewed
pub fn needs_renew(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool {
if self.is_dead() || self.needs_cancel(registry, cur_ts) {
veilid_log!(registry warn "should have checked for is_dead and needs_cancel first");
return false;
}
// If there is no current watch then there is nothing to renew
let Some(current) = self.current.as_ref() else {
return false;
};
cur_ts >= current.min_expiration_ts && cur_ts < current.params.expiration_ts
}
/// Returns true if there is work to be done on getting the outbound
/// watch to its desired state
pub fn needs_reconcile(
&self,
registry: &VeilidComponentRegistry,
consensus_count: usize,
cur_ts: Timestamp,
) -> bool {
if self.is_dead()
|| self.needs_cancel(registry, cur_ts)
|| self.needs_renew(registry, cur_ts)
{
veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first");
return false;
}
// If desired is none, then is_dead() or needs_cancel() should have been true
let Some(desired) = self.desired.as_ref() else {
veilid_log!(registry warn "is_dead() or needs_cancel() should have been true");
return false;
};
// If there is a desired watch but no current watch, then reconcile
let Some(current) = self.current.as_ref() else {
return true;
};
// If the params are different, then needs_cancel() should have returned true
if current.params != *desired {
veilid_log!(registry warn "needs_cancel() should have returned true");
return false;
}
// If we are still working on getting the 'current' state to match
// the 'desired' state, then do the reconcile if we are within the timeframe for it
if current.nodes.len() != consensus_count
&& cur_ts >= current.opt_next_reconcile_ts.unwrap_or_default()
{
return true;
}
// No work to do on this watch at this time
false
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeKey {
/// Watched record key
pub record_key: TypedKey,
/// Watching node id
pub node_id: TypedKey,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeState {
/// Watch Id
pub watch_id: u64,
/// SafetySelection used to contact the node
pub safety_selection: SafetySelection,
/// What key was used to perform the watch
pub opt_watcher: Option<KeyPair>,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// How many value change notifications are left
pub count: u32,
/// Resolved watch node reference
#[serde(skip)]
pub watch_node_ref: Option<NodeRef>,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchState {
/// Each watch per record key
pub outbound_watches: HashMap<TypedKey, OutboundWatch>,
/// Last known active watch per node+record
pub per_node_state: HashMap<PerNodeKey, PerNodeState>,
}
impl Default for OutboundWatchState {
fn default() -> Self {
Self::new()
}
}
impl OutboundWatchState {
pub fn new() -> Self {
Self {
outbound_watches: HashMap::new(),
per_node_state: HashMap::new(),
}
}
pub fn set_desired_watch(
&mut self,
record_key: TypedKey,
desired_watch: Option<OutboundWatchParameters>,
) {
match self.outbound_watches.get_mut(&record_key) {
Some(w) => {
// Replace desired watch
w.desired = desired_watch;
// Remove if the watch is done
if w.current.is_none() && w.desired.is_none() {
self.outbound_watches.remove(&record_key);
}
}
None => {
// Watch does not exist, add one if that's what is desired
if desired_watch.is_some() {
self.outbound_watches.insert(
record_key,
OutboundWatch {
current: None,
desired: desired_watch,
},
);
}
}
}
}
pub fn get_min_expiration(&self, record_key: TypedKey) -> Option<Timestamp> {
self.outbound_watches
.get(&record_key)
.and_then(|x| x.current.as_ref().map(|y| y.min_expiration_ts))
}
}
impl StorageManager {
/// Remove dead watches from the table
pub(super) async fn process_outbound_watch_dead(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
let mut inner = self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.remove(&record_key)
else {
veilid_log!(self warn "dead watch should have still been in the table");
return;
};
if outbound_watch.current.is_some() {
veilid_log!(self warn "dead watch still had current state");
}
if outbound_watch.desired.is_some() {
veilid_log!(self warn "dead watch still had desired params");
}
}
/// Get the list of remaining active watch ids
/// and call their nodes to cancel the watch
pub(super) async fn process_outbound_watch_cancel(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
let per_node_states = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
let mut per_node_states = vec![];
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
current.nodes.retain(|x| !dead_pnks.contains(x));
per_node_states
};
// Now reach out to each node and cancel their watch ids
let mut unord = FuturesUnordered::new();
for (pnk, pns) in per_node_states {
let watch_lock = watch_lock.clone();
unord.push(async move {
let res = self
.outbound_watch_value_cancel(
watch_lock,
pns.opt_watcher,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await;
(pnk, res)
});
}
let mut cancelled = vec![];
while let Some((pnk, res)) = unord.next().await {
match res {
Ok(_) => {
// Remove from 'per node states' because we got some response
cancelled.push(pnk);
}
Err(e) => {
veilid_log!(self debug "outbound watch cancel error: {}", e);
// xxx should do something different for network unreachable vs host unreachable
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch. We'll try the cancel again if we reach this node again during fanout.
}
}
}
// Update state
{
let inner = &mut *self.inner.lock().await;
// Remove per node watches we cancelled
for pnk in cancelled {
if inner
.outbound_watch_state
.per_node_state
.remove(&pnk)
.is_none()
{
veilid_log!(self warn "per-node watch being cancelled should have still been in the table");
};
}
// Remove outbound watch we've cancelled
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
// Mark as dead now that we cancelled
let Some(_current) = outbound_watch.current.take() else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
}
}
/// See which existing per-node watches can be renewed
/// and drop the ones that can't be or are dead
pub(super) async fn process_outbound_watch_renew(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
let (per_node_states, renew_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being renewed should have current state");
return;
};
let mut per_node_states = vec![];
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
current.nodes.retain(|x| !dead_pnks.contains(x));
// Change the params to update count
let mut renew_params = current.params.clone();
renew_params.count = current.remaining_count;
(per_node_states, renew_params)
};
// Now reach out to each node and renew their watches
let mut unord = FuturesUnordered::new();
for (_pnk, pns) in per_node_states {
let params = renew_params.clone();
let watch_lock = watch_lock.clone();
unord.push(async move {
self.outbound_watch_value_change(
watch_lock,
params,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await
});
}
let mut owvresults = vec![];
while let Some(res) = unord.next().await {
match res {
Ok(r) => owvresults.push(r),
Err(e) => {
veilid_log!(self debug "outbound watch change error: {}", e);
}
}
}
// Update state
{
let inner = &mut *self.inner.lock().await;
for owvresult in owvresults {
self.process_outbound_watch_value_result_inner(inner, record_key, owvresult);
}
}
}
/// Perform fanout to add per-node watches to an outbound watch
/// Must have no current state, or have a match to desired parameters
pub(super) async fn process_outbound_watch_reconcile(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
// Get the nodes already active on this watch,
// and the parameters to fanout with for the rest
let (per_node_state, reconcile_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being reconciled should have still been in the table");
return;
};
// Get params to reconcile
let Some(desired) = &outbound_watch.desired else {
veilid_log!(self warn "watch being reconciled should have had desired parameters");
return;
};
// Get active per node states
let mut per_node_state = if let Some(current) = &mut outbound_watch.current {
// Assert matching parameters
if &current.params != desired {
veilid_log!(self warn "watch being reconciled should have had matching current and desired parameters");
return;
}
current
.nodes
.iter()
.map(|pnk| {
(
*pnk,
inner
.outbound_watch_state
.per_node_state
.get(pnk)
.cloned()
.unwrap(),
)
})
.collect()
} else {
HashMap::new()
};
// Add in any inactive per node states
for (pnk, pns) in &inner.outbound_watch_state.per_node_state {
// Skip any we have already
if per_node_state.contains_key(pnk) {
continue;
}
// Add inactive per node state if the record key matches
if pnk.record_key == record_key {
per_node_state.insert(*pnk, pns.clone());
}
}
let reconcile_params = desired.clone();
(per_node_state, reconcile_params)
};
// Now fan out with parameters and get new per node watches
let cur_ts = Timestamp::now();
let res = self
.outbound_watch_value(watch_lock.clone(), reconcile_params, per_node_state)
.await;
{
let inner = &mut *self.inner.lock().await;
match res {
Ok(owvresult) => {
// Update state
self.process_outbound_watch_value_result_inner(inner, record_key, owvresult);
}
Err(e) => {
veilid_log!(self debug "outbound watch fanout error: {}", e);
}
}
// Regardless of result, set our next possible reconciliation time
if let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
{
let next_ts =
cur_ts + TimestampDuration::new_secs(RECONCILE_OUTBOUND_WATCHES_INTERVAL_SECS);
outbound_watch.set_next_reconcile_ts(next_ts);
}
}
}
fn process_outbound_watch_value_result_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedKey,
owvresult: OutboundWatchValueResult,
) {
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "outbound watch should have still been in the table");
return;
};
let Some(desired) = &mut outbound_watch.desired else {
veilid_log!(self warn "watch with result should have desired params");
return;
};
let current = {
if outbound_watch.current.is_none() {
outbound_watch.current = Some(OutboundWatchCurrent::new(desired.clone()));
}
outbound_watch.current.as_mut().unwrap()
};
let mut dead_pnks = BTreeSet::new();
// Handle accepted
for accepted_watch in owvresult.accepted {
let node_id = accepted_watch
.node_ref
.node_ids()
.get(record_key.kind)
.unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
let watch_id = accepted_watch.watch_id;
let opt_watcher = desired.opt_watcher;
let safety_selection = desired.safety_selection;
let expiration_ts = accepted_watch.expiration_ts;
let count = current.remaining_count;
let watch_node_ref = Some(accepted_watch.node_ref);
let opt_value_changed_route = accepted_watch.opt_value_changed_route;
inner.outbound_watch_state.per_node_state.insert(
pnk,
PerNodeState {
watch_id,
safety_selection,
opt_watcher,
expiration_ts,
count,
watch_node_ref,
opt_value_changed_route,
},
);
}
// Eliminate rejected
for rejected_node_ref in owvresult.rejected {
let node_id = rejected_node_ref.node_ids().get(record_key.kind).unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
inner.outbound_watch_state.per_node_state.remove(&pnk);
dead_pnks.insert(pnk);
}
// Drop unanswered but leave in per node state
for ignored_node_ref in owvresult.ignored {
let node_id = ignored_node_ref.node_ids().get(record_key.kind).unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
dead_pnks.insert(pnk);
}
current.nodes.retain(|x| !dead_pnks.contains(x));
// Update outbound watch
current.update(&inner.outbound_watch_state.per_node_state);
}
/// Get the next operation for a particular watch's state machine
/// Can be processed in the foreground, or by the bacgkround operation queue
pub(super) fn get_next_outbound_watch_operation(
&self,
key: TypedKey,
opt_watch_lock: Option<AsyncTagLockGuard<TypedKey>>,
cur_ts: Timestamp,
outbound_watch: &OutboundWatch,
) -> Option<PinBoxFutureStatic<()>> {
let registry = self.registry();
let consensus_count = self
.config()
.with(|c| c.network.dht.get_value_count as usize);
// Check states
if outbound_watch.is_dead() {
// Outbound watch is dead
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_dead(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_cancel(&registry, cur_ts) {
// Outbound watch needs to be cancelled
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_cancel(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_renew(&registry, cur_ts) {
// Outbound watch expired but can be renewed
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_renew(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_reconcile(&registry, consensus_count, cur_ts) {
// Outbound watch parameters have changed or it needs more nodes
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_reconcile(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
}
None
}
}

View File

@ -0,0 +1,77 @@
mod outbound_watch;
mod outbound_watch_parameters;
mod outbound_watch_state;
mod per_node_state;
pub(in crate::storage_manager) use outbound_watch::*;
pub(in crate::storage_manager) use outbound_watch_parameters::*;
pub(in crate::storage_manager) use outbound_watch_state::*;
pub(in crate::storage_manager) use per_node_state::*;
use super::*;
impl_veilid_log_facility!("stor");
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchManager {
/// Each watch per record key
pub outbound_watches: HashMap<TypedKey, OutboundWatch>,
/// Last known active watch per node+record
pub per_node_state: HashMap<PerNodeKey, PerNodeState>,
}
impl Default for OutboundWatchManager {
fn default() -> Self {
Self::new()
}
}
impl OutboundWatchManager {
pub fn new() -> Self {
Self {
outbound_watches: HashMap::new(),
per_node_state: HashMap::new(),
}
}
pub fn set_desired_watch(
&mut self,
record_key: TypedKey,
desired_watch: Option<OutboundWatchParameters>,
) {
match self.outbound_watches.get_mut(&record_key) {
Some(w) => {
// Replace desired watch
w.set_desired(desired_watch);
// Remove if the watch is done (shortcut the dead state)
if w.state().is_none() && w.state().is_none() {
self.outbound_watches.remove(&record_key);
}
}
None => {
// Watch does not exist, add one if that's what is desired
if let Some(desired) = desired_watch {
self.outbound_watches
.insert(record_key, OutboundWatch::new(desired));
}
}
}
}
pub fn set_next_reconcile_ts(&mut self, record_key: TypedKey, next_ts: Timestamp) {
if let Some(outbound_watch) = self.outbound_watches.get_mut(&record_key) {
if let Some(state) = outbound_watch.state_mut() {
state.edit(&self.per_node_state, |editor| {
editor.set_next_reconcile_ts(next_ts);
});
}
}
}
pub fn get_min_expiration(&self, record_key: TypedKey) -> Option<Timestamp> {
self.outbound_watches
.get(&record_key)
.and_then(|x| x.state().map(|y| y.min_expiration_ts()))
}
}

View File

@ -0,0 +1,168 @@
use super::*;
impl_veilid_log_facility!("stor");
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatch {
/// Current state
/// None means inactive/cancelled
state: Option<OutboundWatchState>,
/// Desired parameters
/// None means cancelled
desired: Option<OutboundWatchParameters>,
}
impl OutboundWatch {
/// Create new outbound watch with desired parameters
pub fn new(desired: OutboundWatchParameters) -> Self {
Self {
state: None,
desired: Some(desired),
}
}
/// Get current watch state if it exists
pub fn state(&self) -> Option<&OutboundWatchState> {
self.state.as_ref()
}
/// Get mutable current watch state if it exists
pub fn state_mut(&mut self) -> Option<&mut OutboundWatchState> {
self.state.as_mut()
}
/// Clear current watch state
pub fn clear_state(&mut self) {
self.state = None;
}
/// Get or create current watch state
pub fn state_mut_or_create<F: FnOnce() -> OutboundWatchParameters>(
&mut self,
make_parameters: F,
) -> &mut OutboundWatchState {
if self.state.is_none() {
self.state = Some(OutboundWatchState::new(make_parameters()));
}
self.state.as_mut().unwrap()
}
/// Get desired watch parameters if it exists
pub fn desired(&self) -> Option<OutboundWatchParameters> {
self.desired.clone()
}
/// Set desired watch parameters
pub fn set_desired(&mut self, desired: Option<OutboundWatchParameters>) {
self.desired = desired;
}
/// Returns true if this outbound watch can be removed from the table
pub fn is_dead(&self) -> bool {
self.desired.is_none() && self.state.is_none()
}
/// Returns true if this outbound watch needs to be cancelled
pub fn needs_cancel(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool {
if self.is_dead() {
veilid_log!(registry warn "should have checked for is_dead first");
return false;
}
let Some(state) = self.state() else {
return false;
};
// If the total number of changes has been reached
// then we're done and should cancel
if state.remaining_count() == 0 {
return true;
}
// If we have expired and can't renew, then cancel
if cur_ts >= state.params().expiration_ts {
return true;
}
// If the desired parameters is None then cancel
let Some(desired) = self.desired.as_ref() else {
return true;
};
// If the desired parameters is different than the current parameters
// then cancel so we can eventually reconcile to the new parameters
state.params() != desired
}
/// Returns true if this outbound watch can be renewed
pub fn needs_renew(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool {
if self.is_dead() || self.needs_cancel(registry, cur_ts) {
veilid_log!(registry warn "should have checked for is_dead and needs_cancel first");
return false;
}
// If there is no current watch then there is nothing to renew
let Some(state) = self.state() else {
return false;
};
// If the watch has per node watches that have expired,
// but we can extend our watch then renew
if cur_ts >= state.min_expiration_ts() && cur_ts < state.params().expiration_ts {
return true;
}
let routing_table = registry.routing_table();
let rss = routing_table.route_spec_store();
// See if any of our per node watches have a dead value changed route
// if so, speculatively renew them
for vcr in state.value_changed_routes() {
if rss.get_route_id_for_key(vcr).is_none() {
// Route we would receive value changes on is dead
return true;
}
}
false
}
/// Returns true if there is work to be done on getting the outbound
/// watch to its desired state
pub fn needs_reconcile(
&self,
registry: &VeilidComponentRegistry,
consensus_count: usize,
cur_ts: Timestamp,
) -> bool {
if self.is_dead()
|| self.needs_cancel(registry, cur_ts)
|| self.needs_renew(registry, cur_ts)
{
veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first");
return false;
}
// If desired is none, then is_dead() or needs_cancel() should have been true
let Some(desired) = self.desired.as_ref() else {
veilid_log!(registry warn "is_dead() or needs_cancel() should have been true");
return false;
};
// If there is a desired watch but no current watch, then reconcile
let Some(state) = self.state() else {
return true;
};
// If the params are different, then needs_cancel() should have returned true
if state.params() != desired {
veilid_log!(registry warn "needs_cancel() should have returned true");
return false;
}
// If we are still working on getting the 'current' state to match
// the 'desired' state, then do the reconcile if we are within the timeframe for it
if state.nodes().len() != consensus_count
&& cur_ts >= state.next_reconcile_ts().unwrap_or_default()
{
return true;
}
// No work to do on this watch at this time
false
}
}

View File

@ -0,0 +1,19 @@
use super::*;
impl_veilid_log_facility!("stor");
/// Requested parameters for watch
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct OutboundWatchParameters {
/// Requested expiration timestamp. A zero timestamp here indicates
/// that the watch it to be renewed indefinitely
pub expiration_ts: Timestamp,
/// How many notifications the requestor asked for
pub count: u32,
/// Subkeys requested for this watch
pub subkeys: ValueSubkeyRangeSet,
/// What key to use to perform the watch
pub opt_watcher: Option<KeyPair>,
/// What safety selection to use on the network
pub safety_selection: SafetySelection,
}

View File

@ -0,0 +1,118 @@
use super::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchState {
/// Requested parameters
params: OutboundWatchParameters,
/// Nodes that have an active watch on our behalf
nodes: Vec<PerNodeKey>,
/// How many value change updates remain
remaining_count: u32,
/// The next earliest time we are willing to try to reconcile and improve the watch
opt_next_reconcile_ts: Option<Timestamp>,
/// Calculated field: minimum expiration time for all our nodes
min_expiration_ts: Timestamp,
/// Calculated field: the set of value changed routes for this watch from all per node watches
value_changed_routes: BTreeSet<PublicKey>,
}
pub(in crate::storage_manager) struct OutboundWatchStateEditor<'a> {
state: &'a mut OutboundWatchState,
}
impl OutboundWatchStateEditor<'_> {
#[expect(dead_code)]
pub fn set_params(&mut self, params: OutboundWatchParameters) {
self.state.params = params;
}
pub fn add_nodes<I: IntoIterator<Item = PerNodeKey>>(&mut self, nodes: I) {
self.state.nodes.extend(nodes);
}
pub fn retain_nodes<F: FnMut(&PerNodeKey) -> bool>(&mut self, f: F) {
self.state.nodes.retain(f);
}
pub fn set_remaining_count(&mut self, remaining_count: u32) {
self.state.remaining_count = remaining_count;
}
pub fn set_next_reconcile_ts(&mut self, next_reconcile_ts: Timestamp) {
self.state.opt_next_reconcile_ts = Some(next_reconcile_ts);
}
}
impl OutboundWatchState {
pub fn new(params: OutboundWatchParameters) -> Self {
let remaining_count = params.count;
let min_expiration_ts = params.expiration_ts;
Self {
params,
nodes: vec![],
remaining_count,
opt_next_reconcile_ts: None,
min_expiration_ts,
value_changed_routes: BTreeSet::new(),
}
}
pub fn params(&self) -> &OutboundWatchParameters {
&self.params
}
pub fn nodes(&self) -> &Vec<PerNodeKey> {
&self.nodes
}
pub fn remaining_count(&self) -> u32 {
self.remaining_count
}
pub fn next_reconcile_ts(&self) -> Option<Timestamp> {
self.opt_next_reconcile_ts
}
pub fn min_expiration_ts(&self) -> Timestamp {
self.min_expiration_ts
}
pub fn value_changed_routes(&self) -> &BTreeSet<PublicKey> {
&self.value_changed_routes
}
pub fn edit<R, F: FnOnce(&mut OutboundWatchStateEditor) -> R>(
&mut self,
per_node_state: &HashMap<PerNodeKey, PerNodeState>,
closure: F,
) -> R {
let mut editor = OutboundWatchStateEditor { state: self };
let res = closure(&mut editor);
// Update calculated fields
self.min_expiration_ts = self
.nodes
.iter()
.map(|x| per_node_state.get(x).unwrap().expiration_ts)
.reduce(|a, b| a.min(b))
.unwrap_or(self.params.expiration_ts);
self.value_changed_routes = self
.nodes
.iter()
.filter_map(|x| per_node_state.get(x).unwrap().opt_value_changed_route)
.collect();
res
}
pub fn watch_node_refs(
&self,
per_node_state: &HashMap<PerNodeKey, PerNodeState>,
) -> Vec<NodeRef> {
self.nodes
.iter()
.map(|x| {
per_node_state
.get(x)
.unwrap()
.watch_node_ref
.clone()
.unwrap()
})
.collect()
}
}

View File

@ -0,0 +1,30 @@
use super::*;
impl_veilid_log_facility!("stor");
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeKey {
/// Watched record key
pub record_key: TypedKey,
/// Watching node id
pub node_id: TypedKey,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeState {
/// Watch Id
pub watch_id: u64,
/// SafetySelection used to contact the node
pub safety_selection: SafetySelection,
/// What key was used to perform the watch
pub opt_watcher: Option<KeyPair>,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// How many value change notifications are left
pub count: u32,
/// Resolved watch node reference
#[serde(skip)]
pub watch_node_ref: Option<NodeRef>,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
}

View File

@ -9,33 +9,56 @@ impl StorageManager {
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let mut inner = self.inner.lock().await;
let inner = &mut *self.inner.lock().await;
let cur_ts = Timestamp::now();
// Iterate all per-node watches and remove expired ones that are unreferenced
// Iterate all per-node watches and remove dead ones from outbound watches
let mut dead_pnks = HashSet::new();
for (pnk, pns) in &inner.outbound_watch_state.per_node_state {
if cur_ts >= pns.expiration_ts || pns.count == 0 {
for (pnk, pns) in &inner.outbound_watch_manager.per_node_state {
if !pns
.watch_node_ref
.as_ref()
.unwrap()
.state(cur_ts)
.is_alive()
{
dead_pnks.insert(*pnk);
}
}
for v in inner.outbound_watch_state.outbound_watches.values() {
// If it's still referenced, keep it
let Some(current) = &v.current else {
for v in inner.outbound_watch_manager.outbound_watches.values_mut() {
let Some(current) = v.state_mut() else {
continue;
};
for pnk in &current.nodes {
dead_pnks.remove(pnk);
current.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
editor.retain_nodes(|x| !dead_pnks.contains(x));
});
}
// Iterate all per-node watches and remove expired ones that are unreferenced
let mut expired_pnks = HashSet::new();
for (pnk, pns) in &inner.outbound_watch_manager.per_node_state {
if cur_ts >= pns.expiration_ts || pns.count == 0 {
expired_pnks.insert(*pnk);
}
}
for v in inner.outbound_watch_manager.outbound_watches.values() {
// If it's still referenced, keep it
let Some(current) = v.state() else {
continue;
};
for pnk in current.nodes() {
expired_pnks.remove(pnk);
}
}
inner
.outbound_watch_state
.outbound_watch_manager
.per_node_state
.retain(|k, _| !dead_pnks.contains(k));
.retain(|k, _| !expired_pnks.contains(k));
// Iterate all outbound watches and determine what work needs doing if any
for (k, v) in &inner.outbound_watch_state.outbound_watches {
for (k, v) in &inner.outbound_watch_manager.outbound_watches {
let k = *k;
// Get next work on watch and queue it if we have something to do
@ -44,60 +67,6 @@ impl StorageManager {
};
}
// // See if the watch is expired or out of updates
// if outbound_watch.min_expiration_ts <= cur_ts || outbound_watch.remaining_count == 0 {
// // See if we can lock the outbound watch
// let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(*k) else {
// // Watch is busy, come back later
// continue;
// };
// let outbound_watch = outbound_watch.clone();
// let safety_selection = v.safety_selection();
// let opt_watcher = v.writer().cloned();
// self.background_operation_processor.add_future(
// self.clone().background_outbound_watch_cancel(
// watch_lock,
// safety_selection,
// opt_watcher,
// outbound_watch,
// ),
// );
// // Send valuechange with dead count and no subkeys
// update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
// key: *k,
// subkeys: ValueSubkeyRangeSet::new(),
// count: 0,
// value: None,
// })));
// // See if the private route we're using is dead
// let mut is_dead = false;
// if !is_dead {
// if let Some(value_changed_route) = outbound_watch.opt_value_changed_route {
// if routing_table
// .route_spec_store()
// .get_route_id_for_key(&value_changed_route)
// .is_none()
// {
// // Route we would receive value changes on is dead
// is_dead = true;
// }
// }
// }
// for outbound_watch in &outbound_watch.per_node {
// // See if the active watch's node is dead
// if !outbound_watch.watch_node.state(cur_ts).is_alive() {
// // Watched node is dead
// is_dead = true;
// }
// }
// }
Ok(())
}
}

View File

@ -1,3 +1,5 @@
use futures_util::StreamExt as _;
use super::*;
impl_veilid_log_facility!("stor");
@ -336,6 +338,502 @@ impl StorageManager {
Ok(owvresult)
}
/// Remove dead watches from the table
pub(super) async fn process_outbound_watch_dead(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
let mut inner = self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_manager
.outbound_watches
.remove(&record_key)
else {
veilid_log!(self warn "dead watch should have still been in the table");
return;
};
if outbound_watch.state().is_some() {
veilid_log!(self warn "dead watch still had current state");
}
if outbound_watch.desired().is_some() {
veilid_log!(self warn "dead watch still had desired params");
}
}
/// Get the list of remaining active watch ids
/// and call their nodes to cancel the watch
pub(super) async fn process_outbound_watch_cancel(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
let per_node_states = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_manager
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
let Some(state) = &mut outbound_watch.state_mut() else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
let mut per_node_states = vec![];
let mut missing_pnks = BTreeSet::new();
for pnk in state.nodes() {
let Some(per_node_state) = inner
.outbound_watch_manager
.per_node_state
.get(pnk)
.cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
missing_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
state.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
editor.retain_nodes(|x| !missing_pnks.contains(x));
});
per_node_states
};
// Now reach out to each node and cancel their watch ids
let mut unord = FuturesUnordered::new();
for (pnk, pns) in per_node_states {
let watch_lock = watch_lock.clone();
unord.push(async move {
let res = self
.outbound_watch_value_cancel(
watch_lock,
pns.opt_watcher,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await;
(pnk, res)
});
}
let mut cancelled = vec![];
while let Some((pnk, res)) = unord.next().await {
match res {
Ok(_) => {
// Remove from 'per node states' because we got some response
cancelled.push(pnk);
}
Err(e) => {
veilid_log!(self debug "outbound watch cancel error: {}", e);
// xxx should do something different for network unreachable vs host unreachable
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch. We'll try the cancel again if we reach this node again during fanout.
}
}
}
// Update state
{
let inner = &mut *self.inner.lock().await;
// Remove per node watches we cancelled
for pnk in cancelled {
if inner
.outbound_watch_manager
.per_node_state
.remove(&pnk)
.is_none()
{
veilid_log!(self warn "per-node watch being cancelled should have still been in the table");
};
}
// Remove outbound watch we've cancelled
let Some(outbound_watch) = inner
.outbound_watch_manager
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
// Mark as dead now that we cancelled
outbound_watch.clear_state();
}
// Send valuechange with dead count and no subkeys to inform the api that this was cancelled
self.update_callback_value_change(record_key, ValueSubkeyRangeSet::new(), 0, None);
}
/// See which existing per-node watches can be renewed
/// and drop the ones that can't be or are dead
pub(super) async fn process_outbound_watch_renew(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
let (per_node_states, renew_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_manager
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(state) = outbound_watch.state_mut() else {
veilid_log!(self warn "watch being renewed should have current state");
return;
};
let mut per_node_states = vec![];
let mut missing_pnks = BTreeSet::new();
for pnk in state.nodes() {
let Some(per_node_state) = inner
.outbound_watch_manager
.per_node_state
.get(pnk)
.cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
missing_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
state.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
editor.retain_nodes(|x| !missing_pnks.contains(x));
});
// Change the params to update count
let mut renew_params = state.params().clone();
renew_params.count = state.remaining_count();
(per_node_states, renew_params)
};
// Now reach out to each node and renew their watches
let mut unord = FuturesUnordered::new();
for (_pnk, pns) in per_node_states {
let params = renew_params.clone();
let watch_lock = watch_lock.clone();
unord.push(async move {
self.outbound_watch_value_change(
watch_lock,
params,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await
});
}
let mut owvresults = vec![];
while let Some(res) = unord.next().await {
match res {
Ok(r) => owvresults.push(r),
Err(e) => {
veilid_log!(self debug "outbound watch change error: {}", e);
}
}
}
// Update state
{
let inner = &mut *self.inner.lock().await;
for owvresult in owvresults {
self.process_outbound_watch_value_result_inner(inner, record_key, owvresult);
}
}
}
/// Perform fanout to add per-node watches to an outbound watch
/// Must have no current state, or have a match to desired parameters
pub(super) async fn process_outbound_watch_reconcile(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
// Get the nodes already active on this watch,
// and the parameters to fanout with for the rest
let (per_node_state, reconcile_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_manager
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being reconciled should have still been in the table");
return;
};
// Get params to reconcile
let Some(desired) = outbound_watch.desired() else {
veilid_log!(self warn "watch being reconciled should have had desired parameters");
return;
};
// Get active per node states
let mut per_node_state = if let Some(state) = outbound_watch.state() {
// Assert matching parameters
if state.params() != &desired {
veilid_log!(self warn "watch being reconciled should have had matching current and desired parameters");
return;
}
state
.nodes()
.iter()
.map(|pnk| {
(
*pnk,
inner
.outbound_watch_manager
.per_node_state
.get(pnk)
.cloned()
.unwrap(),
)
})
.collect()
} else {
HashMap::new()
};
// Add in any inactive per node states
for (pnk, pns) in &inner.outbound_watch_manager.per_node_state {
// Skip any we have already
if per_node_state.contains_key(pnk) {
continue;
}
// Add inactive per node state if the record key matches
if pnk.record_key == record_key {
per_node_state.insert(*pnk, pns.clone());
}
}
let reconcile_params = desired.clone();
(per_node_state, reconcile_params)
};
// Now fan out with parameters and get new per node watches
let cur_ts = Timestamp::now();
let res = self
.outbound_watch_value(watch_lock.clone(), reconcile_params, per_node_state)
.await;
{
let inner = &mut *self.inner.lock().await;
match res {
Ok(owvresult) => {
// Update state
self.process_outbound_watch_value_result_inner(inner, record_key, owvresult);
}
Err(e) => {
veilid_log!(self debug "outbound watch fanout error: {}", e);
}
}
// Regardless of result, set our next possible reconciliation time
let next_ts =
cur_ts + TimestampDuration::new_secs(RECONCILE_OUTBOUND_WATCHES_INTERVAL_SECS);
inner
.outbound_watch_manager
.set_next_reconcile_ts(record_key, next_ts);
}
}
fn process_outbound_watch_value_result_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedKey,
owvresult: OutboundWatchValueResult,
) {
let Some(outbound_watch) = inner
.outbound_watch_manager
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "outbound watch should have still been in the table");
return;
};
let Some(desired) = outbound_watch.desired() else {
veilid_log!(self warn "watch with result should have desired params");
return;
};
let state = outbound_watch.state_mut_or_create(|| desired.clone());
let mut added_nodes = Vec::new();
let mut remove_nodes = BTreeSet::new();
// Handle accepted
for accepted_watch in owvresult.accepted {
let node_id = accepted_watch
.node_ref
.node_ids()
.get(record_key.kind)
.unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
let watch_id = accepted_watch.watch_id;
let opt_watcher = desired.opt_watcher;
let safety_selection = desired.safety_selection;
let expiration_ts = accepted_watch.expiration_ts;
let count = state.remaining_count();
let watch_node_ref = Some(accepted_watch.node_ref);
let opt_value_changed_route = accepted_watch.opt_value_changed_route;
inner.outbound_watch_manager.per_node_state.insert(
pnk,
PerNodeState {
watch_id,
safety_selection,
opt_watcher,
expiration_ts,
count,
watch_node_ref,
opt_value_changed_route,
},
);
added_nodes.push(pnk);
}
// Eliminate rejected
for rejected_node_ref in owvresult.rejected {
let node_id = rejected_node_ref.node_ids().get(record_key.kind).unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
inner.outbound_watch_manager.per_node_state.remove(&pnk);
remove_nodes.insert(pnk);
}
// Drop unanswered but leave in per node state
for ignored_node_ref in owvresult.ignored {
let node_id = ignored_node_ref.node_ids().get(record_key.kind).unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
remove_nodes.insert(pnk);
}
state.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
editor.retain_nodes(|x| !remove_nodes.contains(x));
editor.add_nodes(added_nodes);
});
}
/// Get the next operation for a particular watch's state machine
/// Can be processed in the foreground, or by the bacgkround operation queue
pub(super) fn get_next_outbound_watch_operation(
&self,
key: TypedKey,
opt_watch_lock: Option<AsyncTagLockGuard<TypedKey>>,
cur_ts: Timestamp,
outbound_watch: &OutboundWatch,
) -> Option<PinBoxFutureStatic<()>> {
let registry = self.registry();
let consensus_count = self
.config()
.with(|c| c.network.dht.get_value_count as usize);
// Check states
if outbound_watch.is_dead() {
// Outbound watch is dead
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_dead(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_cancel(&registry, cur_ts) {
// Outbound watch needs to be cancelled
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_cancel(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_renew(&registry, cur_ts) {
// Outbound watch expired but can be renewed
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_renew(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_reconcile(&registry, consensus_count, cur_ts) {
// Outbound watch parameters have changed or it needs more nodes
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_reconcile(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
}
None
}
/// Handle a received 'Watch Value' query
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", target = "dht", skip_all)]
@ -399,7 +897,7 @@ impl StorageManager {
{
// Get the outbound watch
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watch_manager
.outbound_watches
.get_mut(&record_key)
else {
@ -407,7 +905,7 @@ impl StorageManager {
return Ok(NetworkResult::value(()));
};
let Some(current) = &mut outbound_watch.current else {
let Some(state) = outbound_watch.state() else {
// No outbound watch current state means no callback
return Ok(NetworkResult::value(()));
};
@ -417,12 +915,13 @@ impl StorageManager {
record_key,
node_id: inbound_node_id,
};
if !current.nodes.contains(&pnk) {
if !state.nodes().contains(&pnk) {
return Ok(NetworkResult::value(()));
}
// Get per node state
let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk)
let Some(per_node_state) =
inner.outbound_watch_manager.per_node_state.get_mut(&pnk)
else {
// No per node state means no callback
veilid_log!(self warn "missing per node state in outbound watch: {:?}", pnk);
@ -439,16 +938,24 @@ impl StorageManager {
// Update per node state
if count > per_node_state.count {
// If count is greater than our requested count then this is invalid, cancel the watch
// XXX: Should this be a punishment?
veilid_log!(self debug "watch count went backward: {}: {} > {}", record_key, count, per_node_state.count);
// Force count to zero for this node id so it gets cancelled out by the background process
per_node_state.count = 0;
return Ok(NetworkResult::value(()));
} else if count == 0 {
// If count is zero, the per-node watch is done and will get purged by the background process
veilid_log!(self debug "watch count finished: {}", record_key);
} else if count == per_node_state.count {
// If a packet gets delivered twice or something else is wrong, report a non-decremented watch count
// Log this because watch counts should always be decrementing non a per-node basis.
// XXX: Should this be a punishment?
veilid_log!(self debug
"watch count duplicate: {}: {} == {}",
record_key,
count,
per_node_state.count
);
} else {
// Decrement the overall watch count and update the per-node watch count
// Reduce the per-node watch count
veilid_log!(self debug
"watch count decremented: {}: {} < {}",
record_key,
@ -513,18 +1020,21 @@ impl StorageManager {
// If we got an actual update, decrement the total remaining watch count
// Get the outbound watch
let outbound_watch = inner
.outbound_watch_state
.outbound_watch_manager
.outbound_watches
.get_mut(&record_key)
.unwrap();
let current = outbound_watch.current.as_mut().unwrap();
let state = outbound_watch.state_mut().unwrap();
if is_value_seq_newer {
current.remaining_count -= 1;
let remaining_count = state.remaining_count() - 1;
state.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
editor.set_remaining_count(remaining_count);
});
}
(is_value_seq_newer, value, current.remaining_count)
(is_value_seq_newer, value, state.remaining_count())
};
// Announce ValueChanged VeilidUpdate