more watchvalue fixes

This commit is contained in:
Christien Rioux 2025-04-12 21:37:07 -04:00
parent 3d88416baa
commit e2f750f207
7 changed files with 193 additions and 125 deletions

View File

@ -51,6 +51,9 @@ impl StorageManager {
pub async fn purge_local_records(&self, reclaim: Option<usize>) -> String {
let mut inner = self.inner.lock().await;
if !inner.opened_records.is_empty() {
return "records still opened".to_owned();
}
let Some(local_record_store) = &mut inner.local_record_store else {
return "not initialized".to_owned();
};
@ -62,6 +65,9 @@ impl StorageManager {
}
pub async fn purge_remote_records(&self, reclaim: Option<usize>) -> String {
let mut inner = self.inner.lock().await;
if !inner.opened_records.is_empty() {
return "records still opened".to_owned();
}
let Some(remote_record_store) = &mut inner.remote_record_store else {
return "not initialized".to_owned();
};

View File

@ -34,6 +34,8 @@ const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1;
const CHECK_OUTBOUND_WATCHES_INTERVAL_SECS: u32 = 1;
/// Frequency to retry reconciliation of watches that are not at consensus
const RECONCILE_OUTBOUND_WATCHES_INTERVAL_SECS: u32 = 30;
/// How long before expiration to try to renew per-node watches
const RENEW_OUTBOUND_WATCHES_DURATION_SECS: u32 = 30;
/// Frequency to check for expired server-side watched records
const CHECK_WATCHED_RECORDS_INTERVAL_SECS: u32 = 1;
/// Table store table for storage manager metadata
@ -387,7 +389,7 @@ impl StorageManager {
for v in inner.outbound_watch_manager.outbound_watches.values() {
if let Some(current) = v.state() {
let node_refs =
current.watch_node_refs(&inner.outbound_watch_manager.per_node_state);
current.watch_node_refs(&inner.outbound_watch_manager.per_node_states);
for node_ref in &node_refs {
let mut found = false;
for nid in node_ref.node_ids().iter() {
@ -554,9 +556,20 @@ impl StorageManager {
return Ok(());
};
// Set the watch to cancelled if we have one
// Will process cancellation in the background
inner.outbound_watch_manager.set_desired_watch(key, None);
Ok(())
}
/// Close all opened records
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn close_all_records(&self) -> VeilidAPIResult<()> {
// Attempt to close the record, returning the opened record if it wasn't already closed
let mut inner = self.inner.lock().await;
let keys = inner.opened_records.keys().copied().collect::<Vec<_>>();
for key in keys {
let Some(_opened_record) = Self::close_record_inner(&mut inner, key)? else {
return Ok(());
};
}
Ok(())
}
@ -565,10 +578,12 @@ impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn delete_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
// Ensure the record is closed
self.close_record(key).await?;
let mut inner = self.inner.lock().await;
let Some(_opened_record) = Self::close_record_inner(&mut inner, key)? else {
return Ok(());
};
// Get record from the local store
let mut inner = self.inner.lock().await;
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
@ -940,8 +955,9 @@ impl StorageManager {
// Process this watch's state machine operations until we are done
loop {
let opt_op_fut = {
let inner = self.inner.lock().await;
let Some(outbound_watch) = inner.outbound_watch_manager.outbound_watches.get(&key)
let mut inner = self.inner.lock().await;
let Some(outbound_watch) =
inner.outbound_watch_manager.outbound_watches.get_mut(&key)
else {
// Watch is gone
return Ok(Timestamp::new(0));
@ -1633,6 +1649,10 @@ impl StorageManager {
return Err(VeilidAPIError::key_not_found(key));
}
// Set the watch to cancelled if we have one
// Will process cancellation in the background
inner.outbound_watch_manager.set_desired_watch(key, None);
Ok(inner.opened_records.remove(&key))
}

View File

@ -17,7 +17,7 @@ pub(in crate::storage_manager) struct OutboundWatchManager {
/// Each watch per record key
pub outbound_watches: HashMap<TypedKey, OutboundWatch>,
/// Last known active watch per node+record
pub per_node_state: HashMap<PerNodeKey, PerNodeState>,
pub per_node_states: HashMap<PerNodeKey, PerNodeState>,
}
impl fmt::Display for OutboundWatchManager {
@ -33,13 +33,13 @@ impl fmt::Display for OutboundWatchManager {
}
}
out += "]\n";
out += "per_node_state: [\n";
out += "per_node_states: [\n";
{
let mut keys = self.per_node_state.keys().copied().collect::<Vec<_>>();
let mut keys = self.per_node_states.keys().copied().collect::<Vec<_>>();
keys.sort();
for k in keys {
let v = self.per_node_state.get(&k).unwrap();
let v = self.per_node_states.get(&k).unwrap();
out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.to_string()));
}
}
@ -59,7 +59,7 @@ impl OutboundWatchManager {
pub fn new() -> Self {
Self {
outbound_watches: HashMap::new(),
per_node_state: HashMap::new(),
per_node_states: HashMap::new(),
}
}
@ -91,7 +91,7 @@ impl OutboundWatchManager {
pub fn set_next_reconcile_ts(&mut self, record_key: TypedKey, next_ts: Timestamp) {
if let Some(outbound_watch) = self.outbound_watches.get_mut(&record_key) {
if let Some(state) = outbound_watch.state_mut() {
state.edit(&self.per_node_state, |editor| {
state.edit(&self.per_node_states, |editor| {
editor.set_next_reconcile_ts(next_ts);
});
}
@ -103,4 +103,58 @@ impl OutboundWatchManager {
.get(&record_key)
.and_then(|x| x.state().map(|y| y.min_expiration_ts()))
}
/// Iterate all per-node watches and remove ones with dead nodes from outbound watches
/// This may trigger reconciliation to increase the number of active per-node watches
/// for an outbound watch that is still alive
pub fn update_per_node_states(&mut self, cur_ts: Timestamp) {
// Node is unreachable
let mut dead_pnks = HashSet::new();
// Per-node expiration reached
let mut expired_pnks = HashSet::new();
// Count reached
let mut finished_pnks = HashSet::new();
for (pnk, pns) in &self.per_node_states {
if pns.count == 0 {
// If per-node watch is done, add to finished list
finished_pnks.insert(*pnk);
} else if !pns
.watch_node_ref
.as_ref()
.unwrap()
.state(cur_ts)
.is_alive()
{
// If node is unreachable add to dead list
dead_pnks.insert(*pnk);
} else if cur_ts >= pns.expiration_ts {
// If per-node watch has expired add to expired list
expired_pnks.insert(*pnk);
}
}
// Go through and remove nodes that are dead or finished from active states
// If an expired per-node watch is still referenced, it may be renewable
// so remove it from the expired list
for v in self.outbound_watches.values_mut() {
let Some(current) = v.state_mut() else {
continue;
};
// Don't drop expired per-node watches that could be renewed (still referenced by this watch)
for node in current.nodes() {
expired_pnks.remove(node);
}
// Remove dead and finished per-node watch nodes from this outbound watch
current.edit(&self.per_node_states, |editor| {
editor.retain_nodes(|x| !dead_pnks.contains(x) && !finished_pnks.contains(x));
});
}
// Drop finished per-node watches and unreferenced expired per-node watches
self.per_node_states
.retain(|k, _| !finished_pnks.contains(k) && !expired_pnks.contains(k));
}
}

View File

@ -76,38 +76,51 @@ impl OutboundWatch {
self.desired = desired;
}
/// Check for desired state changes
pub fn update_desired_state(&mut self, cur_ts: Timestamp) {
let Some(desired) = self.desired.as_ref() else {
// No desired parameters means this is already done
return;
};
// Check if desired parameters have expired
if desired.expiration_ts.as_u64() != 0 && desired.expiration_ts <= cur_ts {
// Expired
self.set_desired(None);
return;
}
// Check if the existing state has no remaining count
if let Some(state) = self.state.as_ref() {
if state.remaining_count() == 0 {
// No remaining count
self.set_desired(None);
}
}
}
/// Returns true if this outbound watch can be removed from the table
pub fn is_dead(&self) -> bool {
self.desired.is_none() && self.state.is_none()
}
/// Returns true if this outbound watch needs to be cancelled
pub fn needs_cancel(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool {
pub fn needs_cancel(&self, registry: &VeilidComponentRegistry) -> bool {
if self.is_dead() {
veilid_log!(registry warn "should have checked for is_dead first");
return false;
}
// If there is no current watch then there is nothing to cancel
let Some(state) = self.state() else {
let Some(state) = self.state.as_ref() else {
return false;
};
// If the total number of changes has been reached
// then we're done and should cancel
if state.remaining_count() == 0 {
return true;
}
// If we have expired and can't renew, then cancel
if state.params().expiration_ts.as_u64() != 0 && cur_ts >= state.params().expiration_ts {
return true;
}
// If the desired parameters is None then cancel
let Some(desired) = self.desired.as_ref() else {
return true;
};
// If the desired parameters is different than the current parameters
// then cancel so we can eventually reconcile to the new parameters
state.params() != desired
@ -115,20 +128,26 @@ impl OutboundWatch {
/// Returns true if this outbound watch can be renewed
pub fn needs_renew(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool {
if self.is_dead() || self.needs_cancel(registry, cur_ts) {
if self.is_dead() || self.needs_cancel(registry) {
veilid_log!(registry warn "should have checked for is_dead and needs_cancel first");
return false;
}
// If there is no current watch then there is nothing to renew
let Some(state) = self.state() else {
let Some(state) = self.state.as_ref() else {
return false;
};
// If the watch has per node watches that have expired,
// but we can extend our watch then renew
if cur_ts >= state.min_expiration_ts()
&& (state.params().expiration_ts.as_u64() == 0 || cur_ts < state.params().expiration_ts)
// but we can extend our watch then renew. Do this only within RENEW_OUTBOUND_WATCHES_DURATION_SECS
// of the actual expiration. If we're looking at this after the actual expiration, don't try because
// the watch id will have died
let renew_ts = cur_ts + TimestampDuration::new_secs(RENEW_OUTBOUND_WATCHES_DURATION_SECS);
if renew_ts >= state.min_expiration_ts()
&& cur_ts < state.min_expiration_ts()
&& (state.params().expiration_ts.as_u64() == 0
|| renew_ts < state.params().expiration_ts)
{
return true;
}
@ -156,10 +175,7 @@ impl OutboundWatch {
consensus_count: usize,
cur_ts: Timestamp,
) -> bool {
if self.is_dead()
|| self.needs_cancel(registry, cur_ts)
|| self.needs_renew(registry, cur_ts)
{
if self.is_dead() || self.needs_cancel(registry) || self.needs_renew(registry, cur_ts) {
veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first");
return false;
}

View File

@ -13,56 +13,14 @@ impl StorageManager {
let cur_ts = Timestamp::now();
// Iterate all per-node watches and remove dead ones from outbound watches
let mut dead_pnks = HashSet::new();
for (pnk, pns) in &inner.outbound_watch_manager.per_node_state {
if !pns
.watch_node_ref
.as_ref()
.unwrap()
.state(cur_ts)
.is_alive()
{
dead_pnks.insert(*pnk);
}
}
for v in inner.outbound_watch_manager.outbound_watches.values_mut() {
let Some(current) = v.state_mut() else {
continue;
};
current.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
editor.retain_nodes(|x| !dead_pnks.contains(x));
});
}
// Iterate all per-node watches and remove expired ones that are unreferenced
let mut expired_pnks = HashSet::new();
for (pnk, pns) in &inner.outbound_watch_manager.per_node_state {
if cur_ts >= pns.expiration_ts || pns.count == 0 {
expired_pnks.insert(*pnk);
}
}
for v in inner.outbound_watch_manager.outbound_watches.values() {
// If it's still referenced, keep it
let Some(current) = v.state() else {
continue;
};
for pnk in current.nodes() {
expired_pnks.remove(pnk);
}
}
inner
.outbound_watch_manager
.per_node_state
.retain(|k, _| !expired_pnks.contains(k));
// Update per-node watch states
// Desired state updates are performed by get_next_outbound_watch_operation
inner.outbound_watch_manager.update_per_node_states(cur_ts);
// Iterate all outbound watches and determine what work needs doing if any
for (k, v) in &inner.outbound_watch_manager.outbound_watches {
let k = *k;
for (k, v) in &mut inner.outbound_watch_manager.outbound_watches {
// Get next work on watch and queue it if we have something to do
if let Some(op_fut) = self.get_next_outbound_watch_operation(k, None, cur_ts, v) {
if let Some(op_fut) = self.get_next_outbound_watch_operation(*k, None, cur_ts, v) {
self.background_operation_processor.add_future(op_fut);
};
}

View File

@ -74,7 +74,7 @@ impl StorageManager {
)?;
if wva.answer.accepted {
veilid_log!(self debug "Outbound watch canceled: id={} ({})", wva.answer.watch_id, watch_node);
veilid_log!(self debug "Outbound watch cancelled: id={} ({})", wva.answer.watch_id, watch_node);
Ok(true)
} else {
veilid_log!(self debug "Outbound watch id did not exist: id={} ({})", watch_id, watch_node);
@ -125,6 +125,8 @@ impl StorageManager {
if wva.answer.accepted {
if watch_id != wva.answer.watch_id {
veilid_log!(self debug "WatchValue changed: id={}->{} expiration_ts={} ({})", watch_id, wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), watch_node);
} else if wva.answer.expiration_ts.as_u64() == 0 {
veilid_log!(self debug "WatchValue not renewed: id={} ({})", watch_id, watch_node);
} else {
veilid_log!(self debug "WatchValue renewed: id={} expiration_ts={} ({})", watch_id, display_ts(wva.answer.expiration_ts.as_u64()), watch_node);
}
@ -361,6 +363,9 @@ impl StorageManager {
if outbound_watch.desired().is_some() {
veilid_log!(self warn "dead watch still had desired params");
}
// Send valuechange with dead count and no subkeys to inform the api that this watch is now gone completely
self.update_callback_value_change(record_key, ValueSubkeyRangeSet::new(), 0, None);
}
/// Get the list of remaining active watch ids
@ -395,7 +400,7 @@ impl StorageManager {
for pnk in state.nodes() {
let Some(per_node_state) = inner
.outbound_watch_manager
.per_node_state
.per_node_states
.get(pnk)
.cloned()
else {
@ -406,7 +411,7 @@ impl StorageManager {
per_node_states.push((*pnk, per_node_state));
}
state.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
state.edit(&inner.outbound_watch_manager.per_node_states, |editor| {
editor.retain_nodes(|x| !missing_pnks.contains(x));
});
@ -456,7 +461,7 @@ impl StorageManager {
for pnk in cancelled {
if inner
.outbound_watch_manager
.per_node_state
.per_node_states
.remove(&pnk)
.is_none()
{
@ -477,9 +482,6 @@ impl StorageManager {
// Mark as dead now that we cancelled
outbound_watch.clear_state();
}
// Send valuechange with dead count and no subkeys to inform the api that this was cancelled
self.update_callback_value_change(record_key, ValueSubkeyRangeSet::new(), 0, None);
}
/// See which existing per-node watches can be renewed
@ -514,7 +516,7 @@ impl StorageManager {
for pnk in state.nodes() {
let Some(per_node_state) = inner
.outbound_watch_manager
.per_node_state
.per_node_states
.get(pnk)
.cloned()
else {
@ -524,7 +526,7 @@ impl StorageManager {
};
per_node_states.push((*pnk, per_node_state));
}
state.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
state.edit(&inner.outbound_watch_manager.per_node_states, |editor| {
editor.retain_nodes(|x| !missing_pnks.contains(x));
});
@ -616,7 +618,7 @@ impl StorageManager {
*pnk,
inner
.outbound_watch_manager
.per_node_state
.per_node_states
.get(pnk)
.cloned()
.unwrap(),
@ -628,7 +630,7 @@ impl StorageManager {
};
// Add in any inactive per node states
for (pnk, pns) in &inner.outbound_watch_manager.per_node_state {
for (pnk, pns) in &inner.outbound_watch_manager.per_node_states {
// Skip any we have already
if per_node_state.contains_key(pnk) {
continue;
@ -707,28 +709,37 @@ impl StorageManager {
node_id,
};
let watch_id = accepted_watch.watch_id;
let opt_watcher = desired.opt_watcher;
let safety_selection = desired.safety_selection;
let expiration_ts = accepted_watch.expiration_ts;
let count = state.remaining_count();
let watch_node_ref = Some(accepted_watch.node_ref);
let opt_value_changed_route = accepted_watch.opt_value_changed_route;
// Insert state, possibly overwriting an existing one
inner.outbound_watch_manager.per_node_state.insert(
pnk,
PerNodeState {
watch_id,
safety_selection,
opt_watcher,
expiration_ts,
count,
watch_node_ref,
opt_value_changed_route,
},
);
added_nodes.push(pnk);
// Check for accepted watch that came back with a dead watch
// (non renewal, watch id didn't exist, didn't renew in time)
if expiration_ts.as_u64() != 0 && count > 0 {
// Insert state, possibly overwriting an existing one
let watch_id = accepted_watch.watch_id;
let opt_watcher = desired.opt_watcher;
let safety_selection = desired.safety_selection;
let watch_node_ref = Some(accepted_watch.node_ref);
let opt_value_changed_route = accepted_watch.opt_value_changed_route;
inner.outbound_watch_manager.per_node_states.insert(
pnk,
PerNodeState {
watch_id,
safety_selection,
opt_watcher,
expiration_ts,
count,
watch_node_ref,
opt_value_changed_route,
},
);
added_nodes.push(pnk);
} else {
// Remove per node state because this watch id was not renewed
inner.outbound_watch_manager.per_node_states.remove(&pnk);
remove_nodes.insert(pnk);
}
}
// Eliminate rejected
for rejected_node_ref in owvresult.rejected {
@ -737,7 +748,7 @@ impl StorageManager {
record_key,
node_id,
};
inner.outbound_watch_manager.per_node_state.remove(&pnk);
inner.outbound_watch_manager.per_node_states.remove(&pnk);
remove_nodes.insert(pnk);
}
// Drop unanswered but leave in per node state
@ -750,26 +761,30 @@ impl StorageManager {
remove_nodes.insert(pnk);
}
state.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
state.edit(&inner.outbound_watch_manager.per_node_states, |editor| {
editor.retain_nodes(|x| !remove_nodes.contains(x));
editor.add_nodes(added_nodes);
});
}
/// Get the next operation for a particular watch's state machine
/// Can be processed in the foreground, or by the bacgkround operation queue
/// Can be processed in the foreground, or by the background operation queue
pub(super) fn get_next_outbound_watch_operation(
&self,
key: TypedKey,
opt_watch_lock: Option<AsyncTagLockGuard<TypedKey>>,
cur_ts: Timestamp,
outbound_watch: &OutboundWatch,
outbound_watch: &mut OutboundWatch,
) -> Option<PinBoxFutureStatic<()>> {
let registry = self.registry();
let consensus_count = self
.config()
.with(|c| c.network.dht.get_value_count as usize);
// Terminate the 'desired' params for watches
// that have no remaining count or have expired
outbound_watch.update_desired_state(cur_ts);
// Check states
if outbound_watch.is_dead() {
// Outbound watch is dead
@ -786,7 +801,7 @@ impl StorageManager {
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_cancel(&registry, cur_ts) {
} else if outbound_watch.needs_cancel(&registry) {
// Outbound watch needs to be cancelled
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
@ -922,7 +937,7 @@ impl StorageManager {
// Get per node state
let Some(per_node_state) =
inner.outbound_watch_manager.per_node_state.get_mut(&pnk)
inner.outbound_watch_manager.per_node_states.get_mut(&pnk)
else {
// No per node state means no callback
veilid_log!(self warn "missing per node state in outbound watch: {:?}", pnk);
@ -1040,8 +1055,8 @@ impl StorageManager {
let state = outbound_watch.state_mut().unwrap();
if is_value_seq_newer {
let remaining_count = state.remaining_count() - 1;
state.edit(&inner.outbound_watch_manager.per_node_state, |editor| {
let remaining_count = state.remaining_count().saturating_sub(1);
state.edit(&inner.outbound_watch_manager.per_node_states, |editor| {
editor.set_remaining_count(remaining_count);
});
}
@ -1052,9 +1067,8 @@ impl StorageManager {
// Announce ValueChanged VeilidUpdate
// * if the value in the update had a newer sequence number
// * if more than a single subkey has changed
// * if the count was zero meaning cancelled
let do_update = is_value_seq_newer || subkeys.len() > 1 || remaining_count == 0;
// * cancellations (count=0) are sent by process_outbound_watch_dead(), not here
let do_update = is_value_seq_newer || subkeys.len() > 1;
if do_update {
let value = if is_value_seq_newer {
Some(value.unwrap().value_data().clone())

View File

@ -1807,7 +1807,7 @@ impl VeilidAPI {
parse_duration,
)
.ok()
.map(|dur| dur + get_timestamp())
.map(|dur| if dur == 0 { 0 } else { dur + get_timestamp() })
.unwrap_or_else(|| {
rest_defaults = true;
Default::default()