From 56668ba7a6ac1d6e326e4d95aa183951cfe478eb Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 27 Oct 2025 20:08:40 -0400 Subject: [PATCH] refactor, and inbound transactions --- veilid-core/src/rpc_processor/coders/mod.rs | 1 - .../operations/operation_transact_command.rs | 1 + .../rpc_processor/coders/signed_value_data.rs | 4 +- .../src/rpc_processor/message_header.rs | 5 + veilid-core/src/rpc_processor/mod.rs | 9 +- .../src/rpc_processor/rpc_get_value.rs | 2 +- .../src/rpc_processor/rpc_inspect_value.rs | 2 +- .../src/rpc_processor/rpc_set_value.rs | 2 +- .../src/rpc_processor/rpc_transact_command.rs | 22 +- veilid-core/src/storage_manager/debug.rs | 8 +- veilid-core/src/storage_manager/get_value.rs | 39 ++- .../src/storage_manager/inspect_value.rs | 30 +- .../local_record_store_interface.rs | 120 +++++++ veilid-core/src/storage_manager/mod.rs | 8 +- .../storage_manager/offline_subkey_writes.rs | 4 +- .../src/storage_manager/open_record.rs | 7 +- .../outbound_transaction_manager/mod.rs | 20 ++ .../outbound_transaction_record.rs | 4 + .../src/storage_manager/record_encryption.rs | 4 +- .../record_store/inbound_transaction.rs | 329 +++++++++++++----- .../record_store/inbound_watch.rs | 94 +++-- .../src/storage_manager/record_store/mod.rs | 148 ++++---- .../record_store/record_snapshot.rs | 63 +++- .../storage_manager/record_store_interface.rs | 305 ---------------- veilid-core/src/storage_manager/rehydrate.rs | 19 +- veilid-core/src/storage_manager/set_value.rs | 92 +++-- .../tasks/offline_subkey_writes.rs | 16 +- .../tasks/send_value_changes.rs | 5 - .../src/storage_manager/tests/test_types.rs | 2 +- .../src/storage_manager/transaction.rs | 7 +- .../src/storage_manager/transaction_begin.rs | 23 +- .../storage_manager/transaction_command.rs | 55 +-- .../types/encrypted_value_data.rs | 20 +- .../src/storage_manager/watch_value.rs | 24 +- .../types/dht/value_subkey_range_set.rs | 13 +- 35 files changed, 812 insertions(+), 695 deletions(-) create mode 100644 veilid-core/src/storage_manager/local_record_store_interface.rs delete mode 100644 veilid-core/src/storage_manager/record_store_interface.rs diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index 2aa9f399..dce7fc6a 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -64,7 +64,6 @@ pub enum QuestionContext { pub struct RPCValidateContext<'a> { pub registry: VeilidComponentRegistry, pub question_context: Option<&'a QuestionContext>, - pub message_header: &'a MessageHeader, } impl_veilid_component_registry_accessor!(RPCValidateContext<'_>); diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_transact_command.rs b/veilid-core/src/rpc_processor/coders/operations/operation_transact_command.rs index 33218491..b3438322 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_transact_command.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_transact_command.rs @@ -159,6 +159,7 @@ impl RPCOperationTransactCommandQ { Ok(()) } + #[expect(clippy::type_complexity)] pub fn destructure( self, ) -> ( diff --git a/veilid-core/src/rpc_processor/coders/signed_value_data.rs b/veilid-core/src/rpc_processor/coders/signed_value_data.rs index 798e3532..32b8a5d2 100644 --- a/veilid-core/src/rpc_processor/coders/signed_value_data.rs +++ b/veilid-core/src/rpc_processor/coders/signed_value_data.rs @@ -24,7 +24,7 @@ fn decode_value_data( None }; - EncryptedValueData::new_with_seq(seq, data, writer, nonce).map_err(RPCError::protocol) + EncryptedValueData::new(seq, data, writer, nonce).map_err(RPCError::protocol) } pub fn decode_signed_value_data( @@ -79,7 +79,7 @@ mod tests { let mut builder = message_builder.init_root::(); let signed_value_data = SignedValueData::new( - EncryptedValueData::new_with_seq( + EncryptedValueData::new( 10.into(), vec![1, 2, 3, 4, 5, 6], keypair.key(), diff --git a/veilid-core/src/rpc_processor/message_header.rs b/veilid-core/src/rpc_processor/message_header.rs index ee6f46de..3ab92de2 100644 --- a/veilid-core/src/rpc_processor/message_header.rs +++ b/veilid-core/src/rpc_processor/message_header.rs @@ -86,6 +86,7 @@ impl MessageHeader { // // XXX: or an actual safety route. If your code depends on this idea, you need to rethink it. // } + #[expect(dead_code)] pub fn is_direct(&self) -> bool { match &self.detail { RPCMessageHeaderDetail::Direct(_) => true, @@ -101,6 +102,8 @@ impl MessageHeader { RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_sender_id(), } } + + #[expect(dead_code)] pub fn direct_sender_public_key(&self) -> PublicKey { match &self.detail { RPCMessageHeaderDetail::Direct(d) => d @@ -128,6 +131,8 @@ impl MessageHeader { RPCMessageHeaderDetail::PrivateRouted(p) => Some(p.private_route.clone()), } } + + #[expect(dead_code)] pub fn get_safety_route_public_key(&self) -> Option { match &self.detail { RPCMessageHeaderDetail::Direct(_) => None, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index ddf995cc..3aa9f9bd 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1425,7 +1425,7 @@ impl RPCProcessor { let operation = signed_operation.decode_operation(&decode_context)?; // Validate the operation - self.validate_rpc_operation(&operation, encoded_msg)?; + self.validate_rpc_operation(&operation)?; Ok((operation, opt_signer)) } @@ -1467,11 +1467,7 @@ impl RPCProcessor { /// complex operations that require stateful validation and a more robust context than /// 'signatures', the caller must still perform whatever validation is necessary #[instrument(level = "trace", target = "rpc", skip_all)] - fn validate_rpc_operation( - &self, - operation: &RPCOperation, - encoded_msg: &MessageEncoded, - ) -> Result<(), RPCError> { + fn validate_rpc_operation(&self, operation: &RPCOperation) -> Result<(), RPCError> { // If this is an answer, get the question context for this answer // If we received an answer for a question we did not ask, this will return an error let question_context = if let RPCOperationKind::Answer(_) = operation.kind() { @@ -1485,7 +1481,6 @@ impl RPCProcessor { let validate_context = RPCValidateContext { registry: self.registry(), question_context: question_context.as_ref().map(|x| x.as_ref()), - message_header: &encoded_msg.header, }; operation.validate(&validate_context)?; diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 11ceb174..4fdf6f96 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -240,7 +240,7 @@ impl RPCProcessor { // See if we have this record ourselves let storage_manager = self.storage_manager(); let inbound_get_value_result = network_result_try!(storage_manager - .inbound_get_value(opaque_record_key.clone(), subkey, want_descriptor) + .inbound_get_value(&opaque_record_key, subkey, want_descriptor) .await .map_err(RPCError::internal)?); diff --git a/veilid-core/src/rpc_processor/rpc_inspect_value.rs b/veilid-core/src/rpc_processor/rpc_inspect_value.rs index 73c3eaee..3d5dd0ed 100644 --- a/veilid-core/src/rpc_processor/rpc_inspect_value.rs +++ b/veilid-core/src/rpc_processor/rpc_inspect_value.rs @@ -234,7 +234,7 @@ impl RPCProcessor { // See if we have this record ourselves let storage_manager = self.storage_manager(); let inbound_inspect_value_result = network_result_try!(storage_manager - .inbound_inspect_value(opaque_record_key.clone(), subkeys, want_descriptor) + .inbound_inspect_value(&opaque_record_key, subkeys, want_descriptor) .await .map_err(RPCError::internal)?); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 36fd42e4..964d06d2 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -268,7 +268,7 @@ impl RPCProcessor { let storage_manager = self.storage_manager(); let result = network_result_try!(storage_manager .inbound_set_value( - opaque_record_key.clone(), + &opaque_record_key, subkey, Arc::new(value), descriptor.map(Arc::new), diff --git a/veilid-core/src/rpc_processor/rpc_transact_command.rs b/veilid-core/src/rpc_processor/rpc_transact_command.rs index 176de8fd..cb1f3f42 100644 --- a/veilid-core/src/rpc_processor/rpc_transact_command.rs +++ b/veilid-core/src/rpc_processor/rpc_transact_command.rs @@ -251,7 +251,7 @@ impl RPCProcessor { let storage_manager = self.storage_manager(); let inbound_transact_value_result = network_result_try!(storage_manager .inbound_transact_command( - opaque_record_key, + &opaque_record_key, transaction_id, command, opt_seqs, @@ -262,17 +262,12 @@ impl RPCProcessor { .map_err(RPCError::internal)?); match inbound_transact_value_result { - InboundTransactCommandResult::Success { - expiration, - opt_seqs, - opt_subkey, - opt_value, - } => ( + InboundTransactCommandResult::Success(res) => ( true, - Some(expiration), - opt_seqs, - opt_subkey, - opt_value.as_ref().map(|x| x.as_ref().clone()), + Some(res.expiration), + res.opt_seqs, + res.opt_subkey, + res.opt_value.as_ref().map(|x| x.as_ref().clone()), ), InboundTransactCommandResult::InvalidTransaction => ( false, @@ -281,6 +276,11 @@ impl RPCProcessor { Default::default(), Default::default(), ), + InboundTransactCommandResult::InvalidArguments => { + return Ok(NetworkResult::invalid_message( + "not processing transact command request with invalid arguments", + )) + } } }; diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index d921cb21..aaf6de78 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -98,7 +98,7 @@ impl StorageManager { }; let opaque_record_key = record_key.opaque(); local_record_store - .debug_record_subkey_info(opaque_record_key, subkey) + .debug_record_subkey_info(&opaque_record_key, subkey) .await } pub async fn debug_remote_record_subkey_info( @@ -112,7 +112,7 @@ impl StorageManager { }; let opaque_record_key = record_key.opaque(); remote_record_store - .debug_record_subkey_info(opaque_record_key, subkey) + .debug_record_subkey_info(&opaque_record_key, subkey) .await } pub async fn debug_local_record_info(&self, record_key: RecordKey) -> String { @@ -121,7 +121,7 @@ impl StorageManager { return "not initialized".to_owned(); }; let opaque_record_key = record_key.opaque(); - let local_debug = local_record_store.debug_record_info(opaque_record_key); + let local_debug = local_record_store.debug_record_info(&opaque_record_key); let opaque_record_key = record_key.opaque(); let opened_debug = if let Some(o) = inner.opened_records.get(&opaque_record_key) { @@ -139,6 +139,6 @@ impl StorageManager { return "not initialized".to_owned(); }; let opaque_record_key = record_key.opaque(); - remote_record_store.debug_record_info(opaque_record_key) + remote_record_store.debug_record_info(&opaque_record_key) } } diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index b95c2708..36d2135e 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -60,7 +60,7 @@ impl StorageManager { // See if the requested subkey is our local record store let last_get_result = self - .handle_get_local_value_inner(&mut inner, opaque_record_key.clone(), subkey, true) + .handle_get_local_value_inner(&mut inner, &opaque_record_key, subkey, true) .await?; // Return the existing value if we have one unless we are forcing a refresh @@ -97,7 +97,7 @@ impl StorageManager { .unwrap_or_default(); let res_rx = self .outbound_get_value( - opaque_record_key.clone(), + &opaque_record_key, subkey, safety_selection, last_get_result, @@ -113,7 +113,7 @@ impl StorageManager { // Process the returned result let out_encrypted = self - .process_outbound_get_value_result(opaque_record_key.clone(), subkey, last_seq, result) + .process_outbound_get_value_result(&opaque_record_key, subkey, last_seq, result) .await?; let out = if let Some(vd) = out_encrypted { Some(self.maybe_decrypt_value_data(&record_key, &vd)?) @@ -141,20 +141,22 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all)] pub async fn inbound_get_value( &self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, want_descriptor: bool, ) -> VeilidAPIResult> { let mut inner = self.inner.lock().await; - // See if the subkey we are getting has a last known remote value - let last_get_result = Self::handle_get_remote_value_inner( - &mut inner, - opaque_record_key, - subkey, - want_descriptor, - ) - .await?; + // See if it's in the remote record store + let last_get_result = { + let Some(remote_record_store) = inner.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + remote_record_store + .get_subkey(opaque_record_key, subkey, want_descriptor) + .await? + .unwrap_or_default() + }; Ok(NetworkResult::value(InboundGetValueResult::Success( last_get_result, @@ -167,7 +169,7 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_get_value( &self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, safety_selection: SafetySelection, last_get_result: GetResult, @@ -183,7 +185,7 @@ impl StorageManager { // Get the nodes we know are caching this value to seed the fanout let init_fanout_queue = { - self.get_value_nodes(opaque_record_key.clone()) + self.get_value_nodes(opaque_record_key) .await? .unwrap_or_default() .into_iter() @@ -398,6 +400,7 @@ impl StorageManager { // Call the fanout in a spawned task let registry = self.registry(); + let fanout_hash_coordinate = opaque_record_key.to_hash_coordinate(); spawn( "outbound_get_value fanout", Box::pin( @@ -405,7 +408,7 @@ impl StorageManager { let routing_table = registry.routing_table(); let fanout_call = FanoutCall::new( &routing_table, - opaque_record_key.to_hash_coordinate(), + fanout_hash_coordinate, key_count, fanout, consensus_count, @@ -481,7 +484,7 @@ impl StorageManager { } }; let is_incomplete = result.fanout_result.kind.is_incomplete(); - let value_data = match this.process_outbound_get_value_result(key.opaque(), subkey, last_seq, result).await { + let value_data = match this.process_outbound_get_value_result(&key.opaque(), subkey, last_seq, result).await { Ok(Some(v)) => v, Ok(None) => { return is_incomplete; @@ -526,7 +529,7 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all)] pub(super) async fn process_outbound_get_value_result( &self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, last_seq: ValueSeqNum, result: get_value::OutboundGetValueResult, @@ -554,8 +557,8 @@ impl StorageManager { &mut inner, opaque_record_key, subkey, + None, get_result_value.clone(), - InboundWatchUpdateMode::UpdateAll, ) .await?; } diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index d6ca7ff7..fefb3c12 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -95,12 +95,7 @@ impl StorageManager { // See if the requested record is our local record store let mut local_inspect_result = self - .handle_inspect_local_value_inner( - &mut inner, - opaque_record_key.clone(), - subkeys.clone(), - true, - ) + .handle_inspect_local_value_inner(&mut inner, &opaque_record_key, subkeys.clone(), true) .await?; // Get the offline subkeys for this record still only returning the ones we're inspecting @@ -145,7 +140,7 @@ impl StorageManager { // Get the inspect record report from the network let result = self .outbound_inspect_value( - opaque_record_key.clone(), + &opaque_record_key, subkeys, safety_selection, if matches!(scope, DHTReportScope::SyncGet | DHTReportScope::SyncSet) { @@ -198,7 +193,7 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_inspect_value( &self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkeys: ValueSubkeyRangeSet, safety_selection: SafetySelection, local_inspect_result: InspectResult, @@ -229,7 +224,7 @@ impl StorageManager { // Get the nodes we know are caching this value to seed the fanout let init_fanout_queue = { - self.get_value_nodes(opaque_record_key.clone()) + self.get_value_nodes(opaque_record_key) .await? .unwrap_or_default() .into_iter() @@ -510,7 +505,7 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all)] pub async fn inbound_inspect_value( &self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkeys: ValueSubkeyRangeSet, want_descriptor: bool, ) -> VeilidAPIResult> { @@ -522,14 +517,13 @@ impl StorageManager { }; // See if the subkey we are getting has a last known remote value - let inspect_result = self - .handle_inspect_remote_value_inner( - &mut inner, - opaque_record_key, - subkeys, - want_descriptor, - ) - .await?; + let Some(remote_record_store) = inner.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + let inspect_result = remote_record_store + .inspect_record(opaque_record_key, &subkeys, want_descriptor) + .await? + .unwrap_or_default(); Ok(NetworkResult::value(InboundInspectValueResult::Success( inspect_result, diff --git a/veilid-core/src/storage_manager/local_record_store_interface.rs b/veilid-core/src/storage_manager/local_record_store_interface.rs new file mode 100644 index 00000000..fa3a073a --- /dev/null +++ b/veilid-core/src/storage_manager/local_record_store_interface.rs @@ -0,0 +1,120 @@ +use super::*; + +impl StorageManager { + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_get_local_value_inner( + &self, + inner: &mut StorageManagerInner, + opaque_record_key: &OpaqueRecordKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> VeilidAPIResult { + // See if the value is in the offline subkey writes first, + // since it may not have been committed yet to the local record store + if let Some(get_result) = self.get_offline_subkey_writes_subkey( + inner, + opaque_record_key, + subkey, + want_descriptor, + )? { + return Ok(get_result); + } + + // See if it's in the local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(get_result) = local_record_store + .get_subkey(opaque_record_key, subkey, want_descriptor) + .await? + { + return Ok(get_result); + } + + Ok(GetResult::default()) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_set_local_value_inner( + &self, + inner: &mut StorageManagerInner, + opaque_record_key: &OpaqueRecordKey, + subkey: ValueSubkey, + opt_transaction_range: Option<&ValueSubkeyRangeSet>, + signed_value_data: Arc, + ) -> VeilidAPIResult<()> { + // See if this new data supercedes any offline subkey writes + self.remove_old_offline_subkey_writes_inner( + inner, + opaque_record_key, + subkey, + signed_value_data.clone(), + ); + + // See if it's in the local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Write subkey to local store + local_record_store + .set_subkey( + opaque_record_key, + subkey, + opt_transaction_range, + signed_value_data, + InboundWatchUpdateMode::NoUpdate, + ) + .await?; + + Ok(()) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_inspect_local_value_inner( + &self, + inner: &mut StorageManagerInner, + opaque_record_key: &OpaqueRecordKey, + subkeys: ValueSubkeyRangeSet, + want_descriptor: bool, + ) -> VeilidAPIResult { + // See if it's in the local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(inspect_result) = local_record_store + .inspect_record(opaque_record_key, &subkeys, want_descriptor) + .await? + { + return Ok(inspect_result); + } + + Ok(InspectResult::default()) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn get_value_nodes( + &self, + opaque_record_key: &OpaqueRecordKey, + ) -> VeilidAPIResult>> { + let inner = self.inner.lock().await; + // Get local record store + let Some(local_record_store) = inner.local_record_store.as_ref() else { + apibail_not_initialized!(); + }; + + // Get routing table to see if we still know about these nodes + let routing_table = self.routing_table(); + + let opt_value_nodes = local_record_store.peek_record(opaque_record_key, |r| { + let d = r.detail(); + d.nodes + .keys() + .cloned() + .filter_map(|nr| routing_table.lookup_node_ref(nr).ok().flatten()) + .collect() + }); + + Ok(opt_value_nodes) + } +} diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index c75409b5..d0287c4e 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -4,6 +4,7 @@ mod debug; mod delete_record; mod get_value; mod inspect_value; +mod local_record_store_interface; mod offline_subkey_writes; mod open_record; mod outbound_transaction_manager; @@ -12,7 +13,6 @@ mod record_encryption; mod record_key; mod record_lock_table; mod record_store; -mod record_store_interface; mod rehydrate; mod schema; mod set_value; @@ -44,8 +44,8 @@ pub(crate) use get_value::InboundGetValueResult; pub(crate) use inspect_value::InboundInspectValueResult; pub(crate) use set_value::InboundSetValueResult; pub(crate) use transaction::OutboundTransactionHandle; -pub(crate) use transaction_begin::InboundTransactBeginResult; -pub(crate) use transaction_command::InboundTransactCommandResult; +pub(crate) use transaction_begin::{InboundTransactBeginResult, TransactBeginSuccess}; +pub(crate) use transaction_command::{InboundTransactCommandResult, TransactCommandSuccess}; pub(crate) use watch_value::{InboundWatchParameters, InboundWatchValueResult}; pub use types::*; @@ -592,7 +592,7 @@ impl StorageManager { #[instrument(level = "trace", target = "stor", skip_all)] fn check_fanout_set_offline( &self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, fanout_result: &FanoutResult, ) -> bool { diff --git a/veilid-core/src/storage_manager/offline_subkey_writes.rs b/veilid-core/src/storage_manager/offline_subkey_writes.rs index 27c1eb02..338b6355 100644 --- a/veilid-core/src/storage_manager/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/offline_subkey_writes.rs @@ -88,7 +88,7 @@ impl StorageManager { pub(super) fn remove_old_offline_subkey_writes_inner( &self, inner: &mut StorageManagerInner, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, signed_value_data: Arc, ) { @@ -144,7 +144,7 @@ impl StorageManager { pub(super) fn finish_offline_subkey_writes_inner( &self, inner: &mut StorageManagerInner, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkeys_written: ValueSubkeyRangeSet, subkeys_still_offline: ValueSubkeyRangeSet, ) { diff --git a/veilid-core/src/storage_manager/open_record.rs b/veilid-core/src/storage_manager/open_record.rs index 8072b9c0..ab1efa42 100644 --- a/veilid-core/src/storage_manager/open_record.rs +++ b/veilid-core/src/storage_manager/open_record.rs @@ -14,6 +14,7 @@ impl StorageManager { }; let mut inner = self.inner.lock().await; + let opaque_record_key = record_key.opaque(); // See if we have a local record already or not if let Some(res) = self @@ -32,7 +33,7 @@ impl StorageManager { let set_consensus = self.config().network.dht.set_value_count as usize; self.add_rehydration_request( - record_key.opaque(), + opaque_record_key, ValueSubkeyRangeSet::full(), set_consensus, ) @@ -53,7 +54,7 @@ impl StorageManager { // Use the safety selection we opened the record with let result = self .outbound_inspect_value( - record_key.opaque(), + &opaque_record_key, ValueSubkeyRangeSet::single(0), safety_selection.clone(), InspectResult::default(), @@ -64,7 +65,7 @@ impl StorageManager { // If we got nothing back, the key wasn't found if result.inspect_result.opt_descriptor().is_none() { // No result - apibail_key_not_found!(record_key.opaque()); + apibail_key_not_found!(opaque_record_key); }; // Check again to see if we have a local record already or not diff --git a/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs b/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs index 7aa0f82c..4a5dd399 100644 --- a/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs +++ b/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs @@ -624,6 +624,16 @@ impl OutboundTransactionManager { )); }; + // Check if the subkey is in range + if subkey + > record_info + .schema() + .ok_or_else(|| VeilidAPIError::internal("missing descriptor"))? + .max_subkey() + { + apibail_invalid_argument!("subkey out of range", "subkey", subkey); + } + let safety_selection = record_info.safety_selection().clone(); let node_transactions = record_info .get_node_transactions() @@ -790,6 +800,16 @@ impl OutboundTransactionManager { )); }; + // Check if the subkey is in range + if subkey + > record_info + .schema() + .ok_or_else(|| VeilidAPIError::internal("missing descriptor"))? + .max_subkey() + { + apibail_invalid_argument!("subkey out of range", "subkey", subkey); + } + let safety_selection = record_info.safety_selection().clone(); let node_transactions = record_info .get_node_transactions() diff --git a/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_record.rs b/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_record.rs index a85a1406..60719b29 100644 --- a/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_record.rs +++ b/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_record.rs @@ -123,6 +123,10 @@ impl OutboundTransactionRecord { if self.local_snapshot.is_none() { stage = OutboundTransactionStage::Failed; } + // Descriptor was never found + if self.descriptor.is_none() { + stage = OutboundTransactionStage::Failed; + } } } diff --git a/veilid-core/src/storage_manager/record_encryption.rs b/veilid-core/src/storage_manager/record_encryption.rs index ff3daa48..c5e34725 100644 --- a/veilid-core/src/storage_manager/record_encryption.rs +++ b/veilid-core/src/storage_manager/record_encryption.rs @@ -35,14 +35,14 @@ impl StorageManager { let encryption_key = SharedSecret::new(record_key.kind(), encryption_key.clone()); vcrypto.crypt_in_place_no_auth(&mut data, &nonce, &encryption_key)?; - Ok(EncryptedValueData::new_with_seq( + Ok(EncryptedValueData::new( value_data.seq(), data, value_data.writer(), Some(nonce), )?) } else { - Ok(EncryptedValueData::new_with_seq( + Ok(EncryptedValueData::new( value_data.seq(), value_data.data().to_vec(), value_data.writer(), diff --git a/veilid-core/src/storage_manager/record_store/inbound_transaction.rs b/veilid-core/src/storage_manager/record_store/inbound_transaction.rs index 46296481..ef8796c2 100644 --- a/veilid-core/src/storage_manager/record_store/inbound_transaction.rs +++ b/veilid-core/src/storage_manager/record_store/inbound_transaction.rs @@ -14,7 +14,7 @@ pub(crate) struct InboundTransaction { /// Snapshot of record contents if record exists pub opt_snapshot: Option>, /// What has changed since snapshot - pub changed_subkeys: BTreeMap>>, + pub changed_subkeys: BTreeMap>, } #[derive(Debug, Default, Clone)] @@ -41,70 +41,16 @@ where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, { #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn snapshot_record( + pub async fn begin_inbound_transaction( &mut self, - record_key: &OpaqueRecordKey, - ) -> VeilidAPIResult>> { - let Some((subkey_count, stored_subkeys)) = self.with_record(record_key, |record| { - (record.subkey_count(), record.stored_subkeys().clone()) - }) else { - // Record not available - return Ok(None); - }; - - // Snapshot all subkeys - let mut all_value_data = vec![Option::>::None; subkey_count]; - - for subkey in stored_subkeys.iter() { - // If subkey exists in subkey cache, use that - let stk = SubkeyTableKey { - record_key: record_key.clone(), - subkey, - }; - let svd = match self.subkey_cache.get(&stk) { - Some(record_data) => record_data.signed_value_data(), - None => - // If not in cache, try to pull from table store if it is in our stored subkey set - { - match self - .subkey_table - .load_json::(0, &stk.bytes()) - .await - .map_err(VeilidAPIError::internal)? - { - Some(record_data) => { - let out = record_data.signed_value_data().clone(); - // Add to cache, do nothing with lru out - self.add_to_subkey_cache(stk, record_data); - out - } - None => { - apibail_internal!("failed to snapshot subkey that was stored"); - } - } - } - }; - - let subkey = usize::try_from(subkey).map_err(VeilidAPIError::internal)?; - all_value_data[subkey] = Some(svd); - } - - let out = Arc::new(RecordSnapshot::new(all_value_data)); - - Ok(Some(out)) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn new_inbound_transaction( - &mut self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, opt_descriptor: Option, want_descriptor: bool, signing_member_id: MemberId, ) -> VeilidAPIResult { // Get descriptor let opt_existing_descriptor = - self.with_record(&opaque_record_key, |record| record.descriptor()); + self.with_record(opaque_record_key, |record| record.descriptor()); let descriptor = match opt_existing_descriptor { Some(x) => x, None => { @@ -138,6 +84,11 @@ where let is_member = member_check(&signing_member_id); if let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) { + // If there is a record lock then this transaction can not be started right now + if active_transaction_list.record_lock.is_some() { + return Ok(InboundTransactBeginResult::TransactionUnavailable); + } + // Total up the number of transactions for this key for t in &mut active_transaction_list.transactions { existing_ids.insert(t.id); @@ -169,7 +120,7 @@ where } // Transaction can be added, so let's get a snapshot if the record exists already - let opt_snapshot = self.snapshot_record(&opaque_record_key).await?; + let opt_snapshot = self.snapshot_record(opaque_record_key).await?; // Generate a record-unique transaction id > 0 let mut id = 0; @@ -179,7 +130,7 @@ where // Make sure it doesn't match any other id or zero (unlikely, but lets be certain) while existing_ids.contains(&id) { let next_id = id.overflowing_add(1); - id = next_id.0 + next_id.1.then_some(1).unwrap_or_default(); + id = next_id.0 + if next_id.1 { 1 } else { 0 }; } // Make transaction expiration timestamp @@ -209,7 +160,7 @@ where .push(inbound_transaction); // Return the result - Ok(InboundTransactBeginResult::Success(TransactBeginResult { + Ok(InboundTransactBeginResult::Success(TransactBeginSuccess { transaction_id: id, expiration, opt_descriptor: want_descriptor.then_some(descriptor), @@ -219,7 +170,7 @@ where pub async fn end_inbound_transaction( &mut self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, transaction_id: u64, ) -> VeilidAPIResult { // See if this transaction is still valid @@ -253,19 +204,21 @@ where // If there's no changes, we can just quit early with a zero expiration to indicate no commit or rollback is necessary if inbound_transaction.changed_subkeys.is_empty() { - return Ok(InboundTransactCommandResult::Success { - expiration: Default::default(), - opt_seqs: Default::default(), - opt_subkey: Default::default(), - opt_value: Default::default(), - }); + return Ok(InboundTransactCommandResult::Success( + TransactCommandSuccess { + expiration: Default::default(), + opt_seqs: Default::default(), + opt_subkey: Default::default(), + opt_value: Default::default(), + }, + )); } inbound_transaction.opt_snapshot.clone() }; - let end_snapshot = self.snapshot_record(&opaque_record_key).await?; + let end_snapshot = self.snapshot_record(opaque_record_key).await?; - // If our snapshot still validates, then the changes can be applied + // If the snapshot doesn't validate then the transaction is not valid if begin_snapshot.as_ref().map(|s| s.seqs()) != end_snapshot.map(|s| s.seqs()) { let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else { return Ok(InboundTransactCommandResult::InvalidTransaction); @@ -276,6 +229,8 @@ where return Ok(InboundTransactCommandResult::InvalidTransaction); } + // Everything is valid, we can end the transaction successfully + // Lock the record let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else { return Ok(InboundTransactCommandResult::InvalidTransaction); @@ -285,42 +240,250 @@ where // Give the user another timeout bump to allow for commit let expiration = Timestamp::now().later(self.limits.transaction_timeout); - Ok(InboundTransactCommandResult::Success { - expiration, - opt_seqs: Default::default(), - opt_subkey: Default::default(), - opt_value: Default::default(), - }) + Ok(InboundTransactCommandResult::Success( + TransactCommandSuccess { + expiration, + opt_seqs: Default::default(), + opt_subkey: Default::default(), + opt_value: Default::default(), + }, + )) } pub async fn commit_inbound_transaction( &mut self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, transaction_id: u64, ) -> VeilidAPIResult { + // See if this transaction is still valid + let rtk = RecordTableKey { + record_key: opaque_record_key.clone(), + }; + + let transaction = { + let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else { + return Ok(InboundTransactCommandResult::InvalidTransaction); + }; + + // If there is a record lock then it better be ours + // * If there is no record lock, then this commit is out of order and this transaction should be dropped + // * If the lock id is ours, then we can commit + // * If the lock id is not ours, then this commit is out of order and this transaction should be dropped + if active_transaction_list.record_lock != Some(transaction_id) { + // Drop inbound transaction + active_transaction_list.drop_transaction(transaction_id); + return Ok(InboundTransactCommandResult::InvalidTransaction); + } + + // Get the inbound transaction if it is still valid + let Some(inbound_transaction) = active_transaction_list + .transactions + .iter_mut() + .find(|x| x.id == transaction_id) + else { + apibail_internal!("inbound transaction missing even though it is locked"); + }; + + // If there's no changes, the transaction should have been dropped by 'end' and not locked and we shouldnt get here + if inbound_transaction.changed_subkeys.is_empty() { + apibail_internal!("no changes in locked transaction"); + } + + inbound_transaction.clone() + }; + + // Apply all changes + let transaction_range = + ValueSubkeyRangeSet::from_iter(transaction.changed_subkeys.keys().copied()); + for (subkey, changed_signed_value_data) in transaction.changed_subkeys.iter() { + if let Err(e) = self + .set_subkey( + opaque_record_key, + *subkey, + Some(&transaction_range), + changed_signed_value_data.clone(), + InboundWatchUpdateMode::UpdateAll, + ) + .await + { + veilid_log!(self error "set_subkey failed in transaction: {}", e); + } + } + + // Drop transaction and lock now that we're done + { + let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else { + return Ok(InboundTransactCommandResult::InvalidTransaction); + }; + + active_transaction_list.drop_transaction(transaction_id); + } + + Ok(InboundTransactCommandResult::Success( + TransactCommandSuccess { + expiration: Default::default(), + opt_seqs: Default::default(), + opt_subkey: Default::default(), + opt_value: Default::default(), + }, + )) } - pub async fn rollback_inbound_transaction( + pub fn rollback_inbound_transaction( &mut self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, transaction_id: u64, ) -> VeilidAPIResult { + // See if this transaction is still valid + let rtk = RecordTableKey { + record_key: opaque_record_key.clone(), + }; + + // Rollback just needs to drop the transaction wherever it is + { + let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else { + return Ok(InboundTransactCommandResult::InvalidTransaction); + }; + active_transaction_list.drop_transaction(transaction_id); + } + + Ok(InboundTransactCommandResult::Success( + TransactCommandSuccess { + expiration: Default::default(), + opt_seqs: Default::default(), + opt_subkey: Default::default(), + opt_value: Default::default(), + }, + )) } - pub async fn inbound_transaction_get( + pub fn inbound_transaction_get( &mut self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, transaction_id: u64, subkey: ValueSubkey, ) -> VeilidAPIResult { + // See if this transaction is still valid + let rtk = RecordTableKey { + record_key: opaque_record_key.clone(), + }; + + // If the transaction is still active and not ended/locked + let opt_value = { + let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else { + return Ok(InboundTransactCommandResult::InvalidTransaction); + }; + + if active_transaction_list.record_lock == Some(transaction_id) { + active_transaction_list.drop_transaction(transaction_id); + return Ok(InboundTransactCommandResult::InvalidTransaction); + } + + // Get the inbound transaction if it is still valid + let Some(inbound_transaction) = active_transaction_list + .transactions + .iter_mut() + .find(|x| x.id == transaction_id) + else { + // Nothing to drop + return Ok(InboundTransactCommandResult::InvalidTransaction); + }; + + // Ensure subkey is within bounds + if subkey > inbound_transaction.descriptor.schema()?.max_subkey() { + return Ok(InboundTransactCommandResult::InvalidArguments); + } + + // Get value to return + if let Some(snapshot) = &inbound_transaction.opt_snapshot { + snapshot.subkey_value_data(subkey)? + } else { + None + } + }; + + // Give the user another timeout bump to allow for more commands + let expiration = Timestamp::now().later(self.limits.transaction_timeout); + + Ok(InboundTransactCommandResult::Success( + TransactCommandSuccess { + expiration, + opt_seqs: Default::default(), + opt_subkey: Some(subkey), + opt_value, + }, + )) } - pub async fn inbound_transaction_set( + pub fn inbound_transaction_set( &mut self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, transaction_id: u64, subkey: ValueSubkey, value: Arc, ) -> VeilidAPIResult { + // See if this transaction is still valid + let rtk = RecordTableKey { + record_key: opaque_record_key.clone(), + }; + + // If the transaction is still active and not ended/locked + let opt_value = { + let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else { + return Ok(InboundTransactCommandResult::InvalidTransaction); + }; + + if active_transaction_list.record_lock == Some(transaction_id) { + active_transaction_list.drop_transaction(transaction_id); + return Ok(InboundTransactCommandResult::InvalidTransaction); + } + + // Get the inbound transaction if it is still valid + let Some(inbound_transaction) = active_transaction_list + .transactions + .iter_mut() + .find(|x| x.id == transaction_id) + else { + // Nothing to drop + return Ok(InboundTransactCommandResult::InvalidTransaction); + }; + + // Ensure subkey is within bounds + if subkey > inbound_transaction.descriptor.schema()?.max_subkey() { + return Ok(InboundTransactCommandResult::InvalidArguments); + } + + // Get value to compare against + let opt_existing_value = if let Some(snapshot) = &inbound_transaction.opt_snapshot { + snapshot.subkey_value_data(subkey)? + } else { + None + }; + + // If the proposed sequence number is newer, then return no value + if value.value_data().seq() + > opt_existing_value + .as_ref() + .map(|x| x.value_data().seq()) + .unwrap_or_default() + { + None + } else { + // Otherwise return the existing value + opt_existing_value + } + }; + + // Give the user another timeout bump to allow for more commands + let expiration = Timestamp::now().later(self.limits.transaction_timeout); + + Ok(InboundTransactCommandResult::Success( + TransactCommandSuccess { + expiration, + opt_seqs: Default::default(), + opt_subkey: Some(subkey), + opt_value, + }, + )) } } diff --git a/veilid-core/src/storage_manager/record_store/inbound_watch.rs b/veilid-core/src/storage_manager/record_store/inbound_watch.rs index 100fafc6..1a1ac250 100644 --- a/veilid-core/src/storage_manager/record_store/inbound_watch.rs +++ b/veilid-core/src/storage_manager/record_store/inbound_watch.rs @@ -7,7 +7,8 @@ pub(crate) struct InboundWatch { pub params: InboundWatchParameters, /// A unique id per record assigned at watch creation time. Used to disambiguate a client's version of a watch pub id: u64, - /// What has changed since the last update + /// What has changed in the watched range since the last update. + /// May include non-watched ranges if they were changed as part of an overlapping transaction pub changed: ValueSubkeyRangeSet, } @@ -35,8 +36,9 @@ where #[instrument(level = "trace", target = "stor", skip_all)] pub(super) async fn update_watched_value( &mut self, - record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, + opt_transaction_range: Option<&ValueSubkeyRangeSet>, watch_update_mode: InboundWatchUpdateMode, ) { let (do_update, opt_ignore_target) = match watch_update_mode { @@ -48,25 +50,31 @@ where return; } - let rtk = RecordTableKey { record_key }; + let rtk = RecordTableKey { + record_key: opaque_record_key.clone(), + }; let Some(wr) = self.watched_records.get_mut(&rtk) else { return; }; // Update all watchers - let mut changed = false; + let mut changed_watched = false; for w in &mut wr.watches { // If this watcher is watching the changed subkey then add to the watcher's changed list // Don't bother marking changes for value sets coming from the same watching node/target because they // are already going to be aware of the changes in that case if Some(&w.params.target) != opt_ignore_target.as_ref() && w.params.subkeys.contains(subkey) - && w.changed.insert(subkey) { - changed = true; + if let Some(transaction_range) = opt_transaction_range { + w.changed = w.changed.union(transaction_range); + } else { + w.changed.insert(subkey); + } + changed_watched = true; } } - if changed { + if changed_watched { self.changed_watched_values.insert(rtk); } } @@ -74,13 +82,13 @@ where #[instrument(level = "trace", target = "stor", skip_all, err)] async fn create_new_watch( &mut self, - record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, params: InboundWatchParameters, member_check: Box bool + Send>, ) -> VeilidAPIResult { // Generate a record-unique watch id > 0 let rtk = RecordTableKey { - record_key: record_key.clone(), + record_key: opaque_record_key.clone(), }; // Calculate watch limits @@ -141,7 +149,7 @@ where // Make sure it doesn't match any other id (unlikely, but lets be certain) while existing_ids.contains(&id) { let next_id = id.overflowing_add(1); - id = next_id.0 + next_id.1.then_some(1).unwrap_or_default(); + id = next_id.0 + if next_id.1 { 1 } else { 0 }; } // Ok this is an acceptable new watch, add it @@ -150,7 +158,7 @@ where watch_list.watches.push(InboundWatch { params, id, - changed: ValueSubkeyRangeSet::new(), + changed: Default::default(), }); Ok(InboundWatchValueResult::Created { id, expiration }) } @@ -158,7 +166,7 @@ where #[instrument(level = "trace", target = "stor", skip_all, err)] async fn change_existing_watch( &mut self, - record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, params: InboundWatchParameters, watch_id: u64, ) -> VeilidAPIResult { @@ -169,7 +177,9 @@ where apibail_internal!("zero expiration should have been resolved to max by now"); } // Get the watch list for this record - let rtk = RecordTableKey { record_key }; + let rtk = RecordTableKey { + record_key: opaque_record_key.clone(), + }; let Some(watch_list) = self.watched_records.get_mut(&rtk) else { // No watches, nothing to change return Ok(InboundWatchValueResult::Rejected); @@ -196,7 +206,7 @@ where #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn watch_record( &mut self, - record_key: OpaqueRecordKey, + opaque_record_key: OpaqueRecordKey, mut params: InboundWatchParameters, opt_watch_id: Option, ) -> VeilidAPIResult { @@ -204,7 +214,11 @@ where if params.count == 0 { if let Some(watch_id) = opt_watch_id { let cancelled = self - .cancel_watch(record_key.clone(), watch_id, params.watcher_member_id) + .cancel_watch( + opaque_record_key.clone(), + watch_id, + params.watcher_member_id, + ) .await?; if cancelled { return Ok(InboundWatchValueResult::Cancelled); @@ -226,7 +240,7 @@ where // Don't add watches with too low of an expiration time if let Some(watch_id) = opt_watch_id { let cancelled = self - .cancel_watch(record_key, watch_id, params.watcher_member_id) + .cancel_watch(opaque_record_key, watch_id, params.watcher_member_id) .await?; if cancelled { return Ok(InboundWatchValueResult::Cancelled); @@ -236,7 +250,7 @@ where } // Make a closure to check for member vs anonymous - let Some((schema, owner)) = self.with_record(&record_key, |record| { + let Some((schema, owner)) = self.with_record(&opaque_record_key, |record| { let schema = record.schema(); let owner = record.owner(); (schema, owner) @@ -251,10 +265,10 @@ where // Create or update depending on if a watch id is specified or not if let Some(watch_id) = opt_watch_id { - self.change_existing_watch(record_key, params, watch_id) + self.change_existing_watch(&opaque_record_key, params, watch_id) .await } else { - self.create_new_watch(record_key, params, member_check) + self.create_new_watch(&opaque_record_key, params, member_check) .await } } @@ -344,6 +358,7 @@ where continue; } + // Clear the change logs w.changed.clear(); // Reduce the count of changes sent @@ -378,25 +393,30 @@ where } for evci in evcis { - // Get the first subkey data - let Some(first_subkey) = evci.subkeys.first() else { - veilid_log!(self error "first subkey should exist for value change notification"); - continue; - }; - let get_result = match self.get_subkey(evci.key.clone(), first_subkey, false).await { - Ok(Some(skr)) => skr, - Ok(None) => { - veilid_log!(self error "subkey should have data for value change notification"); + // Get a single subkey data if we can send it + let value = if evci.subkeys.len() == 1 { + let Some(first_subkey) = evci.subkeys.first() else { + veilid_log!(self error "first subkey should exist for value change notification"); continue; - } - Err(e) => { - veilid_log!(self error "error getting subkey data for value change notification: {}", e); + }; + let get_result = match self.get_subkey(&evci.key, first_subkey, false).await { + Ok(Some(skr)) => skr, + Ok(None) => { + veilid_log!(self error "subkey should have data for value change notification"); + continue; + } + Err(e) => { + veilid_log!(self error "error getting subkey data for value change notification: {}", e); + continue; + } + }; + let Some(value) = get_result.opt_value else { + veilid_log!(self error "first subkey should have had value for value change notification"); continue; - } - }; - let Some(value) = get_result.opt_value else { - veilid_log!(self error "first subkey should have had value for value change notification"); - continue; + }; + Some(value) + } else { + None }; changes.push(ValueChangedInfo { @@ -405,7 +425,7 @@ where subkeys: evci.subkeys, count: evci.count, watch_id: evci.watch_id, - value: Some(value), + value, }); } } diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index 3ad3736c..a8e7e8e5 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -141,19 +141,6 @@ pub struct InspectResult { opt_descriptor: Option>, } -/// The result of a single successful transaction begin -#[derive(Default, Debug, Clone)] -pub struct TransactBeginResult { - /// Transaction id - pub transaction_id: u64, - /// Expiration timestamp - pub expiration: Timestamp, - /// Descriptor - pub opt_descriptor: Option>, - /// Sequence numbers for record - pub seqs: Vec, -} - impl InspectResult { pub fn new( registry_accessor: &impl VeilidComponentRegistryAccessor, @@ -174,7 +161,7 @@ impl InspectResult { veilid_log!(registry_accessor error "{}: more subkeys returned than requested: {} not a subset of {}", log_context, subkeys, requested_subkeys); apibail_internal!("invalid subkeys returned"); } - Ok(InspectResult { + Ok(Self { subkeys, seqs, opt_descriptor, @@ -492,10 +479,12 @@ where #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn new_record( &mut self, - record_key: OpaqueRecordKey, + opaque_record_key: OpaqueRecordKey, record: Record, ) -> VeilidAPIResult<()> { - let rtk = RecordTableKey { record_key }; + let rtk = RecordTableKey { + record_key: opaque_record_key, + }; if self.record_index.contains_key(&rtk) { apibail_internal!("record already exists"); } @@ -538,9 +527,14 @@ where } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn delete_record(&mut self, record_key: OpaqueRecordKey) -> VeilidAPIResult<()> { + pub async fn delete_record( + &mut self, + opaque_record_key: OpaqueRecordKey, + ) -> VeilidAPIResult<()> { // Get the record table key - let rtk = RecordTableKey { record_key }; + let rtk = RecordTableKey { + record_key: opaque_record_key, + }; // Remove record from the index let Some(record) = self.record_index.remove(&rtk) else { @@ -564,22 +558,26 @@ where } #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn contains_record(&mut self, record_key: &OpaqueRecordKey) -> bool { + pub(super) fn contains_record(&mut self, opaque_record_key: &OpaqueRecordKey) -> bool { let rtk = RecordTableKey { - record_key: record_key.clone(), + record_key: opaque_record_key.clone(), }; self.record_index.contains_key(&rtk) } #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn with_record(&mut self, record_key: &OpaqueRecordKey, f: F) -> Option + pub(super) fn with_record( + &mut self, + opaque_record_key: &OpaqueRecordKey, + f: F, + ) -> Option where F: FnOnce(&Record) -> R, { // Get record from index let mut out = None; let rtk = RecordTableKey { - record_key: record_key.clone(), + record_key: opaque_record_key.clone(), }; if let Some(record) = self.record_index.get_mut(&rtk) { // Callback @@ -598,14 +596,14 @@ where } #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn peek_record(&self, record_key: &OpaqueRecordKey, f: F) -> Option + pub(super) fn peek_record(&self, opaque_record_key: &OpaqueRecordKey, f: F) -> Option where F: FnOnce(&Record) -> R, { // Get record from index let mut out = None; let rtk = RecordTableKey { - record_key: record_key.clone(), + record_key: opaque_record_key.clone(), }; if let Some(record) = self.record_index.peek(&rtk) { // Callback @@ -615,14 +613,18 @@ where } #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn with_record_mut(&mut self, record_key: &OpaqueRecordKey, f: F) -> Option + pub(super) fn with_record_mut( + &mut self, + opaque_record_key: &OpaqueRecordKey, + f: F, + ) -> Option where F: FnOnce(&mut Record) -> R, { // Get record from index let mut out = None; let rtk = RecordTableKey { - record_key: record_key.clone(), + record_key: opaque_record_key.clone(), }; if let Some(record) = self.record_index.get_mut(&rtk) { // Callback @@ -643,13 +645,13 @@ where #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn get_subkey( &mut self, - record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, want_descriptor: bool, ) -> VeilidAPIResult> { // Get record from index let Some((subkey_count, has_subkey, opt_descriptor)) = - self.with_record(&record_key, |record| { + self.with_record(opaque_record_key, |record| { ( record.subkey_count(), record.stored_subkeys().contains(subkey), @@ -680,7 +682,10 @@ where } // If subkey exists in subkey cache, use that - let stk = SubkeyTableKey { record_key, subkey }; + let stk = SubkeyTableKey { + record_key: opaque_record_key.clone(), + subkey, + }; if let Some(record_data) = self.subkey_cache.get(&stk) { let out = record_data.signed_value_data().clone(); @@ -713,13 +718,13 @@ where #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn peek_subkey( &self, - record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, want_descriptor: bool, ) -> VeilidAPIResult> { // record from index let Some((subkey_count, has_subkey, opt_descriptor)) = - self.peek_record(&record_key, |record| { + self.peek_record(opaque_record_key, |record| { ( record.subkey_count(), record.stored_subkeys().contains(subkey), @@ -750,7 +755,10 @@ where } // If subkey exists in subkey cache, use that - let stk = SubkeyTableKey { record_key, subkey }; + let stk = SubkeyTableKey { + record_key: opaque_record_key.clone(), + subkey, + }; if let Some(record_data) = self.subkey_cache.peek(&stk) { let out = record_data.signed_value_data().clone(); @@ -780,8 +788,9 @@ where #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn set_subkey( &mut self, - record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, + opt_transaction_range: Option<&ValueSubkeyRangeSet>, signed_value_data: Arc, watch_update_mode: InboundWatchUpdateMode, ) -> VeilidAPIResult<()> { @@ -796,11 +805,11 @@ where // Get record subkey count and total size of all record subkey data exclusive of structures let Some((subkey_count, prior_record_data_size)) = self - .with_record(&record_key, |record| { + .with_record(opaque_record_key, |record| { (record.subkey_count(), record.record_data_size()) }) else { - apibail_invalid_argument!("no record at this key", "key", record_key); + apibail_invalid_argument!("no record at this key", "key", opaque_record_key); }; // Check if the subkey is in range @@ -813,7 +822,7 @@ where // If subkey exists in subkey cache, use that let stk = SubkeyTableKey { - record_key: record_key.clone(), + record_key: opaque_record_key.clone(), subkey, }; let stk_bytes = stk.bytes(); @@ -861,7 +870,7 @@ where // Write to inspect cache self.inspect_cache.replace_subkey_seq( - &stk.record_key, + opaque_record_key, subkey, subkey_record_data.signed_value_data().value_data().seq(), ); @@ -870,7 +879,7 @@ where self.add_to_subkey_cache(stk, subkey_record_data); // Update record - self.with_record_mut(&record_key, |record| { + self.with_record_mut(opaque_record_key, |record| { record.store_subkey(subkey); record.set_record_data_size(new_record_data_size); }) @@ -879,9 +888,14 @@ where // Update storage space self.total_storage_space.commit().unwrap(); - // Send updates to - self.update_watched_value(record_key, subkey, watch_update_mode) - .await; + // Register change with inbound watches + self.update_watched_value( + opaque_record_key, + subkey, + opt_transaction_range, + watch_update_mode, + ) + .await; Ok(()) } @@ -889,26 +903,28 @@ where #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn inspect_record( &mut self, - record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkeys: &ValueSubkeyRangeSet, want_descriptor: bool, ) -> VeilidAPIResult> { // Get record from index - let Some((schema_subkeys, opt_descriptor)) = self.with_record(&record_key, |record| { - // Get number of subkeys from schema and ensure we are getting the - // right number of sequence numbers betwen that and what we asked for - let schema_subkeys = record - .schema() - .truncate_subkeys(subkeys, Some(DHTSchema::MAX_SUBKEY_COUNT)); - ( - schema_subkeys, - if want_descriptor { - Some(record.descriptor().clone()) - } else { - None - }, - ) - }) else { + let Some((schema_subkeys, opt_descriptor)) = + self.with_record(opaque_record_key, |record| { + // Get number of subkeys from schema and ensure we are getting the + // right number of sequence numbers betwen that and what we asked for + let schema_subkeys = record + .schema() + .truncate_subkeys(subkeys, Some(DHTSchema::MAX_SUBKEY_COUNT)); + ( + schema_subkeys, + if want_descriptor { + Some(record.descriptor().clone()) + } else { + None + }, + ) + }) + else { // Record not available return Ok(None); }; @@ -920,7 +936,7 @@ where } // See if we have this inspection cached - if let Some(icv) = self.inspect_cache.get(&record_key, &schema_subkeys) { + if let Some(icv) = self.inspect_cache.get(opaque_record_key, &schema_subkeys) { return Ok(Some(InspectResult::new( self, subkeys.clone(), @@ -936,7 +952,7 @@ where let mut seqs = Vec::with_capacity(schema_subkeys.len() as usize); for subkey in schema_subkeys.iter() { let stk = SubkeyTableKey { - record_key: record_key.clone(), + record_key: opaque_record_key.clone(), subkey, }; let seq = if let Some(record_data) = self.subkey_cache.peek(&stk) { @@ -956,7 +972,7 @@ where // Save seqs cache self.inspect_cache.put( - record_key, + opaque_record_key.clone(), schema_subkeys.clone(), InspectCacheL2Value { seqs: seqs.clone() }, ); @@ -1023,11 +1039,13 @@ where out } - pub fn debug_record_info(&self, record_key: OpaqueRecordKey) -> String { + pub fn debug_record_info(&self, opaque_record_key: &OpaqueRecordKey) -> String { let record_info = self - .peek_record(&record_key, |r| format!("{:#?}", r)) + .peek_record(opaque_record_key, |r| format!("{:#?}", r)) .unwrap_or("Not found".to_owned()); - let watched_record = match self.watched_records.get(&RecordTableKey { record_key }) { + let watched_record = match self.watched_records.get(&RecordTableKey { + record_key: opaque_record_key.clone(), + }) { Some(w) => { format!("Remote Watches: {:#?}", w) } @@ -1038,10 +1056,10 @@ where pub async fn debug_record_subkey_info( &self, - record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, ) -> String { - match self.peek_subkey(record_key, subkey, true).await { + match self.peek_subkey(opaque_record_key, subkey, true).await { Ok(Some(v)) => { format!("{:#?}", v) } diff --git a/veilid-core/src/storage_manager/record_store/record_snapshot.rs b/veilid-core/src/storage_manager/record_store/record_snapshot.rs index f48b8e67..34e4ab49 100644 --- a/veilid-core/src/storage_manager/record_store/record_snapshot.rs +++ b/veilid-core/src/storage_manager/record_store/record_snapshot.rs @@ -9,10 +9,6 @@ impl RecordSnapshot { pub fn new(all_value_data: Vec>>) -> Self { Self { all_value_data } } - pub fn all_value_data(&self) -> &[Option>] { - &self.all_value_data - } - pub fn subkey_value_data( &self, subkey: ValueSubkey, @@ -47,3 +43,62 @@ impl RecordSnapshot { .collect() } } + +impl RecordStore +where + D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, +{ + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub async fn snapshot_record( + &mut self, + record_key: &OpaqueRecordKey, + ) -> VeilidAPIResult>> { + let Some((subkey_count, stored_subkeys)) = self.with_record(record_key, |record| { + (record.subkey_count(), record.stored_subkeys().clone()) + }) else { + // Record not available + return Ok(None); + }; + + // Snapshot all subkeys + let mut all_value_data = vec![Option::>::None; subkey_count]; + + for subkey in stored_subkeys.iter() { + // If subkey exists in subkey cache, use that + let stk = SubkeyTableKey { + record_key: record_key.clone(), + subkey, + }; + let svd = match self.subkey_cache.get(&stk) { + Some(record_data) => record_data.signed_value_data(), + None => + // If not in cache, try to pull from table store if it is in our stored subkey set + { + match self + .subkey_table + .load_json::(0, &stk.bytes()) + .await + .map_err(VeilidAPIError::internal)? + { + Some(record_data) => { + let out = record_data.signed_value_data().clone(); + // Add to cache, do nothing with lru out + self.add_to_subkey_cache(stk, record_data); + out + } + None => { + apibail_internal!("failed to snapshot subkey that was stored"); + } + } + } + }; + + let subkey = usize::try_from(subkey).map_err(VeilidAPIError::internal)?; + all_value_data[subkey] = Some(svd); + } + + let out = Arc::new(RecordSnapshot::new(all_value_data)); + + Ok(Some(out)) + } +} diff --git a/veilid-core/src/storage_manager/record_store_interface.rs b/veilid-core/src/storage_manager/record_store_interface.rs deleted file mode 100644 index a3495759..00000000 --- a/veilid-core/src/storage_manager/record_store_interface.rs +++ /dev/null @@ -1,305 +0,0 @@ -use super::*; - -impl StorageManager { - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_get_local_value_inner( - &self, - inner: &mut StorageManagerInner, - opaque_record_key: OpaqueRecordKey, - subkey: ValueSubkey, - want_descriptor: bool, - ) -> VeilidAPIResult { - // See if the value is in the offline subkey writes first, - // since it may not have been committed yet to the local record store - if let Some(get_result) = self.get_offline_subkey_writes_subkey( - inner, - &opaque_record_key, - subkey, - want_descriptor, - )? { - return Ok(get_result); - } - - // See if it's in the local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - if let Some(get_result) = local_record_store - .get_subkey(opaque_record_key, subkey, want_descriptor) - .await? - { - return Ok(get_result); - } - - Ok(GetResult { - opt_value: None, - opt_descriptor: None, - }) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_set_local_value_inner( - &self, - inner: &mut StorageManagerInner, - opaque_record_key: OpaqueRecordKey, - subkey: ValueSubkey, - signed_value_data: Arc, - watch_update_mode: InboundWatchUpdateMode, - ) -> VeilidAPIResult<()> { - // See if this new data supercedes any offline subkey writes - self.remove_old_offline_subkey_writes_inner( - inner, - opaque_record_key.clone(), - subkey, - signed_value_data.clone(), - ); - - // See if it's in the local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // Write subkey to local store - local_record_store - .set_subkey( - opaque_record_key, - subkey, - signed_value_data, - watch_update_mode, - ) - .await?; - - Ok(()) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_inspect_local_value_inner( - &self, - inner: &mut StorageManagerInner, - opaque_record_key: OpaqueRecordKey, - subkeys: ValueSubkeyRangeSet, - want_descriptor: bool, - ) -> VeilidAPIResult { - // See if it's in the local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - if let Some(inspect_result) = local_record_store - .inspect_record(opaque_record_key, &subkeys, want_descriptor) - .await? - { - return Ok(inspect_result); - } - - InspectResult::new( - self, - subkeys, - "handle_inspect_local_value_inner", - ValueSubkeyRangeSet::new(), - vec![], - None, - ) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_get_remote_value_inner( - inner: &mut StorageManagerInner, - opaque_record_key: OpaqueRecordKey, - subkey: ValueSubkey, - want_descriptor: bool, - ) -> VeilidAPIResult { - // See if it's in the remote record store - let Some(remote_record_store) = inner.remote_record_store.as_mut() else { - apibail_not_initialized!(); - }; - if let Some(get_result) = remote_record_store - .get_subkey(opaque_record_key, subkey, want_descriptor) - .await? - { - return Ok(get_result); - } - - Ok(GetResult { - opt_value: None, - opt_descriptor: None, - }) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_set_remote_value_inner( - inner: &mut StorageManagerInner, - opaque_record_key: OpaqueRecordKey, - subkey: ValueSubkey, - signed_value_data: Arc, - signed_value_descriptor: Arc, - watch_update_mode: InboundWatchUpdateMode, - ) -> VeilidAPIResult<()> { - // See if it's in the remote record store - let Some(remote_record_store) = inner.remote_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // See if we have a remote record already or not - if remote_record_store - .with_record(&opaque_record_key, |_| {}) - .is_none() - { - // record didn't exist, make it - let cur_ts = Timestamp::now(); - let remote_record_detail = RemoteRecordDetail {}; - let record = Record::::new( - cur_ts, - signed_value_descriptor, - remote_record_detail, - )?; - remote_record_store - .new_record(opaque_record_key.clone(), record) - .await? - }; - - // Write subkey to remote store - remote_record_store - .set_subkey( - opaque_record_key, - subkey, - signed_value_data, - watch_update_mode, - ) - .await?; - - Ok(()) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_inspect_remote_value_inner( - &self, - inner: &mut StorageManagerInner, - opaque_record_key: OpaqueRecordKey, - subkeys: ValueSubkeyRangeSet, - want_descriptor: bool, - ) -> VeilidAPIResult { - // See if it's in the local record store - let Some(remote_record_store) = inner.remote_record_store.as_mut() else { - apibail_not_initialized!(); - }; - if let Some(inspect_result) = remote_record_store - .inspect_record(opaque_record_key, &subkeys, want_descriptor) - .await? - { - return Ok(inspect_result); - } - - InspectResult::new( - self, - subkeys, - "handle_inspect_remote_value_inner", - ValueSubkeyRangeSet::new(), - vec![], - None, - ) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn get_value_nodes( - &self, - opaque_record_key: OpaqueRecordKey, - ) -> VeilidAPIResult>> { - let inner = self.inner.lock().await; - // Get local record store - let Some(local_record_store) = inner.local_record_store.as_ref() else { - apibail_not_initialized!(); - }; - - // Get routing table to see if we still know about these nodes - let routing_table = self.routing_table(); - - let opt_value_nodes = local_record_store.peek_record(&opaque_record_key, |r| { - let d = r.detail(); - d.nodes - .keys() - .cloned() - .filter_map(|nr| routing_table.lookup_node_ref(nr).ok().flatten()) - .collect() - }); - - Ok(opt_value_nodes) - } - - // #[instrument(level = "trace", target = "stor", skip_all, err)] - // async fn move_remote_record_to_local_inner( - // &self, - // inner: &mut StorageManagerInner, - // record_key: RecordKey, - // safety_selection: SafetySelection, - // ) -> VeilidAPIResult> { - // // Get local record store - // let Some(local_record_store) = inner.local_record_store.as_mut() else { - // apibail_not_initialized!(); - // }; - - // // Get remote record store - // let Some(remote_record_store) = inner.remote_record_store.as_mut() else { - // apibail_not_initialized!(); - // }; - - // let rcb = |r: &Record| { - // // Return record details - // r.clone() - // }; - // let opaque_record_key = record_key.opaque(); - // let Some(remote_record) = remote_record_store.with_record(&opaque_record_key, rcb) else { - // // No local or remote record found, return None - // return Ok(None); - // }; - - // // Make local record - // let cur_ts = Timestamp::now(); - // let local_record = Record::new( - // cur_ts, - // remote_record.descriptor().clone(), - // LocalRecordDetail::new(safety_selection), - // )?; - // local_record_store - // .new_record(opaque_record_key.clone(), local_record) - // .await?; - - // // Move copy subkey data from remote to local store - // for subkey in remote_record.stored_subkeys().iter() { - // let Some(get_result) = remote_record_store - // .get_subkey(opaque_record_key.clone(), subkey, false) - // .await? - // else { - // // Subkey was missing - // veilid_log!(self warn "Subkey was missing: {} #{}", record_key, subkey); - // continue; - // }; - // let Some(subkey_data) = get_result.opt_value else { - // // Subkey was missing - // veilid_log!(self warn "Subkey data was missing: {} #{}", record_key, subkey); - // continue; - // }; - // local_record_store - // .set_subkey( - // opaque_record_key.clone(), - // subkey, - // subkey_data, - // InboundWatchUpdateMode::NoUpdate, - // ) - // .await?; - // } - - // // Move watches - // local_record_store.move_watches( - // opaque_record_key.clone(), - // remote_record_store.move_watches(opaque_record_key.clone(), None), - // ); - - // // Delete remote record from store - // remote_record_store - // .delete_record(opaque_record_key.clone()) - // .await?; - - // // Return record information as transferred to local record - // Ok(Some((remote_record.owner(), remote_record.schema()))) - // } -} diff --git a/veilid-core/src/storage_manager/rehydrate.rs b/veilid-core/src/storage_manager/rehydrate.rs index 746c8a69..9e9cfd3b 100644 --- a/veilid-core/src/storage_manager/rehydrate.rs +++ b/veilid-core/src/storage_manager/rehydrate.rs @@ -90,12 +90,7 @@ impl StorageManager { // See if the requested record is our local record store let local_inspect_result = self - .handle_inspect_local_value_inner( - &mut inner, - opaque_record_key.clone(), - subkeys.clone(), - true, - ) + .handle_inspect_local_value_inner(&mut inner, &opaque_record_key, subkeys.clone(), true) .await?; // Get rpc processor and drop mutex so we don't block while getting the value from the network @@ -112,7 +107,7 @@ impl StorageManager { // Get the inspect record report from the network let result = self .outbound_inspect_value( - opaque_record_key.clone(), + &opaque_record_key, local_inspect_result.subkeys().clone(), safety_selection.clone(), InspectResult::default(), @@ -150,13 +145,13 @@ impl StorageManager { async fn rehydrate_single_subkey_inner( &self, inner: &mut StorageManagerInner, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, safety_selection: SafetySelection, ) -> bool { // Get value to rehydrate with let get_result = match self - .handle_get_local_value_inner(inner, opaque_record_key.clone(), subkey, false) + .handle_get_local_value_inner(inner, opaque_record_key, subkey, false) .await { Ok(v) => v, @@ -178,7 +173,7 @@ impl StorageManager { veilid_log!(self debug "Rehydrating: record={} subkey={}", opaque_record_key, subkey); self.add_offline_subkey_write_inner( inner, - opaque_record_key, + opaque_record_key.clone(), subkey, safety_selection, data, @@ -207,7 +202,7 @@ impl StorageManager { if self .rehydrate_single_subkey_inner( &mut inner, - opaque_record_key.clone(), + &opaque_record_key, subkey, safety_selection.clone(), ) @@ -262,7 +257,7 @@ impl StorageManager { if self .rehydrate_single_subkey_inner( &mut inner, - opaque_record_key.clone(), + &opaque_record_key, subkey, safety_selection.clone(), ) diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index c2160298..c8e43702 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -82,7 +82,7 @@ impl StorageManager { // Make signed value data (encrypted) and value data (unencrypted) and get descriptor for this value let last_get_result = self - .handle_get_local_value_inner(&mut inner, opaque_record_key.clone(), subkey, true) + .handle_get_local_value_inner(&mut inner, &opaque_record_key, subkey, true) .await?; let (signed_value_data, value_data, descriptor) = @@ -114,7 +114,7 @@ impl StorageManager { // Use the safety selection we opened the record with let res_rx = match self .outbound_set_value( - opaque_record_key.clone(), + &opaque_record_key, subkey, safety_selection.clone(), signed_value_data.clone(), @@ -343,7 +343,7 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_set_value( &self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, safety_selection: SafetySelection, value: Arc, @@ -360,7 +360,7 @@ impl StorageManager { // Get the nodes we know are caching this value to seed the fanout let init_fanout_queue = { - self.get_value_nodes(opaque_record_key.clone()) + self.get_value_nodes(opaque_record_key) .await? .unwrap_or_default() .into_iter() @@ -567,6 +567,7 @@ impl StorageManager { // Call the fanout in a spawned task let registry = self.registry(); + let fanout_hash_coordinate = opaque_record_key.to_hash_coordinate(); spawn( "outbound_set_value fanout", Box::pin( @@ -574,7 +575,7 @@ impl StorageManager { let routing_table = registry.routing_table(); let fanout_call = FanoutCall::new( &routing_table, - opaque_record_key.to_hash_coordinate(), + fanout_hash_coordinate, key_count, fanout, consensus_count, @@ -704,15 +705,16 @@ impl StorageManager { ) -> Result, VeilidAPIError> { // Regain the lock after network access let mut inner = self.inner.lock().await; + let opaque_record_key = record_key.opaque(); // Report on fanout result offline let was_offline = - self.check_fanout_set_offline(record_key.opaque(), subkey, &result.fanout_result); + self.check_fanout_set_offline(&opaque_record_key, subkey, &result.fanout_result); if was_offline { // Failed to write, try again later self.add_offline_subkey_write_inner( &mut inner, - record_key.opaque(), + opaque_record_key.clone(), subkey, safety_selection, result.signed_value_data.clone(), @@ -722,7 +724,7 @@ impl StorageManager { // Keep the list of nodes that returned a value for later reference Self::process_fanout_results_inner( &mut inner, - record_key.opaque(), + opaque_record_key.clone(), core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)), true, self.config().network.dht.consensus_width as usize, @@ -731,10 +733,10 @@ impl StorageManager { // Record the set value locally since it was successfully set online self.handle_set_local_value_inner( &mut inner, - record_key.opaque(), + &opaque_record_key, subkey, + None, result.signed_value_data.clone(), - InboundWatchUpdateMode::UpdateAll, ) .await?; @@ -756,7 +758,7 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all)] pub async fn inbound_set_value( &self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, value: Arc, descriptor: Option>, @@ -765,13 +767,15 @@ impl StorageManager { let mut inner = self.inner.lock().await; // See if the subkey we are modifying has a last known remote value - let last_get_result = Self::handle_get_remote_value_inner( - &mut inner, - opaque_record_key.clone(), - subkey, - true, - ) - .await?; + let last_get_result = { + let Some(remote_record_store) = inner.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + remote_record_store + .get_subkey(opaque_record_key, subkey, true) + .await? + .unwrap_or_default() + }; // Make sure this value would actually be newer if let Some(last_value) = &last_get_result.opt_value { @@ -837,24 +841,42 @@ impl StorageManager { } // Do the set and return no new value - let res = Self::handle_set_remote_value_inner( - &mut inner, - opaque_record_key.clone(), - subkey, - value, - actual_descriptor, - InboundWatchUpdateMode::ExcludeTarget(target), - ) - .await; + + // See if it's in the remote record store + let Some(remote_record_store) = inner.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // See if we have a remote record already or not + if remote_record_store + .with_record(opaque_record_key, |_| {}) + .is_none() + { + // record didn't exist, make it + let cur_ts = Timestamp::now(); + let remote_record_detail = RemoteRecordDetail {}; + let record = + Record::::new(cur_ts, actual_descriptor, remote_record_detail)?; + remote_record_store + .new_record(opaque_record_key.clone(), record) + .await? + }; + + // Write subkey to remote store + let res = remote_record_store + .set_subkey( + opaque_record_key, + subkey, + None, + value, + InboundWatchUpdateMode::ExcludeTarget(target), + ) + .await; + match res { - Ok(()) => {} - Err(VeilidAPIError::Internal { message }) => { - apibail_internal!(message); - } - Err(e) => { - return Ok(NetworkResult::invalid_message(e)); - } + Ok(()) => Ok(NetworkResult::value(InboundSetValueResult::Success)), + Err(VeilidAPIError::Internal { message }) => Err(VeilidAPIError::Internal { message }), + Err(e) => Ok(NetworkResult::invalid_message(e)), } - Ok(NetworkResult::value(InboundSetValueResult::Success)) } } diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index db6cf214..e64d0f19 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -31,7 +31,7 @@ impl StorageManager { async fn write_single_offline_subkey( &self, stop_token: StopToken, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, safety_selection: SafetySelection, ) -> EyreResult { @@ -47,7 +47,7 @@ impl StorageManager { let get_result = { let mut inner = self.inner.lock().await; - self.handle_get_local_value_inner(&mut inner, opaque_record_key.clone(), subkey, true) + self.handle_get_local_value_inner(&mut inner, opaque_record_key, subkey, true) .await }; let Ok(get_result) = get_result else { @@ -67,7 +67,7 @@ impl StorageManager { veilid_log!(self debug "Offline subkey write: {}:{} len={}", opaque_record_key, subkey, value.value_data().data().len()); let osvres = self .outbound_set_value( - opaque_record_key.clone(), + opaque_record_key, subkey, safety_selection, value.clone(), @@ -92,10 +92,10 @@ impl StorageManager { self.handle_set_local_value_inner( &mut inner, - opaque_record_key.clone(), + opaque_record_key, subkey, + None, result.signed_value_data.clone(), - InboundWatchUpdateMode::UpdateAll, ) .await?; } @@ -136,7 +136,7 @@ impl StorageManager { let result = match self .write_single_offline_subkey( stop_token.clone(), - work_item.opaque_record_key.clone(), + &work_item.opaque_record_key, subkey, work_item.safety_selection.clone(), ) @@ -156,7 +156,7 @@ impl StorageManager { // Process non-partial setvalue result let was_offline = self.check_fanout_set_offline( - work_item.opaque_record_key.clone(), + &work_item.opaque_record_key, subkey, &result.fanout_result, ); @@ -187,7 +187,7 @@ impl StorageManager { let subkeys_still_offline = result.work_item.subkeys.difference(&result.written_subkeys); self.finish_offline_subkey_writes_inner( &mut inner, - result.work_item.opaque_record_key.clone(), + &result.work_item.opaque_record_key, result.written_subkeys, subkeys_still_offline, ); diff --git a/veilid-core/src/storage_manager/tasks/send_value_changes.rs b/veilid-core/src/storage_manager/tasks/send_value_changes.rs index da2ae212..efb6149b 100644 --- a/veilid-core/src/storage_manager/tasks/send_value_changes.rs +++ b/veilid-core/src/storage_manager/tasks/send_value_changes.rs @@ -17,11 +17,6 @@ impl StorageManager { { let mut inner = self.inner.lock().await; - if let Some(local_record_store) = &mut inner.local_record_store { - local_record_store - .take_value_changes(&mut value_changes) - .await; - } if let Some(remote_record_store) = &mut inner.remote_record_store { remote_record_store .take_value_changes(&mut value_changes) diff --git a/veilid-core/src/storage_manager/tests/test_types.rs b/veilid-core/src/storage_manager/tests/test_types.rs index ec2c8d51..020c9b73 100644 --- a/veilid-core/src/storage_manager/tests/test_types.rs +++ b/veilid-core/src/storage_manager/tests/test_types.rs @@ -5,7 +5,7 @@ use super::*; // encrypted_value_data pub fn test_encrypted_value_data() { - let orig = EncryptedValueData::new_with_seq( + let orig = EncryptedValueData::new( 42.into(), b"Brent Spiner".to_vec(), fix_fake_public_key(), diff --git a/veilid-core/src/storage_manager/transaction.rs b/veilid-core/src/storage_manager/transaction.rs index 700bd2dc..cbdc56ed 100644 --- a/veilid-core/src/storage_manager/transaction.rs +++ b/veilid-core/src/storage_manager/transaction.rs @@ -391,15 +391,18 @@ impl StorageManager { for record_info in transaction_state.get_record_infos() { let opaque_record_key = record_info.record_key().opaque(); + let transaction_range = ValueSubkeyRangeSet::from_iter( + record_info.commit_subkey_states().iter().map(|x| *x.0), + ); for (subkey, subkey_state) in record_info.commit_subkey_states().iter() { // Record the set value locally since it was successfully set online if let Some(signed_value_data) = subkey_state.opt_value.clone() { self.handle_set_local_value_inner( inner, - opaque_record_key.clone(), + &opaque_record_key, *subkey, + Some(&transaction_range), signed_value_data, - InboundWatchUpdateMode::UpdateAll, ) .await?; } diff --git a/veilid-core/src/storage_manager/transaction_begin.rs b/veilid-core/src/storage_manager/transaction_begin.rs index 9679deb1..1f5f45c3 100644 --- a/veilid-core/src/storage_manager/transaction_begin.rs +++ b/veilid-core/src/storage_manager/transaction_begin.rs @@ -53,13 +53,26 @@ pub(super) struct OutboundTransactBeginResult { #[derive(Clone, Debug)] pub(crate) enum InboundTransactBeginResult { /// Value transacted successfully - Success(TransactBeginResult), + Success(TransactBeginSuccess), /// Transaction unavailable due to limits TransactionUnavailable, /// Descriptor required but not provided, NeedDescriptor, } +/// The result of a single successful transaction begin +#[derive(Default, Debug, Clone)] +pub(crate) struct TransactBeginSuccess { + /// Transaction id + pub transaction_id: u64, + /// Expiration timestamp + pub expiration: Timestamp, + /// Descriptor + pub opt_descriptor: Option>, + /// Sequence numbers for record + pub seqs: Vec, +} + impl StorageManager { //////////////////////////////////////////////////////////////////////// @@ -89,7 +102,7 @@ impl StorageManager { // Get the nodes we know are caching this value to seed the fanout let init_fanout_queue = { - self.get_value_nodes(opaque_record_key.clone()) + self.get_value_nodes(&opaque_record_key) .await? .unwrap_or_default() .into_iter() @@ -107,7 +120,7 @@ impl StorageManager { let local_inspect_result = self .handle_inspect_local_value_inner( &mut inner, - opaque_record_key.clone(), + &opaque_record_key, ValueSubkeyRangeSet::full(), true, ) @@ -394,8 +407,8 @@ impl StorageManager { }; return remote_record_store - .new_inbound_transaction( - opaque_record_key, + .begin_inbound_transaction( + &opaque_record_key, opt_descriptor, want_descriptor, signing_member_id, diff --git a/veilid-core/src/storage_manager/transaction_command.rs b/veilid-core/src/storage_manager/transaction_command.rs index 480f7d3d..9529664e 100644 --- a/veilid-core/src/storage_manager/transaction_command.rs +++ b/veilid-core/src/storage_manager/transaction_command.rs @@ -31,6 +31,7 @@ pub(super) struct OutboundTransactCommandPerNodeResult { /// True if the transaction is still valid pub transaction_valid: bool, /// Return from the command (sequence numbers) + #[expect(dead_code)] pub opt_seqs: Option>, /// Return from the command (subkey number) pub opt_subkey: Option, @@ -51,18 +52,24 @@ pub(super) struct OutboundTransactCommandResult { #[derive(Clone, Debug)] pub(crate) enum InboundTransactCommandResult { /// Value transacted successfully - Success { - /// Expiration timestamp - expiration: Timestamp, - /// Sequence numbers - opt_seqs: Option>, - /// Subkey - opt_subkey: Option, - /// Value - opt_value: Option>, - }, + Success(TransactCommandSuccess), /// Transaction not valid InvalidTransaction, + /// Invalid arguments + InvalidArguments, +} + +/// The result of a single successful transaction command +#[derive(Default, Debug, Clone)] +pub(crate) struct TransactCommandSuccess { + /// Expiration timestamp + pub expiration: Timestamp, + /// Sequence numbers + pub opt_seqs: Option>, + /// Subkey + pub opt_subkey: Option, + /// Value + pub opt_value: Option>, } impl StorageManager { @@ -92,7 +99,7 @@ impl StorageManager { let local_inspect_result = self .handle_inspect_local_value_inner( &mut inner, - opaque_record_key.clone(), + &opaque_record_key, ValueSubkeyRangeSet::full(), true, ) @@ -197,7 +204,7 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all)] pub async fn inbound_transact_command( &self, - opaque_record_key: OpaqueRecordKey, + opaque_record_key: &OpaqueRecordKey, transaction_id: u64, command: TransactCommand, _opt_seqs: Option>, @@ -227,18 +234,17 @@ impl StorageManager { .commit_inbound_transaction(opaque_record_key, transaction_id) .await? } - TransactCommand::Rollback => { - remote_record_store - .rollback_inbound_transaction(opaque_record_key, transaction_id) - .await? - } + TransactCommand::Rollback => remote_record_store + .rollback_inbound_transaction(opaque_record_key, transaction_id)?, TransactCommand::Get => { let Some(subkey) = opt_subkey else { return Ok(NetworkResult::invalid_message("missing subkey")); }; - remote_record_store - .inbound_transaction_get(opaque_record_key, transaction_id, subkey) - .await? + remote_record_store.inbound_transaction_get( + opaque_record_key, + transaction_id, + subkey, + )? } TransactCommand::Set => { let Some(subkey) = opt_subkey else { @@ -247,9 +253,12 @@ impl StorageManager { let Some(value) = opt_value else { return Ok(NetworkResult::invalid_message("missing value")); }; - remote_record_store - .inbound_transaction_set(opaque_record_key, transaction_id, subkey, value) - .await? + remote_record_store.inbound_transaction_set( + opaque_record_key, + transaction_id, + subkey, + value, + )? } }; diff --git a/veilid-core/src/storage_manager/types/encrypted_value_data.rs b/veilid-core/src/storage_manager/types/encrypted_value_data.rs index cbf3f82a..3ea4efbd 100644 --- a/veilid-core/src/storage_manager/types/encrypted_value_data.rs +++ b/veilid-core/src/storage_manager/types/encrypted_value_data.rs @@ -11,11 +11,7 @@ pub struct EncryptedValueData { impl EncryptedValueData { pub const MAX_LEN: usize = 32768; - pub fn new(data: Vec, writer: PublicKey, nonce: Option) -> VeilidAPIResult { - Self::new_with_seq(ValueSeqNum::ZERO, data, writer, nonce) - } - - pub fn new_with_seq( + pub fn new( seq: ValueSeqNum, data: Vec, writer: PublicKey, @@ -234,7 +230,7 @@ impl<'de> serde::Deserialize<'de> for EncryptedValueData { Ok(EncryptedValueData { blob }) } Helper::Legacy(legacy) => { - EncryptedValueData::new_with_seq(legacy.seq, legacy.data, legacy.writer, None) + EncryptedValueData::new(legacy.seq, legacy.data, legacy.writer, None) .map_err(serde::de::Error::custom) } } @@ -249,12 +245,6 @@ mod tests { #[test] fn value_data_ok() { assert!(EncryptedValueData::new( - vec![0; EncryptedValueData::MAX_LEN], - fix_fake_public_key(), - None, - ) - .is_ok()); - assert!(EncryptedValueData::new_with_seq( ValueSeqNum::ZERO, vec![0; EncryptedValueData::MAX_LEN], fix_fake_public_key(), @@ -266,12 +256,6 @@ mod tests { #[test] fn value_data_too_long() { assert!(EncryptedValueData::new( - vec![0; EncryptedValueData::MAX_LEN + 1], - fix_fake_public_key(), - None, - ) - .is_err()); - assert!(EncryptedValueData::new_with_seq( ValueSeqNum::ZERO, vec![0; EncryptedValueData::MAX_LEN + 1], fix_fake_public_key(), diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 2af41f5a..f8e23745 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -438,7 +438,7 @@ impl StorageManager { // Get the nodes we know are caching this value to seed the fanout let init_fanout_queue = { - self.get_value_nodes(opaque_record_key) + self.get_value_nodes(&opaque_record_key) .await? .unwrap_or_default() .into_iter() @@ -1437,12 +1437,7 @@ impl StorageManager { let mut report_value_change = false; if let Some(value) = &value { let last_get_result = self - .handle_get_local_value_inner( - inner, - opaque_record_key.clone(), - first_subkey, - true, - ) + .handle_get_local_value_inner(inner, &opaque_record_key, first_subkey, true) .await?; let descriptor = last_get_result.opt_descriptor.unwrap(); @@ -1488,10 +1483,10 @@ impl StorageManager { if report_value_change { self.handle_set_local_value_inner( inner, - opaque_record_key.clone(), + &opaque_record_key, first_subkey, + None, value.clone(), - InboundWatchUpdateMode::NoUpdate, ) .await?; } @@ -1546,9 +1541,16 @@ impl StorageManager { // Announce ValueChanged VeilidUpdate // Cancellations (count=0) are sent by process_outbound_watch_dead(), not here if report_value_change { - let value = self.maybe_decrypt_value_data(&record_key, value.unwrap().value_data())?; + let value = self.maybe_decrypt_value_data( + &record_key, + value + .ok_or_else(|| { + VeilidAPIError::internal("value must be present to report value change") + })? + .value_data(), + )?; - // We have a value with a newer sequence number to report + // We have a single value with a newer sequence number to report self.update_callback_value_change( record_key, reportable_subkeys, diff --git a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs index 70db0583..42a2e261 100644 --- a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs +++ b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs @@ -112,12 +112,6 @@ impl ValueSubkeyRangeSet { } } -impl FromIterator for ValueSubkeyRangeSet { - fn from_iter>(iter: T) -> Self { - Self::new_with_data(RangeSetBlaze::from_iter(iter)) - } -} - // impl TryFrom]>> for ValueSubkeyRangeSet { // type Error = VeilidAPIError; @@ -182,6 +176,13 @@ impl FromStr for ValueSubkeyRangeSet { } } +impl FromIterator for ValueSubkeyRangeSet { + fn from_iter>(iter: T) -> Self { + let data = RangeSetBlaze::::from_iter(iter); + Self::new_with_data(data) + } +} + impl Deref for ValueSubkeyRangeSet { type Target = RangeSetBlaze;