[ci skip] stash

This commit is contained in:
Christien Rioux 2025-03-30 15:42:00 -04:00
parent 965a6e2af7
commit 5d31192134
25 changed files with 485 additions and 612 deletions

View File

@ -173,7 +173,7 @@ impl ClientApiConnection {
let mut inner = this.inner.lock();
inner.request_sender = None;
};
unord.push(system_boxed(recv_messages_future));
unord.push(pin_dyn_future!(recv_messages_future));
// Requests send processor
let send_requests_future = async move {
@ -183,7 +183,7 @@ impl ClientApiConnection {
}
}
};
unord.push(system_boxed(send_requests_future));
unord.push(pin_dyn_future!(send_requests_future));
// Request initial server state
let capi = self.clone();

View File

@ -353,7 +353,7 @@ struct OperationSetValueA @0x9378d0732dc95be2 {
struct OperationWatchValueQ @0xf9a5a6c547b9b228 {
key @0 :TypedKey; # key for value to watch
subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges), if empty this implies 0..=UINT32_MAX
subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges). An empty range here should not be specified unless cancelling a watch (count=0).
expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max)
count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous)
watchId @4 :UInt64; # if 0, request a new watch. if >0, existing watch id

File diff suppressed because it is too large Load Diff

View File

@ -691,10 +691,7 @@ impl ConnectionManager {
fn spawn_reconnector(&self, dial_info: DialInfo) {
let this = self.clone();
self.arc.reconnection_processor.add(
Box::pin(futures_util::stream::once(async { dial_info })),
move |dial_info| {
let this = this.clone();
self.arc.reconnection_processor.add_future(
Box::pin(async move {
match this.get_or_create_connection(dial_info.clone()).await {
Ok(NetworkResult::Value(conn)) => {
@ -706,15 +703,11 @@ impl ConnectionManager {
Err(e) => {
veilid_log!(this debug "Reconnection error to {}: {}", dial_info, e);
}
}
false
})
},
);
};
}));
}
pub fn debug_print(&self) -> String {
//let inner = self.arc.inner.lock();
format!(
"Connection Table:\n\n{}",
self.arc.connection_table.debug_print_table()

View File

@ -352,7 +352,7 @@ impl NetworkConnection {
};
let timer = MutableFuture::new(new_timer());
unord.push(system_boxed(timer.clone().in_current_span()));
unord.push(pin_dyn_future!(timer.clone().in_current_span()));
loop {
// Add another message sender future if necessary
@ -386,7 +386,7 @@ impl NetworkConnection {
}
}
}.in_current_span());
unord.push(system_boxed(sender_fut.in_current_span()));
unord.push(pin_dyn_future!(sender_fut.in_current_span()));
}
// Add another message receiver future if necessary
@ -445,7 +445,7 @@ impl NetworkConnection {
}
}.in_current_span());
unord.push(system_boxed(receiver_fut.in_current_span()));
unord.push(pin_dyn_future!(receiver_fut.in_current_span()));
}
// Process futures

View File

@ -269,7 +269,7 @@ impl RPCProcessor {
} else {
// Accepted, lets try to watch or cancel it
let params = WatchParameters {
let params = InboundWatchParameters {
subkeys: subkeys.clone(),
expiration: Timestamp::new(expiration),
count,
@ -287,12 +287,12 @@ impl RPCProcessor {
// Encode the watch result
// Rejections and cancellations are treated the same way by clients
let (ret_expiration, ret_watch_id) = match watch_result {
WatchResult::Created { id, expiration } => (expiration.as_u64(), id),
WatchResult::Changed { expiration } => {
InboundWatchResult::Created { id, expiration } => (expiration.as_u64(), id),
InboundWatchResult::Changed { expiration } => {
(expiration.as_u64(), watch_id.unwrap_or_default())
}
WatchResult::Cancelled => (0, watch_id.unwrap_or_default()),
WatchResult::Rejected => (0, watch_id.unwrap_or_default()),
InboundWatchResult::Cancelled => (0, watch_id.unwrap_or_default()),
InboundWatchResult::Rejected => (0, watch_id.unwrap_or_default()),
};
(true, ret_expiration, ret_watch_id)
};

View File

@ -24,7 +24,7 @@ impl StorageManager {
} else {
"".to_owned()
};
let watch = if let Some(w) = v.active_watch() {
let watch = if let Some(w) = v.outbound_watch() {
format!(" watch: {:?}\n", w)
} else {
"".to_owned()

View File

@ -395,7 +395,7 @@ impl StorageManager {
key,
subkey,
get_result_value.clone(),
WatchUpdateMode::UpdateAll,
InboundWatchUpdateMode::UpdateAll,
)
.await?;
}

View File

@ -1,6 +1,7 @@
mod debug;
mod get_value;
mod inspect_value;
mod outbound_watch;
mod record_store;
mod set_value;
mod tasks;
@ -8,11 +9,12 @@ mod types;
mod watch_value;
use super::*;
use outbound_watch::*;
use record_store::*;
use routing_table::*;
use rpc_processor::*;
pub use record_store::{WatchParameters, WatchResult};
pub use record_store::{InboundWatchParameters, InboundWatchResult};
pub use types::*;
@ -28,8 +30,8 @@ const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1;
const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 5;
/// Frequency to send ValueChanged notifications to the network
const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1;
/// Frequency to check for dead nodes and routes for client-side active watches
const CHECK_ACTIVE_WATCHES_INTERVAL_SECS: u32 = 1;
/// Frequency to check for dead nodes and routes for client-side outbound watches
const CHECK_OUTBOUND_WATCHES_INTERVAL_SECS: u32 = 1;
/// Frequency to check for expired server-side watched records
const CHECK_WATCHED_RECORDS_INTERVAL_SECS: u32 = 1;
/// Table store table for storage manager metadata
@ -61,6 +63,8 @@ struct StorageManagerInner {
pub offline_subkey_writes: HashMap<TypedKey, tasks::offline_subkey_writes::OfflineSubkeyWrite>,
/// Record subkeys that are currently being written to in the foreground
pub active_subkey_writes: HashMap<TypedKey, ValueSubkeyRangeSet>,
/// State management for outbound watches
pub outbound_watches: HashMap<TypedKey, OutboundWatch>,
/// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
@ -90,14 +94,20 @@ pub(crate) struct StorageManager {
flush_record_stores_task: TickTask<EyreReport>,
offline_subkey_writes_task: TickTask<EyreReport>,
send_value_changes_task: TickTask<EyreReport>,
check_active_watches_task: TickTask<EyreReport>,
check_watched_records_task: TickTask<EyreReport>,
check_outbound_watches_task: TickTask<EyreReport>,
check_inbound_watches_task: TickTask<EyreReport>,
// Anonymous watch keys
anonymous_watch_keys: TypedKeyPairGroup,
/// Deferred result processor
deferred_result_processor: DeferredStreamProcessor,
// Outbound watch operation lock
// Keeps changes to watches to one-at-a-time per record
outbound_watch_lock_table: AsyncTagLockTable<TypedKey>,
// Background operation processor
// for offline subkey writes, watch changes, and any other
// background operations the storage manager wants to perform
background_operation_processor: DeferredStreamProcessor,
}
impl fmt::Debug for StorageManager {
@ -116,7 +126,11 @@ impl fmt::Debug for StorageManager {
// "check_watched_records_task",
// &self.check_watched_records_task,
// )
.field("deferred_result_processor", &self.deferred_result_processor)
.field("outbound_watch_lock_table", &self.outbound_watch_lock_table)
.field(
"background_operation_processor",
&self.background_operation_processor,
)
.field("anonymous_watch_keys", &self.anonymous_watch_keys)
.finish()
}
@ -157,17 +171,17 @@ impl StorageManager {
"send_value_changes_task",
SEND_VALUE_CHANGES_INTERVAL_SECS,
),
check_active_watches_task: TickTask::new(
check_outbound_watches_task: TickTask::new(
"check_active_watches_task",
CHECK_ACTIVE_WATCHES_INTERVAL_SECS,
CHECK_OUTBOUND_WATCHES_INTERVAL_SECS,
),
check_watched_records_task: TickTask::new(
check_inbound_watches_task: TickTask::new(
"check_watched_records_task",
CHECK_WATCHED_RECORDS_INTERVAL_SECS,
),
outbound_watch_lock_table: AsyncTagLockTable::new(),
anonymous_watch_keys,
deferred_result_processor: DeferredStreamProcessor::new(),
background_operation_processor: DeferredStreamProcessor::new(),
};
this.setup_tasks();
@ -240,7 +254,7 @@ impl StorageManager {
}
// Start deferred results processors
self.deferred_result_processor.init();
self.background_operation_processor.init();
Ok(())
}
@ -286,7 +300,7 @@ impl StorageManager {
veilid_log!(self debug "starting storage manager shutdown");
// Stop deferred result processor
self.deferred_result_processor.terminate().await;
self.background_operation_processor.terminate().await;
// Terminate and release the storage manager
{
@ -366,7 +380,7 @@ impl StorageManager {
let inner = self.inner.lock().await;
let mut out = vec![];
for opened_record in inner.opened_records.values() {
if let Some(aw) = opened_record.active_watch() {
if let Some(aw) = opened_record.outbound_watch() {
for pn in &aw.per_node {
out.push(
Destination::direct(
@ -525,7 +539,7 @@ impl StorageManager {
};
// See if we have an active watch on the closed record
let Some(active_watch) = opened_record.active_watch() else {
let Some(active_watch) = opened_record.outbound_watch() else {
return Ok(());
};
@ -737,7 +751,7 @@ impl StorageManager {
key,
subkey,
signed_value_data.clone(),
WatchUpdateMode::NoUpdate,
InboundWatchUpdateMode::NoUpdate,
)
.await?;
@ -867,7 +881,7 @@ impl StorageManager {
(
opened_record.safety_selection(),
opened_record.writer().cloned(),
opened_record.active_watch().cloned(),
opened_record.outbound_watch().cloned(),
)
};
@ -921,7 +935,7 @@ impl StorageManager {
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
apibail_generic!("record not open");
};
opened_record.clear_active_watch();
opened_record.clear_outbound_watch();
// Get the minimum expiration timestamp we will accept
let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| {
@ -965,7 +979,7 @@ impl StorageManager {
}
// Keep a record of the watch
opened_record.set_active_watch(ActiveWatch {
opened_record.set_outbound_watch(OutboundWatch {
id: owvresult.watch_id,
expiration_ts,
watch_node: owvresult.watch_node,
@ -983,14 +997,14 @@ impl StorageManager {
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
) -> VeilidAPIResult<bool> {
let (subkeys, active_watch) = {
let (subkeys, count, expiration_ts) = {
let inner = self.inner.lock().await;
let Some(opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
};
// See what watch we have currently if any
let Some(active_watch) = opened_record.active_watch() else {
let Some(active_watch) = opened_record.outbound_watch() else {
// If we didn't have an active watch, then we can just return false because there's nothing to do here
return Ok(false);
};
@ -1003,22 +1017,27 @@ impl StorageManager {
};
// Reduce the subkey range
let new_subkeys = active_watch.subkeys.difference(&subkeys);
let new_subkeys = active_watch.params.subkeys.difference(&subkeys);
(new_subkeys, active_watch)
};
// If no change is happening return false
if new_subkeys == active_watch.params.subkeys {
return Ok(false);
}
// If we have no subkeys left, then set the count to zero to indicate a full cancellation
let count = if subkeys.is_empty() {
0
} else {
active_watch.count
// If we have no subkeys left, then set the count to zero to indicate a full cancellation
let count = if new_subkeys.is_empty() {
0
} else {
active_watch.params.count
};
(new_subkeys, count, active_watch.params.expiration_ts)
};
// Update the watch. This just calls through to the above watch_values() function
// This will update the active_watch so we don't need to do that in this routine.
let expiration_ts =
pin_future!(self.watch_values(key, subkeys, active_watch.expiration_ts, count)).await?;
pin_future!(self.watch_values(key, subkeys, expiration_ts, count)).await?;
// A zero expiration time returned from watch_value() means the watch is done
// or no subkeys are left, and the watch is no longer active
@ -1375,7 +1394,7 @@ impl StorageManager {
continue;
};
local_record_store
.set_subkey(key, subkey, subkey_data, WatchUpdateMode::NoUpdate)
.set_subkey(key, subkey, subkey_data, InboundWatchUpdateMode::NoUpdate)
.await?;
}
@ -1509,7 +1528,12 @@ impl StorageManager {
if let Some(signed_value_data) = get_result.opt_value {
// Write subkey to local store
local_record_store
.set_subkey(key, subkey, signed_value_data, WatchUpdateMode::NoUpdate)
.set_subkey(
key,
subkey,
signed_value_data,
InboundWatchUpdateMode::NoUpdate,
)
.await?;
}
@ -1653,7 +1677,7 @@ impl StorageManager {
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
watch_update_mode: WatchUpdateMode,
watch_update_mode: InboundWatchUpdateMode,
) -> VeilidAPIResult<()> {
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
@ -1724,7 +1748,7 @@ impl StorageManager {
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
signed_value_descriptor: Arc<SignedValueDescriptor>,
watch_update_mode: WatchUpdateMode,
watch_update_mode: InboundWatchUpdateMode,
) -> VeilidAPIResult<()> {
// See if it's in the remote record store
let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
@ -1816,7 +1840,7 @@ impl StorageManager {
receiver: flume::Receiver<T>,
handler: impl FnMut(T) -> PinBoxFutureStatic<bool> + Send + 'static,
) -> bool {
self.deferred_result_processor
.add(receiver.into_stream(), handler)
self.background_operation_processor
.add_stream(receiver.into_stream(), handler)
}
}

View File

@ -1,3 +1,4 @@
mod inbound_watch;
/// RecordStore
/// Keeps an LRU cache of dht keys and their associated subkey valuedata.
/// Instances of this store are used for 'local' (persistent) and 'remote' (ephemeral) dht key storage.
@ -13,8 +14,9 @@ mod record;
mod record_data;
mod record_store_limits;
mod remote_record_detail;
mod watch;
pub(super) use inbound_watch::*;
pub use inbound_watch::{InboundWatchParameters, InboundWatchResult};
pub(super) use inspect_cache::*;
pub(super) use keys::*;
pub(super) use limited_size::*;
@ -23,8 +25,6 @@ pub(super) use opened_record::*;
pub(super) use record::*;
pub(super) use record_store_limits::*;
pub(super) use remote_record_detail::*;
pub(super) use watch::*;
pub use watch::{WatchParameters, WatchResult};
use super::*;
use record_data::*;
@ -75,7 +75,7 @@ where
/// The list of records that have changed since last flush to disk (optimization for batched writes)
changed_records: HashSet<RecordTableKey>,
/// The list of records being watched for changes
watched_records: HashMap<RecordTableKey, WatchList>,
watched_records: HashMap<RecordTableKey, InboundWatchList>,
/// The list of watched records that have changed values since last notification
changed_watched_values: HashSet<RecordTableKey>,
/// A mutex to ensure we handle this concurrently
@ -680,12 +680,12 @@ where
&mut self,
key: TypedKey,
subkey: ValueSubkey,
watch_update_mode: WatchUpdateMode,
watch_update_mode: InboundWatchUpdateMode,
) {
let (do_update, opt_ignore_target) = match watch_update_mode {
WatchUpdateMode::NoUpdate => (false, None),
WatchUpdateMode::UpdateAll => (true, None),
WatchUpdateMode::ExcludeTarget(target) => (true, Some(target)),
InboundWatchUpdateMode::NoUpdate => (false, None),
InboundWatchUpdateMode::UpdateAll => (true, None),
InboundWatchUpdateMode::ExcludeTarget(target) => (true, Some(target)),
};
if !do_update {
return;
@ -720,7 +720,7 @@ where
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
watch_update_mode: WatchUpdateMode,
watch_update_mode: InboundWatchUpdateMode,
) -> VeilidAPIResult<()> {
// Check size limit for data
if signed_value_data.value_data().data().len() > self.limits.max_subkey_size {
@ -902,9 +902,9 @@ where
pub async fn _change_existing_watch(
&mut self,
key: TypedKey,
params: WatchParameters,
params: InboundWatchParameters,
watch_id: u64,
) -> VeilidAPIResult<WatchResult> {
) -> VeilidAPIResult<InboundWatchResult> {
if params.count == 0 {
apibail_internal!("cancel watch should not have gotten here");
}
@ -915,7 +915,7 @@ where
let rtk = RecordTableKey { key };
let Some(watch_list) = self.watched_records.get_mut(&rtk) else {
// No watches, nothing to change
return Ok(WatchResult::Rejected);
return Ok(InboundWatchResult::Rejected);
};
// Check each watch to see if we have an exact match for the id to change
@ -925,23 +925,23 @@ where
if w.id == watch_id && w.params.watcher == params.watcher {
// Updating an existing watch
w.params = params;
return Ok(WatchResult::Changed {
return Ok(InboundWatchResult::Changed {
expiration: w.params.expiration,
});
}
}
// No existing watch found
Ok(WatchResult::Rejected)
Ok(InboundWatchResult::Rejected)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn _create_new_watch(
&mut self,
key: TypedKey,
params: WatchParameters,
params: InboundWatchParameters,
member_check: Box<dyn Fn(PublicKey) -> bool + Send>,
) -> VeilidAPIResult<WatchResult> {
) -> VeilidAPIResult<InboundWatchResult> {
// Generate a record-unique watch id > 0
let rtk = RecordTableKey { key };
let mut id = 0;
@ -1001,7 +1001,7 @@ where
// For anonymous, no more than one watch per target per record
if target_watch_count > 0 {
// Too many watches
return Ok(WatchResult::Rejected);
return Ok(InboundWatchResult::Rejected);
}
// Check watch table for limits
@ -1011,18 +1011,18 @@ where
self.limits.public_watch_limit
};
if watch_count >= watch_limit {
return Ok(WatchResult::Rejected);
return Ok(InboundWatchResult::Rejected);
}
// Ok this is an acceptable new watch, add it
let watch_list = self.watched_records.entry(rtk).or_default();
let expiration = params.expiration;
watch_list.watches.push(Watch {
watch_list.watches.push(InboundWatch {
params,
id,
changed: ValueSubkeyRangeSet::new(),
});
Ok(WatchResult::Created { id, expiration })
Ok(InboundWatchResult::Created { id, expiration })
}
/// Add or update an inbound record watch for changes
@ -1030,17 +1030,17 @@ where
pub async fn watch_record(
&mut self,
key: TypedKey,
mut params: WatchParameters,
mut params: InboundWatchParameters,
opt_watch_id: Option<u64>,
) -> VeilidAPIResult<WatchResult> {
) -> VeilidAPIResult<InboundWatchResult> {
// If count is zero then we're cancelling a watch completely
if params.count == 0 {
if let Some(watch_id) = opt_watch_id {
let cancelled = self.cancel_watch(key, watch_id, params.watcher).await?;
if cancelled {
return Ok(WatchResult::Cancelled);
return Ok(InboundWatchResult::Cancelled);
}
return Ok(WatchResult::Rejected);
return Ok(InboundWatchResult::Rejected);
}
apibail_internal!("shouldn't have let a None watch id get here");
}
@ -1058,10 +1058,10 @@ where
if let Some(watch_id) = opt_watch_id {
let cancelled = self.cancel_watch(key, watch_id, params.watcher).await?;
if cancelled {
return Ok(WatchResult::Cancelled);
return Ok(InboundWatchResult::Cancelled);
}
}
return Ok(WatchResult::Rejected);
return Ok(InboundWatchResult::Rejected);
}
// Make a closure to check for member vs anonymous
@ -1071,7 +1071,7 @@ where
Box::new(move |watcher| owner == watcher || schema.is_member(&watcher))
}) else {
// Record not found
return Ok(WatchResult::Rejected);
return Ok(InboundWatchResult::Rejected);
};
// Create or update depending on if a watch id is specified or not
@ -1128,8 +1128,8 @@ where
pub fn move_watches(
&mut self,
key: TypedKey,
in_watch: Option<(WatchList, bool)>,
) -> Option<(WatchList, bool)> {
in_watch: Option<(InboundWatchList, bool)>,
) -> Option<(InboundWatchList, bool)> {
let rtk = RecordTableKey { key };
let out = self.watched_records.remove(&rtk);
if let Some(in_watch) = in_watch {

View File

@ -1,56 +1,5 @@
use super::*;
#[derive(Clone, Debug)]
pub(in crate::storage_manager) struct PerNodeActiveWatch {
/// The watch id returned from the watch node
pub id: u64,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which node accepted the watch
pub watch_node: NodeRef,
/// How many value change notifications are left
pub count: u32,
}
/// Requested parameters for watch
#[derive(Clone, Debug)]
pub(in crate::storage_manager) struct ActiveWatchParameters {
/// Requested expiration timestamp
pub expiration_ts: Timestamp,
/// How many notifications the requestor asked for
pub count: u32,
/// Subkeys requested for this watch
pub subkeys: ValueSubkeyRangeSet,
}
#[derive(Clone, Debug)]
pub(in crate::storage_manager) struct ActiveWatch {
/// Requested parameters
pub params: ActiveWatchParameters,
/// Active watches per node
pub per_node: Vec<PerNodeActiveWatch>,
/// Minimum expiration time for all our nodes
pub min_expiration_ts: Timestamp,
/// How many value change updates remain
pub remaining_count: u32,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
}
impl ActiveWatch {
pub fn new(params: ActiveWatchParameters, opt_value_changed_route: Option<CryptoKey>) -> Self {
let remaining_count = params.count;
Self {
params,
per_node,
min_expiration_ts,
remaining_count,
opt_value_changed_route,
}
}
}
/// The state associated with a local record when it is opened
/// This is not serialized to storage as it is ephemeral for the lifetime of the opened record
#[derive(Clone, Debug, Default)]
@ -62,9 +11,6 @@ pub(in crate::storage_manager) struct OpenedRecord {
/// The safety selection in current use
safety_selection: SafetySelection,
/// Active watches we have on this record
active_watch: Option<ActiveWatch>,
}
impl OpenedRecord {
@ -72,7 +18,6 @@ impl OpenedRecord {
Self {
writer,
safety_selection,
active_watch: None,
}
}
@ -89,85 +34,4 @@ impl OpenedRecord {
pub fn set_safety_selection(&mut self, safety_selection: SafetySelection) {
self.safety_selection = safety_selection;
}
fn calculate_min_expiration_ts(per_node: &[PerNodeActiveWatch]) -> Option<Timestamp> {
per_node
.iter()
.map(|x| x.expiration_ts)
.reduce(|a, b| a.min(b))
}
pub fn new_active_watch(
&mut self,
params: ActiveWatchParameters,
opt_value_changed_route: Option<CryptoKey>,
per_node: Vec<PerNodeActiveWatch>,
) {
assert!(
self.active_watch.is_none(),
"should have cleared active watch first"
);
assert!(!per_node.is_empty(), "must have at least one watch node");
let min_expiration_ts = Self::calculate_min_expiration_ts(&per_node).unwrap();
let remaining_count = params.count;
self.active_watch = Some(ActiveWatch {
params,
per_node,
min_expiration_ts,
remaining_count,
opt_value_changed_route,
});
}
pub fn set_active_watch(&mut self, active_watch: ActiveWatch) {
assert!(
self.active_watch.is_none(),
"should have cleared watch first before setting a new one"
);
self.active_watch = Some(active_watch);
}
pub fn clear_active_watch(&mut self) {
self.active_watch = None;
}
pub fn active_watch(&self) -> Option<&ActiveWatch> {
self.active_watch.as_ref()
}
pub fn active_watch_mut(&mut self) -> Option<&mut ActiveWatch> {
self.active_watch.as_mut()
}
pub fn per_node_active_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeActiveWatch> {
self.active_watch
.as_ref()
.map(|x| x.per_node.iter().find(|x| x.id == watch_id))
.flatten()
}
pub fn per_node_active_watch_by_id_mut(
&mut self,
watch_id: u64,
) -> Option<&mut PerNodeActiveWatch> {
self.active_watch
.as_mut()
.map(|x| x.per_node.iter_mut().find(|x| x.id == watch_id))
.flatten()
}
pub fn remove_per_node_active_watch_by_id(&mut self, watch_id: u64) {
let Some(active_watch) = self.active_watch.as_mut() else {
return;
};
let Some(n) = active_watch.per_node.iter().position(|x| x.id == watch_id) else {
return;
};
active_watch.per_node.remove(n);
active_watch.min_expiration_ts =
Self::calculate_min_expiration_ts(&active_watch.per_node).unwrap();
}
}

View File

@ -1,66 +0,0 @@
use super::*;
/// Watch parameters used to configure a watch
#[derive(Debug, Clone)]
pub struct WatchParameters {
/// The range of subkeys being watched, empty meaning full
pub subkeys: ValueSubkeyRangeSet,
/// When this watch will expire
pub expiration: Timestamp,
/// How many updates are left before forced expiration
pub count: u32,
/// The watching schema member key, or an anonymous key
pub watcher: PublicKey,
/// The place where updates are sent
pub target: Target,
}
/// Watch result to return with answer
/// Default result is cancelled/expired/inactive/rejected
#[derive(Debug, Clone)]
pub enum WatchResult {
/// A new watch was created
Created {
/// The new id of the watch
id: u64,
/// The expiration timestamp of the watch. This should never be zero.
expiration: Timestamp,
},
/// An existing watch was modified
Changed {
/// The new expiration timestamp of the modified watch. This should never be zero.
expiration: Timestamp,
},
/// An existing watch was cancelled
Cancelled,
/// The request was rejected due to invalid parameters or a missing watch
Rejected,
}
/// An individual watch
#[derive(Debug, Clone)]
pub struct Watch {
/// The configuration of the watch
pub params: WatchParameters,
/// A unique id per record assigned at watch creation time. Used to disambiguate a client's version of a watch
pub id: u64,
/// What has changed since the last update
pub changed: ValueSubkeyRangeSet,
}
#[derive(Debug, Default, Clone)]
/// A record being watched for changes
pub struct WatchList {
/// The list of active watches
pub watches: Vec<Watch>,
}
/// How a watch gets updated when a value changes
pub enum WatchUpdateMode {
/// Update no watchers
NoUpdate,
/// Update all watchers
UpdateAll,
/// Update all watchers except ones that come from a specific target
ExcludeTarget(Target),
}

View File

@ -385,7 +385,7 @@ impl StorageManager {
key,
subkey,
result.signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
InboundWatchUpdateMode::UpdateAll,
)
.await?;
@ -486,7 +486,7 @@ impl StorageManager {
key,
subkey,
value,
WatchUpdateMode::ExcludeTarget(target),
InboundWatchUpdateMode::ExcludeTarget(target),
)
.await
} else {
@ -496,7 +496,7 @@ impl StorageManager {
subkey,
value,
actual_descriptor,
WatchUpdateMode::ExcludeTarget(target),
InboundWatchUpdateMode::ExcludeTarget(target),
)
.await
};

View File

@ -1,64 +0,0 @@
use super::*;
impl StorageManager {
// Check if client-side watches on opened records either have dead nodes or if the watch has expired
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_active_watches_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
{
let mut inner = self.inner.lock().await;
let routing_table = self.routing_table();
//let update_callback = self.update_callback();
let cur_ts = Timestamp::now();
for (k, v) in inner.opened_records.iter_mut() {
for active_watch in v.active_watches() {
// See if the active watch's node is dead
let mut is_dead = false;
if !active_watch.watch_node.state(cur_ts).is_alive() {
// Watched node is dead
is_dead = true;
}
// See if the private route we're using is dead
if !is_dead {
if let Some(value_changed_route) = active_watch.opt_value_changed_route {
if routing_table
.route_spec_store()
.get_route_id_for_key(&value_changed_route)
.is_none()
{
// Route we would receive value changes on is dead
is_dead = true;
}
}
}
// See if the watch is expired
if !is_dead && active_watch.expiration_ts <= cur_ts {
// Watch has expired
is_dead = true;
}
if is_dead {
v.remove_active_watch(active_watch.id);
// // Send valuechange with dead count and no subkeys
// update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
// key: *k,
// subkeys: ValueSubkeyRangeSet::new(),
// count: 0,
// value: None,
// })));
}
}
}
}
Ok(())
}
}

View File

@ -1,23 +0,0 @@
use super::*;
impl StorageManager {
// Check if server-side watches have expired
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_watched_records_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let mut inner = self.inner.lock().await;
if let Some(local_record_store) = &mut inner.local_record_store {
local_record_store.check_watched_records();
}
if let Some(remote_record_store) = &mut inner.remote_record_store {
remote_record_store.check_watched_records();
}
Ok(())
}
}

View File

@ -1,5 +1,5 @@
pub mod check_active_watches;
pub mod check_watched_records;
pub mod check_inbound_watches;
pub mod check_outbound_watches;
pub mod flush_record_stores;
pub mod offline_subkey_writes;
pub mod send_value_changes;
@ -40,8 +40,8 @@ impl StorageManager {
impl_setup_task!(
self,
Self,
check_active_watches_task,
check_active_watches_task_routine
check_outbound_watches_task,
check_outbound_watches_task_routine
);
// Set check watched records tick task
@ -49,8 +49,8 @@ impl StorageManager {
impl_setup_task!(
self,
Self,
check_watched_records_task,
check_watched_records_task_routine
check_inbound_watches_task,
check_inbound_watches_task_routine
);
}
@ -60,10 +60,10 @@ impl StorageManager {
self.flush_record_stores_task.tick().await?;
// Check active watches
self.check_active_watches_task.tick().await?;
self.check_outbound_watches_task.tick().await?;
// Check watched records
self.check_watched_records_task.tick().await?;
self.check_inbound_watches_task.tick().await?;
// Run online-only tasks
if self.dht_is_online() {
@ -81,11 +81,11 @@ impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) async fn cancel_tasks(&self) {
veilid_log!(self debug "stopping check watched records task");
if let Err(e) = self.check_watched_records_task.stop().await {
if let Err(e) = self.check_inbound_watches_task.stop().await {
veilid_log!(self warn "check_watched_records_task not stopped: {}", e);
}
veilid_log!(self debug "stopping check active watches task");
if let Err(e) = self.check_active_watches_task.stop().await {
if let Err(e) = self.check_outbound_watches_task.stop().await {
veilid_log!(self warn "check_active_watches_task not stopped: {}", e);
}
veilid_log!(self debug "stopping send value changes task");

View File

@ -90,7 +90,7 @@ impl StorageManager {
key,
subkey,
result.signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
InboundWatchUpdateMode::UpdateAll,
)
.await?;
}

View File

@ -1,4 +1,5 @@
use super::*;
use futures_util::StreamExt as _;
impl_veilid_log_facility!("stor");
@ -28,17 +29,66 @@ pub(super) struct OutboundWatchValueResult {
}
impl StorageManager {
/// Perform a 'watch value cancel' on a set of nodes without fanout
/// Returns the list of successfully cancelled ids and just logs failures
pub(super) async fn outbound_watch_value_cancel_set(
&self,
key: TypedKey,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
outbound_watch: &OutboundWatch,
) -> Vec<u64> {
let mut unord = FuturesUnordered::new();
for pn in &outbound_watch.per_node {
unord.push(async {
let cancelled = match self.outbound_watch_value_cancel(
key,
safety_selection,
opt_watcher,
pn.watch_node.clone(),
pn.id,
).await {
Ok(_) => {
// Either watch was cancelled, or it didn't exist, but it's not there now
true
}
Err(e) => {
veilid_log!(self debug "Outbound watch value (id={}) cancel to {} failed: {}", pn.id, pn.watch_node, e);
false
}
};
if cancelled {
Some(pn.id)
} else {
None
}
});
}
let mut cancelled = vec![];
while let Some(x) = unord.next().await {
match x {
Some(id) => {
cancelled.push(id);
}
None => {}
}
}
cancelled
}
/// Perform a 'watch value cancel' on the network without fanout
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_watch_value_cancel(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
active_watch: &ActiveWatch,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
watch_node: NodeRef,
watch_id: u64,
) -> VeilidAPIResult<bool> {
let routing_domain = RoutingDomain::PublicInternet;
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
@ -52,7 +102,7 @@ impl StorageManager {
Destination::direct(watch_node.routing_domain_filtered(routing_domain))
.with_safety(safety_selection),
key,
subkeys,
ValueSubkeyRangeSet::new(),
Timestamp::default(),
0,
watcher,
@ -62,16 +112,11 @@ impl StorageManager {
)?;
if wva.answer.accepted {
veilid_log!(self debug "WatchValue canceled: id={} expiration_ts={} ({})", wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), watch_node);
Ok(Some(OutboundWatchValueResult {
expiration_ts: wva.answer.expiration_ts,
watch_id: wva.answer.watch_id,
watch_node,
opt_value_changed_route: wva.reply_private_route,
}))
veilid_log!(self debug "Outbound watch canceled: id={} ({})", wva.answer.watch_id, watch_node);
Ok(true)
} else {
veilid_log!(self debug "WatchValue not canceled: id={} ({})", watch_id, watch_node);
Ok(None)
veilid_log!(self debug "Outbound watch id did not exist: id={} ({})", watch_id, watch_node);
Ok(false)
}
}
@ -86,7 +131,7 @@ impl StorageManager {
count: u32,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
active_watch: &ActiveWatch,
active_watch: &OutboundWatch,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
let routing_domain = RoutingDomain::PublicInternet;
@ -143,7 +188,7 @@ impl StorageManager {
count: u32,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
opt_active_watch: Option<&ActiveWatch>,
opt_active_watch: Option<&OutboundWatch>,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
// if the count is zero, we are cancelling
if count == 0 {
@ -357,9 +402,9 @@ impl StorageManager {
pub async fn inbound_watch_value(
&self,
key: TypedKey,
params: WatchParameters,
params: InboundWatchParameters,
watch_id: Option<u64>,
) -> VeilidAPIResult<NetworkResult<WatchResult>> {
) -> VeilidAPIResult<NetworkResult<InboundWatchResult>> {
let mut inner = self.inner.lock().await;
// Validate input
@ -390,7 +435,7 @@ impl StorageManager {
.map(NetworkResult::value);
}
// No record found
Ok(NetworkResult::value(WatchResult::Rejected))
Ok(NetworkResult::value(InboundWatchResult::Rejected))
}
/// Handle a received 'Value Changed' statement
@ -445,7 +490,7 @@ impl StorageManager {
active_watch.count
);
active_watch.count = count;
opened_record.set_active_watch(active_watch);
opened_record.set_outbound_watch(active_watch);
}
// Null out default value
@ -492,7 +537,7 @@ impl StorageManager {
key,
first_subkey,
value.clone(),
WatchUpdateMode::NoUpdate,
InboundWatchUpdateMode::NoUpdate,
)
.await?;
}

View File

@ -404,7 +404,7 @@ impl RoutingContext {
///
/// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future,
/// and the returned timestamp will be no later than the requested expiration, but -may- be before the requested expiration.
/// If the returned timestamp is zero it indicates that the watch creation or update has failed. In the case of a faild update, the watch is considered cancelled.
/// If the returned timestamp is zero it indicates that the watch creation or update has failed. In the case of a failed update, the watch is considered cancelled.
///
/// DHT watches are accepted with the following conditions:
/// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record.

View File

@ -395,7 +395,7 @@ impl ClientApi {
// Request receive processor future
// Receives from socket and enqueues RequestLines
// Completes when the connection is closed or there is a failure
unord.push(system_boxed(self.clone().receive_requests(
unord.push(Box::pin(self.clone().receive_requests(
reader,
requests_tx,
responses_tx,
@ -404,12 +404,10 @@ impl ClientApi {
// Response send processor
// Sends finished response strings out the socket
// Completes when the responses channel is closed
unord.push(system_boxed(
self.clone().send_responses(responses_rx, writer),
));
unord.push(Box::pin(self.clone().send_responses(responses_rx, writer)));
// Add future to process first request
unord.push(system_boxed(Self::next_request_line(requests_rx.clone())));
unord.push(Box::pin(Self::next_request_line(requests_rx.clone())));
// Send and receive until we're done or a stop is requested
while let Ok(Some(r)) = unord.next().timeout_at(stop_token.clone()).await {
@ -417,7 +415,7 @@ impl ClientApi {
let request_line = match r {
Ok(Some(request_line)) => {
// Add future to process next request
unord.push(system_boxed(Self::next_request_line(requests_rx.clone())));
unord.push(Box::pin(Self::next_request_line(requests_rx.clone())));
// Socket receive future returned something to process
request_line
@ -434,7 +432,7 @@ impl ClientApi {
};
// Enqueue unordered future to process request line in parallel
unord.push(system_boxed(
unord.push(Box::pin(
self.clone().process_request_line(jrp.clone(), request_line),
));
}

View File

@ -24,6 +24,10 @@ where
guard: Some(guard),
}
}
pub fn tag(&self) -> T {
self.tag.clone()
}
}
impl<T> Drop for AsyncTagLockGuard<T>

View File

@ -113,7 +113,10 @@ impl DeferredStreamProcessor {
/// * 'handler' is the callback to handle each item from the stream
///
/// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized.
pub fn add<T: Send + 'static, S: futures_util::Stream<Item = T> + Unpin + Send + 'static>(
pub fn add_stream<
T: Send + 'static,
S: futures_util::Stream<Item = T> + Unpin + Send + 'static,
>(
&self,
mut receiver: S,
mut handler: impl FnMut(T) -> PinBoxFutureStatic<bool> + Send + 'static,
@ -140,6 +143,24 @@ impl DeferredStreamProcessor {
}
true
}
/// Queue a single future to process in the background
pub fn add_future<F>(&self, fut: F) -> bool
where
F: Future<Output = ()> + Send + 'static,
{
let dsc_tx = {
let inner = self.inner.lock();
let Some(dsc_tx) = inner.opt_deferred_stream_channel.clone() else {
return false;
};
dsc_tx
};
if dsc_tx.send(Box::pin(fut)).is_err() {
return false;
}
true
}
}
impl Default for DeferredStreamProcessor {

View File

@ -119,14 +119,6 @@ macro_rules! asyncrwlock_try_write_arc {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn system_boxed<'a, Out>(
future: impl Future<Output = Out> + Send + 'a,
) -> PinBoxFuture<'a, Out> {
Box::pin(future)
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
cfg_if! {
if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
#[must_use]

View File

@ -589,7 +589,7 @@ impl RouterClient {
let framed_reader = FramedRead::new(reader, BytesCodec);
let framed_writer = FramedWrite::new(writer, BytesCodec);
let framed_writer_fut = system_boxed(async move {
let framed_writer_fut = Box::pin(async move {
if let Err(e) = receiver
.into_stream()
.map(|command| {
@ -603,7 +603,7 @@ impl RouterClient {
error!("{}", e);
}
});
let framed_reader_fut = system_boxed(async move {
let framed_reader_fut = Box::pin(async move {
let fut = framed_reader.try_for_each(|x| async {
let x = x;
let evt = from_bytes::<ServerProcessorEvent>(&x)
@ -631,7 +631,7 @@ impl RouterClient {
.into_stream()
.map(io::Result::<ServerProcessorEvent>::Ok);
let receiver_fut = system_boxed(async move {
let receiver_fut = Box::pin(async move {
let fut =
receiver.try_for_each(|evt| Self::process_event(evt, router_op_waiter.clone()));
if let Err(e) = fut.await {

View File

@ -117,7 +117,7 @@ impl RouterServer {
let stop_token = stop_source.token();
let this = self.clone();
let listener_fut = system_boxed(async move {
let listener_fut = Box::pin(async move {
loop {
// Wait for a new connection
match listener.accept().timeout_at(stop_token.clone()).await {
@ -125,7 +125,7 @@ impl RouterServer {
let conn = conn.compat();
// Register a connection processing inbound receiver
let this2 = this.clone();
let inbound_receiver_fut = system_boxed(async move {
let inbound_receiver_fut = Box::pin(async move {
let (reader, writer) = conn.split();
this2.process_connection(reader, writer).await
@ -178,7 +178,7 @@ impl RouterServer {
let stop_token = stop_source.token();
let this = self.clone();
let listener_fut = system_boxed(async move {
let listener_fut = Box::pin(async move {
loop {
// Wait for a new connection
match listener.accept().timeout_at(stop_token.clone()).await {
@ -188,7 +188,7 @@ impl RouterServer {
let ws = WsStream::new(s);
// Register a connection processing inbound receiver
let this2 = this.clone();
let inbound_receiver_fut = system_boxed(async move {
let inbound_receiver_fut = Box::pin(async move {
let (reader, writer) = ws.split();
this2.process_connection(reader, writer).await
});
@ -233,7 +233,7 @@ impl RouterServer {
let (local_outbound_sender, local_outbound_receiver) = flume::unbounded();
let this = self.clone();
let inbound_receiver_fut = system_boxed(async move {
let inbound_receiver_fut = Box::pin(async move {
local_inbound_receiver
.into_stream()
.for_each(|cmd| async {
@ -316,7 +316,7 @@ impl RouterServer {
let framed_writer = FramedWrite::new(writer, BytesCodec);
let (outbound_sender, outbound_receiver) = flume::unbounded();
let outbound_fut = system_boxed(
let outbound_fut = Box::pin(
outbound_receiver
.into_stream()
.map(|command| {
@ -327,7 +327,7 @@ impl RouterServer {
.forward(framed_writer),
);
let inbound_fut = system_boxed(framed_reader.try_for_each(|x| async {
let inbound_fut = Box::pin(framed_reader.try_for_each(|x| async {
let x = x;
let cmd = from_bytes::<ServerProcessorCommand>(&x).map_err(io::Error::other)?;