checkpoint

This commit is contained in:
Christien Rioux 2025-11-28 17:50:09 -05:00
parent 24e1c9ce38
commit 3d4c676d8a
16 changed files with 587 additions and 720 deletions

View file

@ -74,6 +74,10 @@ pub(crate) struct VeilidComponentRegistry {
}
impl VeilidComponentRegistry {
pub const fn log_facility(&self) -> &'static str {
"registry"
}
pub fn new(startup_options: VeilidStartupOptions) -> Self {
let namespace = startup_options.config().namespace.to_static_str();
let program_name = startup_options.config().program_name.to_static_str();
@ -294,12 +298,17 @@ impl VeilidComponentRegistryAccessor for VeilidComponentRegistry {
////////////////////////////////////////////////////////////////////
macro_rules! impl_veilid_component_registry_accessor {
($struct_name:ty) => {
($struct_name:ty, $log_facility:expr) => {
impl VeilidComponentRegistryAccessor for $struct_name {
fn registry(&self) -> VeilidComponentRegistry {
self.registry.clone()
}
}
impl $struct_name {
pub const fn log_facility(&self) -> &'static str {
$log_facility
}
}
};
}
@ -308,8 +317,8 @@ pub(crate) use impl_veilid_component_registry_accessor;
/////////////////////////////////////////////////////////////////////
macro_rules! impl_veilid_component {
($component_name:ty) => {
impl_veilid_component_registry_accessor!($component_name);
($component_name:ty, $log_facility:expr) => {
impl_veilid_component_registry_accessor!($component_name, $log_facility);
impl VeilidComponent for $component_name {
fn name(&self) -> &'static str {

View file

@ -9,8 +9,6 @@ use crate::veilid_api::*;
use crate::veilid_config::*;
use crate::*;
impl_veilid_log_facility!("corectx");
pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate) + Send + Sync>;
type InitKey = (String, String);
@ -21,7 +19,7 @@ pub(crate) struct VeilidCoreContext {
registry: VeilidComponentRegistry,
}
impl_veilid_component_registry_accessor!(VeilidCoreContext);
impl_veilid_component_registry_accessor!(VeilidCoreContext, "corectx");
impl VeilidCoreContext {
#[instrument(level = "trace", target = "core_context", err, skip_all)]

View file

@ -86,207 +86,161 @@ macro_rules! fn_string {
};
}
#[macro_export]
macro_rules! veilid_log_event {
// veilid_log_event!(self level: Level::XXX, "message")
($self_expr:ident prefix: $prefix:literal, level: $lvl:expr, $text:expr) => {event!(
target: $self_expr.log_facility(),
$lvl,
__VEILID_LOG_KEY = $self_expr.log_key(),
concat!($prefix,"{}"),
$text);
};
// veilid_log!(self level: Level::XXX, target: "facility", "message")
($self_expr:ident prefix: $prefix:literal, level: $lvl:expr, target: $target:expr, $text:expr) => {event!(
target: $target,
$lvl,
__VEILID_LOG_KEY = $self_expr.log_key(),
concat!($prefix,"{}"),
$text);
};
// veilid_log!(self level: Level::XXX, "data: {}", data)
($self_expr:ident prefix: $prefix:literal, level: $lvl:expr, $fmt:literal, $($arg:expr),+) => {event!(
target: $self_expr.log_facility(),
$lvl,
__VEILID_LOG_KEY = $self_expr.log_key(),
concat!($prefix,$fmt),
$($arg),+);
};
// veilid_log!(self level: Level::XXX, target: "facility", "data: {}", data)
($self_expr:ident prefix: $prefix:literal, level: $lvl:expr, target: $target:expr, $fmt:literal, $($arg:expr),+) => {event!(
target: $target,
$lvl,
__VEILID_LOG_KEY = $self_expr.log_key(),
concat!($prefix,$fmt),
$($arg),+);
};
// veilid_log!(self Level::XXX, field=value, ?other_field)
($self_expr:ident prefix: $prefix:literal, level: $lvl:expr, $($k:ident).+ = $($fields:tt)*) => {event!(
target: $self_expr.log_facility(),
$lvl,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*,
concat!($prefix,""));
};
// veilid_log!(self Level::XXX, target: "facility", field=value, ?other_field)
($self_expr:ident prefix: $prefix:literal, level: $lvl:expr, target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {event!(
target: $target,
$lvl,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*,
concat!($prefix,""));
};
}
#[macro_export]
#[cfg(debug_assertions)]
macro_rules! debugwarn_level {
() => {
$crate::Level::WARN
};
}
#[macro_export]
#[cfg(not(debug_assertions))]
macro_rules! debugwarn_level {
() => {
$crate::Level::DEBUG
};
}
#[macro_export]
macro_rules! veilid_log {
// ERROR //////////////////////////////////////////////////////////////////////////
// veilid_log!(self error "message")
($self_expr:ident error $text:expr) => {error!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident error $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::ERROR, $text)};
// veilid_log!(self error target: "facility", "message")
($self_expr:ident error target: $target:expr, $text:expr) => {error!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident error target: $target:expr, $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::ERROR, target: $target, $text)};
// veilid_log!(self error "data: {}", data)
($self_expr:ident error $fmt:literal, $($arg:expr),+) => {error!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident error $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::ERROR, $fmt, $($arg),+)};
// veilid_log!(self error target: "facility", "data: {}", data)
($self_expr:ident error target: $target:expr, $fmt:literal, $($arg:expr),+) => {error!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident error target: $target:expr, $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::ERROR, target: $target, $fmt, $($arg),+)};
// veilid_log!(self error field=value, ?other_field)
($self_expr:ident error $($k:ident).+ = $($fields:tt)*) => {error!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident error $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr level: $crate::Level::ERROR, $($k).+ = $($fields)*)};
// veilid_log!(self error target: "facility", field=value, ?other_field)
($self_expr:ident error target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {error!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident error target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::ERROR, target: $target, $($k).+ = $($fields)*)};
// WARN //////////////////////////////////////////////////////////////////////////
// veilid_log!(self warn "message")
($self_expr:ident warn $text:expr) => {warn!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident warn $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::WARN, $text)};
// veilid_log!(self warn target: "facility", "message")
($self_expr:ident warn target: $target:expr, $text:expr) => {warn!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident warn target: $target:expr, $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::WARN, target: $target, $text)};
// veilid_log!(self warn "data: {}", data)
($self_expr:ident warn $fmt:literal, $($arg:expr),+) => {warn!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident warn $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::WARN, $fmt, $($arg),+)};
// veilid_log!(self warn target: "facility", "data: {}", data)
($self_expr:ident warn target: $target:expr, $fmt:literal, $($arg:expr),+) => {warn!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident warn target: $target:expr, $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::WARN, target: $target, $fmt, $($arg),+)};
// veilid_log!(self warn field=value, ?other_field)
($self_expr:ident warn $($k:ident).+ = $($fields:tt)*) => {warn!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident warn $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::WARN, $($k).+ = $($fields)*)};
// veilid_log!(self warn target: "facility", field=value, ?other_field)
($self_expr:ident warn target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {warn!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident warn target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::WARN, target: $target, $($k).+ = $($fields)*)};
// INFO //////////////////////////////////////////////////////////////////////////
// veilid_log!(self info "message")
($self_expr:ident info $text:expr) => {info!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident info $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::INFO, $text)};
// veilid_log!(self info target: "facility", "message")
($self_expr:ident info target: $target:expr, $text:expr) => {info!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident info target: $target:expr, $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::INFO, target: $target, $text)};
// veilid_log!(self info "data: {}", data)
($self_expr:ident info $fmt:literal, $($arg:expr),+) => {info!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident info $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::INFO, $fmt, $($arg),+)};
// veilid_log!(self info target: "facility", "data: {}", data)
($self_expr:ident info target: $target:expr, $fmt:literal, $($arg:expr),+) => {info!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident info target: $target:expr, $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::INFO, target: $target, $fmt, $($arg),+)};
// veilid_log!(self info field=value, ?other_field)
($self_expr:ident info $($k:ident).+ = $($fields:tt)*) => {info!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident info $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::INFO, $($k).+ = $($fields)*)};
// veilid_log!(self info target: "facility", field=value, ?other_field)
($self_expr:ident info target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {info!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident info target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::INFO, target: $target, $($k).+ = $($fields)*)};
// DEBUG //////////////////////////////////////////////////////////////////////////
// veilid_log!(self debug "message")
($self_expr:ident debug $text:expr) => {debug!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident debug $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::DEBUG, $text)};
// veilid_log!(self debug target: "facility", "message")
($self_expr:ident debug target: $target:expr, $text:expr) => {debug!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident debug target: $target:expr, $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::DEBUG, target: $target, $text)};
// veilid_log!(self debug "data: {}", data)
($self_expr:ident debug $fmt:literal, $($arg:expr),+) => {debug!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident debug $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::DEBUG, $fmt, $($arg),+)};
// veilid_log!(self debug target: "facility", "data: {}", data)
($self_expr:ident debug target: $target:expr, $fmt:literal, $($arg:expr),+) => {debug!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident debug target: $target:expr, $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::DEBUG, target: $target, $fmt, $($arg),+)};
// veilid_log!(self debug field=value, ?other_field)
($self_expr:ident debug $($k:ident).+ = $($fields:tt)*) => {debug!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident debug $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::DEBUG, $($k).+ = $($fields)*)};
// veilid_log!(self debug target: "facility", field=value, ?other_field)
($self_expr:ident debug target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {debug!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident debug target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::DEBUG, target: $target, $($k).+ = $($fields)*)};
// TRACE //////////////////////////////////////////////////////////////////////////
// veilid_log!(self trace "message")
($self_expr:ident trace $text:expr) => {trace!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident trace $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::TRACE, $text)};
// veilid_log!(self trace target: "facility", "message")
($self_expr:ident trace target: $target:expr, $text:expr) => {trace!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
"{}",
$text,
)};
($self_expr:ident trace target: $target:expr, $text:expr) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::TRACE, target: $target, $text)};
// veilid_log!(self trace "data: {}", data)
($self_expr:ident trace $fmt:literal, $($arg:expr),+) => {trace!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident trace $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::TRACE, $fmt, $($arg),+)};
// veilid_log!(self trace target: "facility", "data: {}", data)
($self_expr:ident trace target: $target:expr, $fmt:literal, $($arg:expr),+) => {trace!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$fmt, $($arg),+);
};
($self_expr:ident trace target: $target:expr, $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::TRACE, target: $target, $fmt, $($arg),+)};
// veilid_log!(self trace field=value, ?other_field)
($self_expr:ident trace $($k:ident).+ = $($fields:tt)*) => {trace!(
target: self::__VEILID_LOG_FACILITY,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident trace $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::TRACE, $($k).+ = $($fields)*)};
// veilid_log!(self trace target: "facility", field=value, ?other_field)
($self_expr:ident trace target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {trace!(
target: $target,
__VEILID_LOG_KEY = $self_expr.log_key(),
$($k).+ = $($fields)*
)};
($self_expr:ident trace target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "", level: $crate::Level::TRACE, target: $target, $($k).+ = $($fields)*)};
// DEBUGWARN //////////////////////////////////////////////////////////////////////////
// veilid_log!(self debugwarn "message")
($self_expr:ident debugwarn $text:expr) => {veilid_log_event!($self_expr prefix: "DEBUGWARN: ", level: debugwarn_level!(), $text)};
// veilid_log!(self debugwarn target: "facility", "message")
($self_expr:ident debugwarn target: $target:expr, $text:expr) => {veilid_log_event!($self_expr prefix: "DEBUGWARN: ", level: debugwarn_level!(), target: $target, $text)};
// veilid_log!(self debugwarn "data: {}", data)
($self_expr:ident debugwarn $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "DEBUGWARN: ", level: debugwarn_level!(), $fmt, $($arg),+)};
// veilid_log!(self debugwarn target: "facility", "data: {}", data)
($self_expr:ident debugwarn target: $target:expr, $fmt:literal, $($arg:expr),+) => {veilid_log_event!($self_expr prefix: "DEBUGWARN: ", level: debugwarn_level!(), target: $target, $fmt, $($arg),+)};
// veilid_log!(self debugwarn field=value, ?other_field)
($self_expr:ident debugwarn $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "DEBUGWARN", level: debugwarn_level!(), $($k).+ = $($fields)*)};
// veilid_log!(self debugwarn target: "facility", field=value, ?other_field)
($self_expr:ident debugwarn target: $target:expr, $($k:ident).+ = $($fields:tt)*) => {veilid_log_event!($self_expr prefix: "DEBUGWARN", level: debugwarn_level!(), target: $target, $($k).+ = $($fields)*)};
}
#[macro_export]

View file

@ -112,7 +112,7 @@ struct ValueChangedInfo {
record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
watch_id: u64,
watch_id: InboundWatchId,
value: Option<Arc<SignedValueData>>,
}

View file

@ -131,7 +131,15 @@ impl OutboundTransactionManager {
&mut self,
transaction_handle: OutboundTransactionHandle,
) -> Option<OutboundTransactionState> {
let outbound_transaction_state = self.transactions.remove(&transaction_handle)?;
let outbound_transaction_state = match self.transactions.remove(&transaction_handle) {
Some(x) => x,
None => {
veilid_log!(self debugwarn "Dropping non-existent transaction: {:?}", transaction_handle);
return None;
}
};
veilid_log!(self debug target: "network_result", "Dropping transaction: {:?}", transaction_handle);
for record_info in outbound_transaction_state.get_record_infos() {
let opaque_record_key = record_info.record_key().opaque();

View file

@ -14,15 +14,16 @@ mod record_store_inner;
mod record_store_limits;
mod results;
use super::*;
pub(super) use inbound_watch::*;
pub(super) use opened_record::*;
pub(super) use record::*;
pub(super) use record_snapshot::*;
pub(super) use record_store_inner::{InboundTransactionId, InboundWatchId};
pub(super) use record_store_limits::*;
pub(super) use results::*;
use super::*;
use record_store_inner::*;
use hashlink::LruCache;
@ -431,16 +432,35 @@ where
want_descriptor: bool,
signing_member_id: MemberId,
) -> VeilidAPIResult<InboundTransactBeginResult> {
let _record_lock = self
let record_lock = self
.record_store_lock_table
.lock_record(opaque_record_key.clone())
.await;
// prepare
let begin_res = self.inner.lock().prepare_begin_inbound_transaction(
opaque_record_key,
opt_descriptor,
want_descriptor,
signing_member_id,
)?;
// snapshot
let begin_context = match begin_res {
PrepareBeginInboundTransactionResult::Done(inbound_transact_begin_result) => {
return Ok(inbound_transact_begin_result);
}
PrepareBeginInboundTransactionResult::Continue(
prepare_begin_inbound_transaction_context,
) => prepare_begin_inbound_transaction_context,
};
// Transaction can be added, so let's get a snapshot if the record exists already
let opt_snapshot = self.snapshot_record_locked(&record_lock).await?;
// finish
self.inner
.lock()
.finish_begin_inbound_transaction(begin_context, opt_snapshot)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
@ -449,15 +469,33 @@ where
opaque_record_key: &OpaqueRecordKey,
transaction_id: InboundTransactionId,
) -> VeilidAPIResult<InboundTransactCommandResult> {
let _record_lock = self
let record_lock = self
.record_store_lock_table
.lock_record(opaque_record_key.clone())
.await;
// prepare
let end_res = self
.inner
.lock()
.prepare_end_inbound_transaction(opaque_record_key, transaction_id)?;
let end_context = match end_res {
PrepareEndInboundTransactionResult::Done(inbound_transact_end_result) => {
return Ok(inbound_transact_end_result);
}
PrepareEndInboundTransactionResult::Continue(
prepare_end_inbound_transaction_context,
) => prepare_end_inbound_transaction_context,
};
// snapshot
let res_opt_end_snapshot = self.snapshot_record_locked(&record_lock).await;
// finish
self.inner
.lock()
.finish_end_inbound_transaction(end_context, res_opt_end_snapshot)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
@ -471,11 +509,32 @@ where
.record_store_lock_table
.lock_record(opaque_record_key.clone())
.await;
// prepare
let commit_res = self.inner.lock().prepare_commit_inbound_transaction(
opaque_record_key,
transaction_id,
make_record_detail,
)?;
let mut commit_context = match commit_res {
PrepareCommitInboundTransactionResult::Done(inbound_transact_commit_result) => {
return Ok(inbound_transact_commit_result);
}
PrepareCommitInboundTransactionResult::Continue(
prepare_commit_inbound_transaction_context,
) => prepare_commit_inbound_transaction_context,
};
// commit action
if let Some(commit_action) = commit_context.opt_commit_action.take() {
self.process_commit_action(commit_action).await;
};
// finish
self.inner
.lock()
.finish_commit_inbound_transaction(commit_context)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]

View file

@ -51,14 +51,11 @@ where
D: RecordDetail,
{
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn snapshot_record(
pub async fn snapshot_record_locked(
&self,
opaque_record_key: &OpaqueRecordKey,
record_lock: &RecordLockGuard,
) -> VeilidAPIResult<Option<Arc<RecordSnapshot>>> {
let _record_lock = self
.record_store_lock_table
.lock_record(opaque_record_key.clone())
.await;
let opaque_record_key = record_lock.record();
// Get all load actions for the snapshot
let mut all_value_load_actions = {
@ -66,7 +63,7 @@ where
let mut inner = self.inner.lock();
let Some((max_subkey, stored_subkeys)) = inner
.with_record(opaque_record_key, |record| {
.with_record(&opaque_record_key, |record| {
(record.max_subkey(), record.stored_subkeys().clone())
})
else {
@ -85,7 +82,7 @@ where
continue;
}
let load_action_result = inner.prepare_get_load_action(opaque_record_key, subkey);
let load_action_result = inner.prepare_get_load_action(&opaque_record_key, subkey);
match load_action_result {
LoadActionResult::NoRecord => {

View file

@ -9,19 +9,22 @@ where
out += &self.record_index.debug();
out += &format!("Inbound Transactions: {}\n", self.record_transactions.len());
let mut record_transactions_keys = self.record_transactions.keys().collect::<Vec<_>>();
out += &format!(
"Inbound Transactions: {}\n",
self.inbound_transactions.len()
);
let mut record_transactions_keys = self.inbound_transactions.keys().collect::<Vec<_>>();
record_transactions_keys.sort();
for atk in record_transactions_keys {
let atx = self.record_transactions.get(atk).unwrap();
let atx = self.inbound_transactions.get(atk).unwrap();
out += &format!(" {}: {:?}\n", atk, atx);
}
out += &format!("Inbound Watches: {}\n", self.record_transactions.len());
let mut watches_keys = self.watched_records.keys().collect::<Vec<_>>();
out += &format!("Inbound Watches: {}\n", self.inbound_transactions.len());
let mut watches_keys = self.inbound_watches.keys().collect::<Vec<_>>();
watches_keys.sort();
for rtk in watches_keys {
let inbound_watch_list = self.watched_records.get(rtk).unwrap();
let inbound_watch_list = self.inbound_watches.get(rtk).unwrap();
out += &format!(" {}: {:?}\n", rtk, inbound_watch_list.watches);
}
@ -32,7 +35,7 @@ where
let record_info = self
.peek_record(opaque_record_key, |r| format!("{:#?}", r))
.unwrap_or("Not found".to_owned());
let watched_record = match self.watched_records.get(&RecordTableKey {
let watched_record = match self.inbound_watches.get(&RecordTableKey {
record_key: opaque_record_key.clone(),
}) {
Some(w) => {

View file

@ -2,16 +2,16 @@ use super::*;
#[derive(Default, Debug)]
pub struct InboundTransactionIdAllocator {
all_transaction_ids: HashSet<InboundTransactionId>,
all_transaction_ids: HashMap<InboundTransactionId, RecordTableKey>,
}
impl InboundTransactionIdAllocator {
pub fn lookup(&mut self, raw_id: u64) -> VeilidAPIResult<Option<InboundTransactionId>> {
let id = InboundTransactionId::new(raw_id)?;
Ok(self.all_transaction_ids.contains(&id).then_some(id))
Ok(self.all_transaction_ids.contains_key(&id).then_some(id))
}
pub fn allocate(&mut self) -> VeilidAPIResult<InboundTransactionId> {
pub fn allocate(&mut self, rtk: RecordTableKey) -> VeilidAPIResult<InboundTransactionId> {
// Generate a record-unique transaction id > 0
let mut id = 0;
while id == 0 {
@ -20,7 +20,7 @@ impl InboundTransactionIdAllocator {
// Make sure it doesn't match any other id or zero (unlikely, but lets be certain)
let mut id = InboundTransactionId::new(id)?;
let starting_id = id;
while self.all_transaction_ids.contains(&id) {
while self.all_transaction_ids.contains_key(&id) {
let next_id = u64::from(id).overflowing_add(1);
id = InboundTransactionId::new(next_id.0 + if next_id.1 { 1 } else { 0 })?;
if id == starting_id {
@ -28,15 +28,19 @@ impl InboundTransactionIdAllocator {
}
}
if !self.all_transaction_ids.insert(id) {
if self.all_transaction_ids.insert(id, rtk).is_some() {
apibail_internal!("allocated already existing inbound transaction id");
}
Ok(id)
}
pub fn get_key(&self, id: InboundTransactionId) -> Option<RecordTableKey> {
self.all_transaction_ids.get(&id).cloned()
}
pub fn free(&mut self, id: InboundTransactionId) -> VeilidAPIResult<()> {
if !self.all_transaction_ids.remove(&id) {
if self.all_transaction_ids.remove(&id).is_none() {
apibail_internal!("freeing non-existent inbound transaction id");
}
Ok(())

View file

@ -13,7 +13,7 @@ pub struct InboundTransactionList {
}
impl InboundTransactionList {
pub fn new_transaction(
pub(super) fn new_transaction(
&mut self,
id: InboundTransactionId,
expiration: Timestamp,
@ -49,19 +49,21 @@ impl InboundTransactionList {
self.transactions.iter()
}
pub fn drop_transaction(
pub(super) fn drop_transaction(
&mut self,
transaction_id: InboundTransactionId,
allocator: &mut InboundTransactionIdAllocator,
) -> VeilidAPIResult<()> {
) -> VeilidAPIResult<bool> {
self.transactions.retain(|t| t.id() != transaction_id);
if self.lock == Some(transaction_id) {
self.lock = None;
}
allocator.free(transaction_id)
allocator.free(transaction_id)?;
Ok(!self.transactions.is_empty())
}
pub fn drop_expired_transactions<D: Fn(InboundTransactionId), L: Fn(VeilidAPIError)>(
pub(super) fn drop_expired_transactions<D: Fn(InboundTransactionId), L: Fn(VeilidAPIError)>(
&mut self,
now: Timestamp,
allocator: &mut InboundTransactionIdAllocator,
@ -85,10 +87,6 @@ impl InboundTransactionList {
!self.transactions.is_empty()
}
// pub fn is_locked(&self) -> bool {
// self.record_lock.is_some()
// }
pub fn is_locked_by(&self, transaction_id: InboundTransactionId) -> bool {
self.lock == Some(transaction_id)
}
@ -102,21 +100,4 @@ impl InboundTransactionList {
Ok(())
}
// pub fn unlock(&mut self, transaction_id: InboundTransactionId) -> VeilidAPIResult<()> {
// if let Some(existing_xid) = self.record_lock {
// if existing_xid != transaction_id {
// apibail_internal!(format!("request to unlock inbound transaction list by xid {} when it was previously locked by {}", transaction_id, existing_xid));
// }
// } else {
// apibail_internal!(format!(
// "request to unlock inbound transaction list by xid {} when it was already unlocked",
// transaction_id
// ));
// }
// self.record_lock = Some(transaction_id);
// Ok(())
// }
}

View file

@ -12,60 +12,125 @@ pub use inbound_transaction_list::*;
impl_veilid_log_facility!("stor");
// Convenience macro to simplify cleanly finishing a successful transaction
macro_rules! return_finished_transaction_command {
($this: ident, $active_transaction_list:expr, $transaction_id: expr) => {
$active_transaction_list.drop_transaction($transaction_id, &mut $this.transaction_id_allocator)?;
return Ok(InboundTransactCommandResult::Success(
TransactCommandSuccess {
expiration: Default::default(),
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
},
));
};
#[derive(Debug, Default)]
pub struct InboundTransactions {
/// The set of records for which transactions are active. The records may not exist in the store until committed.
record_transactions: HashMap<RecordTableKey, InboundTransactionList>,
($this: ident, key $rtk:expr, $transaction_id: expr) => {
let Some(active_transaction_list) = $this.record_transactions.get_mut(&$rtk) else {
veilid_log!($this debug "missing transaction list");
return Ok(InboundTransactCommandResult::InvalidTransaction);
/// The set of all allocated transaction ids
transaction_id_allocator: InboundTransactionIdAllocator,
}
impl InboundTransactions {
pub fn new() -> Self {
Default::default()
}
pub fn allocate(
&mut self,
opaque_record_key: &OpaqueRecordKey,
expiration: Timestamp,
signing_member_id: MemberId,
descriptor: Arc<SignedValueDescriptor>,
opt_snapshot: Option<Arc<RecordSnapshot>>,
) -> VeilidAPIResult<InboundTransactionId> {
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
return_finished_transaction_command!($this, active_transaction_list, $transaction_id);
// Generate a record-unique transaction id > 0
let id = self.transaction_id_allocator.allocate(rtk.clone())?;
// Create a new transaction
let inbound_transaction_list = self.record_transactions.entry(rtk).or_default();
inbound_transaction_list.new_transaction(
id,
expiration,
signing_member_id,
descriptor.clone(),
opt_snapshot,
);
Ok(id)
}
pub fn lookup_id(&mut self, raw_id: u64) -> VeilidAPIResult<Option<InboundTransactionId>> {
self.transaction_id_allocator.lookup(raw_id)
}
pub fn get(&self, rtk: &RecordTableKey) -> Option<&InboundTransactionList> {
self.record_transactions.get(rtk)
}
pub fn get_mut(&mut self, rtk: &RecordTableKey) -> Option<&mut InboundTransactionList> {
self.record_transactions.get_mut(rtk)
}
pub fn remove_record(&mut self, rtk: &RecordTableKey) -> VeilidAPIResult<bool> {
let Some(inbound_transaction_list) = self.record_transactions.remove(rtk) else {
return Ok(false);
};
let dead_ids = inbound_transaction_list
.transactions()
.map(|x| x.id())
.collect::<Vec<_>>();
for dead_id in dead_ids {
self.transaction_id_allocator.free(dead_id)?;
}
Ok(true)
}
pub fn remove_transaction(&mut self, id: InboundTransactionId) -> VeilidAPIResult<()> {
let Some(rtk) = self.transaction_id_allocator.get_key(id) else {
apibail_internal!("transaction id does not exist");
};
let Some(inbound_transaction_list) = self.record_transactions.get_mut(&rtk) else {
apibail_internal!("record does not exist for transaction id");
};
let alive =
inbound_transaction_list.drop_transaction(id, &mut self.transaction_id_allocator)?;
if !alive {
self.record_transactions.remove(&rtk);
}
Ok(())
}
}
// Convenience macro to simplify returning and cleaning up invalid transactions
macro_rules! return_invalid_transaction {
($this: ident, $active_transaction_list:expr, $transaction_id: expr, $msg:expr) => {
$active_transaction_list
.drop_transaction($transaction_id, &mut $this.transaction_id_allocator)?;
veilid_log!($this debug "{}", $msg);
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
($this: ident, $active_transaction_list:expr, $transaction_id: expr, $msg:expr, $err:expr) => {
$active_transaction_list
.drop_transaction($transaction_id, &mut $this.transaction_id_allocator)?;
veilid_log!($this debug "{}: {}", $msg, $err);
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
struct PrepareBeginInboundTransactionContext {
pub opaque_record_key: OpaqueRecordKey,
pub descriptor: Arc<SignedValueDescriptor>,
pub want_descriptor: bool,
pub signing_member_id: MemberId,
pub subkey_count: usize,
}
($this: ident, key $rtk:expr, $transaction_id: expr, $msg:expr) => {
let Some(active_transaction_list) = $this.record_transactions.get_mut(&$rtk) else {
veilid_log!($this debug "missing transaction list");
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
return_invalid_transaction!($this, active_transaction_list, $transaction_id, $msg);
};
pub(in super::super) enum PrepareBeginInboundTransactionResult {
Done(InboundTransactBeginResult),
Continue(PrepareBeginInboundTransactionContext),
}
($this: ident, key $rtk:expr, $transaction_id: expr, $msg:expr, $err:expr) => {
let Some(active_transaction_list) = $this.record_transactions.get_mut(&$rtk) else {
veilid_log!($this debug "missing transaction list");
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
return_invalid_transaction!($this, active_transaction_list, $transaction_id, $msg, $err);
};
struct PrepareEndInboundTransactionContext {
pub opaque_record_key: OpaqueRecordKey,
pub transaction_id: InboundTransactionId,
pub opt_begin_snapshot: Option<Arc<RecordSnapshot>>,
}
pub(in super::super) enum PrepareEndInboundTransactionResult {
Done(InboundTransactCommandResult),
Continue(PrepareEndInboundTransactionContext),
}
struct PrepareCommitInboundTransactionContext<D: RecordDetail> {
pub opaque_record_key: OpaqueRecordKey,
pub transaction_id: InboundTransactionId,
pub opt_commit_action: Option<CommitAction<D>>,
}
pub(in super::super) enum PrepareCommitInboundTransactionResult<D: RecordDetail> {
Done(InboundTransactCommandResult),
Continue(PrepareCommitInboundTransactionContext<D>),
}
impl<D> RecordStoreInner<D>
@ -77,21 +142,17 @@ where
&mut self,
raw_id: u64,
) -> VeilidAPIResult<Option<InboundTransactionId>> {
self.transaction_id_allocator.lookup(raw_id)
self.inbound_transactions.lookup_id(raw_id)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn begin_inbound_transaction(
pub fn prepare_begin_inbound_transaction(
&mut self,
opaque_record_key: &OpaqueRecordKey,
opt_descriptor: Option<SignedValueDescriptor>,
want_descriptor: bool,
signing_member_id: MemberId,
) -> VeilidAPIResult<InboundTransactBeginResult> {
split this into prepare/finish blocks and put the rest of the logic including the snapshot in the record store outer
) -> VeilidAPIResult<PrepareBeginInboundTransactionResult> {
// Get descriptor
let opt_existing_descriptor =
self.with_record(opaque_record_key, |record| record.descriptor());
@ -100,7 +161,9 @@ split this into prepare/finish blocks and put the rest of the logic including th
None => {
// Needs descriptor
let Some(descriptor) = opt_descriptor.map(Arc::new) else {
return Ok(InboundTransactBeginResult::NeedDescriptor);
return Ok(PrepareBeginInboundTransactionResult::Done(
InboundTransactBeginResult::NeedDescriptor,
));
};
descriptor
@ -126,7 +189,7 @@ split this into prepare/finish blocks and put the rest of the logic including th
let mut transaction_count = 0;
let is_member = member_check(&signing_member_id);
if let Some(active_transaction_list) = self.record_transactions.get_mut(&rtk) {
if let Some(active_transaction_list) = self.inbound_transactions.get_mut(&rtk) {
// Total up the number of transactions for this key
for t in active_transaction_list.transactions() {
// See if this transactions should be counted toward any limits
@ -152,14 +215,35 @@ split this into prepare/finish blocks and put the rest of the logic including th
self.unlocked_inner.limits.public_transaction_limit
};
if transaction_count >= transaction_limit {
return Ok(InboundTransactBeginResult::TransactionUnavailable);
return Ok(PrepareBeginInboundTransactionResult::Done(
InboundTransactBeginResult::TransactionUnavailable,
));
}
// Transaction can be added, so let's get a snapshot if the record exists already
let opt_snapshot = self.snapshot_record(opaque_record_key).await?;
Ok(PrepareBeginInboundTransactionResult::Continue(
PrepareBeginInboundTransactionContext {
opaque_record_key: opaque_record_key.clone(),
descriptor,
want_descriptor,
signing_member_id,
subkey_count,
},
))
}
// Generate a record-unique transaction id > 0
let id = self.transaction_id_allocator.allocate()?;
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn finish_begin_inbound_transaction(
&mut self,
begin_context: PrepareBeginInboundTransactionContext,
opt_snapshot: Option<Arc<RecordSnapshot>>,
) -> VeilidAPIResult<InboundTransactBeginResult> {
let PrepareBeginInboundTransactionContext {
opaque_record_key,
descriptor,
want_descriptor,
signing_member_id,
subkey_count,
} = begin_context;
// Make transaction expiration timestamp
let expiration = Timestamp::now().later(self.unlocked_inner.limits.transaction_timeout);
@ -172,60 +256,74 @@ split this into prepare/finish blocks and put the rest of the logic including th
};
// Create a new transaction
let inbound_transaction_list = self.record_transactions.entry(rtk).or_default();
inbound_transaction_list.new_transaction(
id,
let transaction_id = self.inbound_transactions.allocate(
&opaque_record_key,
expiration,
signing_member_id,
descriptor.clone(),
opt_snapshot,
);
)?;
// Return the result
Ok(InboundTransactBeginResult::Success(TransactBeginSuccess {
transaction_id: id,
transaction_id,
expiration,
opt_descriptor: want_descriptor.then_some(descriptor),
seqs,
}))
}
pub fn end_inbound_transaction(
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn prepare_end_inbound_transaction(
&mut self,
opaque_record_key: &OpaqueRecordKey,
transaction_id: InboundTransactionId,
) -> VeilidAPIResult<InboundTransactCommandResult> {
) -> VeilidAPIResult<PrepareEndInboundTransactionResult> {
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let begin_snapshot = {
let Some(active_transaction_list) = self.record_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
let opt_begin_snapshot = {
let Some(active_transaction_list) = self.inbound_transactions.get_mut(&rtk) else {
return Ok(PrepareEndInboundTransactionResult::Done(
InboundTransactCommandResult::InvalidTransaction,
));
};
// If the lock id is ours, then someone tried to 'end' twice, so invalidate the transaction.
// We don't lock here yet to allow for an unchanged transaction to finish regardless of an
// existing lock held by another transaction
if active_transaction_list.is_locked_by(transaction_id) {
return_invalid_transaction!(
self,
active_transaction_list,
transaction_id,
"ended twice"
);
self.inbound_transactions.remove_transaction(id)
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
veilid_log!(self debug "{}","ended twice");
return Ok(PrepareEndInboundTransactionResult::Done(
InboundTransactCommandResult::InvalidTransaction,
));
}
// Get the inbound transaction if it is still valid
let Some(inbound_transaction) = active_transaction_list.get(transaction_id) else {
// Nothing to drop
return Ok(InboundTransactCommandResult::InvalidTransaction);
return Ok(PrepareEndInboundTransactionResult::Done(
InboundTransactCommandResult::InvalidTransaction,
));
};
// If there's no changes, we can just quit early with a zero expiration to indicate no commit or rollback is necessary
// No lock check is required here because
if !inbound_transaction.has_changed_subkeys() {
return_finished_transaction_command!(self, active_transaction_list, transaction_id);
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
return Ok(PrepareEndInboundTransactionResult::Done(
InboundTransactCommandResult::Success(TransactCommandSuccess {
expiration: Default::default(),
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
}),
));
}
// Get begin snapshot for comparison
@ -234,43 +332,66 @@ split this into prepare/finish blocks and put the rest of the logic including th
// Try to obtain the lock now
// If there is any other lock, we can't lock this ourselves
if let Err(e) = active_transaction_list.lock(transaction_id) {
return_invalid_transaction!(
self,
active_transaction_list,
transaction_id,
"end failed",
e
);
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
veilid_log!(self debug "{}: {}","end failed",e);
return Ok(PrepareEndInboundTransactionResult::Done(
InboundTransactCommandResult::InvalidTransaction,
));
}
begin_snapshot
};
let end_snapshot = match self.snapshot_record(opaque_record_key).await {
Ok(PrepareEndInboundTransactionResult::Continue(
PrepareEndInboundTransactionContext {
opaque_record_key: opaque_record_key.clone(),
transaction_id,
opt_begin_snapshot,
},
))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn finish_end_inbound_transaction(
&mut self,
end_context: PrepareEndInboundTransactionContext,
res_opt_end_snapshot: VeilidAPIResult<Option<Arc<RecordSnapshot>>>,
) -> VeilidAPIResult<InboundTransactCommandResult> {
let PrepareEndInboundTransactionContext {
opaque_record_key,
transaction_id,
opt_begin_snapshot,
} = end_context;
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let opt_end_snapshot = match res_opt_end_snapshot {
Ok(v) => v,
Err(e) => {
return_invalid_transaction!(
self,
key rtk,
transaction_id,
"end snapshot failed",
e
);
let Some(active_transaction_list) = self.inbound_transactions.get_mut(&rtk) else {
veilid_log!(self debug "missing transaction list");
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
veilid_log!(self debug "{}: {}","end snapshot failed",e);
return Ok(InboundTransactCommandResult::InvalidTransaction);
}
};
// If the snapshot doesn't validate then the transaction is not valid
let Some(active_transaction_list) = self.record_transactions.get_mut(&rtk) else {
let Some(active_transaction_list) = self.inbound_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
if begin_snapshot.as_ref().map(|s| s.seqs()) != end_snapshot.map(|s| s.seqs()) {
return_invalid_transaction!(
self,
active_transaction_list,
transaction_id,
"end snapshot mismatch"
);
if opt_begin_snapshot.as_ref().map(|s| s.seqs()) != opt_end_snapshot.map(|s| s.seqs()) {
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
veilid_log!(self debug "{}","end snapshot mismatch");
return Ok(InboundTransactCommandResult::InvalidTransaction);
}
// Everything is valid, we can end the transaction successfully as long as it still exists
@ -294,19 +415,22 @@ split this into prepare/finish blocks and put the rest of the logic including th
))
}
pub async fn commit_inbound_transaction<C: FnOnce() -> D>(
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn prepare_commit_inbound_transaction<C: FnOnce() -> D>(
&mut self,
opaque_record_key: &OpaqueRecordKey,
transaction_id: InboundTransactionId,
make_record_detail: C,
) -> VeilidAPIResult<InboundTransactCommandResult> {
) -> VeilidAPIResult<PrepareCommitInboundTransactionResult<D>> {
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let transaction = {
let Some(active_transaction_list) = self.record_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
let Some(active_transaction_list) = self.inbound_transactions.get_mut(&rtk) else {
return Ok(PrepareCommitInboundTransactionResult::Done(
InboundTransactCommandResult::InvalidTransaction,
));
};
// If there is a record lock then it better be ours
@ -314,12 +438,12 @@ split this into prepare/finish blocks and put the rest of the logic including th
// * If the lock id is ours, then we can commit
// * If the lock id is not ours, then this commit is out of order and this transaction should be dropped
if !active_transaction_list.is_locked_by(transaction_id) {
return_invalid_transaction!(
self,
active_transaction_list,
transaction_id,
"bad lock state"
);
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
veilid_log!(self debug "{}","bad lock state");
return Ok(PrepareCommitInboundTransactionResult::Done(
InboundTransactCommandResult::InvalidTransaction,
));
}
// Get the inbound transaction if it is still valid
@ -352,8 +476,45 @@ split this into prepare/finish blocks and put the rest of the logic including th
veilid_log!(self error "set_subkeys_single_record failed in commit: {}", e);
})?;
Ok(PrepareCommitInboundTransactionResult::Continue(
PrepareCommitInboundTransactionContext {
opaque_record_key: opaque_record_key.clone(),
transaction_id,
opt_commit_action,
},
))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn finish_commit_inbound_transaction(
&mut self,
commit_context: PrepareCommitInboundTransactionContext<D>,
) -> VeilidAPIResult<InboundTransactCommandResult> {
let PrepareCommitInboundTransactionContext {
opaque_record_key,
transaction_id,
opt_commit_action: _,
} = commit_context;
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
// Drop transaction and lock now that we're done
return_finished_transaction_command!(self, key rtk, transaction_id);
let Some(active_transaction_list) = self.inbound_transactions.get_mut(&rtk) else {
veilid_log!(self debug "missing transaction list");
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
return Ok(InboundTransactCommandResult::Success(
TransactCommandSuccess {
expiration: Default::default(),
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
},
));
}
pub fn rollback_inbound_transaction(
@ -366,7 +527,20 @@ split this into prepare/finish blocks and put the rest of the logic including th
};
// Rollback just needs to drop the transaction wherever it is
return_finished_transaction_command!(self, key rtk, transaction_id);
let Some(active_transaction_list) = self.inbound_transactions.get_mut(&rtk) else {
veilid_log!(self debug "missing transaction list");
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
return Ok(InboundTransactCommandResult::Success(
TransactCommandSuccess {
expiration: Default::default(),
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
},
));
}
pub fn inbound_transaction_get(
@ -381,17 +555,15 @@ split this into prepare/finish blocks and put the rest of the logic including th
};
// If the transaction is still active and not ended/locked
let Some(active_transaction_list) = self.record_transactions.get_mut(&rtk) else {
let Some(active_transaction_list) = self.inbound_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
if active_transaction_list.is_locked_by(transaction_id) {
return_invalid_transaction!(
self,
active_transaction_list,
transaction_id,
"get after end"
);
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
veilid_log!(self debug "{}","get after end");
return Ok(InboundTransactCommandResult::InvalidTransaction);
}
// Get the inbound transaction if it is still valid
@ -443,17 +615,15 @@ split this into prepare/finish blocks and put the rest of the logic including th
};
// If the transaction is still active and not ended/locked
let Some(active_transaction_list) = self.record_transactions.get_mut(&rtk) else {
let Some(active_transaction_list) = self.inbound_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
if active_transaction_list.is_locked_by(transaction_id) {
return_invalid_transaction!(
self,
active_transaction_list,
transaction_id,
"set after end"
);
active_transaction_list
.drop_transaction(transaction_id, &mut self.transaction_id_allocator)?;
veilid_log!(self debug "{}","set after end");
return Ok(InboundTransactCommandResult::InvalidTransaction);
}
// Get the inbound transaction if it is still valid
@ -522,7 +692,7 @@ split this into prepare/finish blocks and put the rest of the logic including th
let error_logger = |e| {
veilid_log!(registry error "error in drop_expired_transactions: {}", e);
};
self.record_transactions
self.inbound_transactions
.retain(|_key, inbound_transaction_list| {
inbound_transaction_list.drop_expired_transactions(
now,

View file

@ -1,310 +0,0 @@
use super::*;
/// An individual watch
#[derive(Debug, Clone)]
pub(super) struct InboundWatch {
/// The configuration of the watch
pub params: InboundWatchParameters,
/// A unique id per record assigned at watch creation time. Used to disambiguate a client's version of a watch
pub id: u64,
/// What has changed in the watched range since the last update.
/// May include non-watched ranges if they were changed as part of an overlapping transaction
pub changed: ValueSubkeyRangeSet,
}
#[derive(Debug, Default, Clone)]
/// A record being watched for changes
pub(super) struct InboundWatchList {
/// The list of active watches
pub watches: Vec<InboundWatch>,
}
// ValueChangedInfo but without the subkey data that requires an async operation to get
#[derive(Debug)]
pub(super) struct EarlyValueChangedInfo {
pub target: Target,
pub key: OpaqueRecordKey,
pub subkeys: ValueSubkeyRangeSet,
pub count: u32,
pub watch_id: u64,
}
impl<D> RecordStoreInner<D>
where
D: RecordDetail,
{
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn update_watched_value(
&mut self,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
watch_update_mode: &InboundWatchUpdateMode,
) {
let (do_update, opt_ignore_target) = match watch_update_mode {
InboundWatchUpdateMode::NoUpdate => (false, None),
InboundWatchUpdateMode::UpdateAll => (true, None),
InboundWatchUpdateMode::ExcludeTarget(target) => (true, Some(target)),
};
if !do_update {
return;
}
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let Some(wr) = self.watched_records.get_mut(&rtk) else {
return;
};
// Update all watchers
let mut changed_watched = false;
for w in &mut wr.watches {
// If this watcher is watching the changed subkey then add to the watcher's changed list
// Don't bother marking changes for value sets coming from the same watching node/target because they
// are already going to be aware of the changes in that case
if Some(&w.params.target) != opt_ignore_target && w.params.subkeys.contains(subkey) {
w.changed.insert(subkey);
changed_watched = true;
}
}
if changed_watched {
self.changed_watched_values.insert(rtk);
}
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn create_new_watch(
&mut self,
opaque_record_key: &OpaqueRecordKey,
params: InboundWatchParameters,
member_check: Box<dyn Fn(&MemberId) -> bool + Send>,
) -> VeilidAPIResult<InboundWatchValueResult> {
// Generate a record-unique watch id > 0
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
// Calculate watch limits
let mut watch_count = 0;
let mut target_watch_count = 0;
let mut existing_ids = BTreeSet::new();
let is_member = member_check(&params.watcher_member_id);
if let Some(watched_record) = self.watched_records.get_mut(&rtk) {
// Total up the number of watches for this key
for w in &mut watched_record.watches {
existing_ids.insert(w.id);
// See if this watch should be counted toward any limits
let count_watch = if is_member {
// If the watcher is a member of the schema, then consider the total per-watcher key
w.params.watcher_member_id == params.watcher_member_id
} else {
// If the watcher is not a member of the schema, the check if this watch is an anonymous watch and contributes to per-record key total
!member_check(&w.params.watcher_member_id)
};
// For any watch, if the target matches our also tally that separately
// If the watcher is a member of the schema, then consider the total per-target-per-watcher key
// If the watcher is not a member of the schema, then it is an anonymous watch and the total is per-target-per-record key
if count_watch {
watch_count += 1;
if w.params.target == params.target {
target_watch_count += 1;
}
}
}
}
// For members, no more than one watch per target per watcher per record
// For anonymous, no more than one watch per target per record
if target_watch_count > 0 {
// Too many watches
return Ok(InboundWatchValueResult::Rejected);
}
// Check watch table for limits
let watch_limit = if is_member {
self.unlocked_inner.limits.member_watch_limit
} else {
self.unlocked_inner.limits.public_watch_limit
};
if watch_count >= watch_limit {
return Ok(InboundWatchValueResult::Rejected);
}
// Generate a record-unique watch id > 0
let mut id = 0;
while id == 0 {
id = get_random_u64();
}
// Make sure it doesn't match any other id (unlikely, but lets be certain)
while existing_ids.contains(&id) {
let next_id = id.overflowing_add(1);
id = next_id.0 + if next_id.1 { 1 } else { 0 };
}
// Ok this is an acceptable new watch, add it
let watch_list = self.watched_records.entry(rtk).or_default();
let expiration = params.expiration;
watch_list.watches.push(InboundWatch {
params,
id,
changed: Default::default(),
});
Ok(InboundWatchValueResult::Created { id, expiration })
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn change_existing_watch(
&mut self,
opaque_record_key: &OpaqueRecordKey,
params: InboundWatchParameters,
watch_id: u64,
) -> VeilidAPIResult<InboundWatchValueResult> {
if params.count == 0 {
apibail_internal!("cancel watch should not have gotten here");
}
if params.expiration.as_u64() == 0 {
apibail_internal!("zero expiration should have been resolved to max by now");
}
// Get the watch list for this record
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let Some(watch_list) = self.watched_records.get_mut(&rtk) else {
// No watches, nothing to change
return Ok(InboundWatchValueResult::Rejected);
};
// Check each watch to see if we have an exact match for the id to change
for w in &mut watch_list.watches {
// If the watch id doesn't match, then we're not updating
// Also do not allow the watcher key to change
if w.id == watch_id && w.params.watcher_member_id == params.watcher_member_id {
// Updating an existing watch
w.params = params;
return Ok(InboundWatchValueResult::Changed {
expiration: w.params.expiration,
});
}
}
// No existing watch found
Ok(InboundWatchValueResult::Rejected)
}
/// Clear a specific watch for a record
/// returns true if the watch was found and cancelled
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn cancel_watch(
&mut self,
record_key: OpaqueRecordKey,
watch_id: u64,
watcher_member_id: MemberId,
) -> VeilidAPIResult<bool> {
if watch_id == 0 {
apibail_internal!("should not have let a zero watch id get here");
}
// See if we are cancelling an existing watch
let rtk = RecordTableKey { record_key };
let mut is_empty = false;
let mut ret = false;
if let Some(watch_list) = self.watched_records.get_mut(&rtk) {
let mut dead_watcher = None;
for (wn, w) in watch_list.watches.iter_mut().enumerate() {
// Must match the watch id and the watcher key to cancel
if w.id == watch_id && w.params.watcher_member_id == watcher_member_id {
// Canceling an existing watch
dead_watcher = Some(wn);
ret = true;
break;
}
}
if let Some(dw) = dead_watcher {
watch_list.watches.remove(dw);
if watch_list.watches.is_empty() {
is_empty = true;
}
}
}
if is_empty {
self.watched_records.remove(&rtk);
}
Ok(ret)
}
/// See if any watched records have expired and clear them out
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn check_watched_records(&mut self) {
let now = Timestamp::now_non_decreasing();
self.watched_records.retain(|key, watch_list| {
watch_list.watches.retain(|w| {
w.params.count != 0 && w.params.expiration > now && !w.params.subkeys.is_empty()
});
if watch_list.watches.is_empty() {
// If we're removing the watched record, drop any changed watch values too
self.changed_watched_values.remove(key);
false
} else {
true
}
});
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn take_value_changes(&mut self) -> Vec<EarlyValueChangedInfo> {
let mut evcis = vec![];
let mut empty_watched_records = vec![];
for rtk in self.changed_watched_values.drain() {
if let Some(watch) = self.watched_records.get_mut(&rtk) {
// Process watch notifications
let mut dead_watchers = vec![];
for (wn, w) in watch.watches.iter_mut().enumerate() {
// Get the subkeys that have changed
let subkeys = w.changed.clone();
// If no subkeys on this watcher have changed then skip it
if subkeys.is_empty() {
continue;
}
// Clear the change logs
w.changed.clear();
// Reduce the count of changes sent
// if count goes to zero mark this watcher dead
w.params.count -= 1;
let count = w.params.count;
if count == 0 {
dead_watchers.push(wn);
}
evcis.push(EarlyValueChangedInfo {
target: w.params.target.clone(),
key: rtk.record_key.clone(),
subkeys,
count,
watch_id: w.id,
});
}
// Remove in reverse so we don't have to offset the index to remove the right key
for dw in dead_watchers.iter().rev().copied() {
watch.watches.remove(dw);
if watch.watches.is_empty() {
empty_watched_records.push(rtk);
break;
}
}
}
}
for ewr in empty_watched_records {
self.watched_records.remove(&ewr);
}
evcis
}
}

View file

@ -1,7 +1,7 @@
mod commit_action;
mod debug;
mod inbound_transactions;
mod inbound_watch;
mod inbound_watches;
mod keys;
mod limited_size;
mod load_action;
@ -9,10 +9,10 @@ mod record_data;
mod record_index;
use super::*;
use inbound_watch::*;
pub(super) use commit_action::*;
pub(super) use inbound_transactions::*;
pub(in super::super) use inbound_transactions::*;
pub(in super::super) use inbound_watches::*;
pub(super) use keys::*;
pub(super) use limited_size::*;
pub(super) use load_action::*;
@ -29,17 +29,11 @@ where
/// In-memory record index and cache
record_index: RecordIndex<D>,
/// The set of records being watched for changes
watched_records: HashMap<RecordTableKey, InboundWatchList>,
/// The watches per record
inbound_watches: InboundWatches,
/// The list of watched records that have changed values since last notification
changed_watched_values: HashSet<RecordTableKey>,
/// The set of records for which transactions are active. The records may not exist in the store until committed.
record_transactions: HashMap<RecordTableKey, InboundTransactionList>,
/// The set of all allocated transaction ids
transaction_id_allocator: InboundTransactionIdAllocator,
/// The transactions per record
inbound_transactions: InboundTransactions,
}
impl<D> VeilidComponentRegistryAccessor for RecordStoreInner<D>
@ -58,10 +52,8 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RecordStoreInner")
.field("record_index", &self.record_index)
.field("watched_records", &self.watched_records)
.field("changed_watched_values", &self.changed_watched_values)
.field("record_transactions", &self.record_transactions)
.field("transaction_id_allocator", &self.transaction_id_allocator)
.field("inbound_watches", &self.inbound_watches)
.field("inbound_transactions", &self.inbound_transactions)
.finish()
}
}
@ -75,10 +67,8 @@ where
Ok(Self {
record_index,
watched_records: Default::default(),
changed_watched_values: Default::default(),
record_transactions: Default::default(),
transaction_id_allocator: Default::default(),
inbound_watches: Default::default(),
inbound_transactions: Default::default(),
unlocked_inner,
})
}
@ -245,10 +235,10 @@ where
};
// Remove transactions
self.record_transactions.remove(&rtk);
self.inbound_transactions.remove(&rtk);
// Remove watches
self.watched_records.remove(&rtk);
self.inbound_watches.remove(&rtk);
// Remove watch changes
self.changed_watched_values.remove(&rtk);

View file

@ -77,7 +77,7 @@ impl StorageManager {
.map_err(VeilidAPIError::from)?;
network_result_value_or_log!(self rpc_processor
.rpc_call_value_changed(dest, vc.record_key.clone(), vc.subkeys.clone(), vc.count, vc.watch_id, vc.value.map(|v| (*v).clone()) )
.rpc_call_value_changed(dest, vc.record_key.clone(), vc.subkeys.clone(), vc.count, vc.watch_id.into(), vc.value.map(|v| (*v).clone()) )
.await
.map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {});

View file

@ -34,7 +34,7 @@ impl fmt::Display for OutboundTransactionHandle {
}
impl StorageManager {
/// Create a new transaction over a set of records
/// Create a new outbound transaction over a set of records
/// If an existing transaction exists over these records
/// or a transaction can not be performed at this time, this will fail.
/// Returns a transaction handle if the transaction was created
@ -474,11 +474,7 @@ impl StorageManager {
}
// Record the set values locally since they were successfully set online
let subkey_transaction_changes = self
.handle_set_local_values_with_multiple_records_lock(records_lock, keys_and_subkeys)
.await?;
self.handle_commit_local_subkey_transaction_changes(subkey_transaction_changes)
self.handle_set_local_values_with_multiple_records_lock(records_lock, keys_and_subkeys)
.await?;
Ok(())
@ -498,7 +494,7 @@ impl StorageManager {
apibail_not_initialized!();
};
let records_lock = self
.outbound_transaction_lock_table
.record_lock_table
.try_lock_records(transaction_handle.keys().to_vec())
.ok_or_else(|| VeilidAPIError::try_again("record busy"))?;
@ -506,7 +502,6 @@ impl StorageManager {
if !self
.inner
.lock()
.await
.outbound_transaction_manager
.transaction_exists(&transaction_handle)
{
@ -523,7 +518,10 @@ impl StorageManager {
.await?;
// Transaction is done successfully, drop it
self.drop_transaction_locked(&records_lock, transaction_handle)
self.inner
.lock()
.outbound_transaction_manager
.drop_transaction(transaction_handle)
.await;
Ok(())
@ -536,7 +534,7 @@ impl StorageManager {
transaction_handle: OutboundTransactionHandle,
) -> VeilidAPIResult<()> {
let command_params_list = {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
// Obtain the outbound transaction manager
let otm = &mut inner.outbound_transaction_manager;
@ -576,7 +574,7 @@ impl StorageManager {
// Store rollback results
{
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
let otm = &mut inner.outbound_transaction_manager;
if let Err(e) =
otm.record_transact_rollback_results(transaction_handle.clone(), results)
@ -852,7 +850,7 @@ impl StorageManager {
apibail_not_initialized!();
};
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
inner.outbound_transaction_manager.get_record_report(
transaction_handle,
&record_key.opaque(),
@ -861,25 +859,6 @@ impl StorageManager {
)
}
/// Drop a transaction. This eliminates the transaction locally and does not
/// perform any actions on the network. The remote transaction will time out on its own.
#[instrument(level = "trace", target = "dht", skip(self, _records_lock))]
pub(super) async fn drop_transaction_locked(
&self,
_records_lock: &RecordsLockGuard,
transaction_handle: OutboundTransactionHandle,
) {
veilid_log!(self debug target: "network_result", "Dropping transaction: {:?}", transaction_handle);
let mut inner = self.inner.lock().await;
// Obtain the outbound transaction manager
let otm = &mut inner.outbound_transaction_manager;
// Drop the transaction
otm.drop_transaction(transaction_handle.clone());
}
/// Guard function used to ensure that errors on whole-transaction operations cause rollback attempts
pub(super) async fn rollback_guard_locked<V, E, F: Future<Output = Result<V, E>>>(
&self,
@ -893,7 +872,7 @@ impl StorageManager {
Ok(v) => Ok(v),
Err(e) => {
{
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
let state = match inner
.outbound_transaction_manager
.get_transaction_state(&transaction_handle)
@ -937,8 +916,10 @@ impl StorageManager {
veilid_log!(self debug "Error in roll back transaction: {}", rbe);
}
if enable_drop {
self.drop_transaction_locked(records_lock, transaction_handle)
.await;
self.inner
.lock()
.outbound_transaction_manager
.drop_transaction(transaction_handle);
}
Err(e)
@ -953,14 +934,16 @@ impl StorageManager {
self.background_operation_processor.add_future(async move {
let storage_manager = registry.storage_manager();
let records_lock = storage_manager
.outbound_transaction_lock_table
let _records_lock = storage_manager
.record_lock_table
.lock_records(transaction_handle.keys().to_vec())
.await;
storage_manager
.drop_transaction_locked(&records_lock, transaction_handle)
.await;
.inner
.lock()
.outbound_transaction_manager
.drop_transaction(transaction_handle);
});
}
}

View file

@ -240,6 +240,27 @@ impl VeilidAPIError {
pub type VeilidAPIResult<T> = Result<T, VeilidAPIError>;
pub trait LogVeilidAPIResult<T> {
fn log_error<R: VeilidComponentRegistryAccessor, S: AsRef<str>>(
self,
registry_accessor: R,
message: S,
) -> T;
}
impl<T> LogVeilidAPIResult<T> for VeilidAPIResult<T> {
fn log_error<R: VeilidComponentRegistryAccessor, S: AsRef<str>>(
self,
registry_accessor: R,
message: S,
) -> T {
if let Err(e) = &self {
veilid_log!(registry_accessor error "{}: {}", message.as_ref(), e);
}
self
}
}
pub trait OkVeilidAPIResult<T> {
fn ok_try_again(self) -> VeilidAPIResult<Option<T>>;
fn ok_try_again_timeout(self) -> VeilidAPIResult<Option<T>>;