watch cleanup

This commit is contained in:
Christien Rioux 2024-03-06 15:39:01 -05:00
parent 86f7473727
commit 2dc2f97dfc
15 changed files with 347 additions and 302 deletions

View File

@ -269,24 +269,35 @@ impl RPCProcessor {
#[cfg(feature = "debug-dht")]
log_rpc!(debug "Not close enough for watch value");
(false, Timestamp::default(), watch_id.unwrap_or_default())
(false, 0, watch_id.unwrap_or_default())
} else {
// Accepted, lets try to watch or cancel it
let params = WatchParameters {
subkeys,
expiration: Timestamp::new(expiration),
count,
watcher,
target,
};
// See if we have this record ourselves, if so, accept the watch
let storage_manager = self.storage_manager();
let (ret_expiration, ret_watch_id) = network_result_try!(storage_manager
.inbound_watch_value(
key,
subkeys.clone(),
Timestamp::new(expiration),
count,
watch_id,
target,
watcher
)
let watch_result = network_result_try!(storage_manager
.inbound_watch_value(key, params, watch_id,)
.await
.map_err(RPCError::internal)?);
// Encode the watch result
// Rejections and cancellations are treated the same way by clients
let (ret_expiration, ret_watch_id) = match watch_result {
WatchResult::Created { id, expiration } => (expiration.as_u64(), id),
WatchResult::Changed { expiration } => {
(expiration.as_u64(), watch_id.unwrap_or_default())
}
WatchResult::Cancelled => (0, watch_id.unwrap_or_default()),
WatchResult::Rejected => (0, watch_id.unwrap_or_default()),
};
(true, ret_expiration, ret_watch_id)
};
@ -309,7 +320,7 @@ impl RPCProcessor {
// Make WatchValue answer
let watch_value_a = RPCOperationWatchValueA::new(
ret_accepted,
ret_expiration.as_u64(),
ret_expiration,
closer_to_key_peers,
ret_watch_id,
)?;

View File

@ -1,27 +1,21 @@
mod debug;
mod get_value;
mod keys;
mod limited_size;
mod record_store;
mod record_store_limits;
mod set_value;
mod storage_manager_inner;
mod tasks;
mod types;
mod watch_value;
use keys::*;
use limited_size::*;
use record_store::*;
use record_store_limits::*;
use storage_manager_inner::*;
pub use types::*;
use super::*;
use network_manager::*;
use record_store::*;
use routing_table::*;
use rpc_processor::*;
use storage_manager_inner::*;
pub use record_store::{WatchParameters, WatchResult};
pub use types::*;
/// The maximum size of a single subkey
const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN;
@ -317,9 +311,9 @@ impl StorageManager {
)
.await?;
if let Some(owvresult) = opt_owvresult {
if owvresult.expiration_ts.as_u64() == 0 {
if owvresult.expiration_ts.as_u64() != 0 {
log_stor!(debug
"close record watch cancel should have old expiration, but got zero"
"close record watch cancel should have zero expiration"
);
}
} else {
@ -649,7 +643,7 @@ impl StorageManager {
expiration.as_u64()
};
// If the expiration time is less than our minimum expiration time (or zero) consider this watch cancelled
// If the expiration time is less than our minimum expiration time (or zero) consider this watch inactive
let mut expiration_ts = owvresult.expiration_ts;
if expiration_ts.as_u64() < min_expiration_ts {
return Ok(Timestamp::new(0));
@ -664,7 +658,7 @@ impl StorageManager {
if count == 0 {
// Expiration returned should be zero if we requested a cancellation
if expiration_ts.as_u64() != 0 {
log_stor!(debug "got unexpired watch despite asking for a cancellation");
log_stor!(debug "got active watch despite asking for a cancellation");
}
return Ok(Timestamp::new(0));
}
@ -719,12 +713,14 @@ impl StorageManager {
active_watch.count
};
// Update the watch
// Update the watch. This just calls through to the above watch_values() function
// This will update the active_watch so we don't need to do that in this routine.
let expiration_ts = self
.watch_values(key, subkeys, active_watch.expiration_ts, count)
.await?;
// A zero expiration time means the watch is done or nothing is left, and the watch is no longer active
// A zero expiration time returned from watch_value() means the watch is done
// or no subkeys are left, and the watch is no longer active
if expiration_ts.as_u64() == 0 {
// Return false indicating the watch is completely gone
return Ok(false);

View File

@ -4,6 +4,27 @@
/// This store does not perform any validation on the schema, and all ValueRecordData passed in must have been previously validated.
/// Uses an in-memory store for the records, backed by the TableStore. Subkey data is LRU cached and rotated out by a limits policy,
/// and backed to the TableStore for persistence.
mod keys;
mod limited_size;
mod local_record_detail;
mod opened_record;
mod record;
mod record_data;
mod record_store_limits;
mod remote_record_detail;
mod watch;
pub(super) use keys::*;
pub(super) use limited_size::*;
pub(super) use local_record_detail::*;
pub(super) use opened_record::*;
pub(super) use record::*;
pub(super) use record_data::*;
pub(super) use record_store_limits::*;
pub(super) use remote_record_detail::*;
pub(super) use watch::*;
pub use watch::{WatchParameters, WatchResult};
use super::*;
use hashlink::LruCache;
@ -22,31 +43,6 @@ where
in_total_storage: bool,
}
/// An individual watch
#[derive(Debug, Clone)]
struct WatchedRecordWatch {
id: u64,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
target: Target,
watcher: CryptoKey,
changed: ValueSubkeyRangeSet,
}
#[derive(Debug, Default, Clone)]
/// A record being watched for changes
pub(super) struct WatchedRecord {
/// The list of active watchers
watchers: Vec<WatchedRecordWatch>,
}
pub(super) enum WatchUpdateMode {
NoUpdate,
UpdateAll,
ExcludeTarget(Target),
}
pub(super) struct RecordStore<D>
where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
@ -72,10 +68,9 @@ where
/// The list of records that have changed since last flush to disk (optimization for batched writes)
changed_records: HashSet<RecordTableKey>,
/// The list of records being watched for changes
watched_records: HashMap<RecordTableKey, WatchedRecord>,
watched_records: HashMap<RecordTableKey, WatchList>,
/// The list of watched records that have changed values since last notification
changed_watched_values: HashSet<RecordTableKey>,
/// A mutex to ensure we handle this concurrently
purge_dead_records_mutex: Arc<AsyncMutex<()>>,
}
@ -423,6 +418,11 @@ where
Ok(())
}
pub(super) fn contains_record(&mut self, key: TypedKey) -> bool {
let rtk = RecordTableKey { key };
self.record_index.contains_key(&rtk)
}
pub(super) fn with_record<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
@ -626,20 +626,30 @@ where
&mut self,
key: TypedKey,
subkey: ValueSubkey,
opt_ignore_target: Option<Target>,
watch_update_mode: WatchUpdateMode,
) {
let (do_update, opt_ignore_target) = match watch_update_mode {
WatchUpdateMode::NoUpdate => (false, None),
WatchUpdateMode::UpdateAll => (true, None),
WatchUpdateMode::ExcludeTarget(target) => (true, Some(target)),
};
if !do_update {
return;
}
let rtk = RecordTableKey { key };
let Some(wr) = self.watched_records.get_mut(&rtk) else {
return;
};
// Update all watchers
let mut changed = false;
for w in &mut wr.watchers {
for w in &mut wr.watches {
// If this watcher is watching the changed subkey then add to the watcher's changed list
// Don't bother marking changes for value sets coming from the same watching node/target because they
// are already going to be aware of the changes in that case
if Some(&w.target) != opt_ignore_target.as_ref()
&& w.subkeys.contains(subkey)
if Some(&w.params.target) != opt_ignore_target.as_ref()
&& w.params.subkeys.contains(subkey)
&& w.changed.insert(subkey)
{
changed = true;
@ -743,69 +753,57 @@ where
// Update storage space
self.total_storage_space.commit().unwrap();
// Update watched value
let (do_update, opt_ignore_target) = match watch_update_mode {
WatchUpdateMode::NoUpdate => (false, None),
WatchUpdateMode::UpdateAll => (true, None),
WatchUpdateMode::ExcludeTarget(target) => (true, Some(target)),
};
if do_update {
self.update_watched_value(key, subkey, opt_ignore_target)
// Send updates to
self.update_watched_value(key, subkey, watch_update_mode)
.await;
}
Ok(())
}
/// Add an inbound record watch for changes
#[allow(clippy::too_many_arguments)]
pub async fn watch_record(
pub async fn _change_existing_watch(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
mut expiration: Timestamp,
count: u32,
watch_id: Option<u64>,
target: Target,
watcher: PublicKey,
) -> VeilidAPIResult<Option<(Timestamp, u64)>> {
// If subkeys is empty or count is zero then we're cancelling a watch completely
if subkeys.is_empty() || count == 0 {
if let Some(watch_id) = watch_id {
return self.cancel_watch(key, watch_id, watcher).await;
params: WatchParameters,
watch_id: u64,
) -> VeilidAPIResult<WatchResult> {
if params.count == 0 {
apibail_internal!("cancel watch should not have gotten here");
}
apibail_internal!("shouldn't have let a None watch id get here");
if params.expiration.as_u64() == 0 {
apibail_internal!("zero expiration should have been resolved to max by now");
}
// See if expiration timestamp is too far in the future or not enough in the future
let cur_ts = get_timestamp();
let max_ts = cur_ts + self.limits.max_watch_expiration.as_u64();
let min_ts = cur_ts + self.limits.min_watch_expiration.as_u64();
if expiration.as_u64() == 0 || expiration.as_u64() > max_ts {
// Clamp expiration max time (or set zero expiration to max)
expiration = Timestamp::new(max_ts);
} else if expiration.as_u64() < min_ts {
// Don't add watches with too low of an expiration time
return Ok(None);
}
// Get the record being watched
let Some(is_member) = self.with_record(key, |record| {
// Check if the watcher specified is a schema member
let schema = record.schema();
(*record.owner()) == watcher || schema.is_member(&watcher)
}) else {
// Record not found
return Ok(None);
// Get the watch list for this record
let rtk = RecordTableKey { key };
let Some(watch_list) = self.watched_records.get_mut(&rtk) else {
// No watches, nothing to change
return Ok(WatchResult::Rejected);
};
// Generate a record-unique watch id > 0 if one is not specified
// Check each watch to see if we have an exact match for the id to change
for w in &mut watch_list.watches {
// If the watch id doesn't match, then we're not updating
// Also do not allow the watcher key to change
if w.id == watch_id && w.params.watcher == params.watcher {
// Updating an existing watch
w.params = params;
return Ok(WatchResult::Changed {
expiration: w.params.expiration,
});
}
}
// No existing watch found
Ok(WatchResult::Rejected)
}
pub async fn _create_new_watch(
&mut self,
key: TypedKey,
params: WatchParameters,
member_check: Box<dyn Fn(PublicKey) -> bool + Send>,
) -> VeilidAPIResult<WatchResult> {
// Generate a record-unique watch id > 0
let rtk = RecordTableKey { key };
let mut new_watch = false;
let watch_id = watch_id.unwrap_or_else(|| {
new_watch = true;
let mut id = 0;
while id == 0 {
id = get_random_u64();
@ -813,7 +811,7 @@ where
if let Some(watched_record) = self.watched_records.get_mut(&rtk) {
// Make sure it doesn't match any other id (unlikely, but lets be certain)
'x: loop {
for w in &mut watched_record.watchers {
for w in &mut watched_record.watches {
if w.id == id {
loop {
id = id.overflowing_add(1).0;
@ -827,117 +825,152 @@ where
break;
}
}
id
});
// See if we are updating an existing watch
// with the watcher matched on target
// Calculate watch limits
let mut watch_count = 0;
let mut target_watch_count = 0;
let is_member = member_check(params.watcher);
let rtk = RecordTableKey { key };
if let Some(watched_record) = self.watched_records.get_mut(&rtk) {
for w in &mut watched_record.watchers {
// Total up the number of watches for this key
for w in &mut watched_record.watches {
// See if this watch should be counted toward any limits
let count_watch = if is_member {
// If the watcher is a member of the schema, then consider the total per-watcher key
// If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-record key
if !is_member || w.watcher == watcher {
watch_count += 1;
w.params.watcher == params.watcher
} else {
// If the watcher is not a member of the schema, the check if this watch is an anonymous watch and contributes to per-record key total
!member_check(w.params.watcher)
};
// For any watch, if the target matches our also tally that separately
// If the watcher is a member of the schema, then consider the total per-target-per-watcher key
// If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-target-per-record key
if w.target == target {
if count_watch {
watch_count += 1;
if w.params.target == params.target {
target_watch_count += 1;
}
}
// If this is not a new watch and the watch id doesn't match, then we're not updating
if !new_watch && w.id == watch_id {
// Updating an existing watch
// You can change a watcher key, or target via this update
// as well as the subkey range, expiration and count of the watch
w.watcher = watcher;
w.target = target;
w.subkeys = subkeys;
w.expiration = expiration;
w.count = count;
return Ok(Some((expiration, watch_id)));
}
}
}
// Only proceed here if this is a new watch
if !new_watch {
// Not a new watch
return Ok(None);
}
// For members, no more than one watch per target per watcher per record
// For anonymous, no more than one watch per target per record
if target_watch_count > 0 {
// Too many watches
return Ok(None);
return Ok(WatchResult::Rejected);
}
// Adding a new watcher to a watch
// Check watch table for limits
if is_member {
// Member watch
if watch_count >= self.limits.member_watch_limit {
// Too many watches
return Ok(None);
}
let watch_limit = if is_member {
self.limits.member_watch_limit
} else {
// Public watch
// No more than one
if watch_count >= self.limits.public_watch_limit {
// Too many watches
return Ok(None);
}
self.limits.public_watch_limit
};
if watch_count >= watch_limit {
return Ok(WatchResult::Rejected);
}
// Ok this is an acceptable new watch, add it
let watch = self.watched_records.entry(rtk).or_default();
watch.watchers.push(WatchedRecordWatch {
id: watch_id,
subkeys,
expiration,
count,
target,
watcher,
let watch_list = self.watched_records.entry(rtk).or_default();
let expiration = params.expiration;
watch_list.watches.push(Watch {
params,
id,
changed: ValueSubkeyRangeSet::new(),
});
Ok(Some((expiration, watch_id)))
Ok(WatchResult::Created { id, expiration })
}
/// Add or update an inbound record watch for changes
#[allow(clippy::too_many_arguments)]
pub async fn watch_record(
&mut self,
key: TypedKey,
mut params: WatchParameters,
opt_watch_id: Option<u64>,
) -> VeilidAPIResult<WatchResult> {
// If count is zero then we're cancelling a watch completely
if params.count == 0 {
if let Some(watch_id) = opt_watch_id {
let cancelled = self.cancel_watch(key, watch_id, params.watcher).await?;
if cancelled {
return Ok(WatchResult::Cancelled);
}
return Ok(WatchResult::Rejected);
}
apibail_internal!("shouldn't have let a None watch id get here");
}
// See if expiration timestamp is too far in the future or not enough in the future
let cur_ts = get_timestamp();
let max_ts = cur_ts + self.limits.max_watch_expiration.as_u64();
let min_ts = cur_ts + self.limits.min_watch_expiration.as_u64();
if params.expiration.as_u64() == 0 || params.expiration.as_u64() > max_ts {
// Clamp expiration max time (or set zero expiration to max)
params.expiration = Timestamp::new(max_ts);
} else if params.expiration.as_u64() < min_ts {
// Don't add watches with too low of an expiration time
if let Some(watch_id) = opt_watch_id {
let cancelled = self.cancel_watch(key, watch_id, params.watcher).await?;
if cancelled {
return Ok(WatchResult::Cancelled);
}
}
return Ok(WatchResult::Rejected);
}
// Make a closure to check for member vs anonymous
let Some(member_check) = self.with_record(key, |record| {
let schema = record.schema();
let owner = *record.owner();
Box::new(move |watcher| owner == params.watcher || schema.is_member(&watcher))
}) else {
// Record not found
return Ok(WatchResult::Rejected);
};
// Create or update depending on if a watch id is specified or not
if let Some(watch_id) = opt_watch_id {
self._change_existing_watch(key, params, watch_id).await
} else {
self._create_new_watch(key, params, member_check).await
}
}
/// Clear a specific watch for a record
/// returns true if the watch was found and cancelled
async fn cancel_watch(
&mut self,
key: TypedKey,
watch_id: u64,
watcher: PublicKey,
) -> VeilidAPIResult<Option<(Timestamp, u64)>> {
) -> VeilidAPIResult<bool> {
if watch_id == 0 {
apibail_internal!("should not have let a a zero watch id get here");
apibail_internal!("should not have let a zero watch id get here");
}
// See if we are cancelling an existing watch
// with the watcher matched on target
let rtk = RecordTableKey { key };
let mut is_empty = false;
let mut ret = None;
if let Some(watch) = self.watched_records.get_mut(&rtk) {
let mut ret = false;
if let Some(watch_list) = self.watched_records.get_mut(&rtk) {
let mut dead_watcher = None;
for (wn, w) in watch.watchers.iter_mut().enumerate() {
for (wn, w) in watch_list.watches.iter_mut().enumerate() {
// Must match the watch id and the watcher key to cancel
if w.id == watch_id && w.watcher == watcher {
if w.id == watch_id && w.params.watcher == watcher {
// Canceling an existing watch
dead_watcher = Some(wn);
ret = Some((w.expiration, watch_id));
ret = true;
break;
}
}
if let Some(dw) = dead_watcher {
watch.watchers.remove(dw);
if watch.watchers.is_empty() {
watch_list.watches.remove(dw);
if watch_list.watches.is_empty() {
is_empty = true;
}
}
@ -953,8 +986,8 @@ where
pub fn move_watches(
&mut self,
key: TypedKey,
in_watch: Option<(WatchedRecord, bool)>,
) -> Option<(WatchedRecord, bool)> {
in_watch: Option<(WatchList, bool)>,
) -> Option<(WatchList, bool)> {
let rtk = RecordTableKey { key };
let out = self.watched_records.remove(&rtk);
if let Some(in_watch) = in_watch {
@ -983,7 +1016,7 @@ where
if let Some(watch) = self.watched_records.get_mut(&rtk) {
// Process watch notifications
let mut dead_watchers = vec![];
for (wn, w) in watch.watchers.iter_mut().enumerate() {
for (wn, w) in watch.watches.iter_mut().enumerate() {
// Get the subkeys that have changed
let subkeys = w.changed.clone();
@ -996,14 +1029,14 @@ where
// Reduce the count of changes sent
// if count goes to zero mark this watcher dead
w.count -= 1;
let count = w.count;
w.params.count -= 1;
let count = w.params.count;
if count == 0 {
dead_watchers.push(wn);
}
evcis.push(EarlyValueChangedInfo {
target: w.target,
target: w.params.target,
key: rtk.key,
subkeys,
count,
@ -1013,8 +1046,8 @@ where
// Remove in reverse so we don't have to offset the index to remove the right key
for dw in dead_watchers.iter().rev().copied() {
watch.watchers.remove(dw);
if watch.watchers.is_empty() {
watch.watches.remove(dw);
if watch.watches.is_empty() {
empty_watched_records.push(rtk);
}
}

View File

@ -0,0 +1,66 @@
use super::*;
/// Watch parameters used to configure a watch
#[derive(Debug, Clone)]
pub struct WatchParameters {
/// The range of subkeys being watched, empty meaning full
pub subkeys: ValueSubkeyRangeSet,
/// When this watch will expire
pub expiration: Timestamp,
/// How many updates are left before forced expiration
pub count: u32,
/// The watching schema member key, or an anonymous key
pub watcher: PublicKey,
/// The place where updates are sent
pub target: Target,
}
/// Watch result to return with answer
/// Default result is cancelled/expired/inactive/rejected
#[derive(Debug, Clone)]
pub enum WatchResult {
/// A new watch was created
Created {
/// The new id of the watch
id: u64,
/// The expiration timestamp of the watch. This should never be zero.
expiration: Timestamp,
},
/// An existing watch was modified
Changed {
/// The new expiration timestamp of the modified watch. This should never be zero.
expiration: Timestamp,
},
/// An existing watch was cancelled
Cancelled,
/// The request was rejected due to invalid parameters or a missing watch
Rejected,
}
/// An individual watch
#[derive(Debug, Clone)]
pub struct Watch {
/// The configuration of the watch
pub params: WatchParameters,
/// A unique id per record assigned at watch creation time. Used to disambiguate a client's version of a watch
pub id: u64,
/// What has changed since the last update
pub changed: ValueSubkeyRangeSet,
}
#[derive(Debug, Default, Clone)]
/// A record being watched for changes
pub struct WatchList {
/// The list of active watches
pub watches: Vec<Watch>,
}
/// How a watch gets updated when a value changes
pub enum WatchUpdateMode {
/// Update no watchers
NoUpdate,
/// Update all watchers
UpdateAll,
/// Update all watchers except ones that come from a specific target
ExcludeTarget(Target),
}

View File

@ -537,26 +537,6 @@ impl StorageManagerInner {
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_watch_local_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
watch_id: Option<u64>,
target: Target,
watcher: PublicKey,
) -> VeilidAPIResult<Option<(Timestamp, u64)>> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
local_record_store
.watch_record(key, subkeys, expiration, count, watch_id, target, watcher)
.await
}
pub(super) async fn handle_get_remote_value(
&mut self,
key: TypedKey,
@ -614,26 +594,6 @@ impl StorageManagerInner {
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_watch_remote_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
watch_id: Option<u64>,
target: Target,
watcher: PublicKey,
) -> VeilidAPIResult<Option<(Timestamp, u64)>> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
remote_record_store
.watch_record(key, subkeys, expiration, count, watch_id, target, watcher)
.await
}
/// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey
where

View File

@ -1,17 +1,7 @@
mod local_record_detail;
mod opened_record;
mod record;
mod record_data;
mod remote_record_detail;
mod signed_value_data;
mod signed_value_descriptor;
use super::*;
pub(super) use local_record_detail::*;
pub(super) use opened_record::*;
pub(super) use record::*;
pub(super) use record_data::*;
pub(super) use remote_record_detail::*;
pub use signed_value_data::*;
pub use signed_value_descriptor::*;

View File

@ -20,7 +20,7 @@ pub(super) struct OutboundWatchValueResult {
}
impl StorageManager {
/// Perform a 'watch value' query on the network
/// Perform a 'watch value' query on the network using fanout
#[allow(clippy::too_many_arguments)]
pub(super) async fn outbound_watch_value(
&self,
@ -91,12 +91,12 @@ impl StorageManager {
// Keep answer if we got one
if wva.answer.accepted {
if expiration_ts.as_u64() > 0 {
if wva.answer.expiration_ts.as_u64() > 0 {
// If the expiration time is greater than zero this watch is active
log_stor!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
} else {
// If we asked for a zero notification count, then this is a cancelled watch
log_stor!(debug "Watch cancelled: id={}", wva.answer.watch_id);
// If the returned expiration time is zero, this watch was cancelled, or inactive
log_stor!(debug "Watch inactive: id={}", wva.answer.watch_id);
}
let mut ctx = context.lock();
ctx.opt_watch_value_result = Some(OutboundWatchValueResult {
@ -186,51 +186,40 @@ impl StorageManager {
pub async fn inbound_watch_value(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
params: WatchParameters,
watch_id: Option<u64>,
target: Target,
watcher: PublicKey,
) -> VeilidAPIResult<NetworkResult<(Timestamp, u64)>> {
) -> VeilidAPIResult<NetworkResult<WatchResult>> {
let mut inner = self.lock().await?;
// Validate input
if (subkeys.is_empty() || count == 0) && (watch_id.unwrap_or_default() == 0) {
if params.count == 0 && (watch_id.unwrap_or_default() == 0) {
// Can't cancel a watch without a watch id
return VeilidAPIResult::Ok(NetworkResult::invalid_message(
"can't cancel watch without id",
));
}
// See if this is a remote or local value
let (_is_local, opt_ret) = {
// See if the subkey we are watching has a local value
let opt_ret = inner
.handle_watch_local_value(
key,
subkeys.clone(),
expiration,
count,
watch_id,
target,
watcher,
)
.await?;
if opt_ret.is_some() {
(true, opt_ret)
} else {
// See if the subkey we are watching is a remote value
let opt_ret = inner
.handle_watch_remote_value(
key, subkeys, expiration, count, watch_id, target, watcher,
)
.await?;
(false, opt_ret)
}
// Try from local and remote record stores
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
Ok(NetworkResult::value(opt_ret.unwrap_or_default()))
if local_record_store.contains_record(key) {
return local_record_store
.watch_record(key, params, watch_id)
.await
.map(NetworkResult::value);
}
let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
if remote_record_store.contains_record(key) {
return remote_record_store
.watch_record(key, params, watch_id)
.await
.map(NetworkResult::value);
}
// No record found
Ok(NetworkResult::value(WatchResult::Rejected))
}
/// Handle a received 'Value Changed' statement