checkpoint

This commit is contained in:
Christien Rioux 2025-04-10 19:38:34 -04:00
parent 39cb380474
commit c108fab262
10 changed files with 351 additions and 352 deletions

View File

@ -39,6 +39,7 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp
}
}
#[expect(dead_code)]
pub fn unlocked(&self) -> N {
self.nr.clone()
}

View File

@ -7,8 +7,9 @@ struct FanoutContext<'a> {
done: bool,
}
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Default)]
pub enum FanoutResultKind {
#[default]
Incomplete,
Timeout,
Consensus,
@ -19,11 +20,6 @@ impl FanoutResultKind {
matches!(self, Self::Incomplete)
}
}
impl Default for FanoutResultKind {
fn default() -> Self {
return FanoutResultKind::Incomplete;
}
}
#[derive(Clone, Debug, Default)]
pub struct FanoutResult {
@ -433,9 +429,7 @@ impl<'a> FanoutCall<'a> {
// Initialize closest nodes list
{
let context_locked = &mut *context.lock();
if let Err(e) = self.init_closest_nodes(context_locked) {
return Err(e);
}
self.init_closest_nodes(context_locked)?;
// Ensure we include the most recent nodes
context_locked.fanout_queue.add(&init_fanout_queue);

View File

@ -42,7 +42,7 @@ pub struct FanoutQueue<'a> {
receiver: flume::Receiver<flume::Sender<NodeRef>>,
}
impl<'a> fmt::Debug for FanoutQueue<'a> {
impl fmt::Debug for FanoutQueue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FanoutQueue")
.field("crypto_kind", &self.crypto_kind)
@ -119,11 +119,11 @@ impl<'a> FanoutQueue<'a> {
// Get the next work and send it along
for x in &mut self.sorted_nodes {
// If there are no work receivers left then we should stop trying to send
if self.receiver.len() == 0 {
if self.receiver.is_empty() {
break;
}
let node = self.nodes.get_mut(&x).unwrap();
let node = self.nodes.get_mut(x).unwrap();
if matches!(node.status, FanoutNodeStatus::Queued) {
// Send node to a work request
while let Ok(work_sender) = self.receiver.try_recv() {

View File

@ -280,7 +280,7 @@ impl RPCProcessor {
// See if we have this record ourselves, if so, accept the watch
let storage_manager = self.storage_manager();
let watch_result = network_result_try!(storage_manager
.inbound_watch_value(key, params, watch_id,)
.inbound_watch_value(key, params, watch_id)
.await
.map_err(RPCError::internal)?);

View File

@ -211,7 +211,7 @@ impl StorageManager {
match fanout_result.kind {
FanoutResultKind::Incomplete => {
// Send partial update if desired, if we've gotten at least one consensus node
if ctx.send_partial_update && fanout_result.consensus_nodes.len() >= 1 {
if ctx.send_partial_update && !fanout_result.consensus_nodes.is_empty() {
ctx.send_partial_update = false;
// Return partial result

View File

@ -268,8 +268,7 @@ impl OutboundWatchState {
pub fn get_min_expiration(&self, record_key: TypedKey) -> Option<Timestamp> {
self.outbound_watches
.get(&record_key)
.map(|x| x.current.as_ref().map(|y| y.min_expiration_ts))
.flatten()
.and_then(|x| x.current.as_ref().map(|y| y.min_expiration_ts))
}
}
@ -330,7 +329,7 @@ impl StorageManager {
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(&pnk).cloned()
inner.outbound_watch_state.per_node_state.get(pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
@ -346,10 +345,11 @@ impl StorageManager {
// Now reach out to each node and cancel their watch ids
let mut unord = FuturesUnordered::new();
for (pnk, pns) in per_node_states {
let watch_lock = watch_lock.clone();
unord.push(async move {
let res = self
.outbound_watch_value_cancel(
pnk.record_key,
watch_lock,
pns.opt_watcher,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
@ -442,7 +442,7 @@ impl StorageManager {
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(&pnk).cloned()
inner.outbound_watch_state.per_node_state.get(pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
@ -461,41 +461,25 @@ impl StorageManager {
// Now reach out to each node and renew their watches
let mut unord = FuturesUnordered::new();
for (pnk, pns) in per_node_states {
for (_pnk, pns) in per_node_states {
let params = renew_params.clone();
let watch_lock = watch_lock.clone();
unord.push(async move {
let res = self
.outbound_watch_value_change(
pnk.record_key,
params,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await;
(pnk, res)
self.outbound_watch_value_change(
watch_lock,
params,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await
});
}
let mut renewed = vec![];
let mut rejected = vec![];
let mut unanswered = vec![];
while let Some((pnk, res)) = unord.next().await {
let mut owvresults = vec![];
while let Some(res) = unord.next().await {
match res {
Ok(Some(r)) => {
// Note per node states we should keep vs throw away
renewed.push((pnk, r));
}
Ok(None) => {
rejected.push(pnk);
}
Ok(r) => owvresults.push(r),
Err(e) => {
veilid_log!(self debug "outbound watch change error: {}", e);
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch.
// xxx should do something different for network unreachable vs host unreachable
unanswered.push(pnk);
}
}
}
@ -503,55 +487,9 @@ impl StorageManager {
// Update state
{
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being renewed should have current state");
return;
};
let mut dead_pnks = BTreeSet::new();
// Perform renewals
for (pnk, r) in renewed {
let watch_node = r.watch_nodes.first().cloned().unwrap();
let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk)
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(pnk);
continue;
};
per_node_state.count = renew_params.count;
per_node_state.expiration_ts = watch_node.expiration_ts;
per_node_state.watch_id = watch_node.watch_id;
for owvresult in owvresults {
self.process_outbound_watch_value_result_inner(inner, record_key, owvresult);
}
// Eliminate rejected
for pnk in rejected {
if inner
.outbound_watch_state
.per_node_state
.remove(&pnk)
.is_none()
{
veilid_log!(self warn "per-node watch being renewed should have still been in the table");
}
dead_pnks.insert(pnk);
}
// Drop unanswered but leave in per node state
for pnk in unanswered {
dead_pnks.insert(pnk);
}
current.nodes.retain(|x| !dead_pnks.contains(x));
// Update outbound watch
current.update(&inner.outbound_watch_state.per_node_state);
}
}
@ -570,7 +508,7 @@ impl StorageManager {
// Get the nodes already active on this watch,
// and the parameters to fanout with for the rest
let (active_nodes, reconcile_params) = {
let (per_node_state, reconcile_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
@ -587,32 +525,68 @@ impl StorageManager {
return;
};
let active_nodes = if let Some(current) = &mut outbound_watch.current {
// Get active per node states
let mut per_node_state = if let Some(current) = &mut outbound_watch.current {
// Assert matching parameters
if &current.params != desired {
veilid_log!(self warn "watch being reconciled should have had matching current and desired parameters");
return;
}
current.nodes.iter().map(|x| x.node_id).collect()
current
.nodes
.iter()
.map(|pnk| {
(
*pnk,
inner
.outbound_watch_state
.per_node_state
.get(pnk)
.cloned()
.unwrap(),
)
})
.collect()
} else {
vec![]
HashMap::new()
};
// Add in any inactive per node states
for (pnk, pns) in &inner.outbound_watch_state.per_node_state {
// Skip any we have already
if per_node_state.contains_key(pnk) {
continue;
}
// Add inactive per node state if the record key matches
if pnk.record_key == record_key {
per_node_state.insert(*pnk, pns.clone());
}
}
let reconcile_params = desired.clone();
(active_nodes, reconcile_params)
(per_node_state, reconcile_params)
};
// Now fan out with parameters and get new per node watches
let cur_ts = Timestamp::now();
let res = self
.outbound_watch_value(record_key, reconcile_params, active_nodes)
.outbound_watch_value(watch_lock.clone(), reconcile_params, per_node_state)
.await;
// Regardless of result, set our next possible reconciliation time
{
let inner = &mut *self.inner.lock().await;
match res {
Ok(owvresult) => {
// Update state
self.process_outbound_watch_value_result_inner(inner, record_key, owvresult);
}
Err(e) => {
veilid_log!(self debug "outbound watch fanout error: {}", e);
}
}
// Regardless of result, set our next possible reconciliation time
if let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
@ -623,20 +597,6 @@ impl StorageManager {
outbound_watch.set_next_reconcile_ts(next_ts);
}
}
match res {
Ok(v) => {
//
}
Err(e) => {
veilid_log!(self debug "outbound watch fanout error: {}", e);
// ??? Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch.
// xxx should do something different for network unreachable vs host unreachable
//unanswered.push(pnk);
}
}
}
fn process_outbound_watch_value_result_inner(
@ -659,53 +619,70 @@ impl StorageManager {
};
let current = {
if outbound_watch.current.is_none() {
outbound_watch.current = Some(OutboundWatchCurrent {
params: desired.clone(),
nodes: vec![],
min_expiration_ts: desired.expiration_ts,
remaining_count: desired.count,
opt_next_reconcile_ts: None,
});
outbound_watch.current = Some(OutboundWatchCurrent::new(desired.clone()));
}
outbound_watch.current.as_mut().unwrap()
};
// let mut dead_pnks = BTreeSet::new();
let mut dead_pnks = BTreeSet::new();
// // Perform renewals
// for (pnk, r) in renewed {
// let watch_node = r.watch_nodes.first().cloned().unwrap();
// let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk)
// else {
// veilid_log!(self warn "missing per-node state for watch");
// dead_pnks.insert(pnk);
// continue;
// };
// per_node_state.count = renew_params.count;
// per_node_state.expiration_ts = watch_node.expiration_ts;
// per_node_state.watch_id = watch_node.watch_id;
// }
// // Eliminate rejected
// for pnk in rejected {
// if inner
// .outbound_watch_state
// .per_node_state
// .remove(&pnk)
// .is_none()
// {
// veilid_log!(self warn "per-node watch being renewed should have still been in the table");
// }
// dead_pnks.insert(pnk);
// }
// // Drop unanswered but leave in per node state
// for pnk in unanswered {
// dead_pnks.insert(pnk);
// }
// Handle accepted
for accepted_watch in owvresult.accepted {
let node_id = accepted_watch
.node_ref
.node_ids()
.get(record_key.kind)
.unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
// current.nodes.retain(|x| !dead_pnks.contains(x));
let watch_id = accepted_watch.watch_id;
let opt_watcher = desired.opt_watcher;
let safety_selection = desired.safety_selection;
let expiration_ts = accepted_watch.expiration_ts;
let count = current.remaining_count;
let watch_node_ref = Some(accepted_watch.node_ref);
let opt_value_changed_route = accepted_watch.opt_value_changed_route;
// // Update outbound watch
// current.update(&inner.outbound_watch_state.per_node_state);
inner.outbound_watch_state.per_node_state.insert(
pnk,
PerNodeState {
watch_id,
safety_selection,
opt_watcher,
expiration_ts,
count,
watch_node_ref,
opt_value_changed_route,
},
);
}
// Eliminate rejected
for rejected_node_ref in owvresult.rejected {
let node_id = rejected_node_ref.node_ids().get(record_key.kind).unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
inner.outbound_watch_state.per_node_state.remove(&pnk);
dead_pnks.insert(pnk);
}
// Drop unanswered but leave in per node state
for ignored_node_ref in owvresult.ignored {
let node_id = ignored_node_ref.node_ids().get(record_key.kind).unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
dead_pnks.insert(pnk);
}
current.nodes.retain(|x| !dead_pnks.contains(x));
// Update outbound watch
current.update(&inner.outbound_watch_state.per_node_state);
}
/// Get the next operation for a particular watch's state machine
@ -725,11 +702,8 @@ impl StorageManager {
// Check states
if outbound_watch.is_dead() {
// Outbound watch is dead
let Some(watch_lock) =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))
else {
return None;
};
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
@ -743,11 +717,8 @@ impl StorageManager {
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_cancel(&registry, cur_ts) {
// Outbound watch needs to be cancelled
let Some(watch_lock) =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))
else {
return None;
};
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
@ -761,11 +732,8 @@ impl StorageManager {
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_renew(&registry, cur_ts) {
// Outbound watch expired but can be renewed
let Some(watch_lock) =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))
else {
return None;
};
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
@ -779,11 +747,8 @@ impl StorageManager {
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_reconcile(&registry, consensus_count, cur_ts) {
// Outbound watch parameters have changed or it needs more nodes
let Some(watch_lock) =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))
else {
return None;
};
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();

View File

@ -191,7 +191,7 @@ impl StorageManager {
match fanout_result.kind {
FanoutResultKind::Incomplete => {
// Send partial update if desired, if we've gotten at least consensus node
if ctx.send_partial_update && fanout_result.consensus_nodes.len() >= 1 {
if ctx.send_partial_update && !fanout_result.consensus_nodes.is_empty() {
ctx.send_partial_update = false;
// Return partial result

View File

@ -20,7 +20,7 @@ impl StorageManager {
dead_pnks.insert(*pnk);
}
}
for (_, v) in &inner.outbound_watch_state.outbound_watches {
for v in inner.outbound_watch_state.outbound_watches.values() {
// If it's still referenced, keep it
let Some(current) = &v.current else {
continue;

View File

@ -1,12 +1,12 @@
use super::*;
use futures_util::StreamExt as _;
impl_veilid_log_facility!("stor");
/// The context of the outbound_watch_value operation
#[derive(Debug, Default)]
struct OutboundWatchValueContext {
/// A successful watch
pub opt_watch_value_result: Option<OutboundWatchValueResult>,
pub watch_value_result: OutboundWatchValueResult,
}
/// The record of a node accepting a watch
@ -22,11 +22,11 @@ pub(super) struct AcceptedWatch {
}
/// The result of the outbound_watch_value operation
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub(super) struct OutboundWatchValueResult {
/// Which nodes accepted the watch
pub accepted: Vec<AcceptedWatch>,
/// Which nodes rejected the watch
/// Which nodes rejected or cancelled the watch
pub rejected: Vec<NodeRef>,
/// Which nodes ignored the watch
pub ignored: Vec<NodeRef>,
@ -37,25 +37,31 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_watch_value_cancel(
&self,
key: TypedKey,
watch_lock: AsyncTagLockGuard<TypedKey>,
opt_watcher: Option<KeyPair>,
safety_selection: SafetySelection,
watch_node: NodeRef,
watch_id: u64,
) -> VeilidAPIResult<bool> {
let record_key = watch_lock.tag();
let routing_domain = RoutingDomain::PublicInternet;
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
// which lives for the duration of the app's runtime
let watcher =
opt_watcher.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value);
let watcher = opt_watcher.unwrap_or_else(|| {
self.anonymous_watch_keys
.get(record_key.kind)
.unwrap()
.value
});
let wva = VeilidAPIError::from_network_result(
self.rpc_processor()
.rpc_call_watch_value(
Destination::direct(watch_node.routing_domain_filtered(routing_domain))
.with_safety(safety_selection),
key,
record_key,
ValueSubkeyRangeSet::new(),
Timestamp::default(),
0,
@ -79,12 +85,12 @@ impl StorageManager {
#[instrument(target = "dht", level = "debug", skip_all, err)]
pub(super) async fn outbound_watch_value_change(
&self,
key: TypedKey,
watch_lock: AsyncTagLockGuard<TypedKey>,
params: OutboundWatchParameters,
safety_selection: SafetySelection,
watch_node: NodeRef,
watch_id: u64,
) -> VeilidAPIResult<OutboundWatchValueResult> {
let record_key = watch_lock.tag();
let routing_domain = RoutingDomain::PublicInternet;
if params.count == 0 {
@ -93,15 +99,18 @@ impl StorageManager {
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
// which lives for the duration of the app's runtime
let watcher = params
.opt_watcher
.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value);
let watcher = params.opt_watcher.unwrap_or_else(|| {
self.anonymous_watch_keys
.get(record_key.kind)
.unwrap()
.value
});
let wva = VeilidAPIError::from_network_result(
pin_future!(self.rpc_processor().rpc_call_watch_value(
Destination::direct(watch_node.routing_domain_filtered(routing_domain))
.with_safety(safety_selection),
key,
.with_safety(params.safety_selection),
record_key,
params.subkeys,
params.expiration_ts,
params.count,
@ -144,10 +153,11 @@ impl StorageManager {
//#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_watch_value(
&self,
key: TypedKey,
watch_lock: AsyncTagLockGuard<TypedKey>,
params: OutboundWatchParameters,
active_nodes: Vec<TypedKey>,
per_node_state: HashMap<PerNodeKey, PerNodeState>,
) -> VeilidAPIResult<OutboundWatchValueResult> {
let record_key = watch_lock.tag();
let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'WatchValue', some of which are the same for 'GetValue' operations
@ -162,13 +172,16 @@ impl StorageManager {
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
// which lives for the duration of the app's runtime
let watcher = params
.opt_watcher
.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value);
let watcher = params.opt_watcher.unwrap_or_else(|| {
self.anonymous_watch_keys
.get(record_key.kind)
.unwrap()
.value
});
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
self.get_value_nodes(key)
self.get_value_nodes(record_key)
.await?
.unwrap_or_default()
.into_iter()
@ -181,76 +194,105 @@ impl StorageManager {
};
// Make do-watch-value answer context
let context = Arc::new(Mutex::new(OutboundWatchValueContext {
opt_watch_value_result: None,
}));
let context = Arc::new(Mutex::new(OutboundWatchValueContext::default()));
// Routine to call to generate fanout
let call_routine = {
let context = context.clone();
let registry = self.registry();
let params = params.clone();
Arc::new(
move |next_node: NodeRef| -> PinBoxFutureStatic<FanoutCallResult> {
let context = context.clone();
let registry = registry.clone();
let params = params.clone();
let subkeys = subkeys.clone();
// See if we have an existing watch id for this node
let node_id = next_node.node_ids().get(record_key.kind).unwrap();
let pnk = PerNodeKey {
record_key,
node_id,
};
let watch_id = per_node_state.get(&pnk).map(|pns| pns.watch_id);
Box::pin(async move {
let rpc_processor = registry.rpc_processor();
let wva = network_result_try!(
rpc_processor
.rpc_call_watch_value(
Destination::direct(next_node.routing_domain_filtered(routing_domain)).with_safety(safety_selection),
key,
subkeys,
expiration,
count,
watcher,
None
)
.await?
);
let rpc_processor = registry.rpc_processor();
// Keep answer if we got one
// (accepted means the node could provide an answer, not that the watch is active)
if wva.answer.accepted {
let mut done = false;
if wva.answer.expiration_ts.as_u64() > 0 {
// If the expiration time is greater than zero this watch is active
veilid_log!(registry debug "Watch created: id={} expiration_ts={} ({})", wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node);
done = true;
let wva = match
rpc_processor
.rpc_call_watch_value(
Destination::direct(next_node.routing_domain_filtered(routing_domain)).with_safety(params.safety_selection),
record_key,
params.subkeys,
params.expiration_ts,
params.count,
watcher,
watch_id
)
.await? {
NetworkResult::Timeout => {
return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Timeout});
}
NetworkResult::ServiceUnavailable(_) |
NetworkResult::NoConnection(_) |
NetworkResult::AlreadyExists(_) |
NetworkResult::InvalidMessage(_) => {
return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid});
}
NetworkResult::Value(v) => v
};
// Keep answer if we got one
// (accepted means the node could provide an answer, not that the watch is active)
let disposition = if wva.answer.accepted {
if wva.answer.expiration_ts.as_u64() > 0 {
// If the expiration time is greater than zero this watch is active
veilid_log!(registry debug "WatchValue accepted: id={} expiration_ts={} ({})", wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node);
// Add to accepted watches
let mut ctx = context.lock();
ctx.watch_value_result.accepted.push(AcceptedWatch{
watch_id: wva.answer.watch_id,
node_ref: next_node.clone(),
expiration_ts: wva.answer.expiration_ts,
opt_value_changed_route: wva.reply_private_route,
});
FanoutCallDisposition::Accepted
} else {
// If the returned expiration time is zero, this watch was cancelled
// If the expiration time is greater than zero this watch is active
veilid_log!(registry debug "WatchValue rejected: id={} expiration_ts={} ({})", wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node);
// Add to rejected watches
let mut ctx = context.lock();
ctx.watch_value_result.rejected.push(next_node.clone());
// Treat as accepted but do not add to consensus
FanoutCallDisposition::Stale
}
} else {
// If the returned expiration time is zero, this watch was cancelled or rejected
// If we are asking to cancel then check_done will stop after the first node
}
if done {
// Add to rejected watches
let mut ctx = context.lock();
ctx.opt_watch_value_result = Some(OutboundWatchValueResult {
expiration_ts: wva.answer.expiration_ts,
watch_id: wva.answer.watch_id,
watch_node: next_node.clone(),
opt_value_changed_route: wva.reply_private_route,
});
}
}
ctx.watch_value_result.rejected.push(next_node.clone());
// Return peers if we have some
veilid_log!(registry debug target:"network_result", "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node);
// Treat as rejected and do not add to consensus
FanoutCallDisposition::Rejected
};
Ok(NetworkResult::value(FanoutCallOutput{peer_info_list: wva.answer.peers}))
}.instrument(tracing::trace_span!("outbound_watch_value call routine"))) as PinBoxFuture<FanoutCallResult>
// Return peers if we have some
veilid_log!(registry debug target:"network_result", "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node);
Ok(FanoutCallOutput{peer_info_list: wva.answer.peers, disposition})
}.instrument(tracing::trace_span!("outbound_watch_value call routine"))) as PinBoxFuture<FanoutCallResult>
},
)
};
// Routine to call to check if we're done at each step
let check_done = {
// let context = context.clone();
// let registry = self.registry();
Arc::new(move |fanout_result: &FanoutResult| -> bool {
// let mut ctx = context.lock();
match fanout_result.kind {
FanoutResultKind::Incomplete => {
// Keep going
@ -261,10 +303,6 @@ impl StorageManager {
true
}
FanoutResultKind::Consensus => {
// assert!(
// ctx.value.is_some() && ctx.descriptor.is_some(),
// "should have gotten a value if we got consensus"
// );
// Signal we're done
true
}
@ -278,7 +316,7 @@ impl StorageManager {
let routing_table = self.routing_table();
let fanout_call = FanoutCall::new(
&routing_table,
key,
record_key,
key_count,
fanout,
consensus_count,
@ -294,52 +332,8 @@ impl StorageManager {
})?;
veilid_log!(self debug "WatchValue Fanout: {:?}", fanout_result);
Ok(Some(OutboundWatchValueResult {
watch_nodes: todo!(),
opt_value_changed_route: todo!(),
}))
// match fanout_call.run(init_fanout_queue).await {
// // If we don't finish in the timeout (too much time passed without a successful watch)
// TimeoutOr::Timeout => {
// // Return the best answer we've got
// let ctx = context.lock();
// if ctx.opt_watch_value_result.is_some() {
// veilid_log!(self debug "WatchValue Fanout Timeout Success");
// } else {
// veilid_log!(self debug "WatchValue Fanout Timeout Failure");
// }
// Ok(ctx.opt_watch_value_result.clone())
// }
// // If we finished with done
// TimeoutOr::Value(Ok(Some(()))) => {
// // Return the best answer we've got
// let ctx = context.lock();
// if ctx.opt_watch_value_result.is_some() {
// veilid_log!(self debug "WatchValue Fanout Success");
// } else {
// veilid_log!(self debug "WatchValue Fanout Failure");
// }
// Ok(ctx.opt_watch_value_result.clone())
// }
// // If we ran out of nodes
// TimeoutOr::Value(Ok(None)) => {
// // Return the best answer we've got
// let ctx = context.lock();
// if ctx.opt_watch_value_result.is_some() {
// veilid_log!(self debug "WatchValue Fanout Exhausted Success");
// } else {
// veilid_log!(self debug "WatchValue Fanout Exhausted Failure");
// }
// Ok(ctx.opt_watch_value_result.clone())
// }
// // Failed
// TimeoutOr::Value(Err(e)) => {
// // If we finished with an error, return that
// veilid_log!(self debug "WatchValue Fanout Error: {}", e);
// Err(e.into())
// }
// }
let owvresult = context.lock().watch_value_result.clone();
Ok(owvresult)
}
/// Handle a received 'Watch Value' query
@ -388,57 +382,81 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_value_changed(
&self,
key: TypedKey,
record_key: TypedKey,
subkeys: ValueSubkeyRangeSet,
mut count: u32,
count: u32,
value: Option<Arc<SignedValueData>>,
inbound_node_id: TypedKey,
watch_id: u64,
) -> VeilidAPIResult<NetworkResult<()>> {
// xxx remember to update per_node_state with lower count
// Operate on the watch for this record
let _watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await;
// Update local record store with new value
let (is_value_seq_newer, value) = {
let mut inner = self.inner.lock().await;
let (is_value_seq_newer, value, remaining_count) = {
let inner = &mut *self.inner.lock().await;
// Don't process update if the record is closed
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
return Ok(NetworkResult::value(()));
};
// No active watch means no callback
let Some(mut active_watch) = opened_record.active_watch_by_id(watch_id) else {
return Ok(NetworkResult::value(()));
};
// If the reporting node is not the same as our watch, don't process the value change
if !active_watch
.watch_node
.node_ids()
.contains(&inbound_node_id)
{
return Ok(NetworkResult::value(()));
}
// Get the outbound watch
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
// No outbound watch means no callback
return Ok(NetworkResult::value(()));
};
if count > active_watch.count {
// If count is greater than our requested count then this is invalid, cancel the watch
veilid_log!(self debug "watch count went backward: {}: {}/{}", key, count, active_watch.count);
// Force count to zero
count = 0;
opened_record.remove_active_watch(watch_id);
} else if count == 0 {
// If count is zero, we're done, cancel the watch and the app can renew it if it wants
veilid_log!(self debug "watch count finished: {}", key);
opened_record.clear_active_watches();
} else {
veilid_log!(self debug
"watch count decremented: {}: {}/{}",
key,
count,
active_watch.count
);
active_watch.count = count;
opened_record.set_outbound_watch(active_watch);
let Some(current) = &mut outbound_watch.current else {
// No outbound watch current state means no callback
return Ok(NetworkResult::value(()));
};
// If the reporting node is not part of our current watch, don't process the value change
let pnk = PerNodeKey {
record_key,
node_id: inbound_node_id,
};
if !current.nodes.contains(&pnk) {
return Ok(NetworkResult::value(()));
}
// Get per node state
let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk)
else {
// No per node state means no callback
veilid_log!(self warn "missing per node state in outbound watch: {:?}", pnk);
return Ok(NetworkResult::value(()));
};
// If watch id doesn't match it's for an older watch and should be ignored
if per_node_state.watch_id != watch_id {
// No per node state means no callback
veilid_log!(self warn "incorred watch id for per node state in outbound watch: {:?} {} != {}", pnk, per_node_state.watch_id, watch_id);
return Ok(NetworkResult::value(()));
}
// Update per node state
if count > per_node_state.count {
// If count is greater than our requested count then this is invalid, cancel the watch
veilid_log!(self debug "watch count went backward: {}: {} > {}", record_key, count, per_node_state.count);
// Force count to zero for this node id so it gets cancelled out by the background process
per_node_state.count = 0;
return Ok(NetworkResult::value(()));
} else if count == 0 {
// If count is zero, the per-node watch is done and will get purged by the background process
veilid_log!(self debug "watch count finished: {}", record_key);
} else {
// Decrement the overall watch count and update the per-node watch count
veilid_log!(self debug
"watch count decremented: {}: {} < {}",
record_key,
count,
per_node_state.count
);
per_node_state.count = count;
}
}
// Null out default value
@ -452,7 +470,8 @@ impl StorageManager {
};
let last_get_result =
Self::handle_get_local_value_inner(&mut inner, key, first_subkey, true).await?;
Self::handle_get_local_value_inner(inner, record_key, first_subkey, true)
.await?;
let descriptor = last_get_result.opt_descriptor.unwrap();
let schema = descriptor.schema()?;
@ -481,8 +500,8 @@ impl StorageManager {
}
if is_value_seq_newer {
Self::handle_set_local_value_inner(
&mut inner,
key,
inner,
record_key,
first_subkey,
value.clone(),
InboundWatchUpdateMode::NoUpdate,
@ -491,22 +510,36 @@ impl StorageManager {
}
}
(is_value_seq_newer, value)
// If we got an actual update, decrement the total remaining watch count
// Get the outbound watch
let outbound_watch = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
.unwrap();
let current = outbound_watch.current.as_mut().unwrap();
if is_value_seq_newer {
current.remaining_count -= 1;
}
(is_value_seq_newer, value, current.remaining_count)
};
// Announce ValueChanged VeilidUpdate
// * if the value in the update had a newer sequence number
// * if more than a single subkeys has changed
// * if more than a single subkey has changed
// * if the count was zero meaning cancelled
let do_update = is_value_seq_newer || subkeys.len() > 1 || count == 0;
let do_update = is_value_seq_newer || subkeys.len() > 1 || remaining_count == 0;
if do_update {
let value = if is_value_seq_newer {
Some(value.unwrap().value_data().clone())
} else {
None
};
self.update_callback_value_change(key, subkeys, count, value);
self.update_callback_value_change(record_key, subkeys, remaining_count, value);
}
Ok(NetworkResult::value(()))

View File

@ -395,7 +395,7 @@ impl ClientApi {
// Request receive processor future
// Receives from socket and enqueues RequestLines
// Completes when the connection is closed or there is a failure
unord.push(Box::pin(self.clone().receive_requests(
unord.push(pin_dyn_future!(self.clone().receive_requests(
reader,
requests_tx,
responses_tx,
@ -404,10 +404,14 @@ impl ClientApi {
// Response send processor
// Sends finished response strings out the socket
// Completes when the responses channel is closed
unord.push(Box::pin(self.clone().send_responses(responses_rx, writer)));
unord.push(pin_dyn_future!(self
.clone()
.send_responses(responses_rx, writer)));
// Add future to process first request
unord.push(Box::pin(Self::next_request_line(requests_rx.clone())));
unord.push(pin_dyn_future!(Self::next_request_line(
requests_rx.clone()
)));
// Send and receive until we're done or a stop is requested
while let Ok(Some(r)) = unord.next().timeout_at(stop_token.clone()).await {
@ -415,7 +419,9 @@ impl ClientApi {
let request_line = match r {
Ok(Some(request_line)) => {
// Add future to process next request
unord.push(Box::pin(Self::next_request_line(requests_rx.clone())));
unord.push(pin_dyn_future!(Self::next_request_line(
requests_rx.clone()
)));
// Socket receive future returned something to process
request_line
@ -432,9 +438,9 @@ impl ClientApi {
};
// Enqueue unordered future to process request line in parallel
unord.push(Box::pin(
self.clone().process_request_line(jrp.clone(), request_line),
));
unord.push(pin_dyn_future!(self
.clone()
.process_request_line(jrp.clone(), request_line)));
}
// Stop sending updates