diff --git a/veilid-core/src/rpc_processor/fanout/fanout_call.rs b/veilid-core/src/rpc_processor/fanout/fanout_call.rs index fd2d1763..9b50163b 100644 --- a/veilid-core/src/rpc_processor/fanout/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout/fanout_call.rs @@ -29,8 +29,9 @@ impl Default for FanoutResultKind { pub struct FanoutResult { /// How the fanout completed pub kind: FanoutResultKind, - /// How many nodes counted toward consensus - pub count: usize, + /// The set of nodes that counted toward consensus + /// (for example, had the most recent value for this subkey) + pub consensus_nodes: Vec, /// Which nodes accepted the request pub value_nodes: Vec, } @@ -38,7 +39,7 @@ pub struct FanoutResult { impl fmt::Display for FanoutResult { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let kc = match self.kind { - FanoutResultKind::Incomplete => "P", + FanoutResultKind::Incomplete => "I", FanoutResultKind::Timeout => "T", FanoutResultKind::Consensus => "C", FanoutResultKind::Exhausted => "E", @@ -48,15 +49,15 @@ impl fmt::Display for FanoutResult { f, "{}:{}[{}]", kc, - self.value_nodes.len(), - self.value_nodes + self.consensus_nodes.len(), + self.consensus_nodes .iter() .map(|x| x.to_string()) .collect::>() .join(","), ) } else { - write!(f, "{}:{}", kc, self.value_nodes.len()) + write!(f, "{}:{}", kc, self.consensus_nodes.len()) } } } @@ -194,8 +195,8 @@ impl<'a> FanoutCall<'a> { let fanout_result = ctx.fanout_queue.with_nodes(|nodes, sorted_nodes| { // Count up nodes we have seen in order and see if our closest nodes have a consensus let mut consensus: Option = None; + let mut consensus_nodes: Vec = vec![]; let mut value_nodes: Vec = vec![]; - let mut count: usize = 0; for sn in sorted_nodes { let node = nodes.get(sn).unwrap(); match node.status { @@ -216,8 +217,9 @@ impl<'a> FanoutCall<'a> { FanoutNodeStatus::Accepted => { // Node counts toward consensus and value node list value_nodes.push(node.node_ref.clone()); - count += 1; - if consensus.is_none() && count >= self.consensus_count { + + consensus_nodes.push(node.node_ref.clone()); + if consensus.is_none() && consensus_nodes.len() >= self.consensus_count { consensus = Some(true); } } @@ -228,17 +230,17 @@ impl<'a> FanoutCall<'a> { match consensus { Some(true) => FanoutResult { kind: FanoutResultKind::Consensus, - count, + consensus_nodes, value_nodes, }, Some(false) => FanoutResult { kind: FanoutResultKind::Incomplete, - count, + consensus_nodes, value_nodes, }, None => FanoutResult { kind: FanoutResultKind::Exhausted, - count, + consensus_nodes, value_nodes, }, } @@ -419,7 +421,7 @@ impl<'a> FanoutCall<'a> { fanout_queue: FanoutQueue::new(self.node_id.kind, node_sort), result: FanoutResult { kind: FanoutResultKind::Incomplete, - count: 0, + consensus_nodes: vec![], value_nodes: vec![], }, done: false, diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 4bf829c1..211682a9 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -210,9 +210,8 @@ impl StorageManager { match fanout_result.kind { FanoutResultKind::Incomplete => { - // Send partial update if desired, - // if we've gotten at least one value node before consensus - if ctx.send_partial_update && fanout_result.value_nodes.len() >= 1 { + // Send partial update if desired, if we've gotten at least one consensus node + if ctx.send_partial_update && fanout_result.consensus_nodes.len() >= 1 { ctx.send_partial_update = false; // Return partial result @@ -370,11 +369,18 @@ impl StorageManager { return Ok(None); }; + // Get cryptosystem + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(key.kind) else { + apibail_generic!("unsupported cryptosystem"); + }; + // Keep the list of nodes that returned a value for later reference let mut inner = self.inner.lock().await; Self::process_fanout_results_inner( &mut inner, + &vcrypto, key, core::iter::once((subkey, &result.fanout_result)), false, diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 17397760..0a2bcd5c 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -29,13 +29,10 @@ impl DescriptorInfo { struct SubkeySeqCount { /// The newest sequence number found for a subkey pub seq: ValueSeqNum, - /// The consensus count for the subkey - pub count: usize, - // Maybe add per-subkey value nodes someday? Is this even useful? - // /// The set of nodes that had the most recent value for this subkey - // pub value_nodes: Vec, - // /// The set of nodes that had any value for this subkey - // pub all_nodes: Vec, + /// The set of nodes that had the most recent value for this subkey + pub consensus_nodes: Vec, + /// The set of nodes that had any value for this subkey + pub value_nodes: Vec, } /// The context of the outbound_get_value operation @@ -49,7 +46,7 @@ struct OutboundInspectValueContext { /// The result of the outbound_get_value operation pub(super) struct OutboundInspectValueResult { /// Fanout results for each subkey - pub fanout_results: Vec, + pub subkey_fanout_results: Vec, /// The inspection that was retrieved pub inspect_result: InspectResult, } @@ -113,7 +110,11 @@ impl StorageManager { seqcounts: local_inspect_result .seqs .iter() - .map(|s| SubkeySeqCount { seq: *s, count: 0 }) + .map(|s| SubkeySeqCount { + seq: *s, + consensus_nodes: vec![], + value_nodes: vec![], + }) .collect(), opt_descriptor_info, })); @@ -209,7 +210,8 @@ impl StorageManager { .map(|s| SubkeySeqCount { seq: *s, // One node has shown us the newest sequence numbers so far - count: 1, + consensus_nodes: vec![next_node.clone()], + value_nodes: vec![next_node.clone()], }) .collect(); } else { @@ -227,7 +229,7 @@ impl StorageManager { // If we already have consensus for this subkey, don't bother updating it any more // While we may find a better sequence number if we keep looking, this does not mimic the behavior // of get and set unless we stop here - if ctx_seqcnt.count >= consensus_count { + if ctx_seqcnt.consensus_nodes.len() >= consensus_count { continue; } @@ -240,12 +242,13 @@ impl StorageManager { { // One node has shown us the latest sequence numbers so far ctx_seqcnt.seq = answer_seq; - ctx_seqcnt.count = 1; + ctx_seqcnt.consensus_nodes = vec![next_node.clone()]; } else if answer_seq == ctx_seqcnt.seq { // Keep the nodes that showed us the latest values - ctx_seqcnt.count += 1; + ctx_seqcnt.consensus_nodes.push(next_node.clone()); } } + ctx_seqcnt.value_nodes.push(next_node.clone()); } } @@ -261,15 +264,17 @@ impl StorageManager { }; // Routine to call to check if we're done at each step - + // For inspect, we are tracking consensus externally from the FanoutCall, + // for each subkey, rather than a single consensus, so the single fanoutresult + // that is passed in here is ignored in favor of our own per-subkey tracking let check_done = { let context = context.clone(); - Arc::new(move |fanout_result: &FanoutResult| { + Arc::new(move |_: &FanoutResult| { // If we have reached sufficient consensus on all subkeys, return done let ctx = context.lock(); let mut has_consensus = true; for cs in ctx.seqcounts.iter() { - if cs.count < consensus_count { + if cs.consensus_nodes.len() < consensus_count { has_consensus = false; break; } @@ -295,27 +300,26 @@ impl StorageManager { let fanout_result = fanout_call.run(init_fanout_queue).await?; -xxx continue applying array of count/value_nodes to inspect output, make sure it lines up with fanout behavior - let ctx = context.lock(); - let mut fanout_results = vec![]; + let mut subkey_fanout_results = vec![]; for cs in &ctx.seqcounts { - let has_consensus = cs.count >= consensus_count; - let fanout_result = FanoutResult { + let has_consensus = cs.consensus_nodes.len() >= consensus_count; + let subkey_fanout_result = FanoutResult { kind: if has_consensus { FanoutResultKind::Consensus } else { - kind + fanout_result.kind }, + consensus_nodes: cs.consensus_nodes.clone(), value_nodes: cs.value_nodes.clone(), }; - fanout_results.push(fanout_result); + subkey_fanout_results.push(subkey_fanout_result); } - veilid_log!(self debug "InspectValue Fanout ({:?}):\n{}", kind, debug_fanout_results(&fanout_results)); + veilid_log!(self debug "InspectValue Fanout ({:?}):\n{}", fanout_result.kind, debug_fanout_results(&subkey_fanout_results)); Ok(OutboundInspectValueResult { - fanout_results, + subkey_fanout_results, inspect_result: InspectResult { subkeys: ctx .opt_descriptor_info diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index d3ceb345..1e4c3d4d 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -364,19 +364,21 @@ impl StorageManager { /// Get the set of nodes in our active watches pub async fn get_active_watch_nodes(&self) -> Vec { let inner = self.inner.lock().await; - inner - .opened_records - .values() - .filter_map(|v| { - v.active_watch().map(|aw| { - Destination::direct( - aw.watch_node - .routing_domain_filtered(RoutingDomain::PublicInternet), - ) - .with_safety(v.safety_selection()) - }) - }) - .collect() + let mut out = vec![]; + for opened_record in inner.opened_records.values() { + if let Some(aw) = opened_record.active_watch() { + for pn in &aw.per_node { + out.push( + Destination::direct( + pn.watch_node + .routing_domain_filtered(RoutingDomain::PublicInternet), + ) + .with_safety(opened_record.safety_selection()), + ); + } + } + } + out } /// Builds the record key for a given schema and owner @@ -857,16 +859,17 @@ impl StorageManager { let inner = self.inner.lock().await; // Get the safety selection and the writer we opened this record - // and whatever active watch id and watch node we may have in case this is a watch update - let (safety_selection, opt_writer, opt_watch_id, opt_watch_node) = { + // and whatever active watch we may have in case this is a watch update + let (safety_selection, opt_writer, opt_watch_nodes) = { let Some(opened_record) = inner.opened_records.get(&key) else { apibail_generic!("record not open"); }; ( opened_record.safety_selection(), opened_record.writer().cloned(), - opened_record.active_watch().map(|aw| aw.id), - opened_record.active_watch().map(|aw| aw.watch_node.clone()), + opened_record + .active_watch() + .map(|aw| aw.watch_nodes.clone()), ) }; @@ -1038,6 +1041,12 @@ impl StorageManager { subkeys }; + // Get cryptosystem + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(key.kind) else { + apibail_generic!("unsupported cryptosystem"); + }; + let mut inner = self.inner.lock().await; let safety_selection = { let Some(opened_record) = inner.opened_records.get(&key) else { @@ -1122,7 +1131,7 @@ impl StorageManager { { assert_eq!( result.inspect_result.subkeys.len() as u64, - result.fanout_results.len() as u64, + result.subkey_fanout_results.len() as u64, "mismatch between subkeys returned and fanout results returned" ); } @@ -1140,10 +1149,11 @@ impl StorageManager { .inspect_result .subkeys .iter() - .zip(result.fanout_results.iter()); + .zip(result.subkey_fanout_results.iter()); Self::process_fanout_results_inner( &mut inner, + &vcrypto, key, results_iter, false, @@ -1215,7 +1225,7 @@ impl StorageManager { let get_consensus = self .config() .with(|c| c.network.dht.get_value_count as usize); - let value_node_count = fanout_result.value_nodes.len(); + let value_node_count = fanout_result.consensus_nodes.len(); if value_node_count < get_consensus { veilid_log!(self debug "timeout with insufficient consensus ({}<{}), adding offline subkey: {}:{}", value_node_count, get_consensus, @@ -1232,7 +1242,7 @@ impl StorageManager { let get_consensus = self .config() .with(|c| c.network.dht.get_value_count as usize); - let value_node_count = fanout_result.value_nodes.len(); + let value_node_count = fanout_result.consensus_nodes.len(); if value_node_count < get_consensus { veilid_log!(self debug "exhausted with insufficient consensus ({}<{}), adding offline subkey: {}:{}", value_node_count, get_consensus, @@ -1543,6 +1553,7 @@ impl StorageManager { I: IntoIterator, >( inner: &mut StorageManagerInner, + vcrypto: &CryptoSystemGuard<'_>, key: TypedKey, subkey_results_iter: I, is_set: bool, @@ -1576,7 +1587,17 @@ impl StorageManager { .iter() .map(|kv| (*kv.0, kv.1.last_seen)) .collect::>(); - nodes_ts.sort_by(|a, b| b.1.cmp(&a.1)); + nodes_ts.sort_by(|a, b| { + // Timestamp is first metric + let res = b.1.cmp(&a.1); + if res != cmp::Ordering::Equal { + return res; + } + // Distance is the next metric, closer nodes first + let da = vcrypto.distance(&a.0, &key.value); + let db = vcrypto.distance(&b.0, &key.value); + da.cmp(&db) + }); for dead_node_key in nodes_ts.iter().skip(consensus_count) { d.nodes.remove(&dead_node_key.0); diff --git a/veilid-core/src/storage_manager/record_store/opened_record.rs b/veilid-core/src/storage_manager/record_store/opened_record.rs index acde0a08..7d235e42 100644 --- a/veilid-core/src/storage_manager/record_store/opened_record.rs +++ b/veilid-core/src/storage_manager/record_store/opened_record.rs @@ -1,19 +1,40 @@ use super::*; #[derive(Clone, Debug)] -pub(in crate::storage_manager) struct ActiveWatch { +pub(in crate::storage_manager) struct PerNodeActiveWatch { /// The watch id returned from the watch node pub id: u64, /// The expiration of a successful watch pub expiration_ts: Timestamp, /// Which node accepted the watch pub watch_node: NodeRef, + /// How many value change notifications are left + pub count: u32, +} + +/// Requested parameters for watch +#[derive(Clone, Debug)] +pub(in crate::storage_manager) struct ActiveWatchParameters { + /// Requested expiration timestamp + pub expiration_ts: Timestamp, + /// How many notifications the requestor asked for + pub count: u32, + /// Subkeys requested for this watch + pub subkeys: ValueSubkeyRangeSet, +} + +#[derive(Clone, Debug)] +pub(in crate::storage_manager) struct ActiveWatch { + /// Requested parameters + pub params: ActiveWatchParameters, + /// Active watches per node + pub per_node: Vec, + /// Minimum expiration time for all our nodes + pub min_expiration_ts: Timestamp, + /// How many value change updates remain + pub remaining_count: u32, /// Which private route is responsible for receiving ValueChanged notifications pub opt_value_changed_route: Option, - /// Which subkeys we are watching - pub subkeys: ValueSubkeyRangeSet, - /// How many notifications are left - pub count: u32, } /// The state associated with a local record when it is opened @@ -28,7 +49,7 @@ pub(in crate::storage_manager) struct OpenedRecord { /// The safety selection in current use safety_selection: SafetySelection, - /// Active watch we have on this record + /// Active watches we have on this record active_watch: Option, } @@ -55,15 +76,76 @@ impl OpenedRecord { self.safety_selection = safety_selection; } - pub fn set_active_watch(&mut self, active_watch: ActiveWatch) { - self.active_watch = Some(active_watch); + fn calculate_min_expiration_ts(per_node: &[PerNodeActiveWatch]) -> Option { + per_node + .iter() + .map(|x| x.expiration_ts) + .reduce(|a, b| a.min(b)) + } + + pub fn new_active_watch( + &mut self, + params: ActiveWatchParameters, + opt_value_changed_route: Option, + per_node: Vec, + ) { + assert!( + self.active_watch.is_none(), + "should have cleared active watch first" + ); + assert!(!per_node.is_empty(), "must have at least one watch node"); + + let min_expiration_ts = Self::calculate_min_expiration_ts(&per_node).unwrap(); + let remaining_count = params.count; + + self.active_watch = Some(ActiveWatch { + params, + per_node, + min_expiration_ts, + remaining_count, + opt_value_changed_route, + }); } pub fn clear_active_watch(&mut self) { self.active_watch = None; } - pub fn active_watch(&self) -> Option { - self.active_watch.clone() + pub fn active_watch(&self) -> Option<&ActiveWatch> { + self.active_watch.as_ref() + } + + pub fn active_watch_mut(&mut self) -> Option<&mut ActiveWatch> { + self.active_watch.as_mut() + } + + pub fn per_node_active_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeActiveWatch> { + self.active_watch + .as_ref() + .map(|x| x.per_node.iter().find(|x| x.id == watch_id)) + .flatten() + } + + pub fn per_node_active_watch_by_id_mut( + &mut self, + watch_id: u64, + ) -> Option<&mut PerNodeActiveWatch> { + self.active_watch + .as_mut() + .map(|x| x.per_node.iter_mut().find(|x| x.id == watch_id)) + .flatten() + } + + pub fn remove_per_node_active_watch_by_id(&mut self, watch_id: u64) { + let Some(active_watch) = self.active_watch.as_mut() else { + return; + }; + let Some(n) = active_watch.per_node.iter().position(|x| x.id == watch_id) else { + return; + }; + active_watch.per_node.remove(n); + + active_watch.min_expiration_ts = + Self::calculate_min_expiration_ts(&active_watch.per_node).unwrap(); } } diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index b7deb39a..73e88e11 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -190,9 +190,8 @@ impl StorageManager { match fanout_result.kind { FanoutResultKind::Incomplete => { - // Send partial update if desired, - // if we've gotten at least one value node before consensus - if ctx.send_partial_update && fanout_result.value_nodes.len() >= 1 { + // Send partial update if desired, if we've gotten at least consensus node + if ctx.send_partial_update && fanout_result.consensus_nodes.len() >= 1 { ctx.send_partial_update = false; // Return partial result @@ -350,6 +349,12 @@ impl StorageManager { safety_selection: SafetySelection, result: set_value::OutboundSetValueResult, ) -> Result, VeilidAPIError> { + // Get cryptosystem + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(key.kind) else { + apibail_generic!("unsupported cryptosystem"); + }; + // Regain the lock after network access let mut inner = self.inner.lock().await; @@ -363,6 +368,7 @@ impl StorageManager { // Keep the list of nodes that returned a value for later reference Self::process_fanout_results_inner( &mut inner, + &vcrypto, key, core::iter::once((subkey, &result.fanout_result)), true, diff --git a/veilid-core/src/storage_manager/tasks/check_active_watches.rs b/veilid-core/src/storage_manager/tasks/check_active_watches.rs index 5d6b3545..5b13b929 100644 --- a/veilid-core/src/storage_manager/tasks/check_active_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_active_watches.rs @@ -13,51 +13,48 @@ impl StorageManager { let mut inner = self.inner.lock().await; let routing_table = self.routing_table(); - let update_callback = self.update_callback(); + //let update_callback = self.update_callback(); let cur_ts = Timestamp::now(); for (k, v) in inner.opened_records.iter_mut() { - // If no active watch, then skip this - let Some(active_watch) = v.active_watch() else { - continue; - }; + for active_watch in v.active_watches() { + // See if the active watch's node is dead + let mut is_dead = false; + if !active_watch.watch_node.state(cur_ts).is_alive() { + // Watched node is dead + is_dead = true; + } - // See if the active watch's node is dead - let mut is_dead = false; - if !active_watch.watch_node.state(cur_ts).is_alive() { - // Watched node is dead - is_dead = true; - } - - // See if the private route we're using is dead - if !is_dead { - if let Some(value_changed_route) = active_watch.opt_value_changed_route { - if routing_table - .route_spec_store() - .get_route_id_for_key(&value_changed_route) - .is_none() - { - // Route we would receive value changes on is dead - is_dead = true; + // See if the private route we're using is dead + if !is_dead { + if let Some(value_changed_route) = active_watch.opt_value_changed_route { + if routing_table + .route_spec_store() + .get_route_id_for_key(&value_changed_route) + .is_none() + { + // Route we would receive value changes on is dead + is_dead = true; + } } } - } - // See if the watch is expired - if !is_dead && active_watch.expiration_ts <= cur_ts { - // Watch has expired - is_dead = true; - } + // See if the watch is expired + if !is_dead && active_watch.expiration_ts <= cur_ts { + // Watch has expired + is_dead = true; + } - if is_dead { - v.clear_active_watch(); + if is_dead { + v.remove_active_watch(active_watch.id); - // Send valuechange with dead count and no subkeys - update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { - key: *k, - subkeys: ValueSubkeyRangeSet::new(), - count: 0, - value: None, - }))); + // // Send valuechange with dead count and no subkeys + // update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { + // key: *k, + // subkeys: ValueSubkeyRangeSet::new(), + // count: 0, + // value: None, + // }))); + } } } } diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index cf2346a7..fcb6f280 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -218,8 +218,12 @@ impl StorageManager { } // Keep the list of nodes that returned a value for later reference + let crypto = self.crypto(); + let vcrypto = crypto.get(result.key.kind).unwrap(); + Self::process_fanout_results_inner( &mut inner, + &vcrypto, result.key, result.fanout_results.iter().map(|x| (x.0, &x.1)), true, diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 378630fe..1059d62a 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -8,15 +8,21 @@ struct OutboundWatchValueContext { pub opt_watch_value_result: Option, } +/// The record of a node accepting a watch +#[derive(Debug, Clone)] +pub(super) struct WatchNode { + pub watch_id: u64, + /// The node that + pub node_ref: NodeRef, + /// The expiration of a successful watch + pub expiration_ts: Timestamp, +} + /// The result of the outbound_watch_value operation #[derive(Debug, Clone)] pub(super) struct OutboundWatchValueResult { - /// The expiration of a successful watch - pub expiration_ts: Timestamp, - /// What watch id was returned - pub watch_id: u64, - /// Which node accepted the watch - pub watch_node: NodeRef, + /// Which nodes accepted the watch + pub watch_nodes: Vec, /// Which private route is responsible for receiving ValueChanged notifications pub opt_value_changed_route: Option, } @@ -195,12 +201,13 @@ impl StorageManager { let routing_domain = RoutingDomain::PublicInternet; - // Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations - let (key_count, timeout_us, set_value_count) = self.config().with(|c| { + // Get the DHT parameters for 'WatchValue', some of which are the same for 'GetValue' operations + let (key_count, consensus_count, fanout, timeout_us) = self.config().with(|c| { ( c.network.dht.max_find_node_count as usize, - TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)), - c.network.dht.set_value_count as usize, + c.network.dht.get_value_count as usize, + c.network.dht.get_value_fanout as usize, + TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)), ) }); @@ -301,16 +308,16 @@ impl StorageManager { }; // Call the fanout - // Use a fixed fanout concurrency of 1 because we only want one watch - // Use a longer timeout (timeout_us * set_value_count) because we may need to try multiple nodes + // Use the same fanout parameters as a get // and each one might take timeout_us time. let routing_table = self.routing_table(); let fanout_call = FanoutCall::new( &routing_table, key, key_count, - 1, - TimestampDuration::new(timeout_us.as_u64() * (set_value_count as u64)), + fanout, + consensus_count, + timeout_us, capability_fanout_node_info_filter(vec![CAP_DHT, CAP_DHT_WATCH]), call_routine, check_done, @@ -422,15 +429,10 @@ impl StorageManager { }; // No active watch means no callback - let Some(mut active_watch) = opened_record.active_watch() else { + let Some(mut active_watch) = opened_record.active_watch_by_id(watch_id) else { return Ok(NetworkResult::value(())); }; - // If the watch id doesn't match, then don't process this - if active_watch.id != watch_id { - return Ok(NetworkResult::value(())); - } - // If the reporting node is not the same as our watch, don't process the value change if !active_watch .watch_node @@ -445,11 +447,11 @@ impl StorageManager { veilid_log!(self debug "watch count went backward: {}: {}/{}", key, count, active_watch.count); // Force count to zero count = 0; - opened_record.clear_active_watch(); + opened_record.remove_active_watch(watch_id); } else if count == 0 { // If count is zero, we're done, cancel the watch and the app can renew it if it wants veilid_log!(self debug "watch count finished: {}", key); - opened_record.clear_active_watch(); + opened_record.clear_active_watches(); } else { veilid_log!(self debug "watch count decremented: {}: {}/{}",