[ci skip] stash

This commit is contained in:
Christien Rioux 2025-03-30 15:42:04 -04:00
parent 5d31192134
commit d0a76652f3
4 changed files with 334 additions and 0 deletions

View File

@ -0,0 +1,131 @@
use super::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeOutboundWatch {
/// The watch id returned from the watch node
pub id: u64,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which node accepted the watch
pub watch_node_id: TypedKey,
/// Resolved watch node reference
#[serde(skip)]
pub watch_node_ref: Option<NodeRef>,
/// How many value change notifications are left
pub count: u32,
}
/// Requested parameters for watch
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchParameters {
/// Requested expiration timestamp
pub expiration_ts: Timestamp,
/// How many notifications the requestor asked for
pub count: u32,
/// Subkeys requested for this watch
pub subkeys: ValueSubkeyRangeSet,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchCurrent {
/// Requested parameters
pub params: OutboundWatchParameters,
/// Outbound watches per node
pub per_node: Vec<PerNodeOutboundWatch>,
/// Minimum expiration time for all our nodes
pub min_expiration_ts: Timestamp,
/// How many value change updates remain
pub remaining_count: u32,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatch {
/// Current state
/// None means inactive/cancelled
pub current: Option<OutboundWatchCurrent>,
/// Desired parameters
/// None means cancelled
pub desired: Option<OutboundWatchParameters>,
}
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeKey {
/// Watched record key
pub record_key: TypedKey,
/// Watching node id
pub node_id: TypedKey,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeState {
/// Watch Id
pub watch_id: u64,
/// SafetySpec used to contact the node
pub safety_spec: SafetySpec,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// How many value change notifications are left
pub count: u32,
/// Resolved watch node reference
#[serde(skip)]
pub watch_node_ref: Option<NodeRef>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatchState {
/// Each watch per record key
pub outbound_watches: HashMap<TypedKey, OutboundWatch>,
/// Last known active watch per node+record
pub per_node_state: HashMap<PerNodeKey, PerNodeState>,
}
impl OutboundWatchCurrent {
pub fn new(
params: OutboundWatchParameters,
opt_value_changed_route: Option<CryptoKey>,
) -> Self {
let remaining_count = params.count;
let min_expiration_ts = params.expiration_ts;
Self {
params,
per_node: vec![],
min_expiration_ts,
remaining_count,
opt_value_changed_route,
}
}
pub fn per_node_outbound_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeOutboundWatch> {
self.per_node.iter().find(|x| x.id == watch_id)
}
pub fn per_node_outbound_watch_by_id_mut(
&mut self,
watch_id: u64,
) -> Option<&mut PerNodeOutboundWatch> {
self.per_node.iter_mut().find(|x| x.id == watch_id)
}
pub fn remove_per_node_outbound_watch_by_id(&mut self, watch_id: u64) {
let Some(n) = self.per_node.iter().position(|x| x.id == watch_id) else {
return;
};
self.per_node.remove(n);
self.update_min_expiration_ts();
}
fn update_min_expiration_ts(&mut self) {
self.min_expiration_ts = self
.per_node
.iter()
.map(|x| x.expiration_ts)
.reduce(|a, b| a.min(b))
.unwrap_or(self.params.expiration_ts);
}
}

View File

@ -0,0 +1,66 @@
use super::*;
/// Watch parameters used to configure a watch
#[derive(Debug, Clone)]
pub struct InboundWatchParameters {
/// The range of subkeys being watched, empty meaning full
pub subkeys: ValueSubkeyRangeSet,
/// When this watch will expire
pub expiration: Timestamp,
/// How many updates are left before forced expiration
pub count: u32,
/// The watching schema member key, or an anonymous key
pub watcher: PublicKey,
/// The place where updates are sent
pub target: Target,
}
/// Watch result to return with answer
/// Default result is cancelled/expired/inactive/rejected
#[derive(Debug, Clone)]
pub enum InboundWatchResult {
/// A new watch was created
Created {
/// The new id of the watch
id: u64,
/// The expiration timestamp of the watch. This should never be zero.
expiration: Timestamp,
},
/// An existing watch was modified
Changed {
/// The new expiration timestamp of the modified watch. This should never be zero.
expiration: Timestamp,
},
/// An existing watch was cancelled
Cancelled,
/// The request was rejected due to invalid parameters or a missing watch
Rejected,
}
/// An individual watch
#[derive(Debug, Clone)]
pub struct InboundWatch {
/// The configuration of the watch
pub params: InboundWatchParameters,
/// A unique id per record assigned at watch creation time. Used to disambiguate a client's version of a watch
pub id: u64,
/// What has changed since the last update
pub changed: ValueSubkeyRangeSet,
}
#[derive(Debug, Default, Clone)]
/// A record being watched for changes
pub struct InboundWatchList {
/// The list of active watches
pub watches: Vec<InboundWatch>,
}
/// How a watch gets updated when a value changes
pub enum InboundWatchUpdateMode {
/// Update no watchers
NoUpdate,
/// Update all watchers
UpdateAll,
/// Update all watchers except ones that come from a specific target
ExcludeTarget(Target),
}

View File

@ -0,0 +1,23 @@
use super::*;
impl StorageManager {
// Check if server-side watches have expired
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_inbound_watches_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let mut inner = self.inner.lock().await;
if let Some(local_record_store) = &mut inner.local_record_store {
local_record_store.check_watched_records();
}
if let Some(remote_record_store) = &mut inner.remote_record_store {
remote_record_store.check_watched_records();
}
Ok(())
}
}

View File

@ -0,0 +1,114 @@
use super::*;
impl StorageManager {
async fn background_outbound_watch_cancel(
self,
watch_locked_key: AsyncTagLockGuard<TypedKey>,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
mut outbound_watch: OutboundWatch,
) {
let key = watch_locked_key.tag();
// Last ditch cancellation of per-node watches that may not be fully exhausted
let cancelled_ids = self
.outbound_watch_value_cancel_set(key, safety_selection, opt_watcher, &outbound_watch)
.await;
// Remove any fully cancelled watch ids
for cancelled_id in cancelled_ids {
outbound_watch.remove_per_node_outbound_watch_by_id(cancelled_id);
}
// Ensure the watch is put into cancelled state
outbound_watch.remaining_count = 0;
// Update the opened record
let mut inner = self.inner.lock().await;
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
// Already closed
return;
};
opened_record.clear_outbound_watch();
}
// Check if client-side watches on opened records either have dead nodes or if the watch has expired
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_outbound_watches_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let mut inner = self.inner.lock().await;
let routing_table = self.routing_table();
//let update_callback = self.update_callback();
let cur_ts = Timestamp::now();
for (k, v) in inner.opened_records.iter_mut() {
let Some(outbound_watch) = v.outbound_watch() else {
continue;
};
// See if the watch is expired or out of updates
if outbound_watch.min_expiration_ts <= cur_ts || outbound_watch.remaining_count == 0 {
// See if we can lock the outbound watch
let Some(watch_lock) = self.outbound_watch_lock_table.try_lock_tag(*k) else {
// Watch is busy, come back later
continue;
};
let outbound_watch = outbound_watch.clone();
let safety_selection = v.safety_selection();
let opt_watcher = v.writer().cloned();
self.background_operation_processor.add_future(
self.clone().background_outbound_watch_cancel(
watch_lock,
safety_selection,
opt_watcher,
outbound_watch,
),
);
// Clear active watch
v.remove_active_watch(outbound_watch.id);
// // Send valuechange with dead count and no subkeys
// update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
// key: *k,
// subkeys: ValueSubkeyRangeSet::new(),
// count: 0,
// value: None,
// })));
}
// See if the private route we're using is dead
let mut is_dead = false;
if !is_dead {
if let Some(value_changed_route) = outbound_watch.opt_value_changed_route {
if routing_table
.route_spec_store()
.get_route_id_for_key(&value_changed_route)
.is_none()
{
// Route we would receive value changes on is dead
is_dead = true;
}
}
}
for outbound_watch in &outbound_watch.per_node {
// See if the active watch's node is dead
if !outbound_watch.watch_node.state(cur_ts).is_alive() {
// Watched node is dead
is_dead = true;
}
}
}
Ok(())
}
}