This commit is contained in:
Christien Rioux 2025-04-08 12:52:55 -04:00
parent 922f4d9e15
commit 031d6463fa
9 changed files with 704 additions and 588 deletions

View File

@ -236,7 +236,7 @@ impl RoutingTable {
}
// Get all the active watches from the storage manager
let watch_destinations = self.storage_manager().get_active_watch_nodes().await;
let watch_destinations = self.storage_manager().get_outbound_watch_nodes().await;
for watch_destination in watch_destinations {
let registry = self.registry();

View File

@ -24,16 +24,18 @@ impl StorageManager {
} else {
"".to_owned()
};
let watch = if let Some(w) = v.outbound_watch() {
format!(" watch: {:?}\n", w)
} else {
"".to_owned()
};
out += &format!(" {} {}{}\n", k, writer, watch);
out += &format!(" {} {}\n", k, writer);
}
format!("{}]\n", out)
}
pub async fn debug_watched_records(&self) -> String {
let inner = self.inner.lock().await;
let mut out = "[\n".to_owned();
for (k, v) in &inner.outbound_watch_state.outbound_watches {
out += &format!(" {} {:?}\n", k, v);
}
format!("{}]\n", out)
}
pub async fn debug_offline_records(&self) -> String {
let inner = self.inner.lock().await;
let Some(local_record_store) = &inner.local_record_store else {

View File

@ -377,22 +377,37 @@ impl StorageManager {
}
/// Get the set of nodes in our active watches
pub async fn get_active_watch_nodes(&self) -> Vec<Destination> {
pub async fn get_outbound_watch_nodes(&self) -> Vec<Destination> {
let inner = self.inner.lock().await;
let mut out = vec![];
for opened_record in inner.opened_records.values() {
if let Some(aw) = opened_record.outbound_watch() {
for pn in &aw.per_node {
let mut node_set = HashSet::new();
for v in inner.outbound_watch_state.outbound_watches.values() {
if let Some(current) = &v.current {
let node_refs = current.watch_node_refs(&inner.outbound_watch_state.per_node_state);
for node_ref in &node_refs {
let mut found = false;
for nid in node_ref.node_ids().iter() {
if node_set.contains(nid) {
found = true;
break;
}
}
if found {
continue;
}
node_set.insert(node_ref.best_node_id());
out.push(
Destination::direct(
pn.watch_node
.routing_domain_filtered(RoutingDomain::PublicInternet),
node_ref.routing_domain_filtered(RoutingDomain::PublicInternet),
)
.with_safety(opened_record.safety_selection()),
);
.with_safety(current.params.safety_selection),
)
}
}
}
out
}
@ -531,54 +546,14 @@ impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
// Attempt to close the record, returning the opened record if it wasn't already closed
let opened_record = {
let mut inner = self.inner.lock().await;
let Some(opened_record) = Self::close_record_inner(&mut inner, key)? else {
return Ok(());
};
opened_record
};
// See if we have an active watch on the closed record
let Some(active_watch) = opened_record.outbound_watch() else {
let mut inner = self.inner.lock().await;
let Some(_opened_record) = Self::close_record_inner(&mut inner, key)? else {
return Ok(());
};
// Send a one-time cancel request for the watch if we have one and we're online
if !self.dht_is_online() {
veilid_log!(self debug "skipping last-ditch watch cancel because we are offline");
return Ok(());
}
// Use the safety selection we opened the record with
// Use the writer we opened with as the 'watcher' as well
let opt_owvresult = match self
.outbound_watch_value_cancel(
key,
ValueSubkeyRangeSet::full(),
opened_record.safety_selection(),
opened_record.writer().cloned(),
active_watch.id,
active_watch.watch_node,
)
.await
{
Ok(v) => v,
Err(e) => {
veilid_log!(self debug
"close record watch cancel failed: {}", e
);
None
}
};
if let Some(owvresult) = opt_owvresult {
if owvresult.expiration_ts.as_u64() != 0 {
veilid_log!(self debug
"close record watch cancel should have zero expiration"
);
}
} else {
veilid_log!(self debug "close record watch cancel unsuccessful");
}
// Set the watch to cancelled if we have one
// Will process cancellation in the background
inner.outbound_watch_state.set_desired_watch(key, None);
Ok(())
}
@ -875,11 +850,25 @@ impl StorageManager {
// (may need to wait for background operations to complete on the watch)
let watch_lock = self.outbound_watch_lock_table.lock_tag(key).await;
self.watch_values_inner(watch_lock, subkeys, expiration, count)
.await
}
//#[instrument(level = "trace", target = "stor", skip_all)]
async fn watch_values_inner(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
) -> VeilidAPIResult<Timestamp> {
let key = watch_lock.tag();
// Obtain the inner state lock
let inner = self.inner.lock().await;
let mut inner = self.inner.lock().await;
// Get the safety selection and the writer we opened this record
let (safety_selection, opt_writer) = {
let (safety_selection, opt_watcher) = {
let Some(opened_record) = inner.opened_records.get(&key) else {
// Record must be opened already to change watch
apibail_generic!("record not open");
@ -909,17 +898,14 @@ impl StorageManager {
let subkeys = schema.truncate_subkeys(&subkeys, None);
// Calculate desired watch parameters
let desired = if count == 0 {
let desired_params = if count == 0 {
// Cancel
None
} else {
// Get the minimum and maximum expiration timestamp we will accept
let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| {
(
TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)),
TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)),
)
});
// Get the minimum expiration timestamp we will accept
let rpc_timeout_us = self
.config()
.with(|c| TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)));
let cur_ts = get_timestamp();
let min_expiration_ts = Timestamp::new(cur_ts + rpc_timeout_us.as_u64());
let expiration_ts = if expiration.as_u64() == 0 {
@ -935,95 +921,126 @@ impl StorageManager {
expiration_ts,
count,
subkeys,
opt_watcher,
safety_selection,
})
};
// Modify the 'desired' state of the watch or add one if it does not exist
inner.outbound_watch_state.set_desired_watch(key, desired);
inner
.outbound_watch_state
.set_desired_watch(key, desired_params);
// Drop the lock for network access
drop(inner);
// xxx continue here, make a 'reconcile outbound watch' routine that can be called imperatively, wait for it etc.
// Use the safety selection we opened the record with
// Use the writer we opened with as the 'watcher' as well
let opt_owvresult = self
.outbound_watch_value(
key,
subkeys.clone(),
expiration,
count,
safety_selection,
opt_writer,
opt_active_watch.as_ref(),
)
.await?;
// If we did not get a valid response assume nothing changed
let Some(owvresult) = opt_owvresult else {
apibail_try_again!("did not get a valid response");
};
// Clear any existing watch if the watch succeeded or got cancelled
let mut inner = self.inner.lock().await;
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
apibail_generic!("record not open");
};
opened_record.clear_outbound_watch();
// Get the minimum expiration timestamp we will accept
let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| {
(
TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)),
TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)),
)
});
let cur_ts = get_timestamp();
let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64();
let max_expiration_ts = if expiration.as_u64() == 0 {
cur_ts + max_watch_expiration_us.as_u64()
} else {
expiration.as_u64()
};
// Build a new active watch from the watchvalue result
// If the expiration time is less than our minimum expiration time (or zero) consider this watch inactive
let mut expiration_ts = owvresult.expiration_ts;
if expiration_ts.as_u64() < min_expiration_ts {
// Try to fire out a last-chance watch cancellation, so the
if let Some(active_watch) = opt_active_watch.as_ref() {
self.last_change_cancel_watch(active_watch).await;
}
return Ok(Timestamp::new(0));
// Process this watch's state machine operations until we are done
loop {
let opt_op_fut = {
let inner = self.inner.lock().await;
let Some(outbound_watch) = inner.outbound_watch_state.outbound_watches.get(&key)
else {
// Watch is gone
return Ok(Timestamp::new(0));
};
self.get_next_outbound_watch_operation(
key,
Some(watch_lock.clone()),
Timestamp::now(),
outbound_watch,
)
};
let Some(op_fut) = opt_op_fut else {
break;
};
op_fut.await;
}
// If the expiration time is greater than our maximum expiration time, clamp our local watch so we ignore extra valuechanged messages
if expiration_ts.as_u64() > max_expiration_ts {
expiration_ts = Timestamp::new(max_expiration_ts);
}
let inner = self.inner.lock().await;
let expiration_ts = inner
.outbound_watch_state
.get_min_expiration(key)
.unwrap_or_default();
Ok(expiration_ts)
// If we requested a cancellation, then consider this watch cancelled
if count == 0 {
// Expiration returned should be zero if we requested a cancellation
if expiration_ts.as_u64() != 0 {
veilid_log!(self debug "got active watch despite asking for a cancellation");
}
return Ok(Timestamp::new(0));
}
// // Use the safety selection we opened the record with
// // Use the writer we opened with as the 'watcher' as well
// let opt_owvresult = self
// .outbound_watch_value(
// key,
// subkeys.clone(),
// expiration,
// count,
// safety_selection,
// opt_watcher,
// opt_active_watch.as_ref(),
// )
// .await?;
// Keep a record of the watch
opened_record.set_outbound_watch(OutboundWatch {
id: owvresult.watch_id,
expiration_ts,
watch_node: owvresult.watch_node,
opt_value_changed_route: owvresult.opt_value_changed_route,
subkeys,
count,
});
// // If we did not get a valid response assume nothing changed
// let Some(owvresult) = opt_owvresult else {
// apibail_try_again!("did not get a valid response");
// };
Ok(owvresult.expiration_ts)
// // Clear any existing watch if the watch succeeded or got cancelled
// let mut inner = self.inner.lock().await;
// let Some(opened_record) = inner.opened_records.get_mut(&key) else {
// apibail_generic!("record not open");
// };
// opened_record.clear_outbound_watch();
// // Get the minimum expiration timestamp we will accept
// let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| {
// (
// TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)),
// TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)),
// )
// });
// let cur_ts = get_timestamp();
// let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64();
// let max_expiration_ts = if expiration.as_u64() == 0 {
// cur_ts + max_watch_expiration_us.as_u64()
// } else {
// expiration.as_u64()
// };
// // Build a new active watch from the watchvalue result
// // If the expiration time is less than our minimum expiration time (or zero) consider this watch inactive
// let mut expiration_ts = owvresult.expiration_ts;
// if expiration_ts.as_u64() < min_expiration_ts {
// // Try to fire out a last-chance watch cancellation, so the
// if let Some(active_watch) = opt_active_watch.as_ref() {
// self.last_change_cancel_watch(active_watch).await;
// }
// return Ok(Timestamp::new(0));
// }
// // If the expiration time is greater than our maximum expiration time, clamp our local watch so we ignore extra valuechanged messages
// if expiration_ts.as_u64() > max_expiration_ts {
// expiration_ts = Timestamp::new(max_expiration_ts);
// }
// // If we requested a cancellation, then consider this watch cancelled
// if count == 0 {
// // Expiration returned should be zero if we requested a cancellation
// if expiration_ts.as_u64() != 0 {
// veilid_log!(self debug "got active watch despite asking for a cancellation");
// }
// return Ok(Timestamp::new(0));
// }
// // Keep a record of the watch
// opened_record.set_outbound_watch(OutboundWatch {
// id: owvresult.watch_id,
// expiration_ts,
// watch_node: owvresult.watch_node,
// opt_value_changed_route: owvresult.opt_value_changed_route,
// subkeys,
// count,
// });
// Ok(owvresult.expiration_ts)
}
#[instrument(level = "trace", target = "stor", skip_all)]
@ -1032,18 +1049,30 @@ impl StorageManager {
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
) -> VeilidAPIResult<bool> {
// Obtain the watch change lock
// (may need to wait for background operations to complete on the watch)
let watch_lock = self.outbound_watch_lock_table.lock_tag(key).await;
// Calculate change to existing watch
let (subkeys, count, expiration_ts) = {
let inner = self.inner.lock().await;
let Some(opened_record) = inner.opened_records.get(&key) else {
let Some(_opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
};
// See what watch we have currently if any
let Some(active_watch) = opened_record.outbound_watch() else {
let Some(outbound_watch) = inner.outbound_watch_state.outbound_watches.get(&key) else {
// If we didn't have an active watch, then we can just return false because there's nothing to do here
return Ok(false);
};
// Ensure we have a 'desired' watch state
let Some(desired) = &outbound_watch.desired else {
// If we didn't have a desired watch, then we're already cancelling
let still_active = outbound_watch.current.is_some();
return Ok(still_active);
};
// Rewrite subkey range if empty to full
let subkeys = if subkeys.is_empty() {
ValueSubkeyRangeSet::full()
@ -1052,10 +1081,10 @@ impl StorageManager {
};
// Reduce the subkey range
let new_subkeys = active_watch.params.subkeys.difference(&subkeys);
let new_subkeys = desired.subkeys.difference(&subkeys);
// If no change is happening return false
if new_subkeys == active_watch.params.subkeys {
if new_subkeys == desired.subkeys {
return Ok(false);
}
@ -1063,16 +1092,16 @@ impl StorageManager {
let count = if new_subkeys.is_empty() {
0
} else {
active_watch.params.count
desired.count
};
(new_subkeys, count, active_watch.params.expiration_ts)
(new_subkeys, count, desired.expiration_ts)
};
// Update the watch. This just calls through to the above watch_values() function
// This will update the active_watch so we don't need to do that in this routine.
let expiration_ts =
pin_future!(self.watch_values(key, subkeys, expiration_ts, count)).await?;
pin_future!(self.watch_values_inner(watch_lock, subkeys, expiration_ts, count)).await?;
// A zero expiration time returned from watch_value() means the watch is done
// or no subkeys are left, and the watch is no longer active
@ -1081,7 +1110,7 @@ impl StorageManager {
return Ok(false);
}
// Return true because the the watch was changed
// Return true because the the watch was changed, but is not completely gone
Ok(true)
}

View File

@ -1,3 +1,5 @@
use futures_util::StreamExt as _;
use super::*;
impl_veilid_log_facility!("stor");
@ -14,6 +16,8 @@ pub(in crate::storage_manager) struct OutboundWatchParameters {
pub subkeys: ValueSubkeyRangeSet,
/// What key to use to perform the watch
pub opt_watcher: Option<KeyPair>,
/// What safety selection to use on the network
pub safety_selection: SafetySelection,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -30,6 +34,49 @@ pub(in crate::storage_manager) struct OutboundWatchCurrent {
pub opt_value_changed_route: Option<PublicKey>,
}
impl OutboundWatchCurrent {
pub fn new(
params: OutboundWatchParameters,
opt_value_changed_route: Option<CryptoKey>,
) -> Self {
let remaining_count = params.count;
let min_expiration_ts = params.expiration_ts;
Self {
params,
nodes: vec![],
min_expiration_ts,
remaining_count,
opt_value_changed_route,
}
}
pub fn update(&mut self, per_node_state: &HashMap<PerNodeKey, PerNodeState>) {
self.min_expiration_ts = self
.nodes
.iter()
.map(|x| per_node_state.get(x).unwrap().expiration_ts)
.reduce(|a, b| a.min(b))
.unwrap_or(self.params.expiration_ts);
}
pub fn watch_node_refs(
&self,
per_node_state: &HashMap<PerNodeKey, PerNodeState>,
) -> Vec<NodeRef> {
self.nodes
.iter()
.map(|x| {
per_node_state
.get(x)
.unwrap()
.watch_node_ref
.clone()
.unwrap()
})
.collect()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatch {
/// Current state
@ -102,7 +149,7 @@ impl OutboundWatch {
|| self.needs_cancel(registry, cur_ts)
|| self.needs_renew(registry, cur_ts)
{
veilid_log!(registry warn "should have checked for is_dead, needs_cancel first");
veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first");
return false;
}
@ -208,51 +255,438 @@ impl OutboundWatchState {
}
}
}
pub fn get_min_expiration(&self, record_key: TypedKey) -> Option<Timestamp> {
self.outbound_watches
.get(&record_key)
.map(|x| x.current.as_ref().map(|y| y.min_expiration_ts))
.flatten()
}
}
// impl OutboundWatchCurrent {
// pub fn new(
// params: OutboundWatchParameters,
// opt_value_changed_route: Option<CryptoKey>,
// ) -> Self {
// let remaining_count = params.count;
// let min_expiration_ts = params.expiration_ts;
impl StorageManager {
/// Remove dead watches from the table
pub(super) async fn process_outbound_watch_dead(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// Self {
// params,
// per_node: vec![],
// min_expiration_ts,
// remaining_count,
// opt_value_changed_route,
// }
// }
let mut inner = self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.remove(&record_key)
else {
veilid_log!(self warn "dead watch should have still been in the table");
return;
};
// pub fn per_node_outbound_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeOutboundWatch> {
// self.per_node.iter().find(|x| x.id == watch_id)
// }
if outbound_watch.current.is_some() {
veilid_log!(self warn "dead watch still had current state");
}
if outbound_watch.desired.is_some() {
veilid_log!(self warn "dead watch still had desired params");
}
}
// pub fn per_node_outbound_watch_by_id_mut(
// &mut self,
// watch_id: u64,
// ) -> Option<&mut PerNodeOutboundWatch> {
// self.per_node.iter_mut().find(|x| x.id == watch_id)
// }
/// Get the list of remaining active watch ids
/// and call their nodes to cancel the watch
pub(super) async fn process_outbound_watch_cancel(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// pub fn remove_per_node_outbound_watch_by_id(&mut self, watch_id: u64) {
// let Some(n) = self.per_node.iter().position(|x| x.id == watch_id) else {
// return;
// };
// self.per_node.remove(n);
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
// self.update_min_expiration_ts();
// }
let per_node_states = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
let mut per_node_states = vec![];
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(&pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
current.nodes.retain(|x| !dead_pnks.contains(x));
// fn update_min_expiration_ts(&mut self) {
// self.min_expiration_ts = self
// .per_node
// .iter()
// .map(|x| x.expiration_ts)
// .reduce(|a, b| a.min(b))
// .unwrap_or(self.params.expiration_ts);
// }
// }
per_node_states
};
// Now reach out to each node and cancel their watch ids
let mut unord = FuturesUnordered::new();
for (pnk, pns) in per_node_states {
unord.push(async move {
let res = self
.outbound_watch_value_cancel(
pnk.record_key,
pns.opt_watcher,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await;
(pnk, res)
});
}
let mut cancelled = vec![];
while let Some((pnk, res)) = unord.next().await {
match res {
Ok(_) => {
// Remove from 'per node states' because we got some response
cancelled.push(pnk);
}
Err(e) => {
veilid_log!(self debug "outbound watch cancel error: {}", e);
// xxx should do something different for network unreachable vs host unreachable
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch. We'll try the cancel again if we reach this node again during fanout.
}
}
}
// Update state
{
let inner = &mut *self.inner.lock().await;
// Remove per node watches we cancelled
for pnk in cancelled {
if inner
.outbound_watch_state
.per_node_state
.remove(&pnk)
.is_none()
{
veilid_log!(self warn "per-node watch being cancelled should have still been in the table");
};
}
// Remove outbound watch we've cancelled
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
// Mark as dead now that we cancelled
let Some(_current) = outbound_watch.current.take() else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
}
}
/// See which existing per-node watches can be renewed
/// and drop the ones that can't be or are dead
pub(super) async fn process_outbound_watch_renew(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
let (per_node_states, renew_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being renewed should have current state");
return;
};
let mut per_node_states = vec![];
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(&pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
current.nodes.retain(|x| !dead_pnks.contains(x));
// Change the params to update count
let mut renew_params = current.params.clone();
renew_params.count = current.remaining_count;
(per_node_states, renew_params)
};
// Now reach out to each node and renew their watches
let mut unord = FuturesUnordered::new();
for (pnk, pns) in per_node_states {
let params = renew_params.clone();
unord.push(async move {
let res = self
.outbound_watch_value_change(
pnk.record_key,
params,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await;
(pnk, res)
});
}
let mut renewed = vec![];
let mut rejected = vec![];
let mut unanswered = vec![];
while let Some((pnk, res)) = unord.next().await {
match res {
Ok(Some(r)) => {
// Note per node states we should keep vs throw away
renewed.push((pnk, r));
}
Ok(None) => {
rejected.push(pnk);
}
Err(e) => {
veilid_log!(self debug "outbound watch change error: {}", e);
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch.
// xxx should do something different for network unreachable vs host unreachable
unanswered.push(pnk);
}
}
}
// Update state
{
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being renewed should have current state");
return;
};
let mut dead_pnks = BTreeSet::new();
// Perform renewals
for (pnk, r) in renewed {
let watch_node = r.watch_nodes.first().cloned().unwrap();
let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk)
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(pnk);
continue;
};
per_node_state.count = renew_params.count;
per_node_state.expiration_ts = watch_node.expiration_ts;
per_node_state.watch_id = watch_node.watch_id;
}
// Eliminate rejected
for pnk in rejected {
if inner
.outbound_watch_state
.per_node_state
.remove(&pnk)
.is_none()
{
veilid_log!(self warn "per-node watch being renewed should have still been in the table");
}
dead_pnks.insert(pnk);
}
// Drop unanswered but leave in per node state
for pnk in unanswered {
dead_pnks.insert(pnk);
}
current.nodes.retain(|x| !dead_pnks.contains(x));
// Update outbound watch
current.update(&inner.outbound_watch_state.per_node_state);
}
}
/// Perform fanout to add per-node watches to an outbound watch
/// Must have no current state, or have a match to desired parameters
pub(super) async fn process_outbound_watch_reconcile(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
// Get the nodes already active on this watch,
// and the parameters to fanout with for the rest
let (active_nodes, reconcile_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being reconciled should have still been in the table");
return;
};
// Get params to reconcile
let Some(desired) = &outbound_watch.desired else {
veilid_log!(self warn "watch being reconciled should have had desired parameters");
return;
};
let active_nodes = if let Some(current) = &mut outbound_watch.current {
// Assert matching parameters
if &current.params != desired {
veilid_log!(self warn "watch being reconciled should have had matching current and desired parameters");
return;
}
current.nodes.iter().map(|x| x.node_id).collect()
} else {
vec![]
};
let reconcile_params = desired.clone();
(active_nodes, reconcile_params)
};
// Now fan out with parameters and get new per node watches
self.outbound_watch_value(record_key, reconcile_params, active_nodes)
.await;
}
/// Get the next operation for a particular watch's state machine
/// Can be processed in the foreground, or by the bacgkround operation queue
pub(super) fn get_next_outbound_watch_operation(
&self,
key: TypedKey,
opt_watch_lock: Option<AsyncTagLockGuard<TypedKey>>,
cur_ts: Timestamp,
outbound_watch: &OutboundWatch,
) -> Option<PinBoxFutureStatic<()>> {
let registry = self.registry();
let consensus_count = self
.config()
.with(|c| c.network.dht.get_value_count as usize);
// Check states
if outbound_watch.is_dead() {
// Outbound watch is dead
let Some(watch_lock) =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))
else {
return None;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_dead(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_cancel(&registry, cur_ts) {
// Outbound watch needs to be cancelled
let Some(watch_lock) =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))
else {
return None;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_cancel(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_renew(&registry, cur_ts) {
// Outbound watch expired but can be renewed
let Some(watch_lock) =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))
else {
return None;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_renew(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_reconcile(&registry, consensus_count, cur_ts) {
// Outbound watch parameters have changed or it needs more nodes
let Some(watch_lock) =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))
else {
return None;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_reconcile(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
}
None
}
}

View File

@ -1,279 +1,6 @@
use futures_util::StreamExt as _;
use super::*;
impl StorageManager {
/// Remove dead watches from the table
pub(super) async fn process_outbound_watch_dead(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
let mut inner = self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.remove(&record_key)
else {
veilid_log!(self warn "dead watch should have still been in the table");
return;
};
if outbound_watch.current.is_some() {
veilid_log!(self warn "dead watch still had current state");
}
if outbound_watch.desired.is_some() {
veilid_log!(self warn "dead watch still had desired params");
}
}
/// Get the list of remaining active watch ids
/// and call their nodes to cancel the watch
pub(super) async fn process_outbound_watch_cancel(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
let per_node_states = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
let mut per_node_states = vec![];
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(&pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
current.nodes.retain(|x| !dead_pnks.contains(x));
per_node_states
};
// Now reach out to each node and cancel their watch ids
let mut unord = FuturesUnordered::new();
for (pnk, pns) in per_node_states {
unord.push(async move {
let res = self
.outbound_watch_value_cancel(
pnk.record_key,
pns.opt_watcher,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await;
(pnk, res)
});
}
let mut cancelled = vec![];
while let Some((pnk, res)) = unord.next().await {
match res {
Ok(_) => {
// Remove from 'per node states' because we got some response
cancelled.push(pnk);
}
Err(e) => {
veilid_log!(self debug "outbound watch cancel error: {}", e);
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch. We'll try the cancel again if we reach this node again during fanout.
}
}
}
// Update state
{
let inner = &mut *self.inner.lock().await;
for pnk in cancelled {
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&pnk.record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
// Mark as dead now that we cancelled
let Some(_current) = outbound_watch.current.take() else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
}
}
}
/// See which existing per-node watches can be renewed
/// and drop the ones that can't be or are dead
pub(super) async fn process_outbound_watch_renew(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
let (per_node_states, renew_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being renewed should have current state");
return;
};
let mut per_node_states = vec![];
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(&pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
current.nodes.retain(|x| !dead_pnks.contains(x));
// Change the params to update count
let mut renew_params = current.params.clone();
renew_params.count = current.remaining_count;
(per_node_states, renew_params)
};
// Now reach out to each node and renew their watches
let mut unord = FuturesUnordered::new();
for (pnk, pns) in per_node_states {
let params = renew_params.clone();
unord.push(async move {
let res = self
.outbound_watch_value_change(
pnk.record_key,
params,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await;
(pnk, res)
});
}
let mut renewed = vec![];
let mut rejected = vec![];
let mut unanswered = vec![];
while let Some((pnk, res)) = unord.next().await {
match res {
Ok(Some(r)) => {
// Note per node states we should keep vs throw away
renewed.push((pnk, r));
}
Ok(None) => {
rejected.push(pnk);
}
Err(e) => {
veilid_log!(self debug "outbound watch change error: {}", e);
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch.
// xxx should do something different for network unreachable vs host unreachable
unanswered.push(pnk);
}
}
}
// Update state
{
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being renewed should have current state");
return;
};
let mut dead_pnks = BTreeSet::new();
// Perform renewals
for (pnk, r) in renewed {
let watch_node = r.watch_nodes.first().cloned().unwrap();
let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk)
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(pnk);
continue;
};
per_node_state.count = renew_params.count;
per_node_state.expiration_ts = watch_node.expiration_ts;
per_node_state.watch_id = watch_node.watch_id;
}
// Eliminate rejected
for pnk in rejected {
inner.outbound_watch_state.per_node_state.remove(&pnk);
dead_pnks.insert(pnk);
}
// Drop unanswered but leave in per node state
for pnk in unanswered {
dead_pnks.insert(pnk);
}
current.nodes.retain(|x| !dead_pnks.contains(x));
}
}
pub(super) async fn process_outbound_watch_reconcile(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
xxx continue here
}
// Check if client-side watches on opened records either have dead nodes or if the watch has expired
//#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_outbound_watches_task_routine(
@ -284,11 +11,7 @@ impl StorageManager {
) -> EyreResult<()> {
let mut inner = self.inner.lock().await;
let registry = self.registry();
let cur_ts = Timestamp::now();
let consensus_count = self
.config()
.with(|c| c.network.dht.get_value_count as usize);
// Iterate all per-node watches and remove expired ones that are unreferenced
let mut dead_pnks = HashSet::new();
@ -311,74 +34,14 @@ impl StorageManager {
.per_node_state
.retain(|k, _| !dead_pnks.contains(k));
// Iterate all outbound watches
// Determine what work needs doing if any
// Iterate all outbound watches and determine what work needs doing if any
for (k, v) in &inner.outbound_watch_state.outbound_watches {
let k = *k;
// Check states
if v.is_dead() {
// Outbound watch is dead
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else {
continue;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_dead(watch_lock)
.await
}
};
self.background_operation_processor.add_future(fut);
} else if v.needs_cancel(&registry, cur_ts) {
// Outbound watch needs to be cancelled
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else {
continue;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_cancel(watch_lock)
.await
}
};
self.background_operation_processor.add_future(fut);
} else if v.needs_renew(&registry, cur_ts) {
// Outbound watch expired but can be renewed
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else {
continue;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_renew(watch_lock)
.await
}
};
self.background_operation_processor.add_future(fut);
} else if v.needs_reconcile(&registry, consensus_count, cur_ts) {
// Outbound watch parameters have changed or it needs more nodes
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else {
continue;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_reconcile(watch_lock)
.await
}
};
self.background_operation_processor.add_future(fut);
}
// Get next work on watch and queue it if we have something to do
if let Some(op_fut) = self.get_next_outbound_watch_operation(k, None, cur_ts, v) {
self.background_operation_processor.add_future(op_fut);
};
}
// // See if the watch is expired or out of updates

View File

@ -128,55 +128,15 @@ impl StorageManager {
}
}
/// Perform a 'watch value' query on the network using fanout XXX rewrite this so api-based cancel/change/new make sense
/// Perform a 'watch value' query on the network using fanout
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_watch_value(
&self,
key: TypedKey,
params: OutboundWatchParameters,
safety_selection: SafetySelection,
active_nodes: Vec<TypedKey>,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
// if the count is zero, we are cancelling
if count == 0 {
// Ensure active watch is specified
let Some(active_watch) = opt_active_watch else {
apibail_internal!("Must specify an active watch in order to cancel it");
};
return self
.outbound_watch_value_cancel(
key,
subkeys,
safety_selection,
opt_watcher,
active_watch,
)
.await;
}
// if the watch id and watch node are specified, then we're trying to change an existing watch
// first try to do that, then fall back to fanout for a new watch id
if let Some(active_watch) = opt_active_watch {
if let Some(res) = self
.outbound_watch_value_change(
key,
subkeys.clone(),
expiration,
count,
safety_selection,
opt_watcher,
active_watch,
)
.await?
{
// If a change was successful then return immediately
return Ok(Some(res));
}
// Otherwise, treat this like a new watch
}
let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'WatchValue', some of which are the same for 'GetValue' operations
@ -191,8 +151,9 @@ impl StorageManager {
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
// which lives for the duration of the app's runtime
let watcher =
opt_watcher.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value);
let watcher = params
.opt_watcher
.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value);
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {

View File

@ -1474,6 +1474,11 @@ impl VeilidAPI {
out += &storage_manager.debug_opened_records().await;
out
}
"watched" => {
let mut out = "Watched Records:\n".to_string();
out += &storage_manager.debug_watched_records().await;
out
}
"offline" => {
let mut out = "Offline Records:\n".to_string();
out += &storage_manager.debug_offline_records().await;

View File

@ -404,7 +404,7 @@ impl RoutingContext {
///
/// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future,
/// and the returned timestamp will be no later than the requested expiration, but -may- be before the requested expiration.
/// If the returned timestamp is zero it indicates that the watch creation or update has failed. In the case of a failed update, the watch is considered cancelled.
/// If the returned timestamp is zero it indicates that the watch is considered cancelled, either from a failed update or due to `count` being zero
///
/// DHT watches are accepted with the following conditions:
/// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record.

View File

@ -3,8 +3,26 @@ use super::*;
use core::fmt::Debug;
use core::hash::Hash;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct AsyncTagLockGuard<T>
where
T: Hash + Eq + Clone + Debug,
{
inner: Arc<AsyncTagLockGuardInner<T>>,
}
impl<T> AsyncTagLockGuard<T>
where
T: Hash + Eq + Clone + Debug,
{
#[must_use]
pub fn tag(&self) -> T {
self.inner.tag()
}
}
#[derive(Debug)]
struct AsyncTagLockGuardInner<T>
where
T: Hash + Eq + Clone + Debug,
{
@ -13,7 +31,7 @@ where
guard: Option<AsyncMutexGuardArc<()>>,
}
impl<T> AsyncTagLockGuard<T>
impl<T> AsyncTagLockGuardInner<T>
where
T: Hash + Eq + Clone + Debug,
{
@ -25,12 +43,12 @@ where
}
}
pub fn tag(&self) -> T {
fn tag(&self) -> T {
self.tag.clone()
}
}
impl<T> Drop for AsyncTagLockGuard<T>
impl<T> Drop for AsyncTagLockGuardInner<T>
where
T: Hash + Eq + Clone + Debug,
{
@ -137,7 +155,9 @@ where
let guard = asyncmutex_lock_arc!(mutex);
// Return the locked guard
AsyncTagLockGuard::new(self.clone(), tag, guard)
AsyncTagLockGuard {
inner: Arc::new(AsyncTagLockGuardInner::new(self.clone(), tag, guard)),
}
}
pub fn try_lock_tag(&self, tag: T) -> Option<AsyncTagLockGuard<T>> {
@ -164,7 +184,9 @@ where
}
};
// Return guard
Some(AsyncTagLockGuard::new(self.clone(), tag, guard))
Some(AsyncTagLockGuard {
inner: Arc::new(AsyncTagLockGuardInner::new(self.clone(), tag, guard)),
})
}
}