[ci skip] checkpoint

This commit is contained in:
Christien Rioux 2025-11-29 15:11:15 -05:00
parent af6c92ada4
commit 550ac7df5f
34 changed files with 311 additions and 310 deletions

View file

@ -133,7 +133,7 @@ impl AttachmentManager {
let registry = self.registry();
veilid_log!(self debug "starting attachment maintainer task");
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
attachment_maintainer_task,

View file

@ -350,7 +350,6 @@ macro_rules! impl_setup_task {
Box::pin(async move {
let this = registry.lookup::<$this_type>().unwrap();
this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}};
@ -358,6 +357,22 @@ macro_rules! impl_setup_task {
pub(crate) use impl_setup_task;
macro_rules! impl_setup_task_async {
($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
let registry = $this.registry();
$this.$task_name.set_routine(move |s, l, t| {
let registry = registry.clone();
Box::pin(async move {
let this = registry.lookup::<$this_type>().unwrap();
this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}};
}
pub(crate) use impl_setup_task_async;
// Utility macro for setting up an event bus handler
// Should be called after init, during post-init or later
// Subscription should be unsubscribed before termination

View file

@ -71,21 +71,21 @@ pub trait CryptoSystem {
apibail_generic!("incorrect shared secret kind");
}
if secret.value().len() != self.shared_secret_length() {
apibail_generic!(format!(
apibail_generic!(
"invalid shared secret length: {} != {}",
secret.value().len(),
self.shared_secret_length()
));
);
}
Ok(())
}
fn check_nonce(&self, nonce: &Nonce) -> VeilidAPIResult<()> {
if nonce.len() != self.nonce_length() {
apibail_generic!(format!(
apibail_generic!(
"invalid nonce length: {} != {}",
nonce.len(),
self.nonce_length()
));
);
}
Ok(())
}
@ -94,11 +94,11 @@ pub trait CryptoSystem {
apibail_generic!("incorrect hash digest kind");
}
if hash.value().len() != self.hash_digest_length() {
apibail_generic!(format!(
apibail_generic!(
"invalid hash digest length: {} != {}",
hash.value().len(),
self.hash_digest_length()
));
);
}
Ok(())
}
@ -107,11 +107,11 @@ pub trait CryptoSystem {
apibail_generic!("incorrect public key kind");
}
if key.value().len() != self.public_key_length() {
apibail_generic!(format!(
apibail_generic!(
"invalid public key length: {} != {}",
key.value().len(),
self.public_key_length()
));
);
}
Ok(())
}
@ -120,11 +120,11 @@ pub trait CryptoSystem {
apibail_generic!("incorrect secret key kind");
}
if key.value().len() != self.secret_key_length() {
apibail_generic!(format!(
apibail_generic!(
"invalid secret key length: {} != {}",
key.value().len(),
self.secret_key_length()
));
);
}
Ok(())
}
@ -133,11 +133,11 @@ pub trait CryptoSystem {
apibail_generic!("incorrect signature kind");
}
if signature.value().len() != self.signature_length() {
apibail_generic!(format!(
apibail_generic!(
"invalid signature length: {} != {}",
signature.value().len(),
self.signature_length()
));
);
}
Ok(())
}

View file

@ -5,7 +5,7 @@ use crate::attachment_manager::TickEvent;
impl NetworkManager {
pub fn setup_tasks(&self) {
// Set rolling transfers tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
rolling_transfers_task,

View file

@ -508,10 +508,10 @@ impl RoutingTable {
if let (Some(public_key), Some(secret_key)) = (public_key, secret_key) {
// Validate node id
if !vcrypto.validate_keypair(&public_key, &secret_key).await? {
apibail_generic!(format!(
apibail_generic!(
"secret_key and public_key don't match:\npublic_key: {}\nsecret_key: {}",
public_key, secret_key
));
);
}
(public_key, secret_key)
} else {

View file

@ -17,10 +17,10 @@ impl_veilid_log_facility!("rtab");
impl RoutingTable {
pub fn setup_tasks(&self) {
// Set flush tick task
impl_setup_task!(self, Self, flush_task, flush_task_routine);
impl_setup_task_async!(self, Self, flush_task, flush_task_routine);
// Set rolling transfers tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
rolling_transfers_task,
@ -28,7 +28,7 @@ impl RoutingTable {
);
// Set update state stats tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
update_state_stats_task,
@ -36,7 +36,7 @@ impl RoutingTable {
);
// Set rolling answers tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
rolling_answers_task,
@ -44,13 +44,13 @@ impl RoutingTable {
);
// Set kick buckets tick task
impl_setup_task!(self, Self, kick_buckets_task, kick_buckets_task_routine);
impl_setup_task_async!(self, Self, kick_buckets_task, kick_buckets_task_routine);
// Set bootstrap tick task
impl_setup_task!(self, Self, bootstrap_task, bootstrap_task_routine);
impl_setup_task_async!(self, Self, bootstrap_task, bootstrap_task_routine);
// Set peer minimum refresh tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
peer_minimum_refresh_task,
@ -58,7 +58,7 @@ impl RoutingTable {
);
// Set closest peers refresh tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
closest_peers_refresh_task,
@ -66,7 +66,7 @@ impl RoutingTable {
);
// Set ping validator PublicInternet tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
ping_validator_public_internet_task,
@ -74,7 +74,7 @@ impl RoutingTable {
);
// Set ping validator LocalNetwork tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
ping_validator_local_network_task,
@ -82,7 +82,7 @@ impl RoutingTable {
);
// Set ping validator PublicInternet Relay tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
ping_validator_public_internet_relay_task,
@ -90,7 +90,7 @@ impl RoutingTable {
);
// Set ping validator Active Watch tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
ping_validator_active_watch_task,
@ -98,7 +98,7 @@ impl RoutingTable {
);
// Set relay management tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
relay_management_task,
@ -106,7 +106,7 @@ impl RoutingTable {
);
// Set private route management tick task
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
private_route_management_task,

View file

@ -53,22 +53,22 @@ impl PeerInfo {
// Ensure node ids are within limits
if public_keys.is_empty() {
apibail_internal!(format!(
apibail_internal!(
"no public keys for peer info ({:?})\n{:#?}",
routing_domain, node_info
));
);
} else if public_keys.len() > MAX_CRYPTO_KINDS {
apibail_internal!(format!(
apibail_internal!(
"too many public keys for peer info ({:?}): {:?}\n{:#?}",
routing_domain, public_keys, node_info
));
);
}
// Make sure secret keys and public keys match and make keypairs
let mut keypairs = KeyPairGroup::new();
for pk in public_keys.iter() {
let Some(sk) = secret_keys.get(pk.kind()) else {
apibail_internal!(format!("secret key not found for public key: {}", pk));
apibail_internal!("secret key not found for public key: {}", pk);
};
keypairs.add(KeyPair::new_from_parts(pk.clone(), sk.value()));
}
@ -128,15 +128,15 @@ impl PeerInfo {
// Ensure node ids are within limits
if public_keys.is_empty() {
apibail_internal!(format!(
apibail_internal!(
"no public keys for peer info ({:?})\n{:#?}",
origin_routing_domain, node_info
));
);
} else if public_keys.len() > MAX_CRYPTO_KINDS {
apibail_internal!(format!(
apibail_internal!(
"too many public keys for peer info ({:?}): {:?}\n{:#?}",
origin_routing_domain, public_keys, node_info
));
);
}
// Verify signatures
@ -189,15 +189,15 @@ impl PeerInfo {
// Ensure node ids are within limits
if public_keys.is_empty() {
apibail_internal!(format!(
apibail_internal!(
"no public keys for peer info ({:?})\n{:#?}",
routing_domain, node_info
));
);
} else if public_keys.len() > MAX_CRYPTO_KINDS {
apibail_internal!(format!(
apibail_internal!(
"too many public keys for peer info ({:?}): {:?}\n{:#?}",
routing_domain, public_keys, node_info
));
);
}
// Generate on-the-wire node info message

View file

@ -50,7 +50,7 @@ impl StorageManager {
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
) -> VeilidAPIResult<()> {
if let Some(opened_record) = inner.opened_records.remove(&opaque_record_key) {
if let Some(opened_record) = inner.opened_records.remove(opaque_record_key) {
let record_key =
RecordKey::from_opaque(opaque_record_key.clone(), opened_record.encryption_key());
@ -63,7 +63,7 @@ impl StorageManager {
// Drop any transaction associated with the record
if let Some(transaction_handle) = inner
.outbound_transaction_manager
.get_transaction_by_record(&opaque_record_key)
.get_transaction_by_record(opaque_record_key)
{
inner
.outbound_transaction_manager

View file

@ -133,7 +133,7 @@ impl StorageManager {
.debug_record_subkey_info(&opaque_record_key, subkey)
.await
}
pub async fn debug_local_record_info(&self, record_key: RecordKey) -> String {
pub fn debug_local_record_info(&self, record_key: RecordKey) -> String {
let opaque_record_key = record_key.opaque();
let (local_record_store, opened_debug) = {
@ -154,7 +154,7 @@ impl StorageManager {
format!("{}\n{}", local_debug, opened_debug)
}
pub async fn debug_remote_record_info(&self, record_key: RecordKey) -> String {
pub fn debug_remote_record_info(&self, record_key: RecordKey) -> String {
let remote_record_store = {
let inner = self.inner.lock();
let Some(remote_record_store) = inner.remote_record_store.clone() else {

View file

@ -539,7 +539,7 @@ impl StorageManager {
core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)),
false,
self.config().network.dht.consensus_width as usize,
);
)?;
// If we got a new value back then write it to the opened record
if get_result_value.value_data().seq() != last_seq {

View file

@ -166,7 +166,7 @@ impl StorageManager {
results_iter,
false,
self.config().network.dht.consensus_width as usize,
);
)?;
if result.inspect_result.subkeys().is_empty() {
DHTRecordReport::new(

View file

@ -95,10 +95,10 @@ impl OutboundTransactionManager {
for rp in &record_params {
let opaque_record_key = rp.record_key.opaque();
if self.handles_by_key.contains_key(&opaque_record_key) {
apibail_generic!(format!(
apibail_generic!(
"Record {} already has a a transaction open",
opaque_record_key
));
);
}
opaque_record_keys.push(opaque_record_key);
}
@ -205,10 +205,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Begin
| OutboundTransactionStage::End
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Init",
outbound_transaction_state.stage(),
));
);
}
}
@ -238,10 +238,10 @@ impl OutboundTransactionManager {
outbound_transaction_state.stage(),
OutboundTransactionStage::Init
) {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Init",
outbound_transaction_state.stage(),
));
);
}
// Add all node transaction ids
@ -249,10 +249,10 @@ impl OutboundTransactionManager {
let Some(record_info) =
outbound_transaction_state.get_record_info_mut(&result.opaque_record_key)
else {
apibail_internal!(format!(
apibail_internal!(
"unexpected record in begin results: {}",
result.opaque_record_key
));
);
};
record_info.update_begin_network_seqs(result.seqs);
@ -270,11 +270,11 @@ impl OutboundTransactionManager {
|| record_info.get_node_transactions().len()
< record_info.required_strict_consensus_count()
{
apibail_try_again!(format!("did not get consensus of transaction ids (rec={}, stage={}, count={}, consensus={})",
apibail_try_again!("did not get consensus of transaction ids (rec={}, stage={}, count={}, consensus={})",
record_info.record_key().opaque(),
record_info.stage(),
record_info.get_node_transactions().len(),
record_info.required_strict_consensus_count()));
record_info.required_strict_consensus_count());
}
}
@ -298,10 +298,10 @@ impl OutboundTransactionManager {
OutboundTransactionStage::Init
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Begin, End, Inconsistent, or Failed",
outbound_transaction_state.stage(),
));
);
}
}
@ -357,10 +357,10 @@ impl OutboundTransactionManager {
OutboundTransactionStage::Init
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Begin, End, Inconsistent, or Failed",
outbound_transaction_state.stage(),
));
);
}
}
@ -371,16 +371,17 @@ impl OutboundTransactionManager {
let Some(record_info) =
outbound_transaction_state.get_record_info_mut(&opaque_record_key)
else {
apibail_internal!(format!("missing record in rollback: {}", opaque_record_key));
apibail_internal!("missing record in rollback: {}", opaque_record_key);
};
let mut missing_node_xids = record_info.get_node_xids::<HashSet<_>>();
for pnr in result.per_node_results {
if !missing_node_xids.remove(&pnr.node_transaction_id) {
apibail_internal!(format!(
apibail_internal!(
"node transaction rolled back multiple times: {} pnr={:?}",
opaque_record_key, pnr
));
opaque_record_key,
pnr
);
}
let node_transaction = record_info
@ -393,10 +394,10 @@ impl OutboundTransactionManager {
for missing_node_xid in &missing_node_xids {
let Some(node_transaction) = record_info.get_node_transaction_mut(missing_node_xid)
else {
apibail_internal!(format!(
apibail_internal!(
"missing node transaction in record info: {}",
missing_node_xid,
));
);
};
node_transaction.set_stage(OutboundTransactionStage::Failed, None);
}
@ -422,10 +423,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Failed
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Begin",
outbound_transaction_state.stage(),
));
);
}
}
@ -481,10 +482,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Failed
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Begin",
outbound_transaction_state.stage(),
));
);
}
}
@ -495,16 +496,17 @@ impl OutboundTransactionManager {
let Some(record_info) =
outbound_transaction_state.get_record_info_mut(&opaque_record_key)
else {
apibail_internal!(format!("missing record in end: {}", opaque_record_key));
apibail_internal!("missing record in end: {}", opaque_record_key);
};
let mut missing_node_xids = record_info.get_node_xids::<HashSet<_>>();
for pnr in result.per_node_results {
if !missing_node_xids.remove(&pnr.node_transaction_id) {
apibail_internal!(format!(
apibail_internal!(
"node transaction ended multiple times: {} pnr={:?}",
opaque_record_key, pnr
));
opaque_record_key,
pnr
);
}
let node_transaction = record_info
@ -525,10 +527,10 @@ impl OutboundTransactionManager {
for missing_node_xid in &missing_node_xids {
let Some(node_transaction) = record_info.get_node_transaction_mut(missing_node_xid)
else {
apibail_internal!(format!(
apibail_internal!(
"missing node transaction in record info: {}",
missing_node_xid,
));
);
};
node_transaction.set_stage(OutboundTransactionStage::Failed, None);
}
@ -554,10 +556,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Failed
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted End",
outbound_transaction_state.stage(),
));
);
}
}
@ -619,10 +621,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Failed
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted End",
outbound_transaction_state.stage(),
));
);
}
}
@ -633,16 +635,17 @@ impl OutboundTransactionManager {
let Some(record_info) =
outbound_transaction_state.get_record_info_mut(&opaque_record_key)
else {
apibail_internal!(format!("missing record in commit: {}", opaque_record_key));
apibail_internal!("missing record in commit: {}", opaque_record_key);
};
let mut missing_node_xids = record_info.get_node_xids::<HashSet<_>>();
for pnr in result.per_node_results {
if !missing_node_xids.remove(&pnr.node_transaction_id) {
apibail_internal!(format!(
apibail_internal!(
"node transaction committed multiple times: {} pnr={:?}",
opaque_record_key, pnr
));
opaque_record_key,
pnr
);
}
let node_transaction = record_info
@ -663,10 +666,10 @@ impl OutboundTransactionManager {
for missing_node_xid in &missing_node_xids {
let Some(node_transaction) = record_info.get_node_transaction_mut(missing_node_xid)
else {
apibail_internal!(format!(
apibail_internal!(
"missing node transaction in record info: {}",
missing_node_xid,
));
);
};
if node_transaction.commit_will_change_remote() {
@ -700,10 +703,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Failed
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Begin or Inconsistent",
outbound_transaction_state.stage(),
));
);
}
}
@ -769,10 +772,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Failed
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Begin or Inconsistent",
outbound_transaction_state.stage(),
));
);
}
}
@ -780,10 +783,7 @@ impl OutboundTransactionManager {
let Some(record_info) =
outbound_transaction_state.get_record_info_mut(&result.params.opaque_record_key)
else {
apibail_internal!(format!(
"missing record in set: {}",
result.params.opaque_record_key
));
apibail_internal!("missing record in set: {}", result.params.opaque_record_key);
};
let mut missing_node_xids = record_info.get_node_xids::<HashSet<_>>();
@ -798,10 +798,11 @@ impl OutboundTransactionManager {
let mut found_newer = false;
for pnr in result.per_node_results {
if !missing_node_xids.remove(&pnr.node_transaction_id) {
apibail_internal!(format!(
apibail_internal!(
"node transaction get multiple times: {} pnr={:?}",
result.params.opaque_record_key, pnr
));
result.params.opaque_record_key,
pnr
);
}
let node_transaction = record_info
@ -897,10 +898,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Failed
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Begin or Inconsistent",
outbound_transaction_state.stage(),
));
);
}
}
@ -966,10 +967,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Failed
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Begin or Inconsistent",
outbound_transaction_state.stage(),
));
);
}
}
@ -977,10 +978,7 @@ impl OutboundTransactionManager {
let Some(record_info) =
outbound_transaction_state.get_record_info_mut(&result.params.opaque_record_key)
else {
apibail_internal!(format!(
"missing record in get: {}",
result.params.opaque_record_key
));
apibail_internal!("missing record in get: {}", result.params.opaque_record_key);
};
let mut missing_node_xids = record_info.get_node_xids::<HashSet<_>>();
@ -990,10 +988,11 @@ impl OutboundTransactionManager {
let mut opt_get_subkey_consensus: Option<OutboundTransactionSubkeyConsensus> = None;
for pnr in result.per_node_results {
if !missing_node_xids.remove(&pnr.node_transaction_id) {
apibail_internal!(format!(
apibail_internal!(
"node transaction get multiple times: {} pnr={:?}",
result.params.opaque_record_key, pnr
));
result.params.opaque_record_key,
pnr
);
}
let node_transaction = record_info
@ -1046,10 +1045,10 @@ impl OutboundTransactionManager {
| OutboundTransactionStage::Failed
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Commit => {
apibail_generic!(format!(
apibail_generic!(
"stage was {:?}, wanted Begin or Inconsistent",
outbound_transaction_state.stage(),
));
);
}
}

View file

@ -40,19 +40,19 @@ impl StorageManager {
};
if record_key.value().key().len() != HASH_COORDINATE_LENGTH {
apibail_generic!(format!(
apibail_generic!(
"invalid record key length: {} != {}",
record_key.value().key().len(),
HASH_COORDINATE_LENGTH
));
);
}
if let Some(encryption_key) = record_key.value().encryption_key() {
if encryption_key.len() != vcrypto.shared_secret_length() {
apibail_generic!(format!(
apibail_generic!(
"invalid encryption key length: {} != {}",
encryption_key.len(),
vcrypto.shared_secret_length()
));
);
}
}

View file

@ -96,7 +96,6 @@ where
pub(super) type SubkeyValueList = Vec<(ValueSubkey, Arc<SignedValueData>)>;
pub(super) type RecordSubkeyValueList = Vec<(OpaqueRecordKey, SubkeyValueList)>;
type SubkeyRecordDataList = Vec<(ValueSubkey, RecordData)>;
impl<D> RecordStore<D>
where
@ -312,7 +311,7 @@ where
let opt_commit_action = self.inner.lock().set_subkeys_single_record(
opaque_record_key,
&subkey_values,
subkey_values,
&watch_update_mode,
)?;

View file

@ -98,24 +98,44 @@ where
}
}
pub fn record_stored_subkey(&mut self, subkey: ValueSubkey, data: &RecordData) {
pub fn record_stored_subkey(
&mut self,
subkey: ValueSubkey,
data: &RecordData,
max_record_data_size: usize,
) -> VeilidAPIResult<()> {
let seq = data.signed_value_data().value_data().seq();
let size = data.data_size() as u16;
let new_subkey_size = data.data_size() as u16;
let old_subkey_size = self.subkey_sizes[subkey as usize];
let new_record_data_size = if new_subkey_size > old_subkey_size {
self.record_data_size + (new_subkey_size - old_subkey_size) as usize
} else if new_subkey_size < old_subkey_size {
self.record_data_size - (old_subkey_size - new_subkey_size) as usize
} else {
self.record_data_size
};
if new_record_data_size > max_record_data_size {
apibail_internal!(
"record exceeds maximum data size: {} > {}",
new_record_data_size,
max_record_data_size
);
}
// No failures past this point
self.record_data_size = new_record_data_size;
self.stored_subkeys.insert(subkey);
self.subkey_seqs.resize(self.subkey_count, 0);
self.subkey_seqs[subkey as usize] = u32::from(seq);
self.subkey_sizes.resize(self.subkey_count, 0);
let old_size = self.subkey_sizes[subkey as usize];
self.subkey_sizes[subkey as usize] = size;
self.subkey_sizes[subkey as usize] = new_subkey_size;
if size > old_size {
self.record_data_size += (size - old_size) as usize;
} else if size < old_size {
self.record_data_size -= (old_size - size) as usize;
}
Ok(())
}
#[expect(dead_code)]
pub fn subkey_size(&self, subkey: ValueSubkey) -> u16 {
self.subkey_sizes[subkey as usize]
}

View file

@ -145,10 +145,8 @@ where
// Finish all load actions
{
let mut inner = self.inner.lock();
for opt_load_action in all_value_load_actions.into_iter() {
if let Some(load_action) = opt_load_action {
inner.finish_load_action(load_action);
}
for load_action in all_value_load_actions.into_iter().flatten() {
inner.finish_load_action(load_action);
}
}

View file

@ -30,11 +30,6 @@ where
#[derive(Debug)]
pub(super) enum UncommittedSubkeyChange {
Create {
/// The subkey data being created
new_data: RecordData,
},
Update {
/// The new subkey data
new_data: RecordData,
@ -111,9 +106,6 @@ where
}
for (stk, usc) in self.uncommitted_subkey_changes.iter() {
match usc {
UncommittedSubkeyChange::Create { new_data: data } => {
st_xact.store_json(0, &stk.bytes(), &data).await?;
}
UncommittedSubkeyChange::Update {
new_data,
opt_old_data: _,

View file

@ -93,7 +93,7 @@ impl InboundTransactionList {
pub fn lock(&mut self, transaction_id: InboundTransactionId) -> VeilidAPIResult<()> {
if let Some(existing_xid) = self.lock {
apibail_internal!(format!("request to lock inbound transaction list by xid {} when it was already locked by {}", transaction_id, existing_xid));
apibail_internal!("request to lock inbound transaction list by xid {} when it was already locked by {}", transaction_id, existing_xid);
}
self.lock = Some(transaction_id);

View file

@ -100,6 +100,8 @@ impl<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> LimitedSize<T> {
self.uncommitted_value = Some(new_value);
Ok(new_value)
}
#[expect(dead_code)]
pub fn saturating_sub(&mut self, mut v: T) -> T {
let current_value = self.current_value();
let max_v = current_value - T::min_value();
@ -123,10 +125,6 @@ impl<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> LimitedSize<T> {
true
}
pub fn limit(&self) -> Option<T> {
self.limit
}
pub fn commit(&mut self) -> Result<T, LimitError<T>> {
if let Some(uncommitted_value) = self.uncommitted_value.take() {
if let Some(limit) = self.limit {

View file

@ -119,6 +119,15 @@ where
record_key: key.clone(),
};
// Ensure this record is actually new
if !record.is_new() {
apibail_internal!(
"record was not new during create: key={}: {:?}",
key,
record
);
}
// If record already exists, fail early
if let Some(prev_record) = self.record_cache.get(&rtk) {
veilid_log!(self error "RecordIndex({}): Record already existed with key {}: {:?}", self.unlocked_inner.name, key, prev_record.clone());
@ -309,7 +318,6 @@ where
.and_then(|x| x.get(&stk))
})
.and_then(|v| match v {
UncommittedSubkeyChange::Create { new_data } => Some(new_data.clone()),
UncommittedSubkeyChange::Update {
new_data,
opt_old_data: _,
@ -369,7 +377,7 @@ where
};
// Make a RecordData for the value
let new_data = RecordData::new(value);
let new_data = self.make_record_data(value)?;
// Get the current record from the cache
let Some(old_record) = self.record_cache.get(&rtk).cloned() else {
@ -380,7 +388,11 @@ where
let mut new_record = old_record.clone();
// Change the record to reflect the new data
new_record.record_stored_subkey(subkey, &new_data);
new_record.record_stored_subkey(
subkey,
&new_data,
self.unlocked_inner.limits.max_record_data_size,
)?;
// Update the record's touch timestamp for LRU sorting
new_record.touch();
@ -443,8 +455,12 @@ where
let mut new_data_list = Vec::with_capacity(subkey_values.len());
for (subkey, value) in subkey_values.iter().cloned() {
// Change the record to reflect the new data
let new_data = RecordData::new(value);
new_record.record_stored_subkey(subkey, &new_data);
let new_data = self.make_record_data(value)?;
new_record.record_stored_subkey(
subkey,
&new_data,
self.unlocked_inner.limits.max_record_data_size,
)?;
// Keep the new data for later
new_data_list.push((subkey, new_data));
@ -523,8 +539,12 @@ where
let mut new_data_list = Vec::with_capacity(subkey_values.len());
for (subkey, value) in subkey_values.iter().cloned() {
// Change the record to reflect the new data
let new_data = RecordData::new(value);
new_record.record_stored_subkey(subkey, &new_data);
let new_data = self.make_record_data(value)?;
new_record.record_stored_subkey(
subkey,
&new_data,
self.unlocked_inner.limits.max_record_data_size,
)?;
// Keep the new data for later
new_data_list.push((subkey, new_data));
@ -707,6 +727,18 @@ where
//////////////////////////////////////////////////////////////////////////////////////////
fn make_record_data(&self, value: Arc<SignedValueData>) -> VeilidAPIResult<RecordData> {
if value.data_size() > self.unlocked_inner.limits.max_subkey_size {
apibail_internal!(
"record data too large for record index {}: {} > {}",
self.unlocked_inner.name,
value.data_size(),
self.unlocked_inner.limits.max_subkey_size,
);
}
Ok(RecordData::new(value))
}
async fn load_db(&mut self) -> EyreResult<()> {
let start_ts = Timestamp::now();
veilid_log!(self info "Loading record index: {}", self.unlocked_inner.name);
@ -878,18 +910,6 @@ where
// Process creates and updates with removal first so we don't have to worry about LRU
for (stk, usc) in uncommitted_subkey_changes.iter().rev() {
match usc {
UncommittedSubkeyChange::Create { new_data: data } => {
let opt_prev_data = self.uncache_subkey(stk);
// Validate
if let Some(prev_data) = opt_prev_data {
if &prev_data != data {
veilid_log!(self error "UncommittedSubkeyChange::Create rollback: {} had unexpected previous data", stk);
}
} else {
veilid_log!(self error "UncommittedSubkeyChange::Create rollback: {} had missing previous value", stk);
}
}
UncommittedSubkeyChange::Update {
new_data,
opt_old_data,
@ -961,9 +981,6 @@ where
}
}
}
UncommittedSubkeyChange::Create { new_data: _ } => {
// Already did these
}
}
}
}
@ -977,7 +994,7 @@ where
.load_json::<Record<D>>(0, &rtk.bytes())
.await?
else {
apibail_internal!(format!("missing record: {}", rtk));
apibail_internal!("missing record: {}", rtk);
};
record.post_deserialize();
@ -999,10 +1016,7 @@ where
st_xact: &TableDBTransaction,
) -> VeilidAPIResult<()> {
if self.record_cache.contains_key(rtk) {
apibail_internal!(format!(
"should have removed record from cache already: {}",
rtk
));
apibail_internal!("should have removed record from cache already: {}", rtk);
}
let stored_subkeys = record.stored_subkeys();
@ -1012,10 +1026,7 @@ where
subkey: sk,
};
if self.subkey_cache.contains_key(&stk) {
apibail_internal!(format!(
"should have removed subkey from cache already: {}",
stk
));
apibail_internal!("should have removed subkey from cache already: {}", stk);
}
st_xact.delete(0, &stk.bytes()).await?;
@ -1382,10 +1393,6 @@ where
std::collections::btree_map::Entry::Occupied(mut o) => {
let usc = o.get_mut();
match usc {
UncommittedSubkeyChange::Create { new_data: _ } => {
// Create followed by delete is nothing
o.remove();
}
UncommittedSubkeyChange::Update {
new_data: _,
opt_old_data,
@ -1426,10 +1433,6 @@ where
std::collections::btree_map::Entry::Occupied(mut o) => {
let usc = o.get_mut();
match usc {
UncommittedSubkeyChange::Create { new_data: _ } => {
// If we created a subkey and then updated it, might as well have just created it with the new value
*usc = UncommittedSubkeyChange::Create { new_data };
}
UncommittedSubkeyChange::Update {
new_data: _,
opt_old_data,
@ -1452,42 +1455,6 @@ where
}
}
fn add_uncommitted_subkey_create(&mut self, stk: SubkeyTableKey, new_data: RecordData) {
let stk_log = stk.clone();
match self.uncommitted_subkey_changes.entry(stk) {
std::collections::btree_map::Entry::Vacant(v) => {
v.insert(UncommittedSubkeyChange::Create { new_data });
}
std::collections::btree_map::Entry::Occupied(mut o) => {
let usc = o.get_mut();
match usc {
UncommittedSubkeyChange::Create { new_data: _ } => {
// Should never happen. Can't create an already created subkey.
veilid_log!(self error "subkey was created twice in uncommitted log: {}", stk_log);
}
UncommittedSubkeyChange::Update {
new_data: _,
opt_old_data: _,
} => {
// Should never happen. Can't create an already created subkey.
veilid_log!(self error "record was created after updated in uncommitted log: {}", stk_log);
}
UncommittedSubkeyChange::Delete {
opt_old_data,
is_lru: _,
} => {
// A delete followed by a create is really an update
*usc = UncommittedSubkeyChange::Update {
new_data,
opt_old_data: opt_old_data.clone(),
};
}
}
}
}
}
pub fn debug(&self) -> String {
let mut out = String::new();
@ -1511,14 +1478,14 @@ where
"Uncommitted Record Changes: {}\n",
self.uncommitted_record_changes.len()
);
for (k, _v) in &self.uncommitted_record_changes {
for k in self.uncommitted_record_changes.keys() {
out += &format!(" {}\n", k);
}
out += &format!(
"Uncommitted Subkey Changes: {}\n",
self.uncommitted_subkey_changes.len()
);
for (k, _v) in &self.uncommitted_subkey_changes {
for k in self.uncommitted_subkey_changes.keys() {
out += &format!(" {}\n", k);
}

View file

@ -216,7 +216,7 @@ impl StorageManager {
let mut rehydrated = ValueSubkeyRangeSet::new();
for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() {
if local_inspect_result.seqs()[n].is_none() {
apibail_internal!(format!("None sequence number found in local inspect results. Should have been stripped by strip_none_seqs(): {:?}", local_inspect_result));
apibail_internal!("None sequence number found in local inspect results. Should have been stripped by strip_none_seqs(): {:?}", local_inspect_result);
}
let sfr = outbound_inspect_result
@ -260,7 +260,7 @@ impl StorageManager {
results_iter,
false,
self.config().network.dht.consensus_width as usize,
);
)?;
Ok(RehydrateReport {
opaque_record_key,

View file

@ -245,10 +245,7 @@ impl StorageManager {
) {
veilid_log!(self debug "schema validation error: {}", e);
// Validation failed, ignore this value
apibail_generic!(format!(
"failed schema validation: {}:{}",
record_key, subkey
));
apibail_generic!("failed schema validation: {}:{}", record_key, subkey);
}
// Sign the new value data with the writer
@ -726,7 +723,7 @@ impl StorageManager {
core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)),
true,
self.config().network.dht.consensus_width as usize,
);
)?;
// Record the set value locally since it was successfully set online
let subkey_lock = self

View file

@ -3,7 +3,7 @@ use super::*;
impl StorageManager {
// Check if server-side transactions have expired
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_inbound_transactions_task_routine(
pub(super) fn check_inbound_transactions_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,

View file

@ -3,7 +3,7 @@ use super::*;
impl StorageManager {
// Check if server-side watches have expired
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_inbound_watches_task_routine(
pub(super) fn check_inbound_watches_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,

View file

@ -3,7 +3,7 @@ use super::*;
impl StorageManager {
// Check if client-side transactions on opened records have expired
//#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_outbound_transactions_task_routine(
pub(super) fn check_outbound_transactions_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,

View file

@ -3,7 +3,7 @@ use super::*;
impl StorageManager {
// Check if client-side watches on opened records either have dead nodes or if the watch has expired
//#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_outbound_watches_task_routine(
pub(super) fn check_outbound_watches_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,

View file

@ -14,7 +14,7 @@ impl StorageManager {
pub(super) fn setup_tasks(&self) {
// Set flush records tick task
veilid_log!(self debug "starting flush record stores task");
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
flush_record_stores_task,
@ -23,11 +23,11 @@ impl StorageManager {
// Set save metadata task
veilid_log!(self debug "starting save metadata task");
impl_setup_task!(self, Self, save_metadata_task, save_metadata_task_routine);
impl_setup_task_async!(self, Self, save_metadata_task, save_metadata_task_routine);
// Set offline subkey writes tick task
veilid_log!(self debug "starting offline subkey writes task");
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
offline_subkey_writes_task,
@ -36,7 +36,7 @@ impl StorageManager {
// Set send value changes tick task
veilid_log!(self debug "starting send value changes task");
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
send_value_changes_task,
@ -81,7 +81,7 @@ impl StorageManager {
// Set rehydrate records tick task
veilid_log!(self debug "starting rehydrate records task");
impl_setup_task!(
impl_setup_task_async!(
self,
Self,
rehydrate_records_task,

View file

@ -188,7 +188,8 @@ impl StorageManager {
result.fanout_results.into_iter().map(|x| (x.0, x.1)),
true,
consensus_width,
);
)
.unwrap_or_else(veilid_log_err!(self));
}
// Get the next available work item

View file

@ -86,14 +86,14 @@ impl StorageManager {
for record_key in record_keys {
let opaque_record_key = record_key.opaque();
let Some(opened_record) = inner.opened_records.get(&opaque_record_key) else {
apibail_generic!(format!("record key not open: {}", opaque_record_key));
apibail_generic!("record key not open: {}", opaque_record_key);
};
if record_key.encryption_key().map(|x| x.value()) != opened_record.encryption_key()
{
apibail_generic!(format!(
apibail_generic!(
"record encryption key does not match opened record encryption key: {}",
opaque_record_key
));
);
}
// Get signing keypair for this transaction
@ -213,11 +213,11 @@ impl StorageManager {
for result in &results {
let subkey_count = result.descriptor.schema()?.subkey_count();
if result.seqs.len() != subkey_count {
apibail_internal!(format!(
apibail_internal!(
"seqs returned does not match subkey count: {} != {}",
result.seqs.len(),
subkey_count
));
);
}
let max_subkey = result.descriptor.schema()?.max_subkey();
@ -229,7 +229,7 @@ impl StorageManager {
)),
false,
self.config().network.dht.consensus_width as usize,
);
)?;
}
if let Err(e) = self
@ -445,15 +445,10 @@ impl StorageManager {
if let Some(err) = opt_commit_error {
return Err(err);
}
// Perform storage manager operations
self.flush_committed_transaction_locked_inner(
&mut inner,
records_lock,
transaction_handle,
)
.await?;
}
// Perform storage manager operations
self.flush_committed_transaction_locked(records_lock, transaction_handle)
.await?;
Ok(())
},
@ -463,25 +458,30 @@ impl StorageManager {
/// Removes the transaction from the transaction manager
/// and flushes its contents to the storage manager
#[instrument(level = "trace", target = "dht", skip(self, inner, records_lock), err)]
pub(super) async fn flush_committed_transaction_locked_inner(
#[instrument(level = "trace", target = "dht", skip(self, records_lock), err)]
pub(super) async fn flush_committed_transaction_locked(
&self,
inner: &mut StorageManagerInner,
records_lock: &RecordsLockGuard,
transaction_handle: OutboundTransactionHandle,
) -> VeilidAPIResult<()> {
let transaction_state = inner
.outbound_transaction_manager
.drop_transaction(transaction_handle)
.ok_or_else(|| VeilidAPIError::internal("missing transaction in flush"))?;
let keys_and_subkeys = {
let mut inner = self.inner.lock();
let mut keys_and_subkeys = vec![];
for record_info in transaction_state.get_record_infos() {
let opaque_record_key = record_info.record_key().opaque();
let local_commit_results = record_info.local_commit_results()?;
let transaction_state = inner
.outbound_transaction_manager
.drop_transaction(transaction_handle)
.ok_or_else(|| VeilidAPIError::internal("missing transaction in flush"))?;
keys_and_subkeys.push((opaque_record_key, local_commit_results));
}
let mut keys_and_subkeys = vec![];
for record_info in transaction_state.get_record_infos() {
let opaque_record_key = record_info.record_key().opaque();
let local_commit_results = record_info.local_commit_results()?;
keys_and_subkeys.push((opaque_record_key, local_commit_results));
}
keys_and_subkeys
};
// Record the set values locally since they were successfully set online
self.handle_set_local_values_with_multiple_records_lock(records_lock, keys_and_subkeys)
@ -665,7 +665,7 @@ impl StorageManager {
let Some(record_info) =
outbound_transaction_state.get_record_info(&record_key.opaque())
else {
apibail_internal!(format!("missing record in get: {}", record_key.opaque()));
apibail_internal!("missing record in get: {}", record_key.opaque());
};
record_info.current_subkey_get_result(subkey)?
};
@ -793,30 +793,32 @@ impl StorageManager {
}
OutboundTransactionStage::Failed => {
// Unrecoverable failure, must rollback
apibail_generic!(format!(
"Transaction failed in set operation: transaction_handle={}, key={}, subkey={}",
transaction_handle, record_key, subkey
));
apibail_generic!(
"Transaction failed in set operation: transaction_handle={}, key={}, subkey={}",
transaction_handle, record_key, subkey
);
}
OutboundTransactionStage::Inconsistent => {
// Set failed at this time, try again is possible
apibail_try_again!(format!(
apibail_try_again!(
"Inconsistent set operation: transaction_handle={}, key={}, subkey={}",
transaction_handle, record_key, subkey
));
transaction_handle,
record_key,
subkey
);
}
OutboundTransactionStage::End
| OutboundTransactionStage::Rollback
| OutboundTransactionStage::Init
| OutboundTransactionStage::Commit => {
apibail_internal!(format!("Unexpected transaction state '{}' in set operation: transaction_handle={}, key={}, subkey={}",output_stage,transaction_handle,record_key,subkey));
apibail_internal!("Unexpected transaction state '{}' in set operation: transaction_handle={}, key={}, subkey={}",output_stage,transaction_handle,record_key,subkey);
}
}
let Some(record_info) =
outbound_transaction_state.get_record_info(&record_key.opaque())
else {
apibail_internal!(format!("missing record in set: {}", record_key.opaque()));
apibail_internal!("missing record in set: {}", record_key.opaque());
};
// If there is an updated value, it means the set succeeded
@ -829,11 +831,11 @@ impl StorageManager {
// If the set found a newer value it would be recorded in the current consensus
// unless an error condition was hit, in which case we should have had a failed or inconsistent state
let Some(current_subkey_consensus) = record_info.current_consensus().get(subkey) else {
apibail_internal!(format!(
apibail_internal!(
"record subkey {} should have a current consensus: {}",
subkey,
record_key.opaque()
));
);
};
// Return current subkey consensus value data
@ -841,11 +843,11 @@ impl StorageManager {
};
let Some(current_signed_value_data) = opt_current_signed_value_data else {
apibail_internal!(format!(
apibail_internal!(
"record subkey {} consensus value should not be missing: {}",
subkey,
record_key.opaque()
));
);
};
let current_value_data =
self.maybe_decrypt_value_data(&record_key, current_signed_value_data.value_data())?;

View file

@ -599,14 +599,14 @@ impl StorageManager {
core::iter::once((ValueSubkeyRangeSet::new(), fanout_result)),
false,
self.config().network.dht.consensus_width as usize,
);
)?;
let owvresult = context.lock().watch_value_result.clone();
Ok(owvresult)
}
/// Remove dead watches from the table
pub(super) async fn process_outbound_watch_dead(&self, watch_lock: RecordLockGuard) {
pub(super) fn process_outbound_watch_dead(&self, watch_lock: RecordLockGuard) {
let opaque_record_key = watch_lock.record();
let Some(outbound_watch) = self
@ -1105,7 +1105,6 @@ impl StorageManager {
registry
.storage_manager()
.process_outbound_watch_dead(watch_lock)
.await
}
};
return Some(pin_dyn_future!(fut));
@ -1513,7 +1512,7 @@ impl StorageManager {
}
// Get what subkeys are being watched
let Some(watched_subkeys) = self.get_watched_subkeys_inner(&inner, &record_key)? else {
let Some(watched_subkeys) = self.get_watched_subkeys_inner(inner, &record_key)? else {
// Nothing watched, nothing to report
return Ok(NetworkResult::value(()));
};

View file

@ -233,10 +233,10 @@ impl TableDB {
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn get_keys(&self, col: u32) -> VeilidAPIResult<Vec<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(format!(
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
);
}
let db = self.unlocked_inner.database.clone();
let mut out = Vec::new();
@ -259,10 +259,10 @@ impl TableDB {
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn get_key_count(&self, col: u32) -> VeilidAPIResult<u64> {
if col >= self.opened_column_count {
apibail_generic!(format!(
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
);
}
let db = self.unlocked_inner.database.clone();
let key_count = db.num_keys(col).await.map_err(VeilidAPIError::from)?;
@ -279,10 +279,10 @@ impl TableDB {
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
if col >= self.opened_column_count {
apibail_generic!(format!(
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
);
}
let db = self.unlocked_inner.database.clone();
let mut dbt = db.transaction();
@ -308,10 +308,10 @@ impl TableDB {
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(format!(
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
);
}
let db = self.unlocked_inner.database.clone();
let key = self.maybe_encrypt(key, true).await;
@ -340,10 +340,10 @@ impl TableDB {
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(format!(
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
);
}
let key = self.maybe_encrypt(key, true).await;
@ -433,7 +433,7 @@ impl TableDBTransaction {
/// Return the number of operations in the transaction
/// May be less than the number performed if duplicate keys were specified
pub fn len(&self) -> usize {
#[must_use] pub fn len(&self) -> usize {
self.inner
.lock()
.ops
@ -443,7 +443,7 @@ impl TableDBTransaction {
}
/// Returns true if the operation count to be performed is zero
pub fn is_empty(&self) -> bool {
#[must_use] pub fn is_empty(&self) -> bool {
self.inner
.lock()
.ops
@ -494,10 +494,10 @@ impl TableDBTransaction {
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
if col >= self.db.opened_column_count {
apibail_generic!(format!(
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col, self.db.opened_column_count
));
);
}
let key = self.db.maybe_encrypt(key, true).await;
@ -524,10 +524,10 @@ impl TableDBTransaction {
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> {
if col >= self.db.opened_column_count {
apibail_generic!(format!(
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col, self.db.opened_column_count
));
);
}
let key = self.db.maybe_encrypt(key, true).await;

View file

@ -1478,7 +1478,7 @@ impl VeilidAPI {
}
}
async fn debug_record_list(&self, args: Vec<String>) -> VeilidAPIResult<String> {
fn debug_record_list(&self, args: Vec<String>) -> VeilidAPIResult<String> {
// <local|remote>
let registry = self.core_context()?.registry();
let storage_manager = registry.storage_manager();
@ -1836,8 +1836,8 @@ impl VeilidAPI {
li, ri
)
} else {
let li = storage_manager.debug_local_record_info(key.clone()).await;
let ri = storage_manager.debug_remote_record_info(key.clone()).await;
let li = storage_manager.debug_local_record_info(key.clone());
let ri = storage_manager.debug_remote_record_info(key.clone());
format!("Local Info:\n{}\n\nRemote Info:\n{}\n", li, ri)
};
Ok(out)
@ -2014,7 +2014,7 @@ impl VeilidAPI {
Ok(format!("Success: report={:?}", report))
}
async fn debug_record_rehydrate(&self, args: Vec<String>) -> VeilidAPIResult<String> {
fn debug_record_rehydrate(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let registry = self.core_context()?.registry();
let storage_manager = registry.storage_manager();
@ -2072,7 +2072,7 @@ impl VeilidAPI {
let command = get_debug_argument_at(&args, 0, "debug_record", "command", get_string)?;
if command == "list" {
self.debug_record_list(args).await
self.debug_record_list(args)
} else if command == "purge" {
self.debug_record_purge(args).await
} else if command == "create" {
@ -2096,7 +2096,7 @@ impl VeilidAPI {
} else if command == "inspect" {
self.debug_record_inspect(args).await
} else if command == "rehydrate" {
self.debug_record_rehydrate(args).await
self.debug_record_rehydrate(args)
} else {
Ok(">>> Unknown command\n".to_owned())
}

View file

@ -22,6 +22,9 @@ macro_rules! apibail_try_again {
($x:expr) => {
return Err(VeilidAPIError::try_again($x))
};
($fmt:literal, $($args:tt)*) => {
return Err(VeilidAPIError::try_again( format!($fmt, $($args)*) ))
};
}
#[allow(unused_macros)]
@ -30,6 +33,9 @@ macro_rules! apibail_generic {
($x:expr) => {
return Err(VeilidAPIError::generic($x))
};
($fmt:literal, $($args:tt)*) => {
return Err(VeilidAPIError::generic( format!($fmt, $($args)*) ))
};
}
#[allow(unused_macros)]
@ -38,6 +44,9 @@ macro_rules! apibail_internal {
($x:expr) => {
return Err(VeilidAPIError::internal($x))
};
($fmt:literal, $($args:tt)*) => {
return Err(VeilidAPIError::internal( format!($fmt, $($args)*) ))
};
}
#[allow(unused_macros)]
@ -70,6 +79,10 @@ macro_rules! apibail_no_connection {
($x:expr) => {
return Err(VeilidAPIError::no_connection($x))
};
($fmt:literal, $($args: tt)* ) => {
return Err(VeilidAPIError::no_connection( format!($fmt, arg $($args)*) ))
};
}
#[allow(unused_macros)]

View file

@ -15,10 +15,11 @@ pub fn decompress_size_prepended(
block::uncompressed_size(input).map_err(VeilidAPIError::generic)?;
if let Some(max_size) = max_size {
if uncompressed_size > max_size {
apibail_generic!(format!(
apibail_generic!(
"decompression exceeded maximum size: {} > {}",
uncompressed_size, max_size
));
uncompressed_size,
max_size
);
}
}
block::decompress(input, uncompressed_size).map_err(VeilidAPIError::generic)