diff --git a/veilid-core/src/routing_table/node_ref/node_ref_lock.rs b/veilid-core/src/routing_table/node_ref/node_ref_lock.rs index f15df6e5..19cc3e09 100644 --- a/veilid-core/src/routing_table/node_ref/node_ref_lock.rs +++ b/veilid-core/src/routing_table/node_ref/node_ref_lock.rs @@ -39,6 +39,7 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp } } + #[expect(dead_code)] pub fn unlocked(&self) -> N { self.nr.clone() } diff --git a/veilid-core/src/rpc_processor/fanout/fanout_call.rs b/veilid-core/src/rpc_processor/fanout/fanout_call.rs index 9b50163b..b242d5e0 100644 --- a/veilid-core/src/rpc_processor/fanout/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout/fanout_call.rs @@ -7,8 +7,9 @@ struct FanoutContext<'a> { done: bool, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Default)] pub enum FanoutResultKind { + #[default] Incomplete, Timeout, Consensus, @@ -19,11 +20,6 @@ impl FanoutResultKind { matches!(self, Self::Incomplete) } } -impl Default for FanoutResultKind { - fn default() -> Self { - return FanoutResultKind::Incomplete; - } -} #[derive(Clone, Debug, Default)] pub struct FanoutResult { @@ -433,9 +429,7 @@ impl<'a> FanoutCall<'a> { // Initialize closest nodes list { let context_locked = &mut *context.lock(); - if let Err(e) = self.init_closest_nodes(context_locked) { - return Err(e); - } + self.init_closest_nodes(context_locked)?; // Ensure we include the most recent nodes context_locked.fanout_queue.add(&init_fanout_queue); diff --git a/veilid-core/src/rpc_processor/fanout/fanout_queue.rs b/veilid-core/src/rpc_processor/fanout/fanout_queue.rs index 43bf8739..ee5628fd 100644 --- a/veilid-core/src/rpc_processor/fanout/fanout_queue.rs +++ b/veilid-core/src/rpc_processor/fanout/fanout_queue.rs @@ -42,7 +42,7 @@ pub struct FanoutQueue<'a> { receiver: flume::Receiver>, } -impl<'a> fmt::Debug for FanoutQueue<'a> { +impl fmt::Debug for FanoutQueue<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FanoutQueue") .field("crypto_kind", &self.crypto_kind) @@ -119,11 +119,11 @@ impl<'a> FanoutQueue<'a> { // Get the next work and send it along for x in &mut self.sorted_nodes { // If there are no work receivers left then we should stop trying to send - if self.receiver.len() == 0 { + if self.receiver.is_empty() { break; } - let node = self.nodes.get_mut(&x).unwrap(); + let node = self.nodes.get_mut(x).unwrap(); if matches!(node.status, FanoutNodeStatus::Queued) { // Send node to a work request while let Ok(work_sender) = self.receiver.try_recv() { diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index 567105b8..62a344ca 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -280,7 +280,7 @@ impl RPCProcessor { // See if we have this record ourselves, if so, accept the watch let storage_manager = self.storage_manager(); let watch_result = network_result_try!(storage_manager - .inbound_watch_value(key, params, watch_id,) + .inbound_watch_value(key, params, watch_id) .await .map_err(RPCError::internal)?); diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index b62a41f6..31d221e6 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -211,7 +211,7 @@ impl StorageManager { match fanout_result.kind { FanoutResultKind::Incomplete => { // Send partial update if desired, if we've gotten at least one consensus node - if ctx.send_partial_update && fanout_result.consensus_nodes.len() >= 1 { + if ctx.send_partial_update && !fanout_result.consensus_nodes.is_empty() { ctx.send_partial_update = false; // Return partial result diff --git a/veilid-core/src/storage_manager/outbound_watch.rs b/veilid-core/src/storage_manager/outbound_watch.rs index a03f3ae8..3ff930a9 100644 --- a/veilid-core/src/storage_manager/outbound_watch.rs +++ b/veilid-core/src/storage_manager/outbound_watch.rs @@ -268,8 +268,7 @@ impl OutboundWatchState { pub fn get_min_expiration(&self, record_key: TypedKey) -> Option { self.outbound_watches .get(&record_key) - .map(|x| x.current.as_ref().map(|y| y.min_expiration_ts)) - .flatten() + .and_then(|x| x.current.as_ref().map(|y| y.min_expiration_ts)) } } @@ -330,7 +329,7 @@ impl StorageManager { let mut dead_pnks = BTreeSet::new(); for pnk in ¤t.nodes { let Some(per_node_state) = - inner.outbound_watch_state.per_node_state.get(&pnk).cloned() + inner.outbound_watch_state.per_node_state.get(pnk).cloned() else { veilid_log!(self warn "missing per-node state for watch"); dead_pnks.insert(*pnk); @@ -346,10 +345,11 @@ impl StorageManager { // Now reach out to each node and cancel their watch ids let mut unord = FuturesUnordered::new(); for (pnk, pns) in per_node_states { + let watch_lock = watch_lock.clone(); unord.push(async move { let res = self .outbound_watch_value_cancel( - pnk.record_key, + watch_lock, pns.opt_watcher, pns.safety_selection, pns.watch_node_ref.unwrap(), @@ -442,7 +442,7 @@ impl StorageManager { let mut dead_pnks = BTreeSet::new(); for pnk in ¤t.nodes { let Some(per_node_state) = - inner.outbound_watch_state.per_node_state.get(&pnk).cloned() + inner.outbound_watch_state.per_node_state.get(pnk).cloned() else { veilid_log!(self warn "missing per-node state for watch"); dead_pnks.insert(*pnk); @@ -461,41 +461,25 @@ impl StorageManager { // Now reach out to each node and renew their watches let mut unord = FuturesUnordered::new(); - for (pnk, pns) in per_node_states { + for (_pnk, pns) in per_node_states { let params = renew_params.clone(); + let watch_lock = watch_lock.clone(); unord.push(async move { - let res = self - .outbound_watch_value_change( - pnk.record_key, - params, - pns.safety_selection, - pns.watch_node_ref.unwrap(), - pns.watch_id, - ) - .await; - (pnk, res) + self.outbound_watch_value_change( + watch_lock, + params, + pns.watch_node_ref.unwrap(), + pns.watch_id, + ) + .await }); } - - let mut renewed = vec![]; - let mut rejected = vec![]; - let mut unanswered = vec![]; - while let Some((pnk, res)) = unord.next().await { + let mut owvresults = vec![]; + while let Some(res) = unord.next().await { match res { - Ok(Some(r)) => { - // Note per node states we should keep vs throw away - renewed.push((pnk, r)); - } - Ok(None) => { - rejected.push(pnk); - } + Ok(r) => owvresults.push(r), Err(e) => { veilid_log!(self debug "outbound watch change error: {}", e); - // Leave in the 'per node states' for now because we couldn't contact the node - // but remove from this watch. - - // xxx should do something different for network unreachable vs host unreachable - unanswered.push(pnk); } } } @@ -503,55 +487,9 @@ impl StorageManager { // Update state { let inner = &mut *self.inner.lock().await; - let Some(outbound_watch) = inner - .outbound_watch_state - .outbound_watches - .get_mut(&record_key) - else { - veilid_log!(self warn "watch being renewed should have still been in the table"); - return; - }; - let Some(current) = &mut outbound_watch.current else { - veilid_log!(self warn "watch being renewed should have current state"); - return; - }; - - let mut dead_pnks = BTreeSet::new(); - - // Perform renewals - for (pnk, r) in renewed { - let watch_node = r.watch_nodes.first().cloned().unwrap(); - let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk) - else { - veilid_log!(self warn "missing per-node state for watch"); - dead_pnks.insert(pnk); - continue; - }; - per_node_state.count = renew_params.count; - per_node_state.expiration_ts = watch_node.expiration_ts; - per_node_state.watch_id = watch_node.watch_id; + for owvresult in owvresults { + self.process_outbound_watch_value_result_inner(inner, record_key, owvresult); } - // Eliminate rejected - for pnk in rejected { - if inner - .outbound_watch_state - .per_node_state - .remove(&pnk) - .is_none() - { - veilid_log!(self warn "per-node watch being renewed should have still been in the table"); - } - dead_pnks.insert(pnk); - } - // Drop unanswered but leave in per node state - for pnk in unanswered { - dead_pnks.insert(pnk); - } - - current.nodes.retain(|x| !dead_pnks.contains(x)); - - // Update outbound watch - current.update(&inner.outbound_watch_state.per_node_state); } } @@ -570,7 +508,7 @@ impl StorageManager { // Get the nodes already active on this watch, // and the parameters to fanout with for the rest - let (active_nodes, reconcile_params) = { + let (per_node_state, reconcile_params) = { let inner = &mut *self.inner.lock().await; let Some(outbound_watch) = inner .outbound_watch_state @@ -587,32 +525,68 @@ impl StorageManager { return; }; - let active_nodes = if let Some(current) = &mut outbound_watch.current { + // Get active per node states + let mut per_node_state = if let Some(current) = &mut outbound_watch.current { // Assert matching parameters if ¤t.params != desired { veilid_log!(self warn "watch being reconciled should have had matching current and desired parameters"); return; } - current.nodes.iter().map(|x| x.node_id).collect() + current + .nodes + .iter() + .map(|pnk| { + ( + *pnk, + inner + .outbound_watch_state + .per_node_state + .get(pnk) + .cloned() + .unwrap(), + ) + }) + .collect() } else { - vec![] + HashMap::new() }; + // Add in any inactive per node states + for (pnk, pns) in &inner.outbound_watch_state.per_node_state { + // Skip any we have already + if per_node_state.contains_key(pnk) { + continue; + } + // Add inactive per node state if the record key matches + if pnk.record_key == record_key { + per_node_state.insert(*pnk, pns.clone()); + } + } + let reconcile_params = desired.clone(); - (active_nodes, reconcile_params) + (per_node_state, reconcile_params) }; // Now fan out with parameters and get new per node watches let cur_ts = Timestamp::now(); - let res = self - .outbound_watch_value(record_key, reconcile_params, active_nodes) + .outbound_watch_value(watch_lock.clone(), reconcile_params, per_node_state) .await; - // Regardless of result, set our next possible reconciliation time { let inner = &mut *self.inner.lock().await; + match res { + Ok(owvresult) => { + // Update state + self.process_outbound_watch_value_result_inner(inner, record_key, owvresult); + } + Err(e) => { + veilid_log!(self debug "outbound watch fanout error: {}", e); + } + } + + // Regardless of result, set our next possible reconciliation time if let Some(outbound_watch) = inner .outbound_watch_state .outbound_watches @@ -623,20 +597,6 @@ impl StorageManager { outbound_watch.set_next_reconcile_ts(next_ts); } } - - match res { - Ok(v) => { - // - } - Err(e) => { - veilid_log!(self debug "outbound watch fanout error: {}", e); - // ??? Leave in the 'per node states' for now because we couldn't contact the node - // but remove from this watch. - - // xxx should do something different for network unreachable vs host unreachable - //unanswered.push(pnk); - } - } } fn process_outbound_watch_value_result_inner( @@ -659,53 +619,70 @@ impl StorageManager { }; let current = { if outbound_watch.current.is_none() { - outbound_watch.current = Some(OutboundWatchCurrent { - params: desired.clone(), - nodes: vec![], - min_expiration_ts: desired.expiration_ts, - remaining_count: desired.count, - opt_next_reconcile_ts: None, - }); + outbound_watch.current = Some(OutboundWatchCurrent::new(desired.clone())); } outbound_watch.current.as_mut().unwrap() }; - // let mut dead_pnks = BTreeSet::new(); + let mut dead_pnks = BTreeSet::new(); - // // Perform renewals - // for (pnk, r) in renewed { - // let watch_node = r.watch_nodes.first().cloned().unwrap(); - // let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk) - // else { - // veilid_log!(self warn "missing per-node state for watch"); - // dead_pnks.insert(pnk); - // continue; - // }; - // per_node_state.count = renew_params.count; - // per_node_state.expiration_ts = watch_node.expiration_ts; - // per_node_state.watch_id = watch_node.watch_id; - // } - // // Eliminate rejected - // for pnk in rejected { - // if inner - // .outbound_watch_state - // .per_node_state - // .remove(&pnk) - // .is_none() - // { - // veilid_log!(self warn "per-node watch being renewed should have still been in the table"); - // } - // dead_pnks.insert(pnk); - // } - // // Drop unanswered but leave in per node state - // for pnk in unanswered { - // dead_pnks.insert(pnk); - // } + // Handle accepted + for accepted_watch in owvresult.accepted { + let node_id = accepted_watch + .node_ref + .node_ids() + .get(record_key.kind) + .unwrap(); + let pnk = PerNodeKey { + record_key, + node_id, + }; - // current.nodes.retain(|x| !dead_pnks.contains(x)); + let watch_id = accepted_watch.watch_id; + let opt_watcher = desired.opt_watcher; + let safety_selection = desired.safety_selection; + let expiration_ts = accepted_watch.expiration_ts; + let count = current.remaining_count; + let watch_node_ref = Some(accepted_watch.node_ref); + let opt_value_changed_route = accepted_watch.opt_value_changed_route; - // // Update outbound watch - // current.update(&inner.outbound_watch_state.per_node_state); + inner.outbound_watch_state.per_node_state.insert( + pnk, + PerNodeState { + watch_id, + safety_selection, + opt_watcher, + expiration_ts, + count, + watch_node_ref, + opt_value_changed_route, + }, + ); + } + // Eliminate rejected + for rejected_node_ref in owvresult.rejected { + let node_id = rejected_node_ref.node_ids().get(record_key.kind).unwrap(); + let pnk = PerNodeKey { + record_key, + node_id, + }; + inner.outbound_watch_state.per_node_state.remove(&pnk); + dead_pnks.insert(pnk); + } + // Drop unanswered but leave in per node state + for ignored_node_ref in owvresult.ignored { + let node_id = ignored_node_ref.node_ids().get(record_key.kind).unwrap(); + let pnk = PerNodeKey { + record_key, + node_id, + }; + dead_pnks.insert(pnk); + } + + current.nodes.retain(|x| !dead_pnks.contains(x)); + + // Update outbound watch + current.update(&inner.outbound_watch_state.per_node_state); } /// Get the next operation for a particular watch's state machine @@ -725,11 +702,8 @@ impl StorageManager { // Check states if outbound_watch.is_dead() { // Outbound watch is dead - let Some(watch_lock) = - opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key)) - else { - return None; - }; + let watch_lock = + opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?; let fut = { let registry = self.registry(); @@ -743,11 +717,8 @@ impl StorageManager { return Some(pin_dyn_future!(fut)); } else if outbound_watch.needs_cancel(®istry, cur_ts) { // Outbound watch needs to be cancelled - let Some(watch_lock) = - opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key)) - else { - return None; - }; + let watch_lock = + opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?; let fut = { let registry = self.registry(); @@ -761,11 +732,8 @@ impl StorageManager { return Some(pin_dyn_future!(fut)); } else if outbound_watch.needs_renew(®istry, cur_ts) { // Outbound watch expired but can be renewed - let Some(watch_lock) = - opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key)) - else { - return None; - }; + let watch_lock = + opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?; let fut = { let registry = self.registry(); @@ -779,11 +747,8 @@ impl StorageManager { return Some(pin_dyn_future!(fut)); } else if outbound_watch.needs_reconcile(®istry, consensus_count, cur_ts) { // Outbound watch parameters have changed or it needs more nodes - let Some(watch_lock) = - opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key)) - else { - return None; - }; + let watch_lock = + opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?; let fut = { let registry = self.registry(); diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index fb3df7b7..d2e3e8b6 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -191,7 +191,7 @@ impl StorageManager { match fanout_result.kind { FanoutResultKind::Incomplete => { // Send partial update if desired, if we've gotten at least consensus node - if ctx.send_partial_update && fanout_result.consensus_nodes.len() >= 1 { + if ctx.send_partial_update && !fanout_result.consensus_nodes.is_empty() { ctx.send_partial_update = false; // Return partial result diff --git a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs index acb7fe97..faf460a2 100644 --- a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs @@ -20,7 +20,7 @@ impl StorageManager { dead_pnks.insert(*pnk); } } - for (_, v) in &inner.outbound_watch_state.outbound_watches { + for v in inner.outbound_watch_state.outbound_watches.values() { // If it's still referenced, keep it let Some(current) = &v.current else { continue; diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index efbe482e..b59883e7 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -1,12 +1,12 @@ use super::*; -use futures_util::StreamExt as _; impl_veilid_log_facility!("stor"); /// The context of the outbound_watch_value operation +#[derive(Debug, Default)] struct OutboundWatchValueContext { /// A successful watch - pub opt_watch_value_result: Option, + pub watch_value_result: OutboundWatchValueResult, } /// The record of a node accepting a watch @@ -22,11 +22,11 @@ pub(super) struct AcceptedWatch { } /// The result of the outbound_watch_value operation -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub(super) struct OutboundWatchValueResult { /// Which nodes accepted the watch pub accepted: Vec, - /// Which nodes rejected the watch + /// Which nodes rejected or cancelled the watch pub rejected: Vec, /// Which nodes ignored the watch pub ignored: Vec, @@ -37,25 +37,31 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_watch_value_cancel( &self, - key: TypedKey, + watch_lock: AsyncTagLockGuard, opt_watcher: Option, safety_selection: SafetySelection, watch_node: NodeRef, watch_id: u64, ) -> VeilidAPIResult { + let record_key = watch_lock.tag(); + let routing_domain = RoutingDomain::PublicInternet; // Get the appropriate watcher key, if anonymous use a static anonymous watch key // which lives for the duration of the app's runtime - let watcher = - opt_watcher.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value); + let watcher = opt_watcher.unwrap_or_else(|| { + self.anonymous_watch_keys + .get(record_key.kind) + .unwrap() + .value + }); let wva = VeilidAPIError::from_network_result( self.rpc_processor() .rpc_call_watch_value( Destination::direct(watch_node.routing_domain_filtered(routing_domain)) .with_safety(safety_selection), - key, + record_key, ValueSubkeyRangeSet::new(), Timestamp::default(), 0, @@ -79,12 +85,12 @@ impl StorageManager { #[instrument(target = "dht", level = "debug", skip_all, err)] pub(super) async fn outbound_watch_value_change( &self, - key: TypedKey, + watch_lock: AsyncTagLockGuard, params: OutboundWatchParameters, - safety_selection: SafetySelection, watch_node: NodeRef, watch_id: u64, ) -> VeilidAPIResult { + let record_key = watch_lock.tag(); let routing_domain = RoutingDomain::PublicInternet; if params.count == 0 { @@ -93,15 +99,18 @@ impl StorageManager { // Get the appropriate watcher key, if anonymous use a static anonymous watch key // which lives for the duration of the app's runtime - let watcher = params - .opt_watcher - .unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value); + let watcher = params.opt_watcher.unwrap_or_else(|| { + self.anonymous_watch_keys + .get(record_key.kind) + .unwrap() + .value + }); let wva = VeilidAPIError::from_network_result( pin_future!(self.rpc_processor().rpc_call_watch_value( Destination::direct(watch_node.routing_domain_filtered(routing_domain)) - .with_safety(safety_selection), - key, + .with_safety(params.safety_selection), + record_key, params.subkeys, params.expiration_ts, params.count, @@ -144,10 +153,11 @@ impl StorageManager { //#[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_watch_value( &self, - key: TypedKey, + watch_lock: AsyncTagLockGuard, params: OutboundWatchParameters, - active_nodes: Vec, + per_node_state: HashMap, ) -> VeilidAPIResult { + let record_key = watch_lock.tag(); let routing_domain = RoutingDomain::PublicInternet; // Get the DHT parameters for 'WatchValue', some of which are the same for 'GetValue' operations @@ -162,13 +172,16 @@ impl StorageManager { // Get the appropriate watcher key, if anonymous use a static anonymous watch key // which lives for the duration of the app's runtime - let watcher = params - .opt_watcher - .unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value); + let watcher = params.opt_watcher.unwrap_or_else(|| { + self.anonymous_watch_keys + .get(record_key.kind) + .unwrap() + .value + }); // Get the nodes we know are caching this value to seed the fanout let init_fanout_queue = { - self.get_value_nodes(key) + self.get_value_nodes(record_key) .await? .unwrap_or_default() .into_iter() @@ -181,76 +194,105 @@ impl StorageManager { }; // Make do-watch-value answer context - let context = Arc::new(Mutex::new(OutboundWatchValueContext { - opt_watch_value_result: None, - })); + let context = Arc::new(Mutex::new(OutboundWatchValueContext::default())); // Routine to call to generate fanout let call_routine = { let context = context.clone(); let registry = self.registry(); + let params = params.clone(); Arc::new( move |next_node: NodeRef| -> PinBoxFutureStatic { let context = context.clone(); let registry = registry.clone(); + let params = params.clone(); - let subkeys = subkeys.clone(); + // See if we have an existing watch id for this node + let node_id = next_node.node_ids().get(record_key.kind).unwrap(); + let pnk = PerNodeKey { + record_key, + node_id, + }; + let watch_id = per_node_state.get(&pnk).map(|pns| pns.watch_id); Box::pin(async move { - let rpc_processor = registry.rpc_processor(); - let wva = network_result_try!( - rpc_processor - .rpc_call_watch_value( - Destination::direct(next_node.routing_domain_filtered(routing_domain)).with_safety(safety_selection), - key, - subkeys, - expiration, - count, - watcher, - None - ) - .await? - ); + let rpc_processor = registry.rpc_processor(); - // Keep answer if we got one - // (accepted means the node could provide an answer, not that the watch is active) - if wva.answer.accepted { - let mut done = false; - if wva.answer.expiration_ts.as_u64() > 0 { - // If the expiration time is greater than zero this watch is active - veilid_log!(registry debug "Watch created: id={} expiration_ts={} ({})", wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node); - done = true; + let wva = match + rpc_processor + .rpc_call_watch_value( + Destination::direct(next_node.routing_domain_filtered(routing_domain)).with_safety(params.safety_selection), + record_key, + params.subkeys, + params.expiration_ts, + params.count, + watcher, + watch_id + ) + .await? { + NetworkResult::Timeout => { + return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Timeout}); + } + NetworkResult::ServiceUnavailable(_) | + NetworkResult::NoConnection(_) | + NetworkResult::AlreadyExists(_) | + NetworkResult::InvalidMessage(_) => { + return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid}); + } + NetworkResult::Value(v) => v + }; + + // Keep answer if we got one + // (accepted means the node could provide an answer, not that the watch is active) + let disposition = if wva.answer.accepted { + if wva.answer.expiration_ts.as_u64() > 0 { + // If the expiration time is greater than zero this watch is active + veilid_log!(registry debug "WatchValue accepted: id={} expiration_ts={} ({})", wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node); + + // Add to accepted watches + let mut ctx = context.lock(); + ctx.watch_value_result.accepted.push(AcceptedWatch{ + watch_id: wva.answer.watch_id, + node_ref: next_node.clone(), + expiration_ts: wva.answer.expiration_ts, + opt_value_changed_route: wva.reply_private_route, + }); + + FanoutCallDisposition::Accepted + } else { + // If the returned expiration time is zero, this watch was cancelled + + // If the expiration time is greater than zero this watch is active + veilid_log!(registry debug "WatchValue rejected: id={} expiration_ts={} ({})", wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node); + + // Add to rejected watches + let mut ctx = context.lock(); + ctx.watch_value_result.rejected.push(next_node.clone()); + + // Treat as accepted but do not add to consensus + FanoutCallDisposition::Stale + } } else { - // If the returned expiration time is zero, this watch was cancelled or rejected - // If we are asking to cancel then check_done will stop after the first node - } - if done { + // Add to rejected watches let mut ctx = context.lock(); - ctx.opt_watch_value_result = Some(OutboundWatchValueResult { - expiration_ts: wva.answer.expiration_ts, - watch_id: wva.answer.watch_id, - watch_node: next_node.clone(), - opt_value_changed_route: wva.reply_private_route, - }); - } - } + ctx.watch_value_result.rejected.push(next_node.clone()); - // Return peers if we have some - veilid_log!(registry debug target:"network_result", "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node); + // Treat as rejected and do not add to consensus + FanoutCallDisposition::Rejected + }; - Ok(NetworkResult::value(FanoutCallOutput{peer_info_list: wva.answer.peers})) - }.instrument(tracing::trace_span!("outbound_watch_value call routine"))) as PinBoxFuture + // Return peers if we have some + veilid_log!(registry debug target:"network_result", "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node); + + Ok(FanoutCallOutput{peer_info_list: wva.answer.peers, disposition}) + }.instrument(tracing::trace_span!("outbound_watch_value call routine"))) as PinBoxFuture }, ) }; // Routine to call to check if we're done at each step let check_done = { - // let context = context.clone(); - // let registry = self.registry(); Arc::new(move |fanout_result: &FanoutResult| -> bool { - // let mut ctx = context.lock(); - match fanout_result.kind { FanoutResultKind::Incomplete => { // Keep going @@ -261,10 +303,6 @@ impl StorageManager { true } FanoutResultKind::Consensus => { - // assert!( - // ctx.value.is_some() && ctx.descriptor.is_some(), - // "should have gotten a value if we got consensus" - // ); // Signal we're done true } @@ -278,7 +316,7 @@ impl StorageManager { let routing_table = self.routing_table(); let fanout_call = FanoutCall::new( &routing_table, - key, + record_key, key_count, fanout, consensus_count, @@ -294,52 +332,8 @@ impl StorageManager { })?; veilid_log!(self debug "WatchValue Fanout: {:?}", fanout_result); - - Ok(Some(OutboundWatchValueResult { - watch_nodes: todo!(), - opt_value_changed_route: todo!(), - })) - // match fanout_call.run(init_fanout_queue).await { - // // If we don't finish in the timeout (too much time passed without a successful watch) - // TimeoutOr::Timeout => { - // // Return the best answer we've got - // let ctx = context.lock(); - // if ctx.opt_watch_value_result.is_some() { - // veilid_log!(self debug "WatchValue Fanout Timeout Success"); - // } else { - // veilid_log!(self debug "WatchValue Fanout Timeout Failure"); - // } - // Ok(ctx.opt_watch_value_result.clone()) - // } - // // If we finished with done - // TimeoutOr::Value(Ok(Some(()))) => { - // // Return the best answer we've got - // let ctx = context.lock(); - // if ctx.opt_watch_value_result.is_some() { - // veilid_log!(self debug "WatchValue Fanout Success"); - // } else { - // veilid_log!(self debug "WatchValue Fanout Failure"); - // } - // Ok(ctx.opt_watch_value_result.clone()) - // } - // // If we ran out of nodes - // TimeoutOr::Value(Ok(None)) => { - // // Return the best answer we've got - // let ctx = context.lock(); - // if ctx.opt_watch_value_result.is_some() { - // veilid_log!(self debug "WatchValue Fanout Exhausted Success"); - // } else { - // veilid_log!(self debug "WatchValue Fanout Exhausted Failure"); - // } - // Ok(ctx.opt_watch_value_result.clone()) - // } - // // Failed - // TimeoutOr::Value(Err(e)) => { - // // If we finished with an error, return that - // veilid_log!(self debug "WatchValue Fanout Error: {}", e); - // Err(e.into()) - // } - // } + let owvresult = context.lock().watch_value_result.clone(); + Ok(owvresult) } /// Handle a received 'Watch Value' query @@ -388,57 +382,81 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all)] pub async fn inbound_value_changed( &self, - key: TypedKey, + record_key: TypedKey, subkeys: ValueSubkeyRangeSet, - mut count: u32, + count: u32, value: Option>, inbound_node_id: TypedKey, watch_id: u64, ) -> VeilidAPIResult> { - // xxx remember to update per_node_state with lower count + // Operate on the watch for this record + let _watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await; // Update local record store with new value - let (is_value_seq_newer, value) = { - let mut inner = self.inner.lock().await; + let (is_value_seq_newer, value, remaining_count) = { + let inner = &mut *self.inner.lock().await; - // Don't process update if the record is closed - let Some(opened_record) = inner.opened_records.get_mut(&key) else { - return Ok(NetworkResult::value(())); - }; - - // No active watch means no callback - let Some(mut active_watch) = opened_record.active_watch_by_id(watch_id) else { - return Ok(NetworkResult::value(())); - }; - - // If the reporting node is not the same as our watch, don't process the value change - if !active_watch - .watch_node - .node_ids() - .contains(&inbound_node_id) { - return Ok(NetworkResult::value(())); - } + // Get the outbound watch + let Some(outbound_watch) = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + else { + // No outbound watch means no callback + return Ok(NetworkResult::value(())); + }; - if count > active_watch.count { - // If count is greater than our requested count then this is invalid, cancel the watch - veilid_log!(self debug "watch count went backward: {}: {}/{}", key, count, active_watch.count); - // Force count to zero - count = 0; - opened_record.remove_active_watch(watch_id); - } else if count == 0 { - // If count is zero, we're done, cancel the watch and the app can renew it if it wants - veilid_log!(self debug "watch count finished: {}", key); - opened_record.clear_active_watches(); - } else { - veilid_log!(self debug - "watch count decremented: {}: {}/{}", - key, - count, - active_watch.count - ); - active_watch.count = count; - opened_record.set_outbound_watch(active_watch); + let Some(current) = &mut outbound_watch.current else { + // No outbound watch current state means no callback + return Ok(NetworkResult::value(())); + }; + + // If the reporting node is not part of our current watch, don't process the value change + let pnk = PerNodeKey { + record_key, + node_id: inbound_node_id, + }; + if !current.nodes.contains(&pnk) { + return Ok(NetworkResult::value(())); + } + + // Get per node state + let Some(per_node_state) = inner.outbound_watch_state.per_node_state.get_mut(&pnk) + else { + // No per node state means no callback + veilid_log!(self warn "missing per node state in outbound watch: {:?}", pnk); + return Ok(NetworkResult::value(())); + }; + + // If watch id doesn't match it's for an older watch and should be ignored + if per_node_state.watch_id != watch_id { + // No per node state means no callback + veilid_log!(self warn "incorred watch id for per node state in outbound watch: {:?} {} != {}", pnk, per_node_state.watch_id, watch_id); + return Ok(NetworkResult::value(())); + } + + // Update per node state + if count > per_node_state.count { + // If count is greater than our requested count then this is invalid, cancel the watch + veilid_log!(self debug "watch count went backward: {}: {} > {}", record_key, count, per_node_state.count); + + // Force count to zero for this node id so it gets cancelled out by the background process + per_node_state.count = 0; + return Ok(NetworkResult::value(())); + } else if count == 0 { + // If count is zero, the per-node watch is done and will get purged by the background process + veilid_log!(self debug "watch count finished: {}", record_key); + } else { + // Decrement the overall watch count and update the per-node watch count + veilid_log!(self debug + "watch count decremented: {}: {} < {}", + record_key, + count, + per_node_state.count + ); + per_node_state.count = count; + } } // Null out default value @@ -452,7 +470,8 @@ impl StorageManager { }; let last_get_result = - Self::handle_get_local_value_inner(&mut inner, key, first_subkey, true).await?; + Self::handle_get_local_value_inner(inner, record_key, first_subkey, true) + .await?; let descriptor = last_get_result.opt_descriptor.unwrap(); let schema = descriptor.schema()?; @@ -481,8 +500,8 @@ impl StorageManager { } if is_value_seq_newer { Self::handle_set_local_value_inner( - &mut inner, - key, + inner, + record_key, first_subkey, value.clone(), InboundWatchUpdateMode::NoUpdate, @@ -491,22 +510,36 @@ impl StorageManager { } } - (is_value_seq_newer, value) + // If we got an actual update, decrement the total remaining watch count + // Get the outbound watch + let outbound_watch = inner + .outbound_watch_state + .outbound_watches + .get_mut(&record_key) + .unwrap(); + + let current = outbound_watch.current.as_mut().unwrap(); + + if is_value_seq_newer { + current.remaining_count -= 1; + } + + (is_value_seq_newer, value, current.remaining_count) }; // Announce ValueChanged VeilidUpdate // * if the value in the update had a newer sequence number - // * if more than a single subkeys has changed + // * if more than a single subkey has changed // * if the count was zero meaning cancelled - let do_update = is_value_seq_newer || subkeys.len() > 1 || count == 0; + let do_update = is_value_seq_newer || subkeys.len() > 1 || remaining_count == 0; if do_update { let value = if is_value_seq_newer { Some(value.unwrap().value_data().clone()) } else { None }; - self.update_callback_value_change(key, subkeys, count, value); + self.update_callback_value_change(record_key, subkeys, remaining_count, value); } Ok(NetworkResult::value(())) diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index f8d230a1..fb7f0eb1 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -395,7 +395,7 @@ impl ClientApi { // Request receive processor future // Receives from socket and enqueues RequestLines // Completes when the connection is closed or there is a failure - unord.push(Box::pin(self.clone().receive_requests( + unord.push(pin_dyn_future!(self.clone().receive_requests( reader, requests_tx, responses_tx, @@ -404,10 +404,14 @@ impl ClientApi { // Response send processor // Sends finished response strings out the socket // Completes when the responses channel is closed - unord.push(Box::pin(self.clone().send_responses(responses_rx, writer))); + unord.push(pin_dyn_future!(self + .clone() + .send_responses(responses_rx, writer))); // Add future to process first request - unord.push(Box::pin(Self::next_request_line(requests_rx.clone()))); + unord.push(pin_dyn_future!(Self::next_request_line( + requests_rx.clone() + ))); // Send and receive until we're done or a stop is requested while let Ok(Some(r)) = unord.next().timeout_at(stop_token.clone()).await { @@ -415,7 +419,9 @@ impl ClientApi { let request_line = match r { Ok(Some(request_line)) => { // Add future to process next request - unord.push(Box::pin(Self::next_request_line(requests_rx.clone()))); + unord.push(pin_dyn_future!(Self::next_request_line( + requests_rx.clone() + ))); // Socket receive future returned something to process request_line @@ -432,9 +438,9 @@ impl ClientApi { }; // Enqueue unordered future to process request line in parallel - unord.push(Box::pin( - self.clone().process_request_line(jrp.clone(), request_line), - )); + unord.push(pin_dyn_future!(self + .clone() + .process_request_line(jrp.clone(), request_line))); } // Stop sending updates