From 1a53feea215d74dbfcbe473d7c079baeb2472378 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 23 Nov 2025 19:02:00 -0500 Subject: [PATCH] [ci skip] storage manager refactor continued --- .../src/storage_manager/close_record.rs | 4 +- .../src/storage_manager/create_record.rs | 4 +- veilid-core/src/storage_manager/debug.rs | 62 +- .../src/storage_manager/delete_record.rs | 5 +- veilid-core/src/storage_manager/get_value.rs | 41 +- .../src/storage_manager/inspect_value.rs | 4 +- .../local_record_store_interface.rs | 105 +-- veilid-core/src/storage_manager/mod.rs | 310 +++++---- .../storage_manager/offline_subkey_writes.rs | 30 +- .../src/storage_manager/open_record.rs | 90 +-- .../src/storage_manager/record_encryption.rs | 5 +- .../src/storage_manager/record_lock_table.rs | 128 +++- .../src/storage_manager/record_store/mod.rs | 628 ++++++++---------- .../{record_store_inner => }/record.rs | 0 .../record_store_inner/load_action.rs | 11 + .../record_store/record_store_inner/mod.rs | 82 ++- .../record_store_inner/record_index.rs | 106 ++- veilid-core/src/storage_manager/rehydrate.rs | 14 +- veilid-core/src/storage_manager/set_value.rs | 4 +- .../tasks/offline_subkey_writes.rs | 11 +- .../storage_manager/tasks/save_metadata.rs | 4 +- .../src/storage_manager/transaction.rs | 18 +- .../src/storage_manager/watch_value.rs | 9 +- 23 files changed, 908 insertions(+), 767 deletions(-) rename veilid-core/src/storage_manager/record_store/{record_store_inner => }/record.rs (100%) diff --git a/veilid-core/src/storage_manager/close_record.rs b/veilid-core/src/storage_manager/close_record.rs index fa97bf3d..f1904086 100644 --- a/veilid-core/src/storage_manager/close_record.rs +++ b/veilid-core/src/storage_manager/close_record.rs @@ -46,9 +46,7 @@ impl StorageManager { inner: &mut StorageManagerInner, record_key: RecordKey, ) -> VeilidAPIResult<()> { - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; + let local_record_store = self.get_local_record_store()?; let opaque_record_key = record_key.opaque(); if local_record_store diff --git a/veilid-core/src/storage_manager/create_record.rs b/veilid-core/src/storage_manager/create_record.rs index 44223144..dda9cfee 100644 --- a/veilid-core/src/storage_manager/create_record.rs +++ b/veilid-core/src/storage_manager/create_record.rs @@ -55,9 +55,7 @@ impl StorageManager { }; // Get local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; + let local_record_store = self.get_local_record_store()?; // Verify the dht schema does not contain the node id { diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index 733f9a27..fd8d7545 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -1,22 +1,20 @@ use super::*; impl StorageManager { - pub async fn debug_local_records(&self) -> String { - let inner = self.inner.lock().await; - let Some(local_record_store) = &inner.local_record_store else { + pub fn debug_local_records(&self) -> String { + let Ok(local_record_store) = self.get_local_record_store() else { return "not initialized".to_owned(); }; local_record_store.debug_records() } - pub async fn debug_remote_records(&self) -> String { - let inner = self.inner.lock().await; - let Some(remote_record_store) = &inner.remote_record_store else { + pub fn debug_remote_records(&self) -> String { + let Ok(remote_record_store) = self.get_remote_record_store() else { return "not initialized".to_owned(); }; remote_record_store.debug_records() } - pub async fn debug_opened_records(&self) -> String { - let inner = self.inner.lock().await; + pub fn debug_opened_records(&self) -> String { + let inner = self.inner.lock(); let mut out = "[\n".to_owned(); for (k, v) in &inner.opened_records { let writer = if let Some(w) = v.writer() { @@ -33,18 +31,18 @@ impl StorageManager { } format!("{}]\n", out) } - pub async fn debug_watched_records(&self) -> String { - let inner = self.inner.lock().await; + pub fn debug_watched_records(&self) -> String { + let inner = self.inner.lock(); inner.outbound_watch_manager.to_string() } - pub async fn debug_transactions(&self) -> String { - let inner = self.inner.lock().await; + pub fn debug_transactions(&self) -> String { + let inner = self.inner.lock(); inner.outbound_transaction_manager.to_string() } - pub async fn debug_offline_records(&self) -> String { - let inner = self.inner.lock().await; - let Some(local_record_store) = &inner.local_record_store else { + pub fn debug_offline_records(&self) -> String { + let inner = self.inner.lock(); + let Some(local_record_store) = inner.local_record_store.clone() else { return "not initialized".to_owned(); }; @@ -59,14 +57,14 @@ impl StorageManager { format!("{}]\n", out) } - pub async fn purge_local_records(&self, reclaim: Option) -> String { - let mut inner = self.inner.lock().await; + pub fn purge_local_records(&self, reclaim: Option) -> String { + let mut inner = self.inner.lock(); + let Some(local_record_store) = inner.local_record_store.clone() else { + return "not initialized".to_owned(); + }; if !inner.opened_records.is_empty() { return "records still opened".to_owned(); } - let Some(local_record_store) = &mut inner.local_record_store else { - return "not initialized".to_owned(); - }; let (reclaimed, total) = local_record_store .reclaim_space(reclaim.unwrap_or(u64::MAX)) .await; @@ -77,13 +75,13 @@ impl StorageManager { ) } pub async fn purge_remote_records(&self, reclaim: Option) -> String { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); + let Some(remote_record_store) = inner.remote_record_store.clone() else { + return "not initialized".to_owned(); + }; if !inner.opened_records.is_empty() { return "records still opened".to_owned(); } - let Some(remote_record_store) = &mut inner.remote_record_store else { - return "not initialized".to_owned(); - }; let (reclaimed, total) = remote_record_store .reclaim_space(reclaim.unwrap_or(u64::MAX)) .await; @@ -98,8 +96,8 @@ impl StorageManager { record_key: RecordKey, subkey: ValueSubkey, ) -> String { - let inner = self.inner.lock().await; - let Some(local_record_store) = &inner.local_record_store else { + let inner = self.inner.lock(); + let Some(local_record_store) = inner.local_record_store.clone() else { return "not initialized".to_owned(); }; let opaque_record_key = record_key.opaque(); @@ -112,8 +110,8 @@ impl StorageManager { record_key: RecordKey, subkey: ValueSubkey, ) -> String { - let inner = self.inner.lock().await; - let Some(remote_record_store) = &inner.remote_record_store else { + let inner = self.inner.lock(); + let Some(remote_record_store) = inner.remote_record_store.clone() else { return "not initialized".to_owned(); }; let opaque_record_key = record_key.opaque(); @@ -122,8 +120,8 @@ impl StorageManager { .await } pub async fn debug_local_record_info(&self, record_key: RecordKey) -> String { - let inner = self.inner.lock().await; - let Some(local_record_store) = &inner.local_record_store else { + let inner = self.inner.lock(); + let Some(local_record_store) = inner.local_record_store.clone() else { return "not initialized".to_owned(); }; let opaque_record_key = record_key.opaque(); @@ -140,8 +138,8 @@ impl StorageManager { } pub async fn debug_remote_record_info(&self, record_key: RecordKey) -> String { - let inner = self.inner.lock().await; - let Some(remote_record_store) = &inner.remote_record_store else { + let inner = self.inner.lock(); + let Some(remote_record_store) = inner.remote_record_store.clone() else { return "not initialized".to_owned(); }; let opaque_record_key = record_key.opaque(); diff --git a/veilid-core/src/storage_manager/delete_record.rs b/veilid-core/src/storage_manager/delete_record.rs index 6f2ed46b..403b464a 100644 --- a/veilid-core/src/storage_manager/delete_record.rs +++ b/veilid-core/src/storage_manager/delete_record.rs @@ -13,10 +13,7 @@ impl StorageManager { Self::close_record_inner(&mut inner, record_key.clone())?; // Get record from the local store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - + let local_record_store = self.get_local_record_store()?; let opaque_record_key = record_key.opaque(); // Remove the record from the local store local_record_store.delete_record(opaque_record_key).await diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index a1a20fc2..445045ff 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -45,9 +45,13 @@ impl StorageManager { let opaque_record_key = record_key.opaque(); - let mut inner = self.inner.lock().await; + let subkey_lock = self + .record_lock_table + .lock_subkey(opaque_record_key, subkey) + .await; let safety_selection = { + let inner = self.inner.lock(); let Some(opened_record) = inner.opened_records.get(&opaque_record_key) else { apibail_generic!("record not open"); }; @@ -55,9 +59,7 @@ impl StorageManager { }; // See if the requested subkey is our local record store - let last_get_result = self - .handle_get_local_value_inner(&mut inner, &opaque_record_key, subkey, true) - .await?; + let last_get_result = self.handle_get_local_value(&subkey_lock, true).await?; // Return the existing value if we have one unless we are forcing a refresh if !force_refresh { @@ -81,9 +83,6 @@ impl StorageManager { apibail_try_again!("offline, try again later"); }; - // Drop the lock for network access - drop(inner); - // May have last descriptor / value // Use the safety selection we opened the record with let last_seq = last_get_result @@ -479,7 +478,10 @@ impl StorageManager { } }; let is_incomplete = result.fanout_result.kind.is_incomplete(); - let value_data = match this.process_outbound_get_value_result(&key.opaque(), subkey, last_seq, result).await { + + let subkey_lock = this.record_lock_table.lock_subkey(key.opaque(), subkey).await; + + let value_data = match this.process_outbound_get_value_result(&subkey_lock, last_seq, result).await { Ok(Some(v)) => v, Ok(None) => { return is_incomplete; @@ -524,8 +526,7 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all)] pub(super) async fn process_outbound_get_value_result( &self, - opaque_record_key: &OpaqueRecordKey, - subkey: ValueSubkey, + subkey_lock: &SubkeyLockGuard, last_seq: ValueSeqNum, result: get_value::OutboundGetValueResult, ) -> Result, VeilidAPIError> { @@ -536,12 +537,12 @@ impl StorageManager { }; // Keep the list of nodes that returned a value for later reference - let mut inner = self.inner.lock().await; - - Self::process_fanout_results_inner( - &mut inner, - opaque_record_key.clone(), - core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)), + self.process_fanout_results( + subkey_lock.record(), + core::iter::once(( + ValueSubkeyRangeSet::single(subkey_lock.subkey()), + result.fanout_result, + )), false, self.config().network.dht.consensus_width as usize, ); @@ -549,14 +550,10 @@ impl StorageManager { // If we got a new value back then write it to the opened record if get_result_value.value_data().seq() != last_seq { let subkey_transaction_changes = self - .handle_set_local_values_single_inner( - &mut inner, - opaque_record_key, - vec![(subkey, get_result_value.clone())], - ) + .handle_set_local_value_single(&subkey_lock, get_result_value.clone()) .await?; - self.handle_commit_local_values_inner(&mut inner, subkey_transaction_changes) + self.handle_commit_local_values(subkey_transaction_changes) .await?; } Ok(Some(get_result_value.value_data().clone())) diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 85a80d9e..5ef39e92 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -148,7 +148,6 @@ impl StorageManager { .await?; // Keep the list of nodes that returned a value for later reference - let mut inner = self.inner.lock().await; let results_iter = result .inspect_result .subkeys() @@ -156,8 +155,7 @@ impl StorageManager { .map(ValueSubkeyRangeSet::single) .zip(result.subkey_fanout_results.into_iter()); - Self::process_fanout_results_inner( - &mut inner, + self.process_fanout_results( opaque_record_key.clone(), results_iter, false, diff --git a/veilid-core/src/storage_manager/local_record_store_interface.rs b/veilid-core/src/storage_manager/local_record_store_interface.rs index 258c15ef..69a1161a 100644 --- a/veilid-core/src/storage_manager/local_record_store_interface.rs +++ b/veilid-core/src/storage_manager/local_record_store_interface.rs @@ -2,30 +2,27 @@ use super::*; impl StorageManager { #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_get_local_value_inner( + pub(super) async fn handle_get_local_value( &self, - inner: &mut StorageManagerInner, - opaque_record_key: &OpaqueRecordKey, - subkey: ValueSubkey, + subkey_lock: &SubkeyLockGuard, want_descriptor: bool, ) -> VeilidAPIResult { + let opaque_record_key = subkey_lock.record(); + let subkey = subkey_lock.subkey(); + + let local_record_store = self.get_local_record_store()?; + // See if the value is in the offline subkey writes first, // since it may not have been committed yet to the local record store - if let Some(get_result) = self.get_offline_subkey_writes_subkey( - inner, - opaque_record_key, - subkey, - want_descriptor, - )? { + if let Some(get_result) = + self.get_offline_subkey_writes_subkey(&opaque_record_key, subkey, want_descriptor)? + { return Ok(get_result); } // See if it's in the local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; if let Some(get_result) = local_record_store - .get_subkey(opaque_record_key, subkey, want_descriptor) + .get_subkey(&opaque_record_key, subkey, want_descriptor) .await? { return Ok(get_result); @@ -35,60 +32,72 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_set_local_values_single_inner( + pub(super) async fn handle_set_local_value_single( &self, - inner: &mut StorageManagerInner, - opaque_record_key: &OpaqueRecordKey, + subkey_lock: &SubkeyLockGuard, + value: Arc, + ) -> VeilidAPIResult { + let opaque_record_key = subkey_lock.record(); + let subkey = subkey_lock.subkey(); + + // Write subkey to local store + let local_record_store = self.get_local_record_store()?; + local_record_store + .set_subkeys_single_record(&opaque_record_key, vec![(subkey, value)]) + .await + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_set_local_values_single( + &self, + records_lock: &RecordsLockGuard, subkeys: SubkeyValueList, ) -> VeilidAPIResult { - // See if it's in the local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; + let opaque_record_key = records_lock.single_record()?; // Write subkey to local store + let local_record_store = self.get_local_record_store()?; local_record_store - .set_subkeys_single_record(opaque_record_key, subkeys.clone()) + .set_subkeys_single_record(&opaque_record_key, subkeys) .await } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_set_local_values_multiple_inner( + pub(super) async fn handle_set_local_values_multiple( &self, - inner: &mut StorageManagerInner, + records_lock: &RecordsLockGuard, keys_and_subkeys: RecordSubkeyValueList, ) -> VeilidAPIResult { - // See if it's in the local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - + let records = records_lock.records().into_iter().collect::>(); + for x in keys_and_subkeys.iter() { + if !records.contains(&x.0) { + apibail_internal!("invalid records lock") + } + } // Write subkey to local store + let local_record_store = self.get_local_record_store()?; local_record_store - .set_subkeys_multiple_records(keys_and_subkeys.clone()) + .set_subkeys_multiple_records(keys_and_subkeys) .await } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_commit_local_values_inner( + pub(super) async fn handle_commit_local_values( &self, - inner: &mut StorageManagerInner, subkey_transaction_changes: SubkeyTransactionChanges, ) -> VeilidAPIResult<()> { // See if it's in the local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - + let local_record_store = self.get_local_record_store()?; let record_subkey_value_list = local_record_store .commit_subkeys_tx(subkey_transaction_changes, InboundWatchUpdateMode::NoUpdate) .await?; // See if this new data supercedes any offline subkey writes + let mut inner = self.inner.lock(); for (opaque_record_key, subkey_value_list) in record_subkey_value_list { for (subkey, signed_value_data) in subkey_value_list { self.remove_old_offline_subkey_writes_inner( - inner, + &mut inner, &opaque_record_key, subkey, signed_value_data, @@ -100,19 +109,19 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_inspect_local_value_inner( + pub(super) async fn handle_inspect_local_value( &self, - inner: &mut StorageManagerInner, - opaque_record_key: &OpaqueRecordKey, + records_lock: &RecordsLockGuard, subkeys: ValueSubkeyRangeSet, want_descriptor: bool, ) -> VeilidAPIResult { + let opaque_record_key = records_lock.single_record()?; + // See if it's in the local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; + let local_record_store = self.get_local_record_store()?; + if let Some(inspect_result) = local_record_store - .inspect_record(opaque_record_key, &subkeys, want_descriptor) + .inspect_record(&opaque_record_key, &subkeys, want_descriptor) .await? { return Ok(inspect_result); @@ -124,13 +133,13 @@ impl StorageManager { #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn get_value_nodes( &self, + records_lock: &RecordsLockGuard, opaque_record_key: &OpaqueRecordKey, ) -> VeilidAPIResult>> { - let inner = self.inner.lock().await; + // xxx is this the right records lock?? + // Get local record store - let Some(local_record_store) = inner.local_record_store.as_ref() else { - apibail_not_initialized!(); - }; + let local_record_store = self.get_local_record_store()?; // Get routing table to see if we still know about these nodes let routing_table = self.routing_table(); diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 5e22c604..a5c6ece6 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -102,8 +102,8 @@ const OFFLINE_SUBKEY_WRITES: &[u8] = b"offline_subkey_writes"; const OUTBOUND_WATCH_MANAGER: &[u8] = b"outbound_watch_manager"; /// Rehydration requests metadata key name for rehydration persistence const REHYDRATION_REQUESTS: &[u8] = b"rehydration_requests"; -/// SetValue descriptor cache metadata key name for persistence -const SET_VALUE_DESCRIPTOR_CACHE: &[u8] = b"set_value_descriptor_cace"; +/// Descriptor cache metadata key name for persistence +const DESCRIPTOR_CACHE: &[u8] = b"descriptor_cache"; #[derive(Debug, Clone)] /// A single 'value changed' message to send @@ -175,7 +175,7 @@ impl fmt::Debug for StorageManagerInner { pub(crate) struct StorageManager { registry: VeilidComponentRegistry, - inner: AsyncMutex, + inner: Mutex, startup_lock: Arc, // Background processes @@ -192,14 +192,9 @@ pub(crate) struct StorageManager { // Anonymous watch keys that will be used when watching or transacting on records or we opened without a writer anonymous_signing_keys: KeyPairGroup, - // Outbound watch operation lock - // Keeps changes to watches to one-at-a-time per record - outbound_watch_lock_table: RecordLockTable, - - // Outbound transaction record locks - // Allow begin/commit/rollback/inspect/sync to be exclusive, and set/get to be per subkey - // Prevents concurrent conflicting operations on the same record and/or subkey - outbound_transaction_lock_table: RecordLockTable, + // Record operation lock + // Keeps changes to records to one-at-a-time per record + record_lock_table: RecordLockTable, // Background operation processor // for offline subkey writes, watch changes, and any other @@ -218,27 +213,12 @@ impl fmt::Debug for StorageManager { f.debug_struct("StorageManager") .field("registry", &self.registry) .field("inner", &self.inner) - // .field("flush_record_stores_task", &self.flush_record_stores_task) - // .field( - // "offline_subkey_writes_task", - // &self.offline_subkey_writes_task, - // ) - // .field("send_value_changes_task", &self.send_value_changes_task) - // .field("check_active_watches_task", &self.check_active_watches_task) - // .field( - // "check_watched_records_task", - // &self.check_watched_records_task, - // ) - .field("outbound_watch_lock_table", &self.outbound_watch_lock_table) - .field( - "outbound_transaction_lock_table", - &self.outbound_transaction_lock_table, - ) + .field("record_lock_table", &self.record_lock_table) .field( "background_operation_processor", &self.background_operation_processor, ) - .field("anonymous_watch_keys", &self.anonymous_signing_keys) + .field("anonymous_signing_keys", &self.anonymous_signing_keys) .field("is_online", &self.is_online) .field("descriptor_cache", &self.descriptor_cache) .finish() @@ -248,10 +228,6 @@ impl fmt::Debug for StorageManager { impl_veilid_component!(StorageManager); impl StorageManager { - fn new_inner() -> StorageManagerInner { - StorageManagerInner::default() - } - pub fn new(registry: VeilidComponentRegistry) -> StorageManager { let crypto = registry.crypto(); @@ -263,10 +239,9 @@ impl StorageManager { anonymous_signing_keys.add(kp); } - let inner = Self::new_inner(); let this = StorageManager { registry, - inner: AsyncMutex::new(inner), + inner: Default::default(), startup_lock: Arc::new(StartupLock::new()), save_metadata_task: TickTask::new("save_metadata_task", SAVE_METADATA_INTERVAL_SECS), @@ -302,8 +277,7 @@ impl StorageManager { "rehydrate_records_task", REHYDRATE_RECORDS_INTERVAL_SECS, ), - outbound_watch_lock_table: RecordLockTable::new(), - outbound_transaction_lock_table: RecordLockTable::new(), + record_lock_table: RecordLockTable::new(), anonymous_signing_keys, background_operation_processor: DeferredStreamProcessor::new(), is_online: AtomicBool::new(false), @@ -377,13 +351,14 @@ impl StorageManager { RecordStore::try_new(&table_store, "remote", remote_limits).await?; { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); inner.metadata_db = Some(metadata_db); inner.local_record_store = Some(local_record_store); inner.remote_record_store = Some(remote_record_store); - self.load_metadata_inner(&mut inner).await?; } + self.load_metadata().await?; + // Start deferred results processors self.background_operation_processor.init(); @@ -399,7 +374,7 @@ impl StorageManager { impl_subscribe_event_bus_async!(self, Self, peer_info_change_event_handler); let tick_subscription = impl_subscribe_event_bus_async!(self, Self, tick_event_handler); - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); // Resolve outbound watch manager noderefs inner.outbound_watch_manager.prepare(&self.routing_table()); @@ -420,7 +395,7 @@ impl StorageManager { async fn pre_terminate_async(&self) { // Stop background operations { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); if let Some(sub) = inner.peer_info_change_subscription.take() { self.event_bus().unsubscribe(sub); } @@ -448,28 +423,30 @@ impl StorageManager { self.background_operation_processor.terminate().await; // Terminate and release the storage manager + let (opt_local_record_store, opt_remote_record_store) = { + let mut inner = self.inner.lock(); + let opt_local_record_store = inner.local_record_store.take(); + let opt_remote_record_store = inner.remote_record_store.take(); + (opt_local_record_store, opt_remote_record_store) + }; + + // Final flush on record stores + if let Some(local_record_store) = opt_local_record_store { + local_record_store.flush().await; + } + if let Some(remote_record_store) = opt_remote_record_store { + remote_record_store.flush().await; + } + + // Save metadata + if let Err(e) = self.save_metadata().await { + veilid_log!(self error "termination metadata save failed: {}", e); + } + + // Reset inner state { - let mut inner = self.inner.lock().await; - - // Final flush on record stores - if let Some(mut local_record_store) = inner.local_record_store.take() { - if let Err(e) = local_record_store.flush().await { - veilid_log!(self error "termination local record store tick failed: {}", e); - } - } - if let Some(mut remote_record_store) = inner.remote_record_store.take() { - if let Err(e) = remote_record_store.flush().await { - veilid_log!(self error "termination remote record store tick failed: {}", e); - } - } - - // Save metadata - if let Err(e) = self.save_metadata_inner(&mut inner).await { - veilid_log!(self error "termination metadata save failed: {}", e); - } - - // Reset inner state - *inner = Self::new_inner(); + let mut inner = self.inner.lock(); + *inner = Default::default(); } guard.success(); @@ -477,102 +454,153 @@ impl StorageManager { veilid_log!(self debug "finished storage manager shutdown"); } - async fn save_metadata_inner(&self, inner: &mut StorageManagerInner) -> EyreResult<()> { - if let Some(metadata_db) = &inner.metadata_db { - let tx = metadata_db.transact(); - let set_value_descriptor_cache = self + async fn save_metadata(&self) -> EyreResult<()> { + let ( + metadata_db, + offline_subkey_writes_json, + outbound_watch_manager_json, + rehydration_requests_json, + descriptor_cache_json, + ) = { + let descriptor_cache = self .descriptor_cache .lock() .iter() .map(|x| x.0.clone()) .collect::>(); - tx.store_json(0, OFFLINE_SUBKEY_WRITES, &inner.offline_subkey_writes) - .await?; - tx.store_json(0, OUTBOUND_WATCH_MANAGER, &inner.outbound_watch_manager) - .await?; - tx.store_json(0, REHYDRATION_REQUESTS, &inner.rehydration_requests) - .await?; - tx.store_json(0, SET_VALUE_DESCRIPTOR_CACHE, &set_value_descriptor_cache) - .await?; + let inner = self.inner.lock(); + let Some(metadata_db) = inner.metadata_db.clone() else { + return Ok(()); + }; + + let offline_subkey_writes_json = serde_json::to_vec(&inner.offline_subkey_writes) + .map_err(VeilidAPIError::internal)?; + let outbound_watch_manager_json = serde_json::to_vec(&inner.outbound_watch_manager) + .map_err(VeilidAPIError::internal)?; + let rehydration_requests_json = serde_json::to_vec(&inner.rehydration_requests) + .map_err(VeilidAPIError::internal)?; + let descriptor_cache_json = + serde_json::to_vec(&descriptor_cache).map_err(VeilidAPIError::internal)?; + + ( + metadata_db, + offline_subkey_writes_json, + outbound_watch_manager_json, + rehydration_requests_json, + descriptor_cache_json, + ) + }; + + let tx = metadata_db.transact(); + + tx.store(0, OFFLINE_SUBKEY_WRITES, &offline_subkey_writes_json) + .await?; + tx.store(0, OUTBOUND_WATCH_MANAGER, &outbound_watch_manager_json) + .await?; + tx.store(0, REHYDRATION_REQUESTS, &rehydration_requests_json) + .await?; + tx.store(0, DESCRIPTOR_CACHE, &descriptor_cache_json) + .await?; + + tx.commit().await.wrap_err("failed to commit")?; - tx.commit().await.wrap_err("failed to commit")? - } Ok(()) } - async fn load_metadata_inner(&self, inner: &mut StorageManagerInner) -> EyreResult<()> { - if let Some(metadata_db) = &inner.metadata_db { - inner.offline_subkey_writes = match metadata_db - .load_json(0, OFFLINE_SUBKEY_WRITES) - .await - { - Ok(v) => v.unwrap_or_default(), - Err(_) => { - if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await { - veilid_log!(self debug "offline_subkey_writes format changed, clearing: {}", e); - } - Default::default() - } - }; - inner.outbound_watch_manager = match metadata_db - .load_json(0, OUTBOUND_WATCH_MANAGER) - .await - { - Ok(v) => v.unwrap_or_default(), - Err(_) => { - if let Err(e) = metadata_db.delete(0, OUTBOUND_WATCH_MANAGER).await { - veilid_log!(self debug "outbound_watch_manager format changed, clearing: {}", e); - } - Default::default() - } - }; + async fn load_metadata(&self) -> EyreResult<()> { + let Some(metadata_db) = self.inner.lock().metadata_db.clone() else { + bail!("metadata db should exist"); + }; - inner.rehydration_requests = match metadata_db.load_json(0, REHYDRATION_REQUESTS).await - { - Ok(v) => v.unwrap_or_default(), - Err(_) => { - if let Err(e) = metadata_db.delete(0, REHYDRATION_REQUESTS).await { - veilid_log!(self debug "rehydration_requests format changed, clearing: {}", e); - } - Default::default() + let offline_subkey_writes = match metadata_db.load_json(0, OFFLINE_SUBKEY_WRITES).await { + Ok(v) => v.unwrap_or_default(), + Err(_) => { + if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await { + veilid_log!(self debug "offline_subkey_writes format changed, clearing: {}", e); } - }; - let set_value_descriptor_cache_keys = match metadata_db - .load_json::>(0, SET_VALUE_DESCRIPTOR_CACHE) - .await - { - Ok(v) => v.unwrap_or_default(), - Err(_) => { - if let Err(e) = metadata_db.delete(0, SET_VALUE_DESCRIPTOR_CACHE).await { - veilid_log!(self debug "set_value_descriptor_cache format changed, clearing: {}", e); - } - Default::default() + Default::default() + } + }; + let outbound_watch_manager = match metadata_db.load_json(0, OUTBOUND_WATCH_MANAGER).await { + Ok(v) => v.unwrap_or_default(), + Err(_) => { + if let Err(e) = metadata_db.delete(0, OUTBOUND_WATCH_MANAGER).await { + veilid_log!(self debug "outbound_watch_manager format changed, clearing: {}", e); } - }; - { - let mut set_value_descriptor_cache = self.descriptor_cache.lock(); - set_value_descriptor_cache.clear(); - for k in set_value_descriptor_cache_keys { - set_value_descriptor_cache.insert(k, ()); + Default::default() + } + }; + + let rehydration_requests = match metadata_db.load_json(0, REHYDRATION_REQUESTS).await { + Ok(v) => v.unwrap_or_default(), + Err(_) => { + if let Err(e) = metadata_db.delete(0, REHYDRATION_REQUESTS).await { + veilid_log!(self debug "rehydration_requests format changed, clearing: {}", e); } + Default::default() + } + }; + let descriptor_cache_keys = match metadata_db + .load_json::>(0, DESCRIPTOR_CACHE) + .await + { + Ok(v) => v.unwrap_or_default(), + Err(_) => { + if let Err(e) = metadata_db.delete(0, DESCRIPTOR_CACHE).await { + veilid_log!(self debug "descriptor_cache format changed, clearing: {}", e); + } + Default::default() + } + }; + + { + let mut inner = self.inner.lock(); + inner.offline_subkey_writes = offline_subkey_writes; + inner.outbound_watch_manager = outbound_watch_manager; + inner.rehydration_requests = rehydration_requests; + } + + { + let mut descriptor_cache = self.descriptor_cache.lock(); + descriptor_cache.clear(); + for k in descriptor_cache_keys { + descriptor_cache.insert(k, ()); } } Ok(()) } - async fn has_offline_subkey_writes(&self) -> bool { - !self.inner.lock().await.offline_subkey_writes.is_empty() + fn has_offline_subkey_writes(&self) -> bool { + !self.inner.lock().offline_subkey_writes.is_empty() } - async fn has_rehydration_requests(&self) -> bool { - !self.inner.lock().await.rehydration_requests.is_empty() + fn has_rehydration_requests(&self) -> bool { + !self.inner.lock().rehydration_requests.is_empty() } fn dht_is_online(&self) -> bool { self.is_online.load(Ordering::Acquire) } + fn get_local_record_store(&self) -> VeilidAPIResult> { + self.inner + .lock() + .local_record_store + .as_ref() + .cloned() + .ok_or_else(VeilidAPIError::not_initialized) + } + + fn get_remote_record_store(&self) -> VeilidAPIResult> { + self.inner + .lock() + .remote_record_store + .as_ref() + .cloned() + .ok_or_else(VeilidAPIError::not_initialized) + } + // Send a value change up through the callback #[instrument(level = "trace", target = "stor", skip(self, value))] fn update_callback_value_change( @@ -637,27 +665,23 @@ impl StorageManager { //////////////////////////////////////////////////////////////////////// #[instrument(level = "trace", target = "stor", skip_all)] - fn process_fanout_results_inner>( - inner: &mut StorageManagerInner, + fn process_fanout_results>( + &self, opaque_record_key: OpaqueRecordKey, subkey_results_iter: I, is_set: bool, consensus_width: usize, - ) { - // Get local record store - let local_record_store = inner.local_record_store.as_mut().unwrap(); - + ) -> VeilidAPIResult<()> { let cur_ts = Timestamp::now(); - local_record_store.with_record_mut(&opaque_record_key, |r| { - let d = r.detail_mut(); - + let local_record_store = self.get_local_record_store()?; + local_record_store.with_record_detail_mut(&opaque_record_key, |detail| { for (subkeys, fanout_result) in subkey_results_iter { for node_id in fanout_result .value_nodes .iter() .filter_map(|x| x.node_ids().get(opaque_record_key.kind())) { - let pnd = d.nodes.entry(node_id).or_default(); + let pnd = detail.nodes.entry(node_id).or_default(); if is_set || pnd.last_set == Timestamp::default() { pnd.last_set = cur_ts; } @@ -667,7 +691,7 @@ impl StorageManager { } // Purge nodes down to the N most recently seen, where N is the consensus width - let mut nodes_ts = d + let mut nodes_ts = detail .nodes .iter() .map(|kv| (kv.0.clone(), kv.1.last_seen)) @@ -690,9 +714,11 @@ impl StorageManager { }); for dead_node_key in nodes_ts.iter().skip(consensus_width) { - d.nodes.remove(&dead_node_key.0); + detail.nodes.remove(&dead_node_key.0); } }); + + Ok(()) } #[instrument(level = "trace", target = "stor", skip_all)] diff --git a/veilid-core/src/storage_manager/offline_subkey_writes.rs b/veilid-core/src/storage_manager/offline_subkey_writes.rs index 338b6355..089fd5d4 100644 --- a/veilid-core/src/storage_manager/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/offline_subkey_writes.rs @@ -48,19 +48,21 @@ impl StorageManager { pub(super) fn get_offline_subkey_writes_subkey( &self, - inner: &mut StorageManagerInner, opaque_record_key: &OpaqueRecordKey, subkey: ValueSubkey, want_descriptor: bool, ) -> VeilidAPIResult> { - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - let Some(osw) = inner.offline_subkey_writes.get(opaque_record_key) else { - return Ok(None); - }; - let Some(signed_value_data) = osw.subkey_value_data.get(&subkey).cloned() else { - return Ok(None); + let local_record_store = self.get_local_record_store()?; + + let signed_value_data = { + let inner = self.inner.lock(); + let Some(osw) = inner.offline_subkey_writes.get(opaque_record_key) else { + return Ok(None); + }; + let Some(signed_value_data) = osw.subkey_value_data.get(&subkey).cloned() else { + return Ok(None); + }; + signed_value_data }; let opt_descriptor = if want_descriptor { if let Some(descriptor) = local_record_store @@ -141,9 +143,8 @@ impl StorageManager { /// so we can try again later. If the data associated with the write is no longer necessary /// we can drop it. #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn finish_offline_subkey_writes_inner( + pub(super) fn finish_offline_subkey_writes( &self, - inner: &mut StorageManagerInner, opaque_record_key: &OpaqueRecordKey, subkeys_written: ValueSubkeyRangeSet, subkeys_still_offline: ValueSubkeyRangeSet, @@ -154,7 +155,12 @@ impl StorageManager { ); // Get the offline subkey write record - match inner.offline_subkey_writes.entry(opaque_record_key.clone()) { + match self + .inner + .lock() + .offline_subkey_writes + .entry(opaque_record_key.clone()) + { hashlink::linked_hash_map::Entry::Occupied(mut o) => { let finished = { let osw = o.get_mut(); diff --git a/veilid-core/src/storage_manager/open_record.rs b/veilid-core/src/storage_manager/open_record.rs index ab1efa42..06181cc1 100644 --- a/veilid-core/src/storage_manager/open_record.rs +++ b/veilid-core/src/storage_manager/open_record.rs @@ -12,22 +12,22 @@ impl StorageManager { let Ok(_guard) = self.startup_lock.enter() else { apibail_not_initialized!(); }; - - let mut inner = self.inner.lock().await; let opaque_record_key = record_key.opaque(); + let record_lock = self + .record_lock_table + .lock_record(opaque_record_key.clone()) + .await; // See if we have a local record already or not if let Some(res) = self - .open_existing_record_inner( - &mut inner, + .open_existing_record_locked( + &record_lock, record_key.clone(), writer.clone(), safety_selection.clone(), ) .await? { - drop(inner); - // We had an existing record, so check the network to see if we should // update it with what we have here let set_consensus = self.config().network.dht.set_value_count as usize; @@ -36,8 +36,7 @@ impl StorageManager { opaque_record_key, ValueSubkeyRangeSet::full(), set_consensus, - ) - .await; + ); return Ok(res); } @@ -47,9 +46,6 @@ impl StorageManager { apibail_try_again!("offline, try again later"); }; - // Drop the mutex so we dont block during network access - drop(inner); - // No last descriptor, no last value // Use the safety selection we opened the record with let result = self @@ -71,11 +67,9 @@ impl StorageManager { // Check again to see if we have a local record already or not // because waiting for the outbound_inspect_value action could result in the key being opened // via some parallel process - let mut inner = self.inner.lock().await; - if let Some(res) = self - .open_existing_record_inner( - &mut inner, + .open_existing_record_locked( + &record_lock, record_key.clone(), writer.clone(), safety_selection.clone(), @@ -89,8 +83,8 @@ impl StorageManager { } // Open the new record - self.open_new_record_inner( - &mut inner, + self.open_new_record_locked( + &record_lock, record_key, writer, result.inspect_result, @@ -102,35 +96,38 @@ impl StorageManager { //////////////////////////////////////////////////////////////////////// #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn open_existing_record_inner( + pub(super) async fn open_existing_record_locked( &self, - inner: &mut StorageManagerInner, + record_lock: &RecordsLockGuard, record_key: RecordKey, writer: Option, safety_selection: SafetySelection, ) -> VeilidAPIResult> { + let opaque_record_key = record_lock.single_record()?; + if record_key.opaque() != opaque_record_key { + apibail_internal!("wrong record lock"); + } + // Get local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; + let local_record_store = self.get_local_record_store()?; // See if we have a local record already or not - let cb = |r: &mut Record| { + let cb = |descriptor: Arc, r: &mut LocalRecordDetail| { // Process local record // Keep the safety selection we opened the record with - r.detail_mut().safety_selection = safety_selection.clone(); + r.safety_selection = safety_selection.clone(); // Return record details - (r.owner(), r.schema()) - }; - let opaque_record_key = record_key.opaque(); - let (owner, schema) = match local_record_store.with_record_mut(&opaque_record_key, cb) { - Some(v) => v, - None => { - return Ok(None); - } + (descriptor.owner(), descriptor.schema().unwrap()) }; + let (owner, schema) = + match local_record_store.with_record_detail_mut(&opaque_record_key, cb) { + Some(v) => v, + None => { + return Ok(None); + } + }; // Had local record // If the writer we chose is also the owner, we have the owner secret @@ -157,7 +154,8 @@ impl StorageManager { } // Write open record - inner + self.inner + .lock() .opened_records .entry(opaque_record_key) .and_modify(|e| { @@ -179,17 +177,28 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn open_new_record_inner( + pub(super) async fn open_new_record_locked( &self, - inner: &mut StorageManagerInner, + records_lock: &RecordsLockGuard, record_key: RecordKey, writer: Option, inspect_result: InspectResult, safety_selection: SafetySelection, ) -> VeilidAPIResult { + let opaque_record_key = records_lock.single_record()?; + if record_key.opaque() != opaque_record_key { + apibail_internal!("wrong record lock"); + } + + let local_record_store = self.get_local_record_store()?; + // Ensure the record is closed - let opaque_record_key = record_key.opaque(); - if inner.opened_records.contains_key(&opaque_record_key) { + if self + .inner + .lock() + .opened_records + .contains_key(&opaque_record_key) + { panic!("new record should never be opened at this point"); } @@ -214,11 +223,6 @@ impl StorageManager { }; let schema = signed_value_descriptor.schema()?; - // Get local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - // Make and store a new record for this descriptor let record = Record::::new( Timestamp::now(), @@ -233,7 +237,7 @@ impl StorageManager { let encryption_key = record_key.ref_value().encryption_key(); // Write open record - inner.opened_records.insert( + self.inner.lock().opened_records.insert( opaque_record_key, OpenedRecord::new(writer, safety_selection, encryption_key), ); diff --git a/veilid-core/src/storage_manager/record_encryption.rs b/veilid-core/src/storage_manager/record_encryption.rs index c5e34725..9cf5bd3c 100644 --- a/veilid-core/src/storage_manager/record_encryption.rs +++ b/veilid-core/src/storage_manager/record_encryption.rs @@ -3,12 +3,11 @@ use super::*; impl StorageManager { /// Get the encryption key for an opened OpaqueRecordKey /// Opaque record keys must have been opened with their full record key in order to be read - pub(super) async fn get_encryption_key_for_opaque_record_key( + pub(super) fn get_encryption_key_for_opaque_record_key( &self, opaque_record_key: &OpaqueRecordKey, ) -> VeilidAPIResult> { - let inner = self.inner.lock().await; - + let inner = self.inner.lock(); let Some(opened_record) = inner.opened_records.get(opaque_record_key) else { apibail_generic!("decrypt_value_data: opened_records does not contain an expected key"); }; diff --git a/veilid-core/src/storage_manager/record_lock_table.rs b/veilid-core/src/storage_manager/record_lock_table.rs index 40e3a289..abbe66bf 100644 --- a/veilid-core/src/storage_manager/record_lock_table.rs +++ b/veilid-core/src/storage_manager/record_lock_table.rs @@ -3,38 +3,58 @@ use weak_table::WeakValueHashMap; use super::*; pub struct RecordsLockGuard { - _record_locks: Vec>, + record_locks: Vec>, _whole_record_lock_guards: Vec>, } +impl RecordsLockGuard { + pub fn records(&self) -> Vec { + self.record_locks.iter().map(|x| x.record()).collect() + } + pub fn single_record(&self) -> VeilidAPIResult { + if self.record_locks.len() != 1 { + apibail_internal!("invalid record count"); + } + Ok(self.record_locks.first().cloned().unwrap()) + } +} + pub struct SubkeyLockGuard { record_lock: Arc, _whole_record_lock_guard: AsyncRwLockReadGuardArc<()>, + _subkey_lock_guard: AsyncMutexGuardArc<()>, subkey: ValueSubkey, } -impl Drop for SubkeyLockGuard { - fn drop(&mut self) { - self.record_lock - .subkey_lock_table - .lock() - .remove(self.subkey); +impl SubkeyLockGuard { + pub fn record(&self) -> OpaqueRecordKey { + self.record_lock.record() + } + + pub fn subkey(&self) -> ValueSubkey { + self.subkey } } #[derive(Debug)] struct RecordLock { - pub whole_record_lock: Arc>, - pub subkey_lock_table: Mutex, + whole_record_lock: Arc>, + subkey_lock_table: Mutex>>>, + record: OpaqueRecordKey, } impl RecordLock { - pub fn new() -> Self { + pub fn new(record: OpaqueRecordKey) -> Self { Self { whole_record_lock: Arc::new(AsyncRwLock::new(())), - subkey_lock_table: Mutex::new(ValueSubkeyRangeSet::new()), + subkey_lock_table: Mutex::new(WeakValueHashMap::new()), + record, } } + + pub fn record(&self) -> OpaqueRecordKey { + self.record.clone() + } } #[derive(Debug)] @@ -62,8 +82,8 @@ impl RecordLockTable { let mut inner = self.inner.lock(); let record_lock = inner .record_lock_table - .entry(record) - .or_insert_with(|| Arc::new(RecordLock::new())); + .entry(record.clone()) + .or_insert_with(|| Arc::new(RecordLock::new(record.clone()))); inner.record_lock_table.remove_expired(); record_lock }; @@ -72,7 +92,7 @@ impl RecordLockTable { let whole_record_lock_guard = record_lock.whole_record_lock.write_arc().await; RecordsLockGuard { - _record_locks: vec![record_lock], + record_locks: vec![record_lock], _whole_record_lock_guards: vec![whole_record_lock_guard], } } @@ -88,8 +108,8 @@ impl RecordLockTable { .map(|record| { inner .record_lock_table - .entry(record) - .or_insert_with(|| Arc::new(RecordLock::new())) + .entry(record.clone()) + .or_insert_with(|| Arc::new(RecordLock::new(record.clone()))) }) .collect::>(); inner.record_lock_table.remove_expired(); @@ -104,7 +124,7 @@ impl RecordLockTable { } RecordsLockGuard { - _record_locks: record_locks, + record_locks, _whole_record_lock_guards: whole_record_lock_guards, } } @@ -115,8 +135,8 @@ impl RecordLockTable { let mut inner = self.inner.lock(); let record_lock = inner .record_lock_table - .entry(record) - .or_insert_with(|| Arc::new(RecordLock::new())); + .entry(record.clone()) + .or_insert_with(|| Arc::new(RecordLock::new(record.clone()))); inner.record_lock_table.remove_expired(); record_lock }; @@ -124,10 +144,10 @@ impl RecordLockTable { // Wait on each lock to complete in order let whole_record_lock_guard = record_lock.whole_record_lock.try_write_arc()?; - RecordsLockGuard { - _record_locks: vec![record_lock], + Some(RecordsLockGuard { + record_locks: vec![record_lock], _whole_record_lock_guards: vec![whole_record_lock_guard], - } + }) } pub fn try_lock_records(&self, mut records: Vec) -> Option { @@ -141,8 +161,8 @@ impl RecordLockTable { .map(|record| { inner .record_lock_table - .entry(record) - .or_insert_with(|| Arc::new(RecordLock::new())) + .entry(record.clone()) + .or_insert_with(|| Arc::new(RecordLock::new(record.clone()))) }) .collect::>(); inner.record_lock_table.remove_expired(); @@ -157,11 +177,50 @@ impl RecordLockTable { } Some(RecordsLockGuard { - _record_locks: record_locks, + record_locks, _whole_record_lock_guards: whole_record_lock_guards, }) } + pub async fn lock_subkey( + &self, + record: OpaqueRecordKey, + subkey: ValueSubkey, + ) -> SubkeyLockGuard { + // Get record lock + let record_lock = { + let mut inner = self.inner.lock(); + let record_lock = inner + .record_lock_table + .entry(record.clone()) + .or_insert_with(|| Arc::new(RecordLock::new(record.clone()))); + inner.record_lock_table.remove_expired(); + record_lock + }; + + // Attempt shared lock + let _whole_record_lock_guard = record_lock.whole_record_lock.read_arc().await; + + // Get subkey lock + let subkey_lock = { + let mut subkey_lock_table = record_lock.subkey_lock_table.lock(); + let subkey_lock = subkey_lock_table + .entry(subkey) + .or_insert_with(|| Arc::new(AsyncMutex::new(()))); + subkey_lock_table.remove_expired(); + subkey_lock + }; + + let _subkey_lock_guard = asyncmutex_lock_arc!(subkey_lock); + + SubkeyLockGuard { + record_lock, + _whole_record_lock_guard, + _subkey_lock_guard, + subkey, + } + } + pub fn try_lock_subkey( &self, record: OpaqueRecordKey, @@ -172,8 +231,8 @@ impl RecordLockTable { let mut inner = self.inner.lock(); let record_lock = inner .record_lock_table - .entry(record) - .or_insert_with(|| Arc::new(RecordLock::new())); + .entry(record.clone()) + .or_insert_with(|| Arc::new(RecordLock::new(record.clone()))); inner.record_lock_table.remove_expired(); record_lock }; @@ -182,16 +241,21 @@ impl RecordLockTable { let _whole_record_lock_guard = record_lock.whole_record_lock.try_read_arc()?; // Get subkey lock - { + let subkey_lock = { let mut subkey_lock_table = record_lock.subkey_lock_table.lock(); - if !subkey_lock_table.insert(subkey) { - return None; - } - } + let subkey_lock = subkey_lock_table + .entry(subkey) + .or_insert_with(|| Arc::new(AsyncMutex::new(()))); + subkey_lock_table.remove_expired(); + subkey_lock + }; + + let _subkey_lock_guard = asyncmutex_try_lock_arc!(subkey_lock)?; Some(SubkeyLockGuard { record_lock, _whole_record_lock_guard, + _subkey_lock_guard, subkey, }) } diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index 1e0f928e..1f974077 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -8,6 +8,7 @@ mod inbound_transactions; mod inbound_watch; mod opened_record; +mod record; mod record_snapshot; mod record_store_inner; mod record_store_limits; @@ -17,6 +18,7 @@ mod subkey_transaction_changes; pub(super) use inbound_transactions::*; pub(super) use inbound_watch::*; pub(super) use opened_record::*; +pub(super) use record::*; pub(super) use record_snapshot::*; pub(super) use record_store_inner::*; pub(super) use record_store_limits::*; @@ -124,11 +126,7 @@ where #[instrument(level = "trace", target = "stor", skip_all)] pub async fn flush(&self) { - let opt_commit_action = { - let mut inner = self.inner.lock(); - inner.with_record_index_mut(|record_index| record_index.prepare_commit_action()) - }; - + let opt_commit_action = self.inner.lock().flush(); if let Some(commit_action) = opt_commit_action { self.process_commit_action(commit_action).await; }; @@ -145,16 +143,7 @@ where .lock_record(opaque_record_key.clone()) .await; - let opt_commit_action = { - let mut inner = self.inner.lock(); - inner.with_record_index_mut(|record_index| { - record_index.create(opaque_record_key.clone(), record)?; - VeilidAPIResult::>>::Ok( - record_index.maybe_prepare_commit_action(), - ) - })? - }; - + let opt_commit_action = self.inner.lock().new_record(opaque_record_key, record)?; if let Some(commit_action) = opt_commit_action { self.process_commit_action(commit_action).await; }; @@ -163,43 +152,138 @@ where } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn delete_record( - &mut self, - opaque_record_key: OpaqueRecordKey, - ) -> VeilidAPIResult<()> { + pub async fn delete_record(&self, opaque_record_key: OpaqueRecordKey) -> VeilidAPIResult<()> { let _record_lock = self .record_lock_table .lock_record(opaque_record_key.clone()) .await; - let opt_commit_action = { - let mut inner = self.inner.lock(); - inner.with_record_index_mut(|record_index| { - record_index.delete(opaque_record_key.clone())?; - VeilidAPIResult::>>::Ok( - record_index.maybe_prepare_commit_action(), - ) - })? - -xxx move operations to inner and cleanup function too - - inner. - // Remove all references to this record - self.cleanup_record_internal(rtk, record); - - }; - + let opt_commit_action = self.inner.lock().delete_record(opaque_record_key)?; if let Some(commit_action) = opt_commit_action { self.process_commit_action(commit_action).await; }; - - // Purge the record's space immediately along with any other dead records - self.purge_dead_records(false).await; - Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub async fn get_subkey( + &self, + opaque_record_key: &OpaqueRecordKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> VeilidAPIResult> { + let _record_lock = self + .record_lock_table + .lock_subkey(opaque_record_key.clone(), subkey) + .await; + + let load_action_result = { + let mut inner = self.inner.lock(); + inner.prepare_get_subkey(opaque_record_key, subkey) + }; + + match load_action_result { + LoadActionResult::NoRecord => Ok(None), + LoadActionResult::NoSubkey { descriptor } => Ok(Some(GetResult { + opt_value: None, + opt_descriptor: if want_descriptor { + Some(descriptor) + } else { + None + }, + })), + LoadActionResult::Subkey { + descriptor, + mut load_action, + } => { + let res = load_action.load().await; + { + let mut inner = self.inner.lock(); + inner.finish_get_subkey(load_action); + } + let opt_value = res?.map(|x| x.signed_value_data()); + + Ok(Some(GetResult { + opt_value, + opt_descriptor: if want_descriptor { + Some(descriptor) + } else { + None + }, + })) + } + } + } + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub async fn inspect_record( + &self, + opaque_record_key: &OpaqueRecordKey, + subkeys: &ValueSubkeyRangeSet, + want_descriptor: bool, + ) -> VeilidAPIResult> { + let res = self.with_record(opaque_record_key, |record| { + // Get number of subkeys from schema and ensure we are getting the + // right number of sequence numbers betwen that and what we asked for + let schema_subkeys = record + .schema() + .truncate_subkeys(subkeys, Some(DHTSchema::MAX_SUBKEY_COUNT)); + let opt_descriptor = if want_descriptor { + Some(record.descriptor().clone()) + } else { + None + }; + + // Check if we can return some subkeys + if schema_subkeys.is_empty() { + // No overlapping keys + return Ok(None); + } + + // Collect the requested subkey sequence numbers + let seqs = schema_subkeys + .iter() + .map(|subkey| record.subkey_seq(subkey)) + .collect(); + + Ok(Some(InspectResult::new( + self, + subkeys.clone(), + "inspect_record", + schema_subkeys, + seqs, + opt_descriptor, + )?)) + }); + + match res { + None => Ok(None), + Some(out) => out, + } + } + + #[instrument(level = "trace", target = "stor", skip_all)] + pub fn with_record(&self, opaque_record_key: &OpaqueRecordKey, func: F) -> Option + where + F: FnOnce(&Record) -> R, + { + let mut inner = self.inner.lock(); + inner.with_record(opaque_record_key, func) + } + + #[instrument(level = "trace", target = "stor", skip_all)] + pub fn with_record_detail_mut( + &self, + opaque_record_key: &OpaqueRecordKey, + func: F, + ) -> Option + where + F: FnOnce(Arc, &mut D) -> R, + { + let mut inner = self.inner.lock(); + inner.with_record_detail_mut(opaque_record_key, func) + } + ////////////////////////////////////////////////////////////////////////////////// async fn process_commit_action(&self, mut commit_action: CommitAction) { @@ -209,9 +293,7 @@ xxx move operations to inner and cleanup function too let res = { let mut inner = self.inner.lock(); - inner.with_record_index_mut(|record_index| { - record_index.finish_commit_action(commit_action) - }) + inner.finish_commit_action(commit_action) }; if let Err(e) = res { @@ -219,249 +301,164 @@ xxx move operations to inner and cleanup function too } } - fn cleanup_record_internal(&mut self, rtk: RecordTableKey, record: Record) -> u64 { - // Remove transactions - self.record_transactions.remove(&rtk); + // #[instrument(level = "trace", target = "stor", skip_all)] + // pub(super) fn contains_record(&self, opaque_record_key: &OpaqueRecordKey) -> bool { + // let rtk = RecordTableKey { + // record_key: opaque_record_key.clone(), + // }; + // self.record_index.contains_key(&rtk) + // } - // Remove watches - self.watched_records.remove(&rtk); + // #[instrument(level = "trace", target = "stor", skip_all)] + // pub(super) fn with_record( + // &mut self, + // opaque_record_key: &OpaqueRecordKey, + // f: F, + // ) -> Option + // where + // F: FnOnce(&Record) -> R, + // { + // // Get record from index + // let mut out = None; + // let rtk = RecordTableKey { + // record_key: opaque_record_key.clone(), + // }; + // if let Some(record) = self.record_index.get_mut(&rtk) { + // // Callback + // out = Some(f(record)); - // Remove watch changes - self.changed_watched_values.remove(&rtk); + // // Touch + // record.touch(); + // } + // if out.is_some() { + // // Marks as changed because the record was touched and we want to keep the + // // LRU ordering serialized + // self.changed_records.insert(rtk); + // } - } + // out + // } - #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn contains_record(&self, opaque_record_key: &OpaqueRecordKey) -> bool { - let rtk = RecordTableKey { - record_key: opaque_record_key.clone(), - }; - self.record_index.contains_key(&rtk) - } + // #[instrument(level = "trace", target = "stor", skip_all)] + // pub(super) fn peek_record(&self, opaque_record_key: &OpaqueRecordKey, f: F) -> Option + // where + // F: FnOnce(&Record) -> R, + // { + // // Get record from index + // let mut out = None; + // let rtk = RecordTableKey { + // record_key: opaque_record_key.clone(), + // }; + // if let Some(record) = self.record_index.peek(&rtk) { + // // Callback + // out = Some(f(record)); + // } + // out + // } - #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn with_record( - &mut self, - opaque_record_key: &OpaqueRecordKey, - f: F, - ) -> Option - where - F: FnOnce(&Record) -> R, - { - // Get record from index - let mut out = None; - let rtk = RecordTableKey { - record_key: opaque_record_key.clone(), - }; - if let Some(record) = self.record_index.get_mut(&rtk) { - // Callback - out = Some(f(record)); + // #[instrument(level = "trace", target = "stor", skip_all)] + // pub(super) fn with_record_mut( + // &mut self, + // opaque_record_key: &OpaqueRecordKey, + // f: F, + // ) -> Option + // where + // F: FnOnce(&mut Record) -> R, + // { + // // Get record from index + // let mut out = None; + // let rtk = RecordTableKey { + // record_key: opaque_record_key.clone(), + // }; + // if let Some(record) = self.record_index.get_mut(&rtk) { + // // Callback + // out = Some(f(record)); - // Touch - record.touch(); - } - if out.is_some() { - // Marks as changed because the record was touched and we want to keep the - // LRU ordering serialized - self.changed_records.insert(rtk); - } + // // Touch + // record.touch(); + // } + // if out.is_some() { + // // Marks as changed because the record was touched and we want to keep the + // // LRU ordering serialized + // self.changed_records.insert(rtk); + // } - out - } + // out + // } - #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn peek_record(&self, opaque_record_key: &OpaqueRecordKey, f: F) -> Option - where - F: FnOnce(&Record) -> R, - { - // Get record from index - let mut out = None; - let rtk = RecordTableKey { - record_key: opaque_record_key.clone(), - }; - if let Some(record) = self.record_index.peek(&rtk) { - // Callback - out = Some(f(record)); - } - out - } + // #[instrument(level = "trace", target = "stor", skip_all, err)] + // pub async fn peek_subkey( + // &self, + // opaque_record_key: &OpaqueRecordKey, + // subkey: ValueSubkey, + // want_descriptor: bool, + // ) -> VeilidAPIResult> { + // // record from index + // let Some((subkey_count, has_subkey, opt_descriptor)) = + // self.peek_record(opaque_record_key, |record| { + // ( + // record.subkey_count(), + // record.stored_subkeys().contains(subkey), + // if want_descriptor { + // Some(record.descriptor().clone()) + // } else { + // None + // }, + // ) + // }) + // else { + // // Record not available + // return Ok(None); + // }; - #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn with_record_mut( - &mut self, - opaque_record_key: &OpaqueRecordKey, - f: F, - ) -> Option - where - F: FnOnce(&mut Record) -> R, - { - // Get record from index - let mut out = None; - let rtk = RecordTableKey { - record_key: opaque_record_key.clone(), - }; - if let Some(record) = self.record_index.get_mut(&rtk) { - // Callback - out = Some(f(record)); + // // Check if the subkey is in range + // if subkey as usize >= subkey_count { + // apibail_invalid_argument!("subkey out of range", "subkey", subkey); + // } - // Touch - record.touch(); - } - if out.is_some() { - // Marks as changed because the record was touched and we want to keep the - // LRU ordering serialized - self.changed_records.insert(rtk); - } + // // See if we have this subkey stored + // if !has_subkey { + // // If not, return no value but maybe with descriptor + // return Ok(Some(GetResult { + // opt_value: None, + // opt_descriptor, + // })); + // } - out - } + // // If subkey exists in subkey cache, use that + // let stk = SubkeyTableKey { + // record_key: opaque_record_key.clone(), + // subkey, + // }; + // if let Some(record_data) = self.subkey_cache.peek(&stk) { + // let out = record_data.signed_value_data().clone(); - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn get_subkey( - &mut self, - opaque_record_key: &OpaqueRecordKey, - subkey: ValueSubkey, - want_descriptor: bool, - ) -> VeilidAPIResult> { - // Get record from index - let Some((subkey_count, has_subkey, opt_descriptor)) = - self.with_record(opaque_record_key, |record| { - ( - record.subkey_count(), - record.stored_subkeys().contains(subkey), - if want_descriptor { - Some(record.descriptor().clone()) - } else { - None - }, - ) - }) - else { - // Record not available - return Ok(None); - }; + // return Ok(Some(GetResult { + // opt_value: Some(out), + // opt_descriptor, + // })); + // } + // // If not in cache, try to pull from table store if it is in our stored subkey set + // let Some(record_data) = self + // .subkey_table + // .load_json::(0, &stk.bytes()) + // .await + // .map_err(VeilidAPIError::internal)? + // else { + // apibail_internal!("failed to peek subkey that was stored"); + // }; - // Check if the subkey is in range - if subkey as usize >= subkey_count { - apibail_invalid_argument!("subkey out of range", "subkey", subkey); - } + // let out = record_data.signed_value_data().clone(); - // See if we have this subkey stored - if !has_subkey { - // If not, return no value but maybe with descriptor - return Ok(Some(GetResult { - opt_value: None, - opt_descriptor, - })); - } - - // If subkey exists in subkey cache, use that - let stk = SubkeyTableKey { - record_key: opaque_record_key.clone(), - subkey, - }; - if let Some(record_data) = self.subkey_cache.get(&stk) { - let out = record_data.signed_value_data().clone(); - - return Ok(Some(GetResult { - opt_value: Some(out), - opt_descriptor, - })); - } - // If not in cache, try to pull from table store if it is in our stored subkey set - let Some(record_data) = self - .subkey_table - .load_json::(0, &stk.bytes()) - .await - .map_err(VeilidAPIError::internal)? - else { - apibail_internal!("failed to get subkey that was stored"); - }; - - let out = record_data.signed_value_data().clone(); - - // Add to cache, do nothing with lru out - self.add_to_subkey_cache(stk, record_data); - - Ok(Some(GetResult { - opt_value: Some(out), - opt_descriptor, - })) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn peek_subkey( - &self, - opaque_record_key: &OpaqueRecordKey, - subkey: ValueSubkey, - want_descriptor: bool, - ) -> VeilidAPIResult> { - // record from index - let Some((subkey_count, has_subkey, opt_descriptor)) = - self.peek_record(opaque_record_key, |record| { - ( - record.subkey_count(), - record.stored_subkeys().contains(subkey), - if want_descriptor { - Some(record.descriptor().clone()) - } else { - None - }, - ) - }) - else { - // Record not available - return Ok(None); - }; - - // Check if the subkey is in range - if subkey as usize >= subkey_count { - apibail_invalid_argument!("subkey out of range", "subkey", subkey); - } - - // See if we have this subkey stored - if !has_subkey { - // If not, return no value but maybe with descriptor - return Ok(Some(GetResult { - opt_value: None, - opt_descriptor, - })); - } - - // If subkey exists in subkey cache, use that - let stk = SubkeyTableKey { - record_key: opaque_record_key.clone(), - subkey, - }; - if let Some(record_data) = self.subkey_cache.peek(&stk) { - let out = record_data.signed_value_data().clone(); - - return Ok(Some(GetResult { - opt_value: Some(out), - opt_descriptor, - })); - } - // If not in cache, try to pull from table store if it is in our stored subkey set - let Some(record_data) = self - .subkey_table - .load_json::(0, &stk.bytes()) - .await - .map_err(VeilidAPIError::internal)? - else { - apibail_internal!("failed to peek subkey that was stored"); - }; - - let out = record_data.signed_value_data().clone(); - - Ok(Some(GetResult { - opt_value: Some(out), - opt_descriptor, - })) - } + // Ok(Some(GetResult { + // opt_value: Some(out), + // opt_descriptor, + // })) + // } #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn set_subkeys_single_record( - &mut self, + &self, opaque_record_key: &OpaqueRecordKey, subkeys: SubkeyValueList, ) -> VeilidAPIResult { @@ -483,7 +480,7 @@ xxx move operations to inner and cleanup function too #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn set_subkeys_multiple_records( - &mut self, + &self, keys_and_subkeys: RecordSubkeyValueList, ) -> VeilidAPIResult { // Start subkey table transaction @@ -506,7 +503,7 @@ xxx move operations to inner and cleanup function too #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn commit_subkeys_tx( - &mut self, + &self, subkey_transaction_changes: SubkeyTransactionChanges, watch_update_mode: InboundWatchUpdateMode, ) -> VeilidAPIResult { @@ -566,7 +563,7 @@ xxx move operations to inner and cleanup function too #[instrument(level = "trace", target = "stor", skip_all, err)] async fn set_subkeys_in_tx( - &mut self, + &self, subkey_table_tx: TableDBTransaction, opaque_record_key: &OpaqueRecordKey, subkeys: SubkeyValueList, @@ -683,93 +680,6 @@ xxx move operations to inner and cleanup function too res } - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn inspect_record( - &mut self, - opaque_record_key: &OpaqueRecordKey, - subkeys: &ValueSubkeyRangeSet, - want_descriptor: bool, - ) -> VeilidAPIResult> { - // Get record from index - let Some((schema_subkeys, opt_descriptor)) = - self.with_record(opaque_record_key, |record| { - // Get number of subkeys from schema and ensure we are getting the - // right number of sequence numbers betwen that and what we asked for - let schema_subkeys = record - .schema() - .truncate_subkeys(subkeys, Some(DHTSchema::MAX_SUBKEY_COUNT)); - ( - schema_subkeys, - if want_descriptor { - Some(record.descriptor().clone()) - } else { - None - }, - ) - }) - else { - // Record not available - return Ok(None); - }; - - // Check if we can return some subkeys - if schema_subkeys.is_empty() { - // No overlapping keys - return Ok(None); - } - - // See if we have this inspection cached - if let Some(icv) = self.inspect_cache.get(opaque_record_key, &schema_subkeys) { - return Ok(Some(InspectResult::new( - self, - subkeys.clone(), - "inspect_record", - schema_subkeys.clone(), - icv.seqs, - opt_descriptor, - )?)); - } - - // Build sequence number list to return - #[allow(clippy::unnecessary_cast)] - let mut seqs = Vec::with_capacity(schema_subkeys.len() as usize); - for subkey in schema_subkeys.iter() { - let stk = SubkeyTableKey { - record_key: opaque_record_key.clone(), - subkey, - }; - let seq = if let Some(record_data) = self.subkey_cache.peek(&stk) { - record_data.signed_value_data().value_data().seq() - } else { - // If not in cache, try to pull from table store if it is in our stored subkey set - // XXX: This would be better if it didn't have to pull the whole record data to get the seq. - self.subkey_table - .load_json::(0, &stk.bytes()) - .await - .map_err(VeilidAPIError::internal)? - .map(|record_data| record_data.signed_value_data().value_data().seq()) - .unwrap_or_default() - }; - seqs.push(seq) - } - - // Save seqs cache - self.inspect_cache.put( - opaque_record_key.clone(), - schema_subkeys.clone(), - InspectCacheL2Value { seqs: seqs.clone() }, - ); - - Ok(Some(InspectResult::new( - self, - subkeys.clone(), - "inspect_record", - schema_subkeys, - seqs, - opt_descriptor, - )?)) - } - /// LRU out some records until we reclaim the amount of space requested /// This will force a garbage collection of the space immediately /// If zero is passed in here, a garbage collection will be performed of dead records diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/record.rs b/veilid-core/src/storage_manager/record_store/record.rs similarity index 100% rename from veilid-core/src/storage_manager/record_store/record_store_inner/record.rs rename to veilid-core/src/storage_manager/record_store/record.rs diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/load_action.rs b/veilid-core/src/storage_manager/record_store/record_store_inner/load_action.rs index 9c3a7752..046e2ef3 100644 --- a/veilid-core/src/storage_manager/record_store/record_store_inner/load_action.rs +++ b/veilid-core/src/storage_manager/record_store/record_store_inner/load_action.rs @@ -39,3 +39,14 @@ impl LoadAction { (self.subkey_table_key, self.opt_cached_record_data) } } + +pub enum LoadActionResult { + NoRecord, + NoSubkey { + descriptor: Arc, + }, + Subkey { + descriptor: Arc, + load_action: LoadAction, + }, +} diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/mod.rs b/veilid-core/src/storage_manager/record_store/record_store_inner/mod.rs index fff190d2..56dce758 100644 --- a/veilid-core/src/storage_manager/record_store/record_store_inner/mod.rs +++ b/veilid-core/src/storage_manager/record_store/record_store_inner/mod.rs @@ -2,7 +2,6 @@ mod commit_action; mod keys; mod limited_size; mod load_action; -mod record; mod record_data; mod record_index; @@ -12,7 +11,6 @@ pub(super) use commit_action::*; pub(super) use keys::*; pub(super) use limited_size::*; pub(super) use load_action::*; -pub(super) use record::*; pub(super) use record_data::*; pub(super) use record_index::*; @@ -80,19 +78,79 @@ where }) } - pub fn with_record_index(&self, func: F) -> R - where - F: FnOnce(&RecordIndex) -> R, - { - func(&self.record_index) + pub fn new_record( + &mut self, + opaque_record_key: OpaqueRecordKey, + record: Record, + ) -> VeilidAPIResult>> { + self.record_index + .create(opaque_record_key.clone(), record)?; + Ok(self.record_index.maybe_prepare_commit_action()) } - pub fn with_record_index_mut(&mut self, func: F) -> R + pub fn delete_record( + &mut self, + opaque_record_key: OpaqueRecordKey, + ) -> VeilidAPIResult>> { + self.record_index.delete(opaque_record_key.clone())?; + self.cleanup_record(opaque_record_key); + Ok(self.record_index.maybe_prepare_commit_action()) + } + + pub fn flush(&mut self) -> Option> { + self.record_index.prepare_commit_action() + } + + pub fn finish_commit_action(&mut self, commit_action: CommitAction) -> VeilidAPIResult<()> { + self.record_index.finish_commit_action(commit_action) + } + + pub fn prepare_get_subkey( + &mut self, + opaque_record_key: &OpaqueRecordKey, + subkey: ValueSubkey, + ) -> LoadActionResult { + self.record_index + .prepare_load_action(opaque_record_key.clone(), subkey) + } + + pub fn finish_get_subkey(&mut self, load_action: LoadAction) { + self.record_index.finish_load_action(load_action); + } + + pub fn with_record(&mut self, opaque_record_key: &OpaqueRecordKey, func: F) -> Option where - F: FnOnce(&mut RecordIndex) -> R, + F: FnOnce(&Record) -> R, { - func(&mut self.record_index) + self.record_index.with_record(opaque_record_key, func) + } + + pub(super) fn with_record_detail_mut( + &mut self, + opaque_record_key: &OpaqueRecordKey, + func: F, + ) -> Option + where + F: FnOnce(Arc, &mut D) -> R, + { + self.record_index + .with_record_detail_mut(opaque_record_key, func) + } + + //////////////////////////////////////////////////////////// + + fn cleanup_record(&mut self, opaque_record_key: OpaqueRecordKey) { + let rtk = RecordTableKey { + record_key: opaque_record_key, + }; + + // Remove transactions + self.record_transactions.remove(&rtk); + + // Remove watches + self.watched_records.remove(&rtk); + + // Remove watch changes + self.changed_watched_values.remove(&rtk); } } - -impl RecordStore where D: RecordDetail {} diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs b/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs index 282eb5c3..adbb5f7d 100644 --- a/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs +++ b/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs @@ -166,8 +166,7 @@ where }; let Some(record) = self.record_cache.remove(&rtk) else { - veilid_log!(self error "RecordIndex({}): Record missing with key {}", self.unlocked_inner.name, key); - apibail_internal!("record missing"); + apibail_invalid_argument!("record missing", "key", key); }; self.purge_record_and_subkeys(rtk, record, false); @@ -179,14 +178,96 @@ where Ok(()) } + /// Access a record + /// + /// If the record exists, passes it to a function and marks the record as recently used + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn with_record( + &mut self, + opaque_record_key: &OpaqueRecordKey, + func: F, + ) -> Option + where + F: FnOnce(&Record) -> R, + { + // Get record from index + let mut out = None; + let rtk = RecordTableKey { + record_key: opaque_record_key.clone(), + }; + if let Some(record) = self.record_cache.get_mut(&rtk) { + let old_record = record.clone(); + + // LRU touch + record.touch(); + + // Callback + out = Some(func(record)); + + let new_record = record.clone(); + + self.add_uncommitted_record_update(rtk, new_record, old_record); + } + + out + } + + /// Modify a record's detail + /// + /// If the record exists, passes a mutable reference of its detail to a function and marks the record as recently used + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn with_record_detail_mut( + &mut self, + opaque_record_key: &OpaqueRecordKey, + func: F, + ) -> Option + where + F: FnOnce(Arc, &mut D) -> R, + { + // Get record from index + let mut out = None; + let rtk = RecordTableKey { + record_key: opaque_record_key.clone(), + }; + if let Some(record) = self.record_cache.get_mut(&rtk) { + let old_record = record.clone(); + + // LRU touch + record.touch(); + + // Callback + out = Some(func(record.descriptor(), record.detail_mut())); + + let new_record = record.clone(); + + self.add_uncommitted_record_update(rtk, new_record, old_record); + } + + out + } + /// Get a subkey value + /// /// Does not perform database operations if the subkey does not exist in the cache. - /// Returns a load action object that either has the subkey data or can retrieve it from the database. pub fn prepare_load_action( &mut self, key: OpaqueRecordKey, subkey: ValueSubkey, - ) -> Option { + ) -> LoadActionResult { + let rtk = RecordTableKey { + record_key: key.clone(), + }; + + let Some(record) = self.record_cache.get(&rtk) else { + return LoadActionResult::NoRecord; + }; + + if !record.stored_subkeys().contains(subkey) { + return LoadActionResult::NoSubkey { + descriptor: record.descriptor(), + }; + } + let stk = SubkeyTableKey { record_key: key.clone(), subkey, @@ -215,14 +296,18 @@ where .flatten() }); - Some(LoadAction::new( - self.unlocked_inner.subkey_table.clone(), - stk, - opt_cached_record_data, - )) + LoadActionResult::Subkey { + descriptor: record.descriptor(), + load_action: LoadAction::new( + self.unlocked_inner.subkey_table.clone(), + stk, + opt_cached_record_data, + ), + } } /// Finalize a load action + /// /// If the load action pulled a value from the database, it stores a subkey in /// the cache only if it isn't already there pub fn finish_load_action(&mut self, load_action: LoadAction) { @@ -254,8 +339,7 @@ where // Remove the old record from the cache let Some(old_record) = self.record_cache.remove(&rtk) else { - veilid_log!(self error "RecordIndex({}): Record missing with key {}", self.unlocked_inner.name, key); - apibail_internal!("record missing"); + apibail_invalid_argument!("record missing", "key", key); }; // Make a copy of the record to edit diff --git a/veilid-core/src/storage_manager/rehydrate.rs b/veilid-core/src/storage_manager/rehydrate.rs index bebf30b7..ad927a42 100644 --- a/veilid-core/src/storage_manager/rehydrate.rs +++ b/veilid-core/src/storage_manager/rehydrate.rs @@ -22,7 +22,7 @@ pub(super) struct RehydrationRequest { impl StorageManager { /// Add a background rehydration request #[instrument(level = "trace", target = "stor", skip_all)] - pub async fn add_rehydration_request( + pub fn add_rehydration_request( &self, opaque_record_key: OpaqueRecordKey, subkeys: ValueSubkeyRangeSet, @@ -33,7 +33,7 @@ impl StorageManager { consensus_count, }; veilid_log!(self debug "Adding rehydration request: {} {:?}", opaque_record_key, req); - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); inner .rehydration_requests .entry(opaque_record_key) @@ -59,6 +59,8 @@ impl StorageManager { subkeys: ValueSubkeyRangeSet, consensus_count: usize, ) -> VeilidAPIResult { + let local_record_store = self.get_local_record_store()?; + veilid_log!(self debug "Checking for record rehydration: {} {} @ consensus {}", opaque_record_key, subkeys, consensus_count); // Get subkey range for consideration let subkeys = if subkeys.is_empty() { @@ -84,9 +86,6 @@ impl StorageManager { opened_record.safety_selection() } else { // See if it's in the local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; let Some(safety_selection) = local_record_store .with_record(&opaque_record_key, |rec| { rec.detail().safety_selection.clone() @@ -247,8 +246,6 @@ impl StorageManager { local_inspect_result: InspectResult, outbound_inspect_result: OutboundInspectValueResult, ) -> VeilidAPIResult { - let mut inner = self.inner.lock().await; - // For each subkey, determine if we should rehydrate it let mut rehydrated = ValueSubkeyRangeSet::new(); for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() { @@ -292,8 +289,7 @@ impl StorageManager { .map(ValueSubkeyRangeSet::single) .zip(outbound_inspect_result.subkey_fanout_results.into_iter()); - Self::process_fanout_results_inner( - &mut inner, + self.process_fanout_results( opaque_record_key.clone(), results_iter, false, diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 5bd43d51..f6cd7eb2 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -706,7 +706,6 @@ impl StorageManager { result: set_value::OutboundSetValueResult, ) -> Result, VeilidAPIError> { // Regain the lock after network access - let mut inner = self.inner.lock().await; let opaque_record_key = record_key.opaque(); // Report on fanout result offline @@ -724,8 +723,7 @@ impl StorageManager { } // Keep the list of nodes that returned a value for later reference - Self::process_fanout_results_inner( - &mut inner, + self.process_fanout_results( opaque_record_key.clone(), core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)), true, diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index 99389b1d..ac1ae4ad 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -178,23 +178,19 @@ impl StorageManager { async fn process_single_result(&self, result: WorkItemResult) { let consensus_width = self.config().network.dht.consensus_width as usize; - let mut inner = self.inner.lock().await; - // Debug print the result veilid_log!(self debug "Offline write result: {:?}", result); // Mark the offline subkey write as no longer in-flight let subkeys_still_offline = result.work_item.subkeys.difference(&result.written_subkeys); - self.finish_offline_subkey_writes_inner( - &mut inner, + self.finish_offline_subkey_writes( &result.work_item.opaque_record_key, result.written_subkeys, subkeys_still_offline, ); // Keep the list of nodes that returned a value for later reference - Self::process_fanout_results_inner( - &mut inner, + self.process_fanout_results( result.work_item.opaque_record_key, result.fanout_results.into_iter().map(|x| (x.0, x.1)), true, @@ -279,8 +275,7 @@ impl StorageManager { // Ensure nothing is left in-flight when returning even due to an error { - let mut inner = self.inner.lock().await; - inner.offline_subkey_writes.retain(|_, v| { + self.inner.lock().offline_subkey_writes.retain(|_, v| { v.subkeys = v.subkeys.union(&mem::take(&mut v.subkeys_in_flight)); !v.subkeys.is_empty() }); diff --git a/veilid-core/src/storage_manager/tasks/save_metadata.rs b/veilid-core/src/storage_manager/tasks/save_metadata.rs index ed852917..cb3f0550 100644 --- a/veilid-core/src/storage_manager/tasks/save_metadata.rs +++ b/veilid-core/src/storage_manager/tasks/save_metadata.rs @@ -9,8 +9,6 @@ impl StorageManager { _last_ts: Timestamp, _cur_ts: Timestamp, ) -> EyreResult<()> { - let mut inner = self.inner.lock().await; - self.save_metadata_inner(&mut inner).await?; - Ok(()) + self.save_metadata().await } } diff --git a/veilid-core/src/storage_manager/transaction.rs b/veilid-core/src/storage_manager/transaction.rs index 7ade755f..b37d79fc 100644 --- a/veilid-core/src/storage_manager/transaction.rs +++ b/veilid-core/src/storage_manager/transaction.rs @@ -176,16 +176,13 @@ impl StorageManager { // Store transaction results { - let mut inner = self.inner.lock().await; - let inner = &mut *inner; - // Snapshot local valuedata for transaction { - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; + let local_record_store = self.get_local_record_store()?; - let transaction_state = inner + let transaction_state = self + .inner + .lock() .outbound_transaction_manager .get_transaction_state_mut(&transaction_handle)?; @@ -214,8 +211,7 @@ impl StorageManager { } let max_subkey = result.descriptor.schema()?.max_subkey(); - Self::process_fanout_results_inner( - inner, + self.process_fanout_results( result.opaque_record_key.clone(), core::iter::once(( ValueSubkeyRangeSet::single_range(0, max_subkey), @@ -226,7 +222,9 @@ impl StorageManager { ); } - if let Err(e) = inner + if let Err(e) = self + .inner + .lock() .outbound_transaction_manager .record_transact_begin_results(transaction_handle.clone(), results) { diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 9635ca0b..fff0e87b 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -605,9 +605,7 @@ impl StorageManager { veilid_log!(self debug target:"dht", "WatchValue Fanout: {:#}", fanout_result); // Keep the list of nodes that responded for later reference - let mut inner = self.inner.lock().await; - Self::process_fanout_results_inner( - &mut inner, + self.process_fanout_results( record_key.opaque(), core::iter::once((ValueSubkeyRangeSet::new(), fanout_result)), false, @@ -625,8 +623,9 @@ impl StorageManager { ) { let opaque_record_key = watch_lock.tag(); - let mut inner = self.inner.lock().await; - let Some(outbound_watch) = inner + let Some(outbound_watch) = self + .inner + .lock() .outbound_watch_manager .outbound_watches .remove(&opaque_record_key)