From 550ac7df5fe18b3f499fde7340d42b8eca2987f2 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 29 Nov 2025 15:11:15 -0500 Subject: [PATCH] [ci skip] checkpoint --- veilid-core/src/attachment_manager.rs | 2 +- veilid-core/src/component.rs | 17 ++- veilid-core/src/crypto/crypto_system/mod.rs | 24 ++-- veilid-core/src/network_manager/tasks/mod.rs | 2 +- veilid-core/src/routing_table/mod.rs | 4 +- veilid-core/src/routing_table/tasks/mod.rs | 28 ++-- .../src/routing_table/types/peer_info.rs | 26 ++-- .../src/storage_manager/close_record.rs | 4 +- veilid-core/src/storage_manager/debug.rs | 4 +- veilid-core/src/storage_manager/get_value.rs | 2 +- .../src/storage_manager/inspect_value.rs | 2 +- .../outbound_transaction_manager/mod.rs | 127 +++++++++--------- veilid-core/src/storage_manager/record_key.rs | 8 +- .../src/storage_manager/record_store/mod.rs | 3 +- .../storage_manager/record_store/record.rs | 38 ++++-- .../record_store/record_snapshot.rs | 6 +- .../record_store_inner/commit_action.rs | 8 -- .../inbound_transaction_list.rs | 2 +- .../record_store_inner/limited_size.rs | 6 +- .../record_store_inner/record_index.rs | 121 ++++++----------- veilid-core/src/storage_manager/rehydrate.rs | 4 +- veilid-core/src/storage_manager/set_value.rs | 7 +- .../tasks/check_inbound_transactions.rs | 2 +- .../tasks/check_inbound_watches.rs | 2 +- .../tasks/check_outbound_transactions.rs | 2 +- .../tasks/check_outbound_watches.rs | 2 +- veilid-core/src/storage_manager/tasks/mod.rs | 10 +- .../tasks/offline_subkey_writes.rs | 3 +- .../src/storage_manager/transaction.rs | 84 ++++++------ .../src/storage_manager/watch_value.rs | 7 +- veilid-core/src/table_store/table_db.rs | 32 ++--- veilid-core/src/veilid_api/debug.rs | 12 +- veilid-core/src/veilid_api/error.rs | 13 ++ .../serialize_helpers/compression.rs | 7 +- 34 files changed, 311 insertions(+), 310 deletions(-) diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index 6fec20ad..d3d2f6a7 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -133,7 +133,7 @@ impl AttachmentManager { let registry = self.registry(); veilid_log!(self debug "starting attachment maintainer task"); - impl_setup_task!( + impl_setup_task_async!( self, Self, attachment_maintainer_task, diff --git a/veilid-core/src/component.rs b/veilid-core/src/component.rs index 52ba7402..6939ed22 100644 --- a/veilid-core/src/component.rs +++ b/veilid-core/src/component.rs @@ -350,7 +350,6 @@ macro_rules! impl_setup_task { Box::pin(async move { let this = registry.lookup::<$this_type>().unwrap(); this.$task_routine(s, Timestamp::new(l), Timestamp::new(t)) - .await }) }); }}; @@ -358,6 +357,22 @@ macro_rules! impl_setup_task { pub(crate) use impl_setup_task; +macro_rules! impl_setup_task_async { + ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{ + let registry = $this.registry(); + $this.$task_name.set_routine(move |s, l, t| { + let registry = registry.clone(); + Box::pin(async move { + let this = registry.lookup::<$this_type>().unwrap(); + this.$task_routine(s, Timestamp::new(l), Timestamp::new(t)) + .await + }) + }); + }}; +} + +pub(crate) use impl_setup_task_async; + // Utility macro for setting up an event bus handler // Should be called after init, during post-init or later // Subscription should be unsubscribed before termination diff --git a/veilid-core/src/crypto/crypto_system/mod.rs b/veilid-core/src/crypto/crypto_system/mod.rs index cba19c4d..e95cad87 100644 --- a/veilid-core/src/crypto/crypto_system/mod.rs +++ b/veilid-core/src/crypto/crypto_system/mod.rs @@ -71,21 +71,21 @@ pub trait CryptoSystem { apibail_generic!("incorrect shared secret kind"); } if secret.value().len() != self.shared_secret_length() { - apibail_generic!(format!( + apibail_generic!( "invalid shared secret length: {} != {}", secret.value().len(), self.shared_secret_length() - )); + ); } Ok(()) } fn check_nonce(&self, nonce: &Nonce) -> VeilidAPIResult<()> { if nonce.len() != self.nonce_length() { - apibail_generic!(format!( + apibail_generic!( "invalid nonce length: {} != {}", nonce.len(), self.nonce_length() - )); + ); } Ok(()) } @@ -94,11 +94,11 @@ pub trait CryptoSystem { apibail_generic!("incorrect hash digest kind"); } if hash.value().len() != self.hash_digest_length() { - apibail_generic!(format!( + apibail_generic!( "invalid hash digest length: {} != {}", hash.value().len(), self.hash_digest_length() - )); + ); } Ok(()) } @@ -107,11 +107,11 @@ pub trait CryptoSystem { apibail_generic!("incorrect public key kind"); } if key.value().len() != self.public_key_length() { - apibail_generic!(format!( + apibail_generic!( "invalid public key length: {} != {}", key.value().len(), self.public_key_length() - )); + ); } Ok(()) } @@ -120,11 +120,11 @@ pub trait CryptoSystem { apibail_generic!("incorrect secret key kind"); } if key.value().len() != self.secret_key_length() { - apibail_generic!(format!( + apibail_generic!( "invalid secret key length: {} != {}", key.value().len(), self.secret_key_length() - )); + ); } Ok(()) } @@ -133,11 +133,11 @@ pub trait CryptoSystem { apibail_generic!("incorrect signature kind"); } if signature.value().len() != self.signature_length() { - apibail_generic!(format!( + apibail_generic!( "invalid signature length: {} != {}", signature.value().len(), self.signature_length() - )); + ); } Ok(()) } diff --git a/veilid-core/src/network_manager/tasks/mod.rs b/veilid-core/src/network_manager/tasks/mod.rs index 19210506..48e5ba8c 100644 --- a/veilid-core/src/network_manager/tasks/mod.rs +++ b/veilid-core/src/network_manager/tasks/mod.rs @@ -5,7 +5,7 @@ use crate::attachment_manager::TickEvent; impl NetworkManager { pub fn setup_tasks(&self) { // Set rolling transfers tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, rolling_transfers_task, diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 0d15fd69..14c817f0 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -508,10 +508,10 @@ impl RoutingTable { if let (Some(public_key), Some(secret_key)) = (public_key, secret_key) { // Validate node id if !vcrypto.validate_keypair(&public_key, &secret_key).await? { - apibail_generic!(format!( + apibail_generic!( "secret_key and public_key don't match:\npublic_key: {}\nsecret_key: {}", public_key, secret_key - )); + ); } (public_key, secret_key) } else { diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index ef817453..59699ffc 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -17,10 +17,10 @@ impl_veilid_log_facility!("rtab"); impl RoutingTable { pub fn setup_tasks(&self) { // Set flush tick task - impl_setup_task!(self, Self, flush_task, flush_task_routine); + impl_setup_task_async!(self, Self, flush_task, flush_task_routine); // Set rolling transfers tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, rolling_transfers_task, @@ -28,7 +28,7 @@ impl RoutingTable { ); // Set update state stats tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, update_state_stats_task, @@ -36,7 +36,7 @@ impl RoutingTable { ); // Set rolling answers tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, rolling_answers_task, @@ -44,13 +44,13 @@ impl RoutingTable { ); // Set kick buckets tick task - impl_setup_task!(self, Self, kick_buckets_task, kick_buckets_task_routine); + impl_setup_task_async!(self, Self, kick_buckets_task, kick_buckets_task_routine); // Set bootstrap tick task - impl_setup_task!(self, Self, bootstrap_task, bootstrap_task_routine); + impl_setup_task_async!(self, Self, bootstrap_task, bootstrap_task_routine); // Set peer minimum refresh tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, peer_minimum_refresh_task, @@ -58,7 +58,7 @@ impl RoutingTable { ); // Set closest peers refresh tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, closest_peers_refresh_task, @@ -66,7 +66,7 @@ impl RoutingTable { ); // Set ping validator PublicInternet tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, ping_validator_public_internet_task, @@ -74,7 +74,7 @@ impl RoutingTable { ); // Set ping validator LocalNetwork tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, ping_validator_local_network_task, @@ -82,7 +82,7 @@ impl RoutingTable { ); // Set ping validator PublicInternet Relay tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, ping_validator_public_internet_relay_task, @@ -90,7 +90,7 @@ impl RoutingTable { ); // Set ping validator Active Watch tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, ping_validator_active_watch_task, @@ -98,7 +98,7 @@ impl RoutingTable { ); // Set relay management tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, relay_management_task, @@ -106,7 +106,7 @@ impl RoutingTable { ); // Set private route management tick task - impl_setup_task!( + impl_setup_task_async!( self, Self, private_route_management_task, diff --git a/veilid-core/src/routing_table/types/peer_info.rs b/veilid-core/src/routing_table/types/peer_info.rs index e3ff8b18..19fb8a8a 100644 --- a/veilid-core/src/routing_table/types/peer_info.rs +++ b/veilid-core/src/routing_table/types/peer_info.rs @@ -53,22 +53,22 @@ impl PeerInfo { // Ensure node ids are within limits if public_keys.is_empty() { - apibail_internal!(format!( + apibail_internal!( "no public keys for peer info ({:?})\n{:#?}", routing_domain, node_info - )); + ); } else if public_keys.len() > MAX_CRYPTO_KINDS { - apibail_internal!(format!( + apibail_internal!( "too many public keys for peer info ({:?}): {:?}\n{:#?}", routing_domain, public_keys, node_info - )); + ); } // Make sure secret keys and public keys match and make keypairs let mut keypairs = KeyPairGroup::new(); for pk in public_keys.iter() { let Some(sk) = secret_keys.get(pk.kind()) else { - apibail_internal!(format!("secret key not found for public key: {}", pk)); + apibail_internal!("secret key not found for public key: {}", pk); }; keypairs.add(KeyPair::new_from_parts(pk.clone(), sk.value())); } @@ -128,15 +128,15 @@ impl PeerInfo { // Ensure node ids are within limits if public_keys.is_empty() { - apibail_internal!(format!( + apibail_internal!( "no public keys for peer info ({:?})\n{:#?}", origin_routing_domain, node_info - )); + ); } else if public_keys.len() > MAX_CRYPTO_KINDS { - apibail_internal!(format!( + apibail_internal!( "too many public keys for peer info ({:?}): {:?}\n{:#?}", origin_routing_domain, public_keys, node_info - )); + ); } // Verify signatures @@ -189,15 +189,15 @@ impl PeerInfo { // Ensure node ids are within limits if public_keys.is_empty() { - apibail_internal!(format!( + apibail_internal!( "no public keys for peer info ({:?})\n{:#?}", routing_domain, node_info - )); + ); } else if public_keys.len() > MAX_CRYPTO_KINDS { - apibail_internal!(format!( + apibail_internal!( "too many public keys for peer info ({:?}): {:?}\n{:#?}", routing_domain, public_keys, node_info - )); + ); } // Generate on-the-wire node info message diff --git a/veilid-core/src/storage_manager/close_record.rs b/veilid-core/src/storage_manager/close_record.rs index d36fa535..f6389fc6 100644 --- a/veilid-core/src/storage_manager/close_record.rs +++ b/veilid-core/src/storage_manager/close_record.rs @@ -50,7 +50,7 @@ impl StorageManager { inner: &mut StorageManagerInner, opaque_record_key: &OpaqueRecordKey, ) -> VeilidAPIResult<()> { - if let Some(opened_record) = inner.opened_records.remove(&opaque_record_key) { + if let Some(opened_record) = inner.opened_records.remove(opaque_record_key) { let record_key = RecordKey::from_opaque(opaque_record_key.clone(), opened_record.encryption_key()); @@ -63,7 +63,7 @@ impl StorageManager { // Drop any transaction associated with the record if let Some(transaction_handle) = inner .outbound_transaction_manager - .get_transaction_by_record(&opaque_record_key) + .get_transaction_by_record(opaque_record_key) { inner .outbound_transaction_manager diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index 417fdd52..eab8a557 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -133,7 +133,7 @@ impl StorageManager { .debug_record_subkey_info(&opaque_record_key, subkey) .await } - pub async fn debug_local_record_info(&self, record_key: RecordKey) -> String { + pub fn debug_local_record_info(&self, record_key: RecordKey) -> String { let opaque_record_key = record_key.opaque(); let (local_record_store, opened_debug) = { @@ -154,7 +154,7 @@ impl StorageManager { format!("{}\n{}", local_debug, opened_debug) } - pub async fn debug_remote_record_info(&self, record_key: RecordKey) -> String { + pub fn debug_remote_record_info(&self, record_key: RecordKey) -> String { let remote_record_store = { let inner = self.inner.lock(); let Some(remote_record_store) = inner.remote_record_store.clone() else { diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index e599492e..c53a59a1 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -539,7 +539,7 @@ impl StorageManager { core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)), false, self.config().network.dht.consensus_width as usize, - ); + )?; // If we got a new value back then write it to the opened record if get_result_value.value_data().seq() != last_seq { diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 8de79990..991d0dde 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -166,7 +166,7 @@ impl StorageManager { results_iter, false, self.config().network.dht.consensus_width as usize, - ); + )?; if result.inspect_result.subkeys().is_empty() { DHTRecordReport::new( diff --git a/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs b/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs index eebe5ecb..96f24a9f 100644 --- a/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs +++ b/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs @@ -95,10 +95,10 @@ impl OutboundTransactionManager { for rp in &record_params { let opaque_record_key = rp.record_key.opaque(); if self.handles_by_key.contains_key(&opaque_record_key) { - apibail_generic!(format!( + apibail_generic!( "Record {} already has a a transaction open", opaque_record_key - )); + ); } opaque_record_keys.push(opaque_record_key); } @@ -205,10 +205,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Begin | OutboundTransactionStage::End | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Init", outbound_transaction_state.stage(), - )); + ); } } @@ -238,10 +238,10 @@ impl OutboundTransactionManager { outbound_transaction_state.stage(), OutboundTransactionStage::Init ) { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Init", outbound_transaction_state.stage(), - )); + ); } // Add all node transaction ids @@ -249,10 +249,10 @@ impl OutboundTransactionManager { let Some(record_info) = outbound_transaction_state.get_record_info_mut(&result.opaque_record_key) else { - apibail_internal!(format!( + apibail_internal!( "unexpected record in begin results: {}", result.opaque_record_key - )); + ); }; record_info.update_begin_network_seqs(result.seqs); @@ -270,11 +270,11 @@ impl OutboundTransactionManager { || record_info.get_node_transactions().len() < record_info.required_strict_consensus_count() { - apibail_try_again!(format!("did not get consensus of transaction ids (rec={}, stage={}, count={}, consensus={})", + apibail_try_again!("did not get consensus of transaction ids (rec={}, stage={}, count={}, consensus={})", record_info.record_key().opaque(), record_info.stage(), record_info.get_node_transactions().len(), - record_info.required_strict_consensus_count())); + record_info.required_strict_consensus_count()); } } @@ -298,10 +298,10 @@ impl OutboundTransactionManager { OutboundTransactionStage::Init | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Begin, End, Inconsistent, or Failed", outbound_transaction_state.stage(), - )); + ); } } @@ -357,10 +357,10 @@ impl OutboundTransactionManager { OutboundTransactionStage::Init | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Begin, End, Inconsistent, or Failed", outbound_transaction_state.stage(), - )); + ); } } @@ -371,16 +371,17 @@ impl OutboundTransactionManager { let Some(record_info) = outbound_transaction_state.get_record_info_mut(&opaque_record_key) else { - apibail_internal!(format!("missing record in rollback: {}", opaque_record_key)); + apibail_internal!("missing record in rollback: {}", opaque_record_key); }; let mut missing_node_xids = record_info.get_node_xids::>(); for pnr in result.per_node_results { if !missing_node_xids.remove(&pnr.node_transaction_id) { - apibail_internal!(format!( + apibail_internal!( "node transaction rolled back multiple times: {} pnr={:?}", - opaque_record_key, pnr - )); + opaque_record_key, + pnr + ); } let node_transaction = record_info @@ -393,10 +394,10 @@ impl OutboundTransactionManager { for missing_node_xid in &missing_node_xids { let Some(node_transaction) = record_info.get_node_transaction_mut(missing_node_xid) else { - apibail_internal!(format!( + apibail_internal!( "missing node transaction in record info: {}", missing_node_xid, - )); + ); }; node_transaction.set_stage(OutboundTransactionStage::Failed, None); } @@ -422,10 +423,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Failed | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Begin", outbound_transaction_state.stage(), - )); + ); } } @@ -481,10 +482,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Failed | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Begin", outbound_transaction_state.stage(), - )); + ); } } @@ -495,16 +496,17 @@ impl OutboundTransactionManager { let Some(record_info) = outbound_transaction_state.get_record_info_mut(&opaque_record_key) else { - apibail_internal!(format!("missing record in end: {}", opaque_record_key)); + apibail_internal!("missing record in end: {}", opaque_record_key); }; let mut missing_node_xids = record_info.get_node_xids::>(); for pnr in result.per_node_results { if !missing_node_xids.remove(&pnr.node_transaction_id) { - apibail_internal!(format!( + apibail_internal!( "node transaction ended multiple times: {} pnr={:?}", - opaque_record_key, pnr - )); + opaque_record_key, + pnr + ); } let node_transaction = record_info @@ -525,10 +527,10 @@ impl OutboundTransactionManager { for missing_node_xid in &missing_node_xids { let Some(node_transaction) = record_info.get_node_transaction_mut(missing_node_xid) else { - apibail_internal!(format!( + apibail_internal!( "missing node transaction in record info: {}", missing_node_xid, - )); + ); }; node_transaction.set_stage(OutboundTransactionStage::Failed, None); } @@ -554,10 +556,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Failed | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted End", outbound_transaction_state.stage(), - )); + ); } } @@ -619,10 +621,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Failed | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted End", outbound_transaction_state.stage(), - )); + ); } } @@ -633,16 +635,17 @@ impl OutboundTransactionManager { let Some(record_info) = outbound_transaction_state.get_record_info_mut(&opaque_record_key) else { - apibail_internal!(format!("missing record in commit: {}", opaque_record_key)); + apibail_internal!("missing record in commit: {}", opaque_record_key); }; let mut missing_node_xids = record_info.get_node_xids::>(); for pnr in result.per_node_results { if !missing_node_xids.remove(&pnr.node_transaction_id) { - apibail_internal!(format!( + apibail_internal!( "node transaction committed multiple times: {} pnr={:?}", - opaque_record_key, pnr - )); + opaque_record_key, + pnr + ); } let node_transaction = record_info @@ -663,10 +666,10 @@ impl OutboundTransactionManager { for missing_node_xid in &missing_node_xids { let Some(node_transaction) = record_info.get_node_transaction_mut(missing_node_xid) else { - apibail_internal!(format!( + apibail_internal!( "missing node transaction in record info: {}", missing_node_xid, - )); + ); }; if node_transaction.commit_will_change_remote() { @@ -700,10 +703,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Failed | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Begin or Inconsistent", outbound_transaction_state.stage(), - )); + ); } } @@ -769,10 +772,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Failed | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Begin or Inconsistent", outbound_transaction_state.stage(), - )); + ); } } @@ -780,10 +783,7 @@ impl OutboundTransactionManager { let Some(record_info) = outbound_transaction_state.get_record_info_mut(&result.params.opaque_record_key) else { - apibail_internal!(format!( - "missing record in set: {}", - result.params.opaque_record_key - )); + apibail_internal!("missing record in set: {}", result.params.opaque_record_key); }; let mut missing_node_xids = record_info.get_node_xids::>(); @@ -798,10 +798,11 @@ impl OutboundTransactionManager { let mut found_newer = false; for pnr in result.per_node_results { if !missing_node_xids.remove(&pnr.node_transaction_id) { - apibail_internal!(format!( + apibail_internal!( "node transaction get multiple times: {} pnr={:?}", - result.params.opaque_record_key, pnr - )); + result.params.opaque_record_key, + pnr + ); } let node_transaction = record_info @@ -897,10 +898,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Failed | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Begin or Inconsistent", outbound_transaction_state.stage(), - )); + ); } } @@ -966,10 +967,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Failed | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Begin or Inconsistent", outbound_transaction_state.stage(), - )); + ); } } @@ -977,10 +978,7 @@ impl OutboundTransactionManager { let Some(record_info) = outbound_transaction_state.get_record_info_mut(&result.params.opaque_record_key) else { - apibail_internal!(format!( - "missing record in get: {}", - result.params.opaque_record_key - )); + apibail_internal!("missing record in get: {}", result.params.opaque_record_key); }; let mut missing_node_xids = record_info.get_node_xids::>(); @@ -990,10 +988,11 @@ impl OutboundTransactionManager { let mut opt_get_subkey_consensus: Option = None; for pnr in result.per_node_results { if !missing_node_xids.remove(&pnr.node_transaction_id) { - apibail_internal!(format!( + apibail_internal!( "node transaction get multiple times: {} pnr={:?}", - result.params.opaque_record_key, pnr - )); + result.params.opaque_record_key, + pnr + ); } let node_transaction = record_info @@ -1046,10 +1045,10 @@ impl OutboundTransactionManager { | OutboundTransactionStage::Failed | OutboundTransactionStage::Rollback | OutboundTransactionStage::Commit => { - apibail_generic!(format!( + apibail_generic!( "stage was {:?}, wanted Begin or Inconsistent", outbound_transaction_state.stage(), - )); + ); } } diff --git a/veilid-core/src/storage_manager/record_key.rs b/veilid-core/src/storage_manager/record_key.rs index 8f4bd476..0546bdc4 100644 --- a/veilid-core/src/storage_manager/record_key.rs +++ b/veilid-core/src/storage_manager/record_key.rs @@ -40,19 +40,19 @@ impl StorageManager { }; if record_key.value().key().len() != HASH_COORDINATE_LENGTH { - apibail_generic!(format!( + apibail_generic!( "invalid record key length: {} != {}", record_key.value().key().len(), HASH_COORDINATE_LENGTH - )); + ); } if let Some(encryption_key) = record_key.value().encryption_key() { if encryption_key.len() != vcrypto.shared_secret_length() { - apibail_generic!(format!( + apibail_generic!( "invalid encryption key length: {} != {}", encryption_key.len(), vcrypto.shared_secret_length() - )); + ); } } diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index b7d12471..5a918011 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -96,7 +96,6 @@ where pub(super) type SubkeyValueList = Vec<(ValueSubkey, Arc)>; pub(super) type RecordSubkeyValueList = Vec<(OpaqueRecordKey, SubkeyValueList)>; -type SubkeyRecordDataList = Vec<(ValueSubkey, RecordData)>; impl RecordStore where @@ -312,7 +311,7 @@ where let opt_commit_action = self.inner.lock().set_subkeys_single_record( opaque_record_key, - &subkey_values, + subkey_values, &watch_update_mode, )?; diff --git a/veilid-core/src/storage_manager/record_store/record.rs b/veilid-core/src/storage_manager/record_store/record.rs index 5bec76e6..ce4371c1 100644 --- a/veilid-core/src/storage_manager/record_store/record.rs +++ b/veilid-core/src/storage_manager/record_store/record.rs @@ -98,24 +98,44 @@ where } } - pub fn record_stored_subkey(&mut self, subkey: ValueSubkey, data: &RecordData) { + pub fn record_stored_subkey( + &mut self, + subkey: ValueSubkey, + data: &RecordData, + max_record_data_size: usize, + ) -> VeilidAPIResult<()> { let seq = data.signed_value_data().value_data().seq(); - let size = data.data_size() as u16; + let new_subkey_size = data.data_size() as u16; + let old_subkey_size = self.subkey_sizes[subkey as usize]; + let new_record_data_size = if new_subkey_size > old_subkey_size { + self.record_data_size + (new_subkey_size - old_subkey_size) as usize + } else if new_subkey_size < old_subkey_size { + self.record_data_size - (old_subkey_size - new_subkey_size) as usize + } else { + self.record_data_size + }; + + if new_record_data_size > max_record_data_size { + apibail_internal!( + "record exceeds maximum data size: {} > {}", + new_record_data_size, + max_record_data_size + ); + } + + // No failures past this point + self.record_data_size = new_record_data_size; self.stored_subkeys.insert(subkey); self.subkey_seqs.resize(self.subkey_count, 0); self.subkey_seqs[subkey as usize] = u32::from(seq); self.subkey_sizes.resize(self.subkey_count, 0); - let old_size = self.subkey_sizes[subkey as usize]; - self.subkey_sizes[subkey as usize] = size; + self.subkey_sizes[subkey as usize] = new_subkey_size; - if size > old_size { - self.record_data_size += (size - old_size) as usize; - } else if size < old_size { - self.record_data_size -= (old_size - size) as usize; - } + Ok(()) } + #[expect(dead_code)] pub fn subkey_size(&self, subkey: ValueSubkey) -> u16 { self.subkey_sizes[subkey as usize] } diff --git a/veilid-core/src/storage_manager/record_store/record_snapshot.rs b/veilid-core/src/storage_manager/record_store/record_snapshot.rs index 102977d7..bb8f1ff1 100644 --- a/veilid-core/src/storage_manager/record_store/record_snapshot.rs +++ b/veilid-core/src/storage_manager/record_store/record_snapshot.rs @@ -145,10 +145,8 @@ where // Finish all load actions { let mut inner = self.inner.lock(); - for opt_load_action in all_value_load_actions.into_iter() { - if let Some(load_action) = opt_load_action { - inner.finish_load_action(load_action); - } + for load_action in all_value_load_actions.into_iter().flatten() { + inner.finish_load_action(load_action); } } diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/commit_action.rs b/veilid-core/src/storage_manager/record_store/record_store_inner/commit_action.rs index bed95f02..5302f9e3 100644 --- a/veilid-core/src/storage_manager/record_store/record_store_inner/commit_action.rs +++ b/veilid-core/src/storage_manager/record_store/record_store_inner/commit_action.rs @@ -30,11 +30,6 @@ where #[derive(Debug)] pub(super) enum UncommittedSubkeyChange { - Create { - /// The subkey data being created - new_data: RecordData, - }, - Update { /// The new subkey data new_data: RecordData, @@ -111,9 +106,6 @@ where } for (stk, usc) in self.uncommitted_subkey_changes.iter() { match usc { - UncommittedSubkeyChange::Create { new_data: data } => { - st_xact.store_json(0, &stk.bytes(), &data).await?; - } UncommittedSubkeyChange::Update { new_data, opt_old_data: _, diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/inbound_transactions/inbound_transaction_list.rs b/veilid-core/src/storage_manager/record_store/record_store_inner/inbound_transactions/inbound_transaction_list.rs index 1ba50c8d..87575fea 100644 --- a/veilid-core/src/storage_manager/record_store/record_store_inner/inbound_transactions/inbound_transaction_list.rs +++ b/veilid-core/src/storage_manager/record_store/record_store_inner/inbound_transactions/inbound_transaction_list.rs @@ -93,7 +93,7 @@ impl InboundTransactionList { pub fn lock(&mut self, transaction_id: InboundTransactionId) -> VeilidAPIResult<()> { if let Some(existing_xid) = self.lock { - apibail_internal!(format!("request to lock inbound transaction list by xid {} when it was already locked by {}", transaction_id, existing_xid)); + apibail_internal!("request to lock inbound transaction list by xid {} when it was already locked by {}", transaction_id, existing_xid); } self.lock = Some(transaction_id); diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/limited_size.rs b/veilid-core/src/storage_manager/record_store/record_store_inner/limited_size.rs index 806429e1..be407287 100644 --- a/veilid-core/src/storage_manager/record_store/record_store_inner/limited_size.rs +++ b/veilid-core/src/storage_manager/record_store/record_store_inner/limited_size.rs @@ -100,6 +100,8 @@ impl LimitedSize { self.uncommitted_value = Some(new_value); Ok(new_value) } + + #[expect(dead_code)] pub fn saturating_sub(&mut self, mut v: T) -> T { let current_value = self.current_value(); let max_v = current_value - T::min_value(); @@ -123,10 +125,6 @@ impl LimitedSize { true } - pub fn limit(&self) -> Option { - self.limit - } - pub fn commit(&mut self) -> Result> { if let Some(uncommitted_value) = self.uncommitted_value.take() { if let Some(limit) = self.limit { diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs b/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs index 60922d09..f1c22986 100644 --- a/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs +++ b/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs @@ -119,6 +119,15 @@ where record_key: key.clone(), }; + // Ensure this record is actually new + if !record.is_new() { + apibail_internal!( + "record was not new during create: key={}: {:?}", + key, + record + ); + } + // If record already exists, fail early if let Some(prev_record) = self.record_cache.get(&rtk) { veilid_log!(self error "RecordIndex({}): Record already existed with key {}: {:?}", self.unlocked_inner.name, key, prev_record.clone()); @@ -309,7 +318,6 @@ where .and_then(|x| x.get(&stk)) }) .and_then(|v| match v { - UncommittedSubkeyChange::Create { new_data } => Some(new_data.clone()), UncommittedSubkeyChange::Update { new_data, opt_old_data: _, @@ -369,7 +377,7 @@ where }; // Make a RecordData for the value - let new_data = RecordData::new(value); + let new_data = self.make_record_data(value)?; // Get the current record from the cache let Some(old_record) = self.record_cache.get(&rtk).cloned() else { @@ -380,7 +388,11 @@ where let mut new_record = old_record.clone(); // Change the record to reflect the new data - new_record.record_stored_subkey(subkey, &new_data); + new_record.record_stored_subkey( + subkey, + &new_data, + self.unlocked_inner.limits.max_record_data_size, + )?; // Update the record's touch timestamp for LRU sorting new_record.touch(); @@ -443,8 +455,12 @@ where let mut new_data_list = Vec::with_capacity(subkey_values.len()); for (subkey, value) in subkey_values.iter().cloned() { // Change the record to reflect the new data - let new_data = RecordData::new(value); - new_record.record_stored_subkey(subkey, &new_data); + let new_data = self.make_record_data(value)?; + new_record.record_stored_subkey( + subkey, + &new_data, + self.unlocked_inner.limits.max_record_data_size, + )?; // Keep the new data for later new_data_list.push((subkey, new_data)); @@ -523,8 +539,12 @@ where let mut new_data_list = Vec::with_capacity(subkey_values.len()); for (subkey, value) in subkey_values.iter().cloned() { // Change the record to reflect the new data - let new_data = RecordData::new(value); - new_record.record_stored_subkey(subkey, &new_data); + let new_data = self.make_record_data(value)?; + new_record.record_stored_subkey( + subkey, + &new_data, + self.unlocked_inner.limits.max_record_data_size, + )?; // Keep the new data for later new_data_list.push((subkey, new_data)); @@ -707,6 +727,18 @@ where ////////////////////////////////////////////////////////////////////////////////////////// + fn make_record_data(&self, value: Arc) -> VeilidAPIResult { + if value.data_size() > self.unlocked_inner.limits.max_subkey_size { + apibail_internal!( + "record data too large for record index {}: {} > {}", + self.unlocked_inner.name, + value.data_size(), + self.unlocked_inner.limits.max_subkey_size, + ); + } + Ok(RecordData::new(value)) + } + async fn load_db(&mut self) -> EyreResult<()> { let start_ts = Timestamp::now(); veilid_log!(self info "Loading record index: {}", self.unlocked_inner.name); @@ -878,18 +910,6 @@ where // Process creates and updates with removal first so we don't have to worry about LRU for (stk, usc) in uncommitted_subkey_changes.iter().rev() { match usc { - UncommittedSubkeyChange::Create { new_data: data } => { - let opt_prev_data = self.uncache_subkey(stk); - - // Validate - if let Some(prev_data) = opt_prev_data { - if &prev_data != data { - veilid_log!(self error "UncommittedSubkeyChange::Create rollback: {} had unexpected previous data", stk); - } - } else { - veilid_log!(self error "UncommittedSubkeyChange::Create rollback: {} had missing previous value", stk); - } - } UncommittedSubkeyChange::Update { new_data, opt_old_data, @@ -961,9 +981,6 @@ where } } } - UncommittedSubkeyChange::Create { new_data: _ } => { - // Already did these - } } } } @@ -977,7 +994,7 @@ where .load_json::>(0, &rtk.bytes()) .await? else { - apibail_internal!(format!("missing record: {}", rtk)); + apibail_internal!("missing record: {}", rtk); }; record.post_deserialize(); @@ -999,10 +1016,7 @@ where st_xact: &TableDBTransaction, ) -> VeilidAPIResult<()> { if self.record_cache.contains_key(rtk) { - apibail_internal!(format!( - "should have removed record from cache already: {}", - rtk - )); + apibail_internal!("should have removed record from cache already: {}", rtk); } let stored_subkeys = record.stored_subkeys(); @@ -1012,10 +1026,7 @@ where subkey: sk, }; if self.subkey_cache.contains_key(&stk) { - apibail_internal!(format!( - "should have removed subkey from cache already: {}", - stk - )); + apibail_internal!("should have removed subkey from cache already: {}", stk); } st_xact.delete(0, &stk.bytes()).await?; @@ -1382,10 +1393,6 @@ where std::collections::btree_map::Entry::Occupied(mut o) => { let usc = o.get_mut(); match usc { - UncommittedSubkeyChange::Create { new_data: _ } => { - // Create followed by delete is nothing - o.remove(); - } UncommittedSubkeyChange::Update { new_data: _, opt_old_data, @@ -1426,10 +1433,6 @@ where std::collections::btree_map::Entry::Occupied(mut o) => { let usc = o.get_mut(); match usc { - UncommittedSubkeyChange::Create { new_data: _ } => { - // If we created a subkey and then updated it, might as well have just created it with the new value - *usc = UncommittedSubkeyChange::Create { new_data }; - } UncommittedSubkeyChange::Update { new_data: _, opt_old_data, @@ -1452,42 +1455,6 @@ where } } - fn add_uncommitted_subkey_create(&mut self, stk: SubkeyTableKey, new_data: RecordData) { - let stk_log = stk.clone(); - - match self.uncommitted_subkey_changes.entry(stk) { - std::collections::btree_map::Entry::Vacant(v) => { - v.insert(UncommittedSubkeyChange::Create { new_data }); - } - std::collections::btree_map::Entry::Occupied(mut o) => { - let usc = o.get_mut(); - match usc { - UncommittedSubkeyChange::Create { new_data: _ } => { - // Should never happen. Can't create an already created subkey. - veilid_log!(self error "subkey was created twice in uncommitted log: {}", stk_log); - } - UncommittedSubkeyChange::Update { - new_data: _, - opt_old_data: _, - } => { - // Should never happen. Can't create an already created subkey. - veilid_log!(self error "record was created after updated in uncommitted log: {}", stk_log); - } - UncommittedSubkeyChange::Delete { - opt_old_data, - is_lru: _, - } => { - // A delete followed by a create is really an update - *usc = UncommittedSubkeyChange::Update { - new_data, - opt_old_data: opt_old_data.clone(), - }; - } - } - } - } - } - pub fn debug(&self) -> String { let mut out = String::new(); @@ -1511,14 +1478,14 @@ where "Uncommitted Record Changes: {}\n", self.uncommitted_record_changes.len() ); - for (k, _v) in &self.uncommitted_record_changes { + for k in self.uncommitted_record_changes.keys() { out += &format!(" {}\n", k); } out += &format!( "Uncommitted Subkey Changes: {}\n", self.uncommitted_subkey_changes.len() ); - for (k, _v) in &self.uncommitted_subkey_changes { + for k in self.uncommitted_subkey_changes.keys() { out += &format!(" {}\n", k); } diff --git a/veilid-core/src/storage_manager/rehydrate.rs b/veilid-core/src/storage_manager/rehydrate.rs index 453b2f78..e67d70b3 100644 --- a/veilid-core/src/storage_manager/rehydrate.rs +++ b/veilid-core/src/storage_manager/rehydrate.rs @@ -216,7 +216,7 @@ impl StorageManager { let mut rehydrated = ValueSubkeyRangeSet::new(); for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() { if local_inspect_result.seqs()[n].is_none() { - apibail_internal!(format!("None sequence number found in local inspect results. Should have been stripped by strip_none_seqs(): {:?}", local_inspect_result)); + apibail_internal!("None sequence number found in local inspect results. Should have been stripped by strip_none_seqs(): {:?}", local_inspect_result); } let sfr = outbound_inspect_result @@ -260,7 +260,7 @@ impl StorageManager { results_iter, false, self.config().network.dht.consensus_width as usize, - ); + )?; Ok(RehydrateReport { opaque_record_key, diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index b761cce0..89e24bf2 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -245,10 +245,7 @@ impl StorageManager { ) { veilid_log!(self debug "schema validation error: {}", e); // Validation failed, ignore this value - apibail_generic!(format!( - "failed schema validation: {}:{}", - record_key, subkey - )); + apibail_generic!("failed schema validation: {}:{}", record_key, subkey); } // Sign the new value data with the writer @@ -726,7 +723,7 @@ impl StorageManager { core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)), true, self.config().network.dht.consensus_width as usize, - ); + )?; // Record the set value locally since it was successfully set online let subkey_lock = self diff --git a/veilid-core/src/storage_manager/tasks/check_inbound_transactions.rs b/veilid-core/src/storage_manager/tasks/check_inbound_transactions.rs index f95a058c..82b91aed 100644 --- a/veilid-core/src/storage_manager/tasks/check_inbound_transactions.rs +++ b/veilid-core/src/storage_manager/tasks/check_inbound_transactions.rs @@ -3,7 +3,7 @@ use super::*; impl StorageManager { // Check if server-side transactions have expired #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn check_inbound_transactions_task_routine( + pub(super) fn check_inbound_transactions_task_routine( &self, _stop_token: StopToken, _last_ts: Timestamp, diff --git a/veilid-core/src/storage_manager/tasks/check_inbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_inbound_watches.rs index e827dbcc..e56f48e4 100644 --- a/veilid-core/src/storage_manager/tasks/check_inbound_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_inbound_watches.rs @@ -3,7 +3,7 @@ use super::*; impl StorageManager { // Check if server-side watches have expired #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn check_inbound_watches_task_routine( + pub(super) fn check_inbound_watches_task_routine( &self, _stop_token: StopToken, _last_ts: Timestamp, diff --git a/veilid-core/src/storage_manager/tasks/check_outbound_transactions.rs b/veilid-core/src/storage_manager/tasks/check_outbound_transactions.rs index fc482d9a..0bc502f0 100644 --- a/veilid-core/src/storage_manager/tasks/check_outbound_transactions.rs +++ b/veilid-core/src/storage_manager/tasks/check_outbound_transactions.rs @@ -3,7 +3,7 @@ use super::*; impl StorageManager { // Check if client-side transactions on opened records have expired //#[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn check_outbound_transactions_task_routine( + pub(super) fn check_outbound_transactions_task_routine( &self, _stop_token: StopToken, _last_ts: Timestamp, diff --git a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs index cfb112e3..a63fbefc 100644 --- a/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_outbound_watches.rs @@ -3,7 +3,7 @@ use super::*; impl StorageManager { // Check if client-side watches on opened records either have dead nodes or if the watch has expired //#[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn check_outbound_watches_task_routine( + pub(super) fn check_outbound_watches_task_routine( &self, _stop_token: StopToken, _last_ts: Timestamp, diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs index 9f2c9b35..4140d25e 100644 --- a/veilid-core/src/storage_manager/tasks/mod.rs +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -14,7 +14,7 @@ impl StorageManager { pub(super) fn setup_tasks(&self) { // Set flush records tick task veilid_log!(self debug "starting flush record stores task"); - impl_setup_task!( + impl_setup_task_async!( self, Self, flush_record_stores_task, @@ -23,11 +23,11 @@ impl StorageManager { // Set save metadata task veilid_log!(self debug "starting save metadata task"); - impl_setup_task!(self, Self, save_metadata_task, save_metadata_task_routine); + impl_setup_task_async!(self, Self, save_metadata_task, save_metadata_task_routine); // Set offline subkey writes tick task veilid_log!(self debug "starting offline subkey writes task"); - impl_setup_task!( + impl_setup_task_async!( self, Self, offline_subkey_writes_task, @@ -36,7 +36,7 @@ impl StorageManager { // Set send value changes tick task veilid_log!(self debug "starting send value changes task"); - impl_setup_task!( + impl_setup_task_async!( self, Self, send_value_changes_task, @@ -81,7 +81,7 @@ impl StorageManager { // Set rehydrate records tick task veilid_log!(self debug "starting rehydrate records task"); - impl_setup_task!( + impl_setup_task_async!( self, Self, rehydrate_records_task, diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index 83d08aad..d0b07c3a 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -188,7 +188,8 @@ impl StorageManager { result.fanout_results.into_iter().map(|x| (x.0, x.1)), true, consensus_width, - ); + ) + .unwrap_or_else(veilid_log_err!(self)); } // Get the next available work item diff --git a/veilid-core/src/storage_manager/transaction.rs b/veilid-core/src/storage_manager/transaction.rs index 3337bd1c..1e198f59 100644 --- a/veilid-core/src/storage_manager/transaction.rs +++ b/veilid-core/src/storage_manager/transaction.rs @@ -86,14 +86,14 @@ impl StorageManager { for record_key in record_keys { let opaque_record_key = record_key.opaque(); let Some(opened_record) = inner.opened_records.get(&opaque_record_key) else { - apibail_generic!(format!("record key not open: {}", opaque_record_key)); + apibail_generic!("record key not open: {}", opaque_record_key); }; if record_key.encryption_key().map(|x| x.value()) != opened_record.encryption_key() { - apibail_generic!(format!( + apibail_generic!( "record encryption key does not match opened record encryption key: {}", opaque_record_key - )); + ); } // Get signing keypair for this transaction @@ -213,11 +213,11 @@ impl StorageManager { for result in &results { let subkey_count = result.descriptor.schema()?.subkey_count(); if result.seqs.len() != subkey_count { - apibail_internal!(format!( + apibail_internal!( "seqs returned does not match subkey count: {} != {}", result.seqs.len(), subkey_count - )); + ); } let max_subkey = result.descriptor.schema()?.max_subkey(); @@ -229,7 +229,7 @@ impl StorageManager { )), false, self.config().network.dht.consensus_width as usize, - ); + )?; } if let Err(e) = self @@ -445,15 +445,10 @@ impl StorageManager { if let Some(err) = opt_commit_error { return Err(err); } - - // Perform storage manager operations - self.flush_committed_transaction_locked_inner( - &mut inner, - records_lock, - transaction_handle, - ) - .await?; } + // Perform storage manager operations + self.flush_committed_transaction_locked(records_lock, transaction_handle) + .await?; Ok(()) }, @@ -463,25 +458,30 @@ impl StorageManager { /// Removes the transaction from the transaction manager /// and flushes its contents to the storage manager - #[instrument(level = "trace", target = "dht", skip(self, inner, records_lock), err)] - pub(super) async fn flush_committed_transaction_locked_inner( + #[instrument(level = "trace", target = "dht", skip(self, records_lock), err)] + pub(super) async fn flush_committed_transaction_locked( &self, - inner: &mut StorageManagerInner, records_lock: &RecordsLockGuard, transaction_handle: OutboundTransactionHandle, ) -> VeilidAPIResult<()> { - let transaction_state = inner - .outbound_transaction_manager - .drop_transaction(transaction_handle) - .ok_or_else(|| VeilidAPIError::internal("missing transaction in flush"))?; + let keys_and_subkeys = { + let mut inner = self.inner.lock(); - let mut keys_and_subkeys = vec![]; - for record_info in transaction_state.get_record_infos() { - let opaque_record_key = record_info.record_key().opaque(); - let local_commit_results = record_info.local_commit_results()?; + let transaction_state = inner + .outbound_transaction_manager + .drop_transaction(transaction_handle) + .ok_or_else(|| VeilidAPIError::internal("missing transaction in flush"))?; - keys_and_subkeys.push((opaque_record_key, local_commit_results)); - } + let mut keys_and_subkeys = vec![]; + for record_info in transaction_state.get_record_infos() { + let opaque_record_key = record_info.record_key().opaque(); + let local_commit_results = record_info.local_commit_results()?; + + keys_and_subkeys.push((opaque_record_key, local_commit_results)); + } + + keys_and_subkeys + }; // Record the set values locally since they were successfully set online self.handle_set_local_values_with_multiple_records_lock(records_lock, keys_and_subkeys) @@ -665,7 +665,7 @@ impl StorageManager { let Some(record_info) = outbound_transaction_state.get_record_info(&record_key.opaque()) else { - apibail_internal!(format!("missing record in get: {}", record_key.opaque())); + apibail_internal!("missing record in get: {}", record_key.opaque()); }; record_info.current_subkey_get_result(subkey)? }; @@ -793,30 +793,32 @@ impl StorageManager { } OutboundTransactionStage::Failed => { // Unrecoverable failure, must rollback - apibail_generic!(format!( - "Transaction failed in set operation: transaction_handle={}, key={}, subkey={}", - transaction_handle, record_key, subkey - )); + apibail_generic!( + "Transaction failed in set operation: transaction_handle={}, key={}, subkey={}", + transaction_handle, record_key, subkey + ); } OutboundTransactionStage::Inconsistent => { // Set failed at this time, try again is possible - apibail_try_again!(format!( + apibail_try_again!( "Inconsistent set operation: transaction_handle={}, key={}, subkey={}", - transaction_handle, record_key, subkey - )); + transaction_handle, + record_key, + subkey + ); } OutboundTransactionStage::End | OutboundTransactionStage::Rollback | OutboundTransactionStage::Init | OutboundTransactionStage::Commit => { - apibail_internal!(format!("Unexpected transaction state '{}' in set operation: transaction_handle={}, key={}, subkey={}",output_stage,transaction_handle,record_key,subkey)); + apibail_internal!("Unexpected transaction state '{}' in set operation: transaction_handle={}, key={}, subkey={}",output_stage,transaction_handle,record_key,subkey); } } let Some(record_info) = outbound_transaction_state.get_record_info(&record_key.opaque()) else { - apibail_internal!(format!("missing record in set: {}", record_key.opaque())); + apibail_internal!("missing record in set: {}", record_key.opaque()); }; // If there is an updated value, it means the set succeeded @@ -829,11 +831,11 @@ impl StorageManager { // If the set found a newer value it would be recorded in the current consensus // unless an error condition was hit, in which case we should have had a failed or inconsistent state let Some(current_subkey_consensus) = record_info.current_consensus().get(subkey) else { - apibail_internal!(format!( + apibail_internal!( "record subkey {} should have a current consensus: {}", subkey, record_key.opaque() - )); + ); }; // Return current subkey consensus value data @@ -841,11 +843,11 @@ impl StorageManager { }; let Some(current_signed_value_data) = opt_current_signed_value_data else { - apibail_internal!(format!( + apibail_internal!( "record subkey {} consensus value should not be missing: {}", subkey, record_key.opaque() - )); + ); }; let current_value_data = self.maybe_decrypt_value_data(&record_key, current_signed_value_data.value_data())?; diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 15938faf..17822829 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -599,14 +599,14 @@ impl StorageManager { core::iter::once((ValueSubkeyRangeSet::new(), fanout_result)), false, self.config().network.dht.consensus_width as usize, - ); + )?; let owvresult = context.lock().watch_value_result.clone(); Ok(owvresult) } /// Remove dead watches from the table - pub(super) async fn process_outbound_watch_dead(&self, watch_lock: RecordLockGuard) { + pub(super) fn process_outbound_watch_dead(&self, watch_lock: RecordLockGuard) { let opaque_record_key = watch_lock.record(); let Some(outbound_watch) = self @@ -1105,7 +1105,6 @@ impl StorageManager { registry .storage_manager() .process_outbound_watch_dead(watch_lock) - .await } }; return Some(pin_dyn_future!(fut)); @@ -1513,7 +1512,7 @@ impl StorageManager { } // Get what subkeys are being watched - let Some(watched_subkeys) = self.get_watched_subkeys_inner(&inner, &record_key)? else { + let Some(watched_subkeys) = self.get_watched_subkeys_inner(inner, &record_key)? else { // Nothing watched, nothing to report return Ok(NetworkResult::value(())); }; diff --git a/veilid-core/src/table_store/table_db.rs b/veilid-core/src/table_store/table_db.rs index da1e7212..485db40e 100644 --- a/veilid-core/src/table_store/table_db.rs +++ b/veilid-core/src/table_store/table_db.rs @@ -233,10 +233,10 @@ impl TableDB { #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn get_keys(&self, col: u32) -> VeilidAPIResult>> { if col >= self.opened_column_count { - apibail_generic!(format!( + apibail_generic!( "Column exceeds opened column count {} >= {}", col, self.opened_column_count - )); + ); } let db = self.unlocked_inner.database.clone(); let mut out = Vec::new(); @@ -259,10 +259,10 @@ impl TableDB { #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn get_key_count(&self, col: u32) -> VeilidAPIResult { if col >= self.opened_column_count { - apibail_generic!(format!( + apibail_generic!( "Column exceeds opened column count {} >= {}", col, self.opened_column_count - )); + ); } let db = self.unlocked_inner.database.clone(); let key_count = db.num_keys(col).await.map_err(VeilidAPIError::from)?; @@ -279,10 +279,10 @@ impl TableDB { #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> { if col >= self.opened_column_count { - apibail_generic!(format!( + apibail_generic!( "Column exceeds opened column count {} >= {}", col, self.opened_column_count - )); + ); } let db = self.unlocked_inner.database.clone(); let mut dbt = db.transaction(); @@ -308,10 +308,10 @@ impl TableDB { #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult>> { if col >= self.opened_column_count { - apibail_generic!(format!( + apibail_generic!( "Column exceeds opened column count {} >= {}", col, self.opened_column_count - )); + ); } let db = self.unlocked_inner.database.clone(); let key = self.maybe_encrypt(key, true).await; @@ -340,10 +340,10 @@ impl TableDB { #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult>> { if col >= self.opened_column_count { - apibail_generic!(format!( + apibail_generic!( "Column exceeds opened column count {} >= {}", col, self.opened_column_count - )); + ); } let key = self.maybe_encrypt(key, true).await; @@ -433,7 +433,7 @@ impl TableDBTransaction { /// Return the number of operations in the transaction /// May be less than the number performed if duplicate keys were specified - pub fn len(&self) -> usize { + #[must_use] pub fn len(&self) -> usize { self.inner .lock() .ops @@ -443,7 +443,7 @@ impl TableDBTransaction { } /// Returns true if the operation count to be performed is zero - pub fn is_empty(&self) -> bool { + #[must_use] pub fn is_empty(&self) -> bool { self.inner .lock() .ops @@ -494,10 +494,10 @@ impl TableDBTransaction { #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> { if col >= self.db.opened_column_count { - apibail_generic!(format!( + apibail_generic!( "Column exceeds opened column count {} >= {}", col, self.db.opened_column_count - )); + ); } let key = self.db.maybe_encrypt(key, true).await; @@ -524,10 +524,10 @@ impl TableDBTransaction { #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> { if col >= self.db.opened_column_count { - apibail_generic!(format!( + apibail_generic!( "Column exceeds opened column count {} >= {}", col, self.db.opened_column_count - )); + ); } let key = self.db.maybe_encrypt(key, true).await; diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 2e78dfd3..47d3f5e8 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -1478,7 +1478,7 @@ impl VeilidAPI { } } - async fn debug_record_list(&self, args: Vec) -> VeilidAPIResult { + fn debug_record_list(&self, args: Vec) -> VeilidAPIResult { // let registry = self.core_context()?.registry(); let storage_manager = registry.storage_manager(); @@ -1836,8 +1836,8 @@ impl VeilidAPI { li, ri ) } else { - let li = storage_manager.debug_local_record_info(key.clone()).await; - let ri = storage_manager.debug_remote_record_info(key.clone()).await; + let li = storage_manager.debug_local_record_info(key.clone()); + let ri = storage_manager.debug_remote_record_info(key.clone()); format!("Local Info:\n{}\n\nRemote Info:\n{}\n", li, ri) }; Ok(out) @@ -2014,7 +2014,7 @@ impl VeilidAPI { Ok(format!("Success: report={:?}", report)) } - async fn debug_record_rehydrate(&self, args: Vec) -> VeilidAPIResult { + fn debug_record_rehydrate(&self, args: Vec) -> VeilidAPIResult { let registry = self.core_context()?.registry(); let storage_manager = registry.storage_manager(); @@ -2072,7 +2072,7 @@ impl VeilidAPI { let command = get_debug_argument_at(&args, 0, "debug_record", "command", get_string)?; if command == "list" { - self.debug_record_list(args).await + self.debug_record_list(args) } else if command == "purge" { self.debug_record_purge(args).await } else if command == "create" { @@ -2096,7 +2096,7 @@ impl VeilidAPI { } else if command == "inspect" { self.debug_record_inspect(args).await } else if command == "rehydrate" { - self.debug_record_rehydrate(args).await + self.debug_record_rehydrate(args) } else { Ok(">>> Unknown command\n".to_owned()) } diff --git a/veilid-core/src/veilid_api/error.rs b/veilid-core/src/veilid_api/error.rs index e5fc6457..4f973fc1 100644 --- a/veilid-core/src/veilid_api/error.rs +++ b/veilid-core/src/veilid_api/error.rs @@ -22,6 +22,9 @@ macro_rules! apibail_try_again { ($x:expr) => { return Err(VeilidAPIError::try_again($x)) }; + ($fmt:literal, $($args:tt)*) => { + return Err(VeilidAPIError::try_again( format!($fmt, $($args)*) )) + }; } #[allow(unused_macros)] @@ -30,6 +33,9 @@ macro_rules! apibail_generic { ($x:expr) => { return Err(VeilidAPIError::generic($x)) }; + ($fmt:literal, $($args:tt)*) => { + return Err(VeilidAPIError::generic( format!($fmt, $($args)*) )) + }; } #[allow(unused_macros)] @@ -38,6 +44,9 @@ macro_rules! apibail_internal { ($x:expr) => { return Err(VeilidAPIError::internal($x)) }; + ($fmt:literal, $($args:tt)*) => { + return Err(VeilidAPIError::internal( format!($fmt, $($args)*) )) + }; } #[allow(unused_macros)] @@ -70,6 +79,10 @@ macro_rules! apibail_no_connection { ($x:expr) => { return Err(VeilidAPIError::no_connection($x)) }; + ($fmt:literal, $($args: tt)* ) => { + return Err(VeilidAPIError::no_connection( format!($fmt, arg $($args)*) )) + }; + } #[allow(unused_macros)] diff --git a/veilid-core/src/veilid_api/serialize_helpers/compression.rs b/veilid-core/src/veilid_api/serialize_helpers/compression.rs index 3127ccf2..9c6b2d8f 100644 --- a/veilid-core/src/veilid_api/serialize_helpers/compression.rs +++ b/veilid-core/src/veilid_api/serialize_helpers/compression.rs @@ -15,10 +15,11 @@ pub fn decompress_size_prepended( block::uncompressed_size(input).map_err(VeilidAPIError::generic)?; if let Some(max_size) = max_size { if uncompressed_size > max_size { - apibail_generic!(format!( + apibail_generic!( "decompression exceeded maximum size: {} > {}", - uncompressed_size, max_size - )); + uncompressed_size, + max_size + ); } } block::decompress(input, uncompressed_size).map_err(VeilidAPIError::generic)