[ci skip] updating watch logic

This commit is contained in:
Christien Rioux 2025-04-02 15:27:35 -05:00
parent d0a76652f3
commit 6d41039a5b
4 changed files with 594 additions and 171 deletions

View File

@ -64,7 +64,7 @@ struct StorageManagerInner {
/// Record subkeys that are currently being written to in the foreground
pub active_subkey_writes: HashMap<TypedKey, ValueSubkeyRangeSet>,
/// State management for outbound watches
pub outbound_watches: HashMap<TypedKey, OutboundWatch>,
pub outbound_watch_state: OutboundWatchState,
/// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
@ -80,6 +80,7 @@ impl fmt::Debug for StorageManagerInner {
.field("remote_record_store", &self.remote_record_store)
.field("offline_subkey_writes", &self.offline_subkey_writes)
.field("active_subkey_writes", &self.active_subkey_writes)
.field("outbound_watch_state", &self.outbound_watch_state)
//.field("metadata_db", &self.metadata_db)
//.field("tick_future", &self.tick_future)
.finish()
@ -870,18 +871,22 @@ impl StorageManager {
expiration: Timestamp,
count: u32,
) -> VeilidAPIResult<Timestamp> {
// Obtain the watch change lock
// (may need to wait for background operations to complete on the watch)
let watch_lock = self.outbound_watch_lock_table.lock_tag(key).await;
// Obtain the inner state lock
let inner = self.inner.lock().await;
// Get the safety selection and the writer we opened this record
// and whatever active watch we may have in case this is a watch update
let (safety_selection, opt_writer, opt_active_watch) = {
let (safety_selection, opt_writer) = {
let Some(opened_record) = inner.opened_records.get(&key) else {
// Record must be opened already to change watch
apibail_generic!("record not open");
};
(
opened_record.safety_selection(),
opened_record.writer().cloned(),
opened_record.outbound_watch().cloned(),
)
};
@ -903,14 +908,44 @@ impl StorageManager {
};
let subkeys = schema.truncate_subkeys(&subkeys, None);
// Get rpc processor and drop mutex so we don't block while requesting the watch from the network
if !self.dht_is_online() {
apibail_try_again!("offline, try again later");
// Calculate desired watch parameters
let desired = if count == 0 {
// Cancel
None
} else {
// Get the minimum and maximum expiration timestamp we will accept
let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| {
(
TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)),
TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)),
)
});
let cur_ts = get_timestamp();
let min_expiration_ts = Timestamp::new(cur_ts + rpc_timeout_us.as_u64());
let expiration_ts = if expiration.as_u64() == 0 {
expiration
} else if expiration < min_expiration_ts {
apibail_invalid_argument!("expiration is too soon", "expiration", expiration);
} else {
expiration
};
// Create or modify
Some(OutboundWatchParameters {
expiration_ts,
count,
subkeys,
})
};
// Modify the 'desired' state of the watch or add one if it does not exist
inner.outbound_watch_state.set_desired_watch(key, desired);
// Drop the lock for network access
drop(inner);
// xxx continue here, make a 'reconcile outbound watch' routine that can be called imperatively, wait for it etc.
// Use the safety selection we opened the record with
// Use the writer we opened with as the 'watcher' as well
let opt_owvresult = self

View File

@ -1,37 +1,27 @@
use super::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeOutboundWatch {
/// The watch id returned from the watch node
pub id: u64,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which node accepted the watch
pub watch_node_id: TypedKey,
/// Resolved watch node reference
#[serde(skip)]
pub watch_node_ref: Option<NodeRef>,
/// How many value change notifications are left
pub count: u32,
}
impl_veilid_log_facility!("stor");
/// Requested parameters for watch
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchParameters {
/// Requested expiration timestamp
/// Requested expiration timestamp. A zero timestamp here indicates
/// that the watch it to be renewed indefinitely
pub expiration_ts: Timestamp,
/// How many notifications the requestor asked for
pub count: u32,
/// Subkeys requested for this watch
pub subkeys: ValueSubkeyRangeSet,
/// What key to use to perform the watch
pub opt_watcher: Option<KeyPair>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchCurrent {
/// Requested parameters
pub params: OutboundWatchParameters,
/// Outbound watches per node
pub per_node: Vec<PerNodeOutboundWatch>,
/// Nodes that have an active watch on our behalf
pub nodes: Vec<PerNodeKey>,
/// Minimum expiration time for all our nodes
pub min_expiration_ts: Timestamp,
/// How many value change updates remain
@ -51,7 +41,98 @@ pub(in crate::storage_manager) struct OutboundWatch {
pub desired: Option<OutboundWatchParameters>,
}
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
impl OutboundWatch {
/// Returns true if this outbound watch can be removed from the table
pub fn is_dead(&self) -> bool {
self.desired.is_none() && self.current.is_none()
}
/// Returns true if this outbound watch needs to be cancelled
pub fn needs_cancel(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool {
if self.is_dead() {
veilid_log!(registry warn "should have checked for is_dead first");
return false;
}
let Some(current) = self.current.as_ref() else {
return false;
};
// If the total number of changes has been reached
// then we're done and should cancel
if current.remaining_count == 0 {
return true;
}
// If we have expired and can't renew, then cancel
if cur_ts >= current.params.expiration_ts {
return true;
}
// If the desired parameters is None then cancel
let Some(desired) = self.desired.as_ref() else {
return true;
};
// If the desired parameters is different than the current parameters
// then cancel so we can eventually reconcile to the new parameters
current.params != *desired
}
/// Returns true if this outbound watch can be renewed
pub fn needs_renew(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool {
if self.is_dead() || self.needs_cancel(registry, cur_ts) {
veilid_log!(registry warn "should have checked for is_dead and needs_cancel first");
return false;
}
// If there is no current watch then there is nothing to renew
let Some(current) = self.current.as_ref() else {
return false;
};
cur_ts >= current.min_expiration_ts && cur_ts < current.params.expiration_ts
}
/// Returns true if there is work to be done on getting the outbound
/// watch to its desired state
pub fn needs_reconcile(
&self,
registry: &VeilidComponentRegistry,
consensus_count: usize,
cur_ts: Timestamp,
) -> bool {
if self.is_dead()
|| self.needs_cancel(registry, cur_ts)
|| self.needs_renew(registry, cur_ts)
{
veilid_log!(registry warn "should have checked for is_dead, needs_cancel first");
return false;
}
// If desired is none, then is_dead() or needs_cancel() should have been true
let Some(desired) = self.desired.as_ref() else {
veilid_log!(registry warn "is_dead() or needs_cancel() should have been true");
return false;
};
// If there is a desired watch but no current watch, then reconcile
let Some(current) = self.current.as_ref() else {
return true;
};
// If the params are different, then needs_cancel() should have returned true
if current.params != *desired {
veilid_log!(registry warn "needs_cancel() should have returned true");
return false;
}
// If we are still working on getting the 'current' state to match
// the 'desired' state, then
if current.nodes.len() != consensus_count {
return true;
}
// No work to do on this watch
false
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeKey {
/// Watched record key
pub record_key: TypedKey,
@ -63,13 +144,14 @@ pub(in crate::storage_manager) struct PerNodeKey {
pub(in crate::storage_manager) struct PerNodeState {
/// Watch Id
pub watch_id: u64,
/// SafetySpec used to contact the node
pub safety_spec: SafetySpec,
/// SafetySelection used to contact the node
pub safety_selection: SafetySelection,
/// What key was used to perform the watch
pub opt_watcher: Option<KeyPair>,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// How many value change notifications are left
pub count: u32,
/// Resolved watch node reference
#[serde(skip)]
pub watch_node_ref: Option<NodeRef>,
@ -83,49 +165,94 @@ pub(in crate::storage_manager) struct OutboundWatchState {
pub per_node_state: HashMap<PerNodeKey, PerNodeState>,
}
impl OutboundWatchCurrent {
pub fn new(
params: OutboundWatchParameters,
opt_value_changed_route: Option<CryptoKey>,
) -> Self {
let remaining_count = params.count;
let min_expiration_ts = params.expiration_ts;
impl Default for OutboundWatchState {
fn default() -> Self {
Self::new()
}
}
impl OutboundWatchState {
pub fn new() -> Self {
Self {
params,
per_node: vec![],
min_expiration_ts,
remaining_count,
opt_value_changed_route,
outbound_watches: HashMap::new(),
per_node_state: HashMap::new(),
}
}
pub fn per_node_outbound_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeOutboundWatch> {
self.per_node.iter().find(|x| x.id == watch_id)
}
pub fn per_node_outbound_watch_by_id_mut(
pub fn set_desired_watch(
&mut self,
watch_id: u64,
) -> Option<&mut PerNodeOutboundWatch> {
self.per_node.iter_mut().find(|x| x.id == watch_id)
}
record_key: TypedKey,
desired_watch: Option<OutboundWatchParameters>,
) {
match self.outbound_watches.get_mut(&record_key) {
Some(w) => {
// Replace desired watch
w.desired = desired_watch;
pub fn remove_per_node_outbound_watch_by_id(&mut self, watch_id: u64) {
let Some(n) = self.per_node.iter().position(|x| x.id == watch_id) else {
return;
};
self.per_node.remove(n);
self.update_min_expiration_ts();
}
fn update_min_expiration_ts(&mut self) {
self.min_expiration_ts = self
.per_node
.iter()
.map(|x| x.expiration_ts)
.reduce(|a, b| a.min(b))
.unwrap_or(self.params.expiration_ts);
// Remove if the watch is done
if w.current.is_none() && w.desired.is_none() {
self.outbound_watches.remove(&record_key);
}
}
None => {
// Watch does not exist, add one if that's what is desired
if desired_watch.is_some() {
self.outbound_watches.insert(
record_key,
OutboundWatch {
current: None,
desired: desired_watch,
},
);
}
}
}
}
}
// impl OutboundWatchCurrent {
// pub fn new(
// params: OutboundWatchParameters,
// opt_value_changed_route: Option<CryptoKey>,
// ) -> Self {
// let remaining_count = params.count;
// let min_expiration_ts = params.expiration_ts;
// Self {
// params,
// per_node: vec![],
// min_expiration_ts,
// remaining_count,
// opt_value_changed_route,
// }
// }
// pub fn per_node_outbound_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeOutboundWatch> {
// self.per_node.iter().find(|x| x.id == watch_id)
// }
// pub fn per_node_outbound_watch_by_id_mut(
// &mut self,
// watch_id: u64,
// ) -> Option<&mut PerNodeOutboundWatch> {
// self.per_node.iter_mut().find(|x| x.id == watch_id)
// }
// pub fn remove_per_node_outbound_watch_by_id(&mut self, watch_id: u64) {
// let Some(n) = self.per_node.iter().position(|x| x.id == watch_id) else {
// return;
// };
// self.per_node.remove(n);
// self.update_min_expiration_ts();
// }
// fn update_min_expiration_ts(&mut self) {
// self.min_expiration_ts = self
// .per_node
// .iter()
// .map(|x| x.expiration_ts)
// .reduce(|a, b| a.min(b))
// .unwrap_or(self.params.expiration_ts);
// }
// }

View File

@ -1,114 +1,377 @@
use futures_util::StreamExt as _;
use super::*;
impl StorageManager {
async fn background_outbound_watch_cancel(
self,
watch_locked_key: AsyncTagLockGuard<TypedKey>,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
mut outbound_watch: OutboundWatch,
/// Remove dead watches from the table
pub(super) async fn process_outbound_watch_dead(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let key = watch_locked_key.tag();
let record_key = watch_lock.tag();
// Last ditch cancellation of per-node watches that may not be fully exhausted
let cancelled_ids = self
.outbound_watch_value_cancel_set(key, safety_selection, opt_watcher, &outbound_watch)
.await;
// Remove any fully cancelled watch ids
for cancelled_id in cancelled_ids {
outbound_watch.remove_per_node_outbound_watch_by_id(cancelled_id);
}
// Ensure the watch is put into cancelled state
outbound_watch.remaining_count = 0;
// Update the opened record
let mut inner = self.inner.lock().await;
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
// Already closed
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.remove(&record_key)
else {
veilid_log!(self warn "dead watch should have still been in the table");
return;
};
opened_record.clear_outbound_watch();
if outbound_watch.current.is_some() {
veilid_log!(self warn "dead watch still had current state");
}
if outbound_watch.desired.is_some() {
veilid_log!(self warn "dead watch still had desired params");
}
}
/// Get the list of remaining active watch ids
/// and call their nodes to cancel the watch
pub(super) async fn process_outbound_watch_cancel(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
let per_node_states = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
let mut per_node_states = vec![];
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(&pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
current.nodes.retain(|x| !dead_pnks.contains(x));
per_node_states
};
// Now reach out to each node and cancel their watch ids
let mut unord = FuturesUnordered::new();
for (pnk, pns) in per_node_states {
unord.push(async move {
let res = self
.outbound_watch_value_cancel(
pnk.record_key,
pns.safety_selection,
pns.opt_watcher,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await;
(pnk, res)
});
}
let mut cancelled = vec![];
while let Some((pnk, res)) = unord.next().await {
match res {
Ok(_) => {
// Remove from 'per node states' because we got some response
cancelled.push(pnk);
}
Err(e) => {
veilid_log!(self debug "outbound watch cancel error: {}", e);
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch. We'll try the cancel again if we reach this node again during fanout.
}
}
}
// Update state
{
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
// Mark as dead now that we cancelled
outbound_watch.current = None;
}
}
pub(super) async fn process_outbound_watch_renew(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
// If we can't do this operation right now, don't try
if !self.dht_is_online() {
return;
}
let (per_node_states, params, safety_selection) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_state
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "watch being cancelled should have still been in the table");
return;
};
let Some(current) = &mut outbound_watch.current else {
veilid_log!(self warn "watch being cancelled should have current state");
return;
};
let mut per_node_states = vec![];
let mut dead_pnks = BTreeSet::new();
for pnk in &current.nodes {
let Some(per_node_state) =
inner.outbound_watch_state.per_node_state.get(&pnk).cloned()
else {
veilid_log!(self warn "missing per-node state for watch");
dead_pnks.insert(*pnk);
continue;
};
per_node_states.push((*pnk, per_node_state));
}
current.nodes.retain(|x| !dead_pnks.contains(x));
(per_node_states, current.params.clone())
};
// Now reach out to each node and renew their watches
let mut unord = FuturesUnordered::new();
let cur_ts = Timestamp::now();
for (pnk, pns) in per_node_states {
unord.push(async move {
let res = self
.outbound_watch_value_change(
pnk.record_key,
params,
pns.safety_selection,
pns.watch_node_ref.unwrap(),
pns.watch_id,
)
.await;
(pnk, res)
});
}
let mut renewed = vec![];
let mut rejected = vec![];
while let Some((pnk, res)) = unord.next().await {
match res {
Ok(accepted) => {
// Note per node states we should keep vs throw away
if accepted {
renewed.push(pnk);
} else {
rejected.push(pnk);
}
}
Err(e) => {
veilid_log!(self debug "outbound watch cancel error: {}", e);
// Leave in the 'per node states' for now because we couldn't contact the node
// but remove from this watch.
rejected.push(pnk);
}
}
}
// // Update state
// {
// let inner = &mut *self.inner.lock().await;
// let Some(outbound_watch) = inner
// .outbound_watch_state
// .outbound_watches
// .get_mut(&record_key)
// else {
// veilid_log!(self warn "watch being cancelled should have still been in the table");
// return;
// };
// let Some(current) = &mut outbound_watch.current else {
// veilid_log!(self warn "watch being cancelled should have current state");
// return;
// };
// // Mark as dead now that we cancelled
// outbound_watch.current = None;
// }
}
pub(super) async fn process_outbound_watch_reconcile(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
) {
let record_key = watch_lock.tag();
//
}
// Check if client-side watches on opened records either have dead nodes or if the watch has expired
#[instrument(level = "trace", target = "stor", skip_all, err)]
//#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_outbound_watches_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let mut inner = self.inner.lock().await;
let routing_table = self.routing_table();
//let update_callback = self.update_callback();
let inner = self.inner.lock().await;
// Iterate all outbound watches
let registry = self.registry();
let cur_ts = Timestamp::now();
for (k, v) in inner.opened_records.iter_mut() {
let Some(outbound_watch) = v.outbound_watch() else {
continue;
};
let consensus_count = self
.config()
.with(|c| c.network.dht.get_value_count as usize);
// See if the watch is expired or out of updates
if outbound_watch.min_expiration_ts <= cur_ts || outbound_watch.remaining_count == 0 {
// See if we can lock the outbound watch
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(*k) else {
// Watch is busy, come back later
// Determine what work needs doing if any
for (k, v) in &inner.outbound_watch_state.outbound_watches {
let k = *k;
if v.is_dead() {
// Outbound watch is dead
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else {
continue;
};
let outbound_watch = outbound_watch.clone();
let safety_selection = v.safety_selection();
let opt_watcher = v.writer().cloned();
self.background_operation_processor.add_future(
self.clone().background_outbound_watch_cancel(
watch_lock,
safety_selection,
opt_watcher,
outbound_watch,
),
);
// Clear active watch
v.remove_active_watch(outbound_watch.id);
// // Send valuechange with dead count and no subkeys
// update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
// key: *k,
// subkeys: ValueSubkeyRangeSet::new(),
// count: 0,
// value: None,
// })));
}
// See if the private route we're using is dead
let mut is_dead = false;
if !is_dead {
if let Some(value_changed_route) = outbound_watch.opt_value_changed_route {
if routing_table
.route_spec_store()
.get_route_id_for_key(&value_changed_route)
.is_none()
{
// Route we would receive value changes on is dead
is_dead = true;
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_dead(watch_lock)
.await
}
}
}
for outbound_watch in &outbound_watch.per_node {
// See if the active watch's node is dead
if !outbound_watch.watch_node.state(cur_ts).is_alive() {
// Watched node is dead
is_dead = true;
}
};
self.background_operation_processor.add_future(fut);
} else if v.needs_cancel(&registry, cur_ts) {
// Outbound watch needs to be cancelled
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else {
continue;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_cancel(watch_lock)
.await
}
};
self.background_operation_processor.add_future(fut);
} else if v.needs_renew(&registry, cur_ts) {
// Outbound watch expired but can be renewed
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else {
continue;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_renew(watch_lock)
.await
}
};
self.background_operation_processor.add_future(fut);
} else if v.needs_reconcile(&registry, consensus_count, cur_ts) {
// Outbound watch parameters have changed or it needs more nodes
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(k) else {
continue;
};
let fut = {
let registry = self.registry();
async move {
registry
.storage_manager()
.process_outbound_watch_reconcile(watch_lock)
.await
}
};
self.background_operation_processor.add_future(fut);
}
}
// // See if the watch is expired or out of updates
// if outbound_watch.min_expiration_ts <= cur_ts || outbound_watch.remaining_count == 0 {
// // See if we can lock the outbound watch
// let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(*k) else {
// // Watch is busy, come back later
// continue;
// };
// let outbound_watch = outbound_watch.clone();
// let safety_selection = v.safety_selection();
// let opt_watcher = v.writer().cloned();
// self.background_operation_processor.add_future(
// self.clone().background_outbound_watch_cancel(
// watch_lock,
// safety_selection,
// opt_watcher,
// outbound_watch,
// ),
// );
// // Send valuechange with dead count and no subkeys
// update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
// key: *k,
// subkeys: ValueSubkeyRangeSet::new(),
// count: 0,
// value: None,
// })));
// // See if the private route we're using is dead
// let mut is_dead = false;
// if !is_dead {
// if let Some(value_changed_route) = outbound_watch.opt_value_changed_route {
// if routing_table
// .route_spec_store()
// .get_route_id_for_key(&value_changed_route)
// .is_none()
// {
// // Route we would receive value changes on is dead
// is_dead = true;
// }
// }
// }
// for outbound_watch in &outbound_watch.per_node {
// // See if the active watch's node is dead
// if !outbound_watch.watch_node.state(cur_ts).is_alive() {
// // Watched node is dead
// is_dead = true;
// }
// }
// }
Ok(())
}
}

View File

@ -84,8 +84,8 @@ impl StorageManager {
pub(super) async fn outbound_watch_value_cancel(
&self,
key: TypedKey,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
safety_selection: SafetySelection,
watch_node: NodeRef,
watch_id: u64,
) -> VeilidAPIResult<bool> {
@ -126,32 +126,31 @@ impl StorageManager {
pub(super) async fn outbound_watch_value_change(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
params: OutboundWatchParameters,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
active_watch: &OutboundWatch,
watch_node: NodeRef,
watch_id: u64,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
let routing_domain = RoutingDomain::PublicInternet;
if count == 0 {
if params.count == 0 {
apibail_internal!("cancel should be done with outbound_watch_value_cancel");
}
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
// which lives for the duration of the app's runtime
let watcher =
opt_watcher.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value);
let watcher = params
.opt_watcher
.unwrap_or_else(|| self.anonymous_watch_keys.get(key.kind).unwrap().value);
let wva = VeilidAPIError::from_network_result(
pin_future!(self.rpc_processor().rpc_call_watch_value(
Destination::direct(watch_node.routing_domain_filtered(routing_domain))
.with_safety(safety_selection),
key,
subkeys,
expiration,
count,
params.subkeys,
params.expiration,
params.count,
watcher,
Some(watch_id),
))
@ -166,9 +165,11 @@ impl StorageManager {
}
Ok(Some(OutboundWatchValueResult {
expiration_ts: wva.answer.expiration_ts,
watch_id: wva.answer.watch_id,
watch_node,
watch_nodes: vec![WatchNode {
watch_id: wva.answer.watch_id,
node_ref: watch_node,
expiration_ts: wva.answer.expiration_ts,
}],
opt_value_changed_route: wva.reply_private_route,
}))
} else {
@ -177,18 +178,15 @@ impl StorageManager {
}
}
/// Perform a 'watch value' query on the network using fanout
/// Perform a 'watch value' query on the network using fanout XXX rewrite this so api-based cancel/change/new make sense
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_watch_value(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
params: OutboundWatchParameters,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
opt_active_watch: Option<&OutboundWatch>,
active_nodes: Vec<TypedKey>,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
// if the count is zero, we are cancelling
if count == 0 {