checkpoint

This commit is contained in:
Christien Rioux 2025-11-28 17:50:15 -05:00
parent 3d4c676d8a
commit 230dc8d4a3
5 changed files with 589 additions and 0 deletions

View file

@ -0,0 +1,62 @@
use super::*;
/// An individual watch
#[derive(Debug, Clone)]
pub(super) struct InboundWatch {
/// The configuration of the watch
params: InboundWatchParameters,
/// A unique id per record assigned at watch creation time. Used to disambiguate a client's version of a watch
id: InboundWatchId,
/// What has changed in the watched range since the last update.
/// May include non-watched ranges if they were changed as part of an overlapping transaction
changed_subkeys: ValueSubkeyRangeSet,
}
impl InboundWatch {
pub(super) fn new(id: InboundWatchId, params: InboundWatchParameters) -> Self {
Self {
id,
params,
changed_subkeys: Default::default(),
}
}
pub fn id(&self) -> InboundWatchId {
self.id
}
pub fn params(&self) -> &InboundWatchParameters {
&self.params
}
pub fn update_params(&mut self, params: InboundWatchParameters) {
self.params = params;
}
pub fn add_changed_subkey(&mut self, subkey: ValueSubkey) {
self.changed_subkeys.insert(subkey);
}
pub fn remove_changed_subkey(&mut self, subkey: ValueSubkey) {
self.changed_subkeys.remove(subkey);
}
pub fn has_changed_subkeys(&self) -> bool {
!self.changed_subkeys.is_empty()
}
pub fn changed_subkeys(&self) -> impl Iterator<Item = ValueSubkey> + use<'_> {
self.changed_subkeys.iter()
}
}
impl fmt::Display for InboundWatch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"id={} exp={} cnt={} signer={} subkeys={} changed={} target={:?}",
self.id,
self.params.expiration,
self.params.count,
self.params.watcher_member_id,
self.params.subkeys,
self.changed_subkeys,
self.params.target
)
}
}

View file

@ -0,0 +1,28 @@
use super::*;
/// An individual watch id
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct InboundWatchId(u64);
impl InboundWatchId {
pub(super) fn new(raw_id: u64) -> VeilidAPIResult<Self> {
if raw_id == 0 {
apibail_internal!("invalid watch id");
}
Ok(Self(raw_id))
}
}
impl fmt::Display for InboundWatchId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<InboundWatchId> for u64 {
fn from(value: InboundWatchId) -> Self {
value.0
}
}

View file

@ -0,0 +1,48 @@
use super::*;
#[derive(Default, Debug)]
pub struct InboundWatchIdAllocator {
all_watch_ids: HashMap<InboundWatchId, RecordTableKey>,
}
impl InboundWatchIdAllocator {
pub fn lookup(&mut self, raw_id: u64) -> VeilidAPIResult<Option<InboundWatchId>> {
let id = InboundWatchId::new(raw_id)?;
Ok(self.all_watch_ids.contains_key(&id).then_some(id))
}
pub fn allocate(&mut self, rtk: RecordTableKey) -> VeilidAPIResult<InboundWatchId> {
// Generate a record-unique watch id > 0
let mut id = 0;
while id == 0 {
id = get_random_u64();
}
// Make sure it doesn't match any other id or zero (unlikely, but lets be certain)
let mut id = InboundWatchId::new(id)?;
let starting_id = id;
while self.all_watch_ids.contains_key(&id) {
let next_id = u64::from(id).overflowing_add(1);
id = InboundWatchId::new(next_id.0 + if next_id.1 { 1 } else { 0 })?;
if id == starting_id {
apibail_internal!("unable to allocate watch id");
}
}
if self.all_watch_ids.insert(id, rtk).is_some() {
apibail_internal!("allocated already existing inbound watch id");
}
Ok(id)
}
pub fn get_key(&self, id: InboundWatchId) -> Option<RecordTableKey> {
self.all_watch_ids.get(&id).cloned()
}
pub fn free(&mut self, id: InboundWatchId) -> VeilidAPIResult<()> {
if self.all_watch_ids.remove(&id).is_none() {
apibail_internal!("freeing non-existent inbound watch id");
}
Ok(())
}
}

View file

@ -0,0 +1,63 @@
use super::*;
#[derive(Debug, Default, Clone)]
/// A record being watched for changes
pub struct InboundWatchList {
/// The list of active watches
watches: Vec<InboundWatch>,
}
impl InboundWatchList {
pub(super) fn new_watch(&mut self, id: InboundWatchId, params: InboundWatchParameters) {
let inbound_watch = InboundWatch::new(id, params);
self.watches.push(inbound_watch);
}
pub fn get(&self, transaction_id: InboundWatchId) -> Option<&InboundWatch> {
self.watches.iter().find(|x| x.id() == transaction_id)
}
pub fn get_mut(&mut self, transaction_id: InboundWatchId) -> Option<&mut InboundWatch> {
self.watches.iter_mut().find(|x| x.id() == transaction_id)
}
pub fn watches(&self) -> impl Iterator<Item = &InboundWatch> {
self.watches.iter()
}
pub fn watches_mut(&mut self) -> impl Iterator<Item = &mut InboundWatch> {
self.watches.iter_mut()
}
pub(super) fn drop_watch(
&mut self,
watch_id: InboundWatchId,
allocator: &mut InboundWatchIdAllocator,
) -> VeilidAPIResult<bool> {
self.watches.retain(|t| t.id() != watch_id);
allocator.free(watch_id)?;
Ok(!self.watches.is_empty())
}
pub(super) fn drop_expired_watches<D: Fn(InboundWatchId), L: Fn(VeilidAPIError)>(
&mut self,
now: Timestamp,
allocator: &mut InboundWatchIdAllocator,
debug_logger: &D,
error_logger: &L,
) -> bool {
self.watches.retain(|t| {
let alive = t.params().expiration > now;
if !alive {
let id = t.id();
if let Err(e) = allocator.free(id) {
error_logger(e);
}
debug_logger(id);
}
alive
});
!self.watches.is_empty()
}
}

View file

@ -0,0 +1,388 @@
mod inbound_watch;
mod inbound_watch_id;
mod inbound_watch_id_allocator;
mod inbound_watch_list;
use super::*;
pub use inbound_watch::*;
pub use inbound_watch_id::*;
pub use inbound_watch_id_allocator::*;
pub use inbound_watch_list::*;
impl_veilid_log_facility!("stor");
#[derive(Debug, Default)]
pub struct InboundWatches {
/// The set of records being watched for changes
record_watches: HashMap<RecordTableKey, InboundWatchList>,
/// The list of watched records that have changed values since last notification
changed_watched_values: HashSet<RecordTableKey>,
/// The set of all allocated watch ids
watch_id_allocator: InboundWatchIdAllocator,
}
impl InboundWatches {
pub fn new() -> Self {
Default::default()
}
pub fn allocate(
&mut self,
opaque_record_key: &OpaqueRecordKey,
params: InboundWatchParameters,
) -> VeilidAPIResult<InboundWatchId> {
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
// Generate a record-unique watch id > 0
let id = self.watch_id_allocator.allocate(rtk.clone())?;
// Create a new watch
let inbound_watch_list = self.record_watches.entry(rtk).or_default();
inbound_watch_list.new_watch(
id,
params
);
Ok(id)
}
pub fn lookup_id(&mut self, raw_id: u64) -> VeilidAPIResult<Option<InboundWatchId>> {
self.watch_id_allocator.lookup(raw_id)
}
pub fn get(&self, rtk: &RecordTableKey) -> Option<&InboundWatchList> {
self.record_watches.get(rtk)
}
pub fn get_mut(&mut self, rtk: &RecordTableKey) -> Option<&mut InboundWatchList> {
self.record_watches.get_mut(rtk)
}
pub fn remove_record(&mut self, rtk: &RecordTableKey) -> VeilidAPIResult<bool> {
let Some(inbound_watch_list) = self.record_watches.remove(rtk) else {
return Ok(false);
};
let dead_ids = inbound_watch_list
.watches()
.map(|x| x.id())
.collect::<Vec<_>>();
for dead_id in dead_ids {
self.watch_id_allocator.free(dead_id)?;
}
self.changed_watched_values.remove(rtk);
Ok(true)
}
pub fn remove_watch(&mut self, id: InboundWatchId) -> VeilidAPIResult<()> {
let Some(rtk) = self.watch_id_allocator.get_key(id) else {
apibail_internal!("watch id does not exist");
};
let Some(inbound_watch_list) = self.record_watches.get_mut(&rtk) else {
apibail_internal!("record does not exist for watch id");
};
let alive =
inbound_watch_list.drop_watch(id, &mut self.watch_id_allocator)?;
if !alive {
self.record_watches.remove(&rtk);
self.changed_watched_values.remove(&rtk);
}
Ok(())
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ValueChangedInfo but without the subkey data that requires an async operation to get
#[derive(Debug)]
pub(super) struct EarlyValueChangedInfo {
pub target: Target,
pub key: OpaqueRecordKey,
pub subkeys: ValueSubkeyRangeSet,
pub count: u32,
pub watch_id: InboundWatchId,
}
impl<D> RecordStoreInner<D>
where
D: RecordDetail,
{
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn update_watched_value(
&mut self,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
watch_update_mode: &InboundWatchUpdateMode,
) {
let (do_update, opt_ignore_target) = match watch_update_mode {
InboundWatchUpdateMode::NoUpdate => (false, None),
InboundWatchUpdateMode::UpdateAll => (true, None),
InboundWatchUpdateMode::ExcludeTarget(target) => (true, Some(target)),
};
if !do_update {
return;
}
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let Some(inbound_watch_list) = self.inbound_watches.get_mut(&rtk) else {
return;
};
// Update all watchers
let mut changed_watched = false;
for w in &mut inbound_watch_list.watches_mut() {
// If this watcher is watching the changed subkey then add to the watcher's changed list
// Don't bother marking changes for value sets coming from the same watching node/target because they
// are already going to be aware of the changes in that case
if Some(&w.params().target) != opt_ignore_target && w.params().subkeys.contains(subkey)
{
w.add_changed_subkey(subkey);
changed_watched = true;
}
}
if changed_watched {
self.changed_watched_values.insert(rtk);
}
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn create_new_watch(
&mut self,
opaque_record_key: &OpaqueRecordKey,
params: InboundWatchParameters,
member_check: Box<dyn Fn(&MemberId) -> bool + Send>,
) -> VeilidAPIResult<InboundWatchValueResult> {
// Generate a record-unique watch id > 0
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
// Calculate watch limits
let mut watch_count = 0;
let mut target_watch_count = 0;
let mut existing_ids = BTreeSet::new();
let is_member = member_check(&params.watcher_member_id);
if let Some(inbound_watch_list) = self.inbound_watches.get_mut(&rtk) {
// Total up the number of watches for this key
for w in &mut inbound_watch_list.watches_mut() {
existing_ids.insert(w.id());
// See if this watch should be counted toward any limits
let count_watch = if is_member {
// If the watcher is a member of the schema, then consider the total per-watcher key
w.params().watcher_member_id == params.watcher_member_id
} else {
// If the watcher is not a member of the schema, the check if this watch is an anonymous watch and contributes to per-record key total
!member_check(&w.params().watcher_member_id)
};
// For any watch, if the target matches our also tally that separately
// If the watcher is a member of the schema, then consider the total per-target-per-watcher key
// If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-target-per-record key
if count_watch {
watch_count += 1;
if w.params().target == params.target {
target_watch_count += 1;
}
}
}
}
// For members, no more than one watch per target per watcher per record
// For anonymous, no more than one watch per target per record
if target_watch_count > 0 {
// Too many watches
return Ok(InboundWatchValueResult::Rejected);
}
// Check watch table for limits
let watch_limit = if is_member {
self.unlocked_inner.limits.member_watch_limit
} else {
self.unlocked_inner.limits.public_watch_limit
};
if watch_count >= watch_limit {
return Ok(InboundWatchValueResult::Rejected);
}
// Generate a record-unique watch id > 0
let id = self.watch_id_allocator.allocate()?;
// Ok this is an acceptable new watch, add it
let watch_list = self.inbound_watches.entry(rtk).or_default();
let expiration = params.expiration;
watch_list.new_watch(id, params);
Ok(InboundWatchValueResult::Created { id, expiration })
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn change_existing_watch(
&mut self,
opaque_record_key: &OpaqueRecordKey,
params: InboundWatchParameters,
watch_id: InboundWatchId,
) -> VeilidAPIResult<InboundWatchValueResult> {
if params.count == 0 {
apibail_internal!("cancel watch should not have gotten here");
}
if params.expiration.as_u64() == 0 {
apibail_internal!("zero expiration should have been resolved to max by now");
}
// Get the watch list for this record
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let Some(watch_list) = self.inbound_watches.get_mut(&rtk) else {
// No watches, nothing to change
return Ok(InboundWatchValueResult::Rejected);
};
// Check each watch to see if we have an exact match for the id to change
if let Some(w) = watch_list.get_mut(watch_id) {
// If the watch id doesn't match, then we're not updating
// Also do not allow the watcher key to change
if w.params().watcher_member_id == params.watcher_member_id {
// Updating an existing watch
w.update_params(params);
return Ok(InboundWatchValueResult::Changed {
expiration: w.params().expiration,
});
}
}
// No existing watch found
Ok(InboundWatchValueResult::Rejected)
}
/// Clear a specific watch for a record
/// returns true if the watch was found and cancelled
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn cancel_watch(
&mut self,
record_key: OpaqueRecordKey,
watch_id: InboundWatchId,
watcher_member_id: MemberId,
) -> VeilidAPIResult<bool> {
// See if we are cancelling an existing watch
let rtk = RecordTableKey { record_key };
let mut is_empty = false;
let mut ret = false;
if let Some(inbound_watch_list) = self.inbound_watches.get_mut(&rtk) {
let mut dead_watcher = None;
if inbound_watch_list.get(watch_id).is_some() {
inbound_watch_list.drop_watch(watch_id, &mut self.watch_id_allocator)?;
if inbound_watch_list
ret = true;
} else {
ret=false;
}
for (wn, w) in inbound_watch_list.watches.iter_mut().enumerate() {
// Must match the watch id and the watcher key to cancel
if w.id == watch_id && w.params.watcher_member_id == watcher_member_id {
// Canceling an existing watch
dead_watcher = Some(wn);
ret = true;
break;
}
}
if let Some(dw) = dead_watcher {
inbound_watch_list.watches.remove(dw);
if inbound_watch_list.watches.is_empty() {
is_empty = true;
}
}
}
if is_empty {
self.inbound_watches.remove(&rtk);
}
Ok(ret)
}
/// See if any watched records have expired and clear them out
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn check_watched_records(&mut self) {
let now = Timestamp::now_non_decreasing();
self.inbound_watches.retain(|key, watch_list| {
watch_list.watches.retain(|w| {
w.params.count != 0 && w.params.expiration > now && !w.params.subkeys.is_empty()
});
if watch_list.watches.is_empty() {
// If we're removing the watched record, drop any changed watch values too
self.changed_watched_values.remove(key);
false
} else {
true
}
});
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn take_value_changes(&mut self) -> Vec<EarlyValueChangedInfo> {
let mut evcis = vec![];
let mut empty_watched_records = vec![];
for rtk in self.changed_watched_values.drain() {
if let Some(watch) = self.inbound_watches.get_mut(&rtk) {
// Process watch notifications
let mut dead_watchers = vec![];
for (wn, w) in watch.watches.iter_mut().enumerate() {
// Get the subkeys that have changed
let subkeys = w.changed.clone();
// If no subkeys on this watcher have changed then skip it
if subkeys.is_empty() {
continue;
}
// Clear the change logs
w.changed.clear();
// Reduce the count of changes sent
// if count goes to zero mark this watcher dead
w.params.count -= 1;
let count = w.params.count;
if count == 0 {
dead_watchers.push(wn);
}
evcis.push(EarlyValueChangedInfo {
target: w.params.target.clone(),
key: rtk.record_key.clone(),
subkeys,
count,
watch_id: w.id,
});
}
// Remove in reverse so we don't have to offset the index to remove the right key
for dw in dead_watchers.iter().rev().copied() {
watch.watches.remove(dw);
if watch.watches.is_empty() {
empty_watched_records.push(rtk);
break;
}
}
}
}
for ewr in empty_watched_records {
self.inbound_watches.remove(&ewr);
}
evcis
}
}