[ci skip] xfer

This commit is contained in:
Christien Rioux 2025-04-07 15:04:26 -04:00
parent a84f95f2b7
commit 922f4d9e15
2 changed files with 115 additions and 100 deletions

View File

@ -27,6 +27,7 @@ impl StorageManager {
veilid_log!(self warn "dead watch still had desired params");
}
}
/// Get the list of remaining active watch ids
/// and call their nodes to cancel the watch
pub(super) async fn process_outbound_watch_cancel(
@ -78,8 +79,8 @@ impl StorageManager {
let res = self
.outbound_watch_value_cancel(
pnk.record_key,
pns.safety_selection,
pns.opt_watcher,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
@ -106,24 +107,27 @@ impl StorageManager {
// Update state
{
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
for pnk in cancelled {
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&pnk.record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
// Mark as dead now that we cancelled
outbound_watch.current = None;
// Mark as dead now that we cancelled
let Some(_current) = outbound_watch.current.take() else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
}
}
}
/// See which existing per-node watches can be renewed
/// and drop the ones that can't be or are dead
pub(super) async fn process_outbound_watch_renew(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
@ -135,18 +139,18 @@ impl StorageManager {
return;
}
let (per_node_states, params, safety_selection) = {
let (per_node_states, renew_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being cancelled should have current state");
veilid_log!(self warn "watch being renewed should have current state");
return;
};
let mut per_node_states = vec![];
@ -163,13 +167,17 @@ impl StorageManager {
}
current.nodes.retain(|x| !dead_pnks.contains(x));
(per_node_states, current.params.clone())
// Change the params to update count
let mut renew_params = current.params.clone();
renew_params.count = current.remaining_count;
(per_node_states, renew_params)
};
// Now reach out to each node and renew their watches
let mut unord = FuturesUnordered::new();
let cur_ts = Timestamp::now();
for (pnk, pns) in per_node_states {
let params = renew_params.clone();
unord.push(async move {
let res = self
.outbound_watch_value_change(
@ -186,44 +194,70 @@ impl StorageManager {
let mut renewed = vec![];
let mut rejected = vec![];
let mut unanswered = vec![];
while let Some((pnk, res)) = unord.next().await {
match res {
Ok(accepted) => {
Ok(Some(r)) => {
// Note per node states we should keep vs throw away
if accepted {
renewed.push(pnk);
} else {
rejected.push(pnk);
}
renewed.push((pnk, r));
}
Ok(None) => {
rejected.push(pnk);
}
Err(e) => {
veilid_log!(self debug "outbound watch cancel error: {}", e);
veilid_log!(self debug "outbound watch change error: {}", e);
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch.
rejected.push(pnk);
// xxx should do something different for network unreachable vs host unreachable
unanswered.push(pnk);
}
}
}
// // Update state
// {
// let inner = &mut *self.inner.lock().await;
// let Some(outbound_watch) = inner
// .outbound_watch_state
// .outbound_watches
// .get_mut(&record_key)
// else {
// veilid_log!(self warn "watch being cancelled should have still been in the table");
// return;
// };
// let Some(current) = &mut outbound_watch.current else {
// veilid_log!(self warn "watch being cancelled should have current state");
// return;
// };
// Update state
{
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being renewed should have current state");
return;
};
// // Mark as dead now that we cancelled
// outbound_watch.current = None;
// }
let mut dead_pnks = BTreeSet::new();
// Perform renewals
for (pnk, r) in renewed {
let watch_node = r.watch_nodes.first().cloned().unwrap();
let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk)
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(pnk);
continue;
};
per_node_state.count = renew_params.count;
per_node_state.expiration_ts = watch_node.expiration_ts;
per_node_state.watch_id = watch_node.watch_id;
}
// Eliminate rejected
for pnk in rejected {
inner.outbound_watch_state.per_node_state.remove(&pnk);
dead_pnks.insert(pnk);
}
// Drop unanswered but leave in per node state
for pnk in unanswered {
dead_pnks.insert(pnk);
}
current.nodes.retain(|x| !dead_pnks.contains(x));
}
}
pub(super) async fn process_outbound_watch_reconcile(
@ -231,7 +265,13 @@ impl StorageManager {
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
//
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
xxx continue here
}
// Check if client-side watches on opened records either have dead nodes or if the watch has expired
@ -242,18 +282,41 @@ impl StorageManager {
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let inner = self.inner.lock().await;
let mut inner = self.inner.lock().await;
// Iterate all outbound watches
let registry = self.registry();
let cur_ts = Timestamp::now();
let consensus_count = self
.config()
.with(|c| c.network.dht.get_value_count as usize);
// Iterate all per-node watches and remove expired ones that are unreferenced
let mut dead_pnks = HashSet::new();
for (pnk, pns) in &inner.outbound_watch_state.per_node_state {
if cur_ts >= pns.expiration_ts || pns.count == 0 {
dead_pnks.insert(*pnk);
}
}
for (_, v) in &inner.outbound_watch_state.outbound_watches {
// If it's still referenced, keep it
let Some(current) = &v.current else {
continue;
};
for pnk in &current.nodes {
dead_pnks.remove(pnk);
}
}
inner
.outbound_watch_state
.per_node_state
.retain(|k, _| !dead_pnks.contains(k));
// Iterate all outbound watches
// Determine what work needs doing if any
for (k, v) in &inner.outbound_watch_state.outbound_watches {
let k = *k;
// Check states
if v.is_dead() {
// Outbound watch is dead
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else {

View File

@ -29,56 +29,6 @@ pub(super) struct OutboundWatchValueResult {
}
impl StorageManager {
/// Perform a 'watch value cancel' on a set of nodes without fanout
/// Returns the list of successfully cancelled ids and just logs failures
pub(super) async fn outbound_watch_value_cancel_set(
&self,
key: TypedKey,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
outbound_watch: &OutboundWatch,
) -> Vec<u64> {
let mut unord = FuturesUnordered::new();
for pn in &outbound_watch.per_node {
unord.push(async {
let cancelled = match self.outbound_watch_value_cancel(
key,
safety_selection,
opt_watcher,
pn.watch_node.clone(),
pn.id,
).await {
Ok(_) => {
// Either watch was cancelled, or it didn't exist, but it's not there now
true
}
Err(e) => {
veilid_log!(self debug "Outbound watch value (id={}) cancel to {} failed: {}", pn.id, pn.watch_node, e);
false
}
};
if cancelled {
Some(pn.id)
} else {
None
}
});
}
let mut cancelled = vec![];
while let Some(x) = unord.next().await {
match x {
Some(id) => {
cancelled.push(id);
}
None => {}
}
}
cancelled
}
/// Perform a 'watch value cancel' on the network without fanout
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_watch_value_cancel(
@ -149,7 +99,7 @@ impl StorageManager {
.with_safety(safety_selection),
key,
params.subkeys,
params.expiration,
params.expiration_ts,
params.count,
watcher,
Some(watch_id),
@ -447,6 +397,8 @@ impl StorageManager {
inbound_node_id: TypedKey,
watch_id: u64,
) -> VeilidAPIResult<NetworkResult<()>> {
// xxx remember to update per_node_state with lower count
// Update local record store with new value
let (is_value_seq_newer, value) = {
let mut inner = self.inner.lock().await;