[ci skip] storage manager refactor continued

This commit is contained in:
Christien Rioux 2025-11-23 19:02:00 -05:00
parent f03b6c2dfa
commit 1a53feea21
23 changed files with 908 additions and 767 deletions

View file

@ -46,9 +46,7 @@ impl StorageManager {
inner: &mut StorageManagerInner,
record_key: RecordKey,
) -> VeilidAPIResult<()> {
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let local_record_store = self.get_local_record_store()?;
let opaque_record_key = record_key.opaque();
if local_record_store

View file

@ -55,9 +55,7 @@ impl StorageManager {
};
// Get local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let local_record_store = self.get_local_record_store()?;
// Verify the dht schema does not contain the node id
{

View file

@ -1,22 +1,20 @@
use super::*;
impl StorageManager {
pub async fn debug_local_records(&self) -> String {
let inner = self.inner.lock().await;
let Some(local_record_store) = &inner.local_record_store else {
pub fn debug_local_records(&self) -> String {
let Ok(local_record_store) = self.get_local_record_store() else {
return "not initialized".to_owned();
};
local_record_store.debug_records()
}
pub async fn debug_remote_records(&self) -> String {
let inner = self.inner.lock().await;
let Some(remote_record_store) = &inner.remote_record_store else {
pub fn debug_remote_records(&self) -> String {
let Ok(remote_record_store) = self.get_remote_record_store() else {
return "not initialized".to_owned();
};
remote_record_store.debug_records()
}
pub async fn debug_opened_records(&self) -> String {
let inner = self.inner.lock().await;
pub fn debug_opened_records(&self) -> String {
let inner = self.inner.lock();
let mut out = "[\n".to_owned();
for (k, v) in &inner.opened_records {
let writer = if let Some(w) = v.writer() {
@ -33,18 +31,18 @@ impl StorageManager {
}
format!("{}]\n", out)
}
pub async fn debug_watched_records(&self) -> String {
let inner = self.inner.lock().await;
pub fn debug_watched_records(&self) -> String {
let inner = self.inner.lock();
inner.outbound_watch_manager.to_string()
}
pub async fn debug_transactions(&self) -> String {
let inner = self.inner.lock().await;
pub fn debug_transactions(&self) -> String {
let inner = self.inner.lock();
inner.outbound_transaction_manager.to_string()
}
pub async fn debug_offline_records(&self) -> String {
let inner = self.inner.lock().await;
let Some(local_record_store) = &inner.local_record_store else {
pub fn debug_offline_records(&self) -> String {
let inner = self.inner.lock();
let Some(local_record_store) = inner.local_record_store.clone() else {
return "not initialized".to_owned();
};
@ -59,14 +57,14 @@ impl StorageManager {
format!("{}]\n", out)
}
pub async fn purge_local_records(&self, reclaim: Option<u64>) -> String {
let mut inner = self.inner.lock().await;
pub fn purge_local_records(&self, reclaim: Option<u64>) -> String {
let mut inner = self.inner.lock();
let Some(local_record_store) = inner.local_record_store.clone() else {
return "not initialized".to_owned();
};
if !inner.opened_records.is_empty() {
return "records still opened".to_owned();
}
let Some(local_record_store) = &mut inner.local_record_store else {
return "not initialized".to_owned();
};
let (reclaimed, total) = local_record_store
.reclaim_space(reclaim.unwrap_or(u64::MAX))
.await;
@ -77,13 +75,13 @@ impl StorageManager {
)
}
pub async fn purge_remote_records(&self, reclaim: Option<u64>) -> String {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
let Some(remote_record_store) = inner.remote_record_store.clone() else {
return "not initialized".to_owned();
};
if !inner.opened_records.is_empty() {
return "records still opened".to_owned();
}
let Some(remote_record_store) = &mut inner.remote_record_store else {
return "not initialized".to_owned();
};
let (reclaimed, total) = remote_record_store
.reclaim_space(reclaim.unwrap_or(u64::MAX))
.await;
@ -98,8 +96,8 @@ impl StorageManager {
record_key: RecordKey,
subkey: ValueSubkey,
) -> String {
let inner = self.inner.lock().await;
let Some(local_record_store) = &inner.local_record_store else {
let inner = self.inner.lock();
let Some(local_record_store) = inner.local_record_store.clone() else {
return "not initialized".to_owned();
};
let opaque_record_key = record_key.opaque();
@ -112,8 +110,8 @@ impl StorageManager {
record_key: RecordKey,
subkey: ValueSubkey,
) -> String {
let inner = self.inner.lock().await;
let Some(remote_record_store) = &inner.remote_record_store else {
let inner = self.inner.lock();
let Some(remote_record_store) = inner.remote_record_store.clone() else {
return "not initialized".to_owned();
};
let opaque_record_key = record_key.opaque();
@ -122,8 +120,8 @@ impl StorageManager {
.await
}
pub async fn debug_local_record_info(&self, record_key: RecordKey) -> String {
let inner = self.inner.lock().await;
let Some(local_record_store) = &inner.local_record_store else {
let inner = self.inner.lock();
let Some(local_record_store) = inner.local_record_store.clone() else {
return "not initialized".to_owned();
};
let opaque_record_key = record_key.opaque();
@ -140,8 +138,8 @@ impl StorageManager {
}
pub async fn debug_remote_record_info(&self, record_key: RecordKey) -> String {
let inner = self.inner.lock().await;
let Some(remote_record_store) = &inner.remote_record_store else {
let inner = self.inner.lock();
let Some(remote_record_store) = inner.remote_record_store.clone() else {
return "not initialized".to_owned();
};
let opaque_record_key = record_key.opaque();

View file

@ -13,10 +13,7 @@ impl StorageManager {
Self::close_record_inner(&mut inner, record_key.clone())?;
// Get record from the local store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let local_record_store = self.get_local_record_store()?;
let opaque_record_key = record_key.opaque();
// Remove the record from the local store
local_record_store.delete_record(opaque_record_key).await

View file

@ -45,9 +45,13 @@ impl StorageManager {
let opaque_record_key = record_key.opaque();
let mut inner = self.inner.lock().await;
let subkey_lock = self
.record_lock_table
.lock_subkey(opaque_record_key, subkey)
.await;
let safety_selection = {
let inner = self.inner.lock();
let Some(opened_record) = inner.opened_records.get(&opaque_record_key) else {
apibail_generic!("record not open");
};
@ -55,9 +59,7 @@ impl StorageManager {
};
// See if the requested subkey is our local record store
let last_get_result = self
.handle_get_local_value_inner(&mut inner, &opaque_record_key, subkey, true)
.await?;
let last_get_result = self.handle_get_local_value(&subkey_lock, true).await?;
// Return the existing value if we have one unless we are forcing a refresh
if !force_refresh {
@ -81,9 +83,6 @@ impl StorageManager {
apibail_try_again!("offline, try again later");
};
// Drop the lock for network access
drop(inner);
// May have last descriptor / value
// Use the safety selection we opened the record with
let last_seq = last_get_result
@ -479,7 +478,10 @@ impl StorageManager {
}
};
let is_incomplete = result.fanout_result.kind.is_incomplete();
let value_data = match this.process_outbound_get_value_result(&key.opaque(), subkey, last_seq, result).await {
let subkey_lock = this.record_lock_table.lock_subkey(key.opaque(), subkey).await;
let value_data = match this.process_outbound_get_value_result(&subkey_lock, last_seq, result).await {
Ok(Some(v)) => v,
Ok(None) => {
return is_incomplete;
@ -524,8 +526,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all)]
pub(super) async fn process_outbound_get_value_result(
&self,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
subkey_lock: &SubkeyLockGuard,
last_seq: ValueSeqNum,
result: get_value::OutboundGetValueResult,
) -> Result<Option<EncryptedValueData>, VeilidAPIError> {
@ -536,12 +537,12 @@ impl StorageManager {
};
// Keep the list of nodes that returned a value for later reference
let mut inner = self.inner.lock().await;
Self::process_fanout_results_inner(
&mut inner,
opaque_record_key.clone(),
core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)),
self.process_fanout_results(
subkey_lock.record(),
core::iter::once((
ValueSubkeyRangeSet::single(subkey_lock.subkey()),
result.fanout_result,
)),
false,
self.config().network.dht.consensus_width as usize,
);
@ -549,14 +550,10 @@ impl StorageManager {
// If we got a new value back then write it to the opened record
if get_result_value.value_data().seq() != last_seq {
let subkey_transaction_changes = self
.handle_set_local_values_single_inner(
&mut inner,
opaque_record_key,
vec![(subkey, get_result_value.clone())],
)
.handle_set_local_value_single(&subkey_lock, get_result_value.clone())
.await?;
self.handle_commit_local_values_inner(&mut inner, subkey_transaction_changes)
self.handle_commit_local_values(subkey_transaction_changes)
.await?;
}
Ok(Some(get_result_value.value_data().clone()))

View file

@ -148,7 +148,6 @@ impl StorageManager {
.await?;
// Keep the list of nodes that returned a value for later reference
let mut inner = self.inner.lock().await;
let results_iter = result
.inspect_result
.subkeys()
@ -156,8 +155,7 @@ impl StorageManager {
.map(ValueSubkeyRangeSet::single)
.zip(result.subkey_fanout_results.into_iter());
Self::process_fanout_results_inner(
&mut inner,
self.process_fanout_results(
opaque_record_key.clone(),
results_iter,
false,

View file

@ -2,30 +2,27 @@ use super::*;
impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_get_local_value_inner(
pub(super) async fn handle_get_local_value(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
subkey_lock: &SubkeyLockGuard,
want_descriptor: bool,
) -> VeilidAPIResult<GetResult> {
let opaque_record_key = subkey_lock.record();
let subkey = subkey_lock.subkey();
let local_record_store = self.get_local_record_store()?;
// See if the value is in the offline subkey writes first,
// since it may not have been committed yet to the local record store
if let Some(get_result) = self.get_offline_subkey_writes_subkey(
inner,
opaque_record_key,
subkey,
want_descriptor,
)? {
if let Some(get_result) =
self.get_offline_subkey_writes_subkey(&opaque_record_key, subkey, want_descriptor)?
{
return Ok(get_result);
}
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(get_result) = local_record_store
.get_subkey(opaque_record_key, subkey, want_descriptor)
.get_subkey(&opaque_record_key, subkey, want_descriptor)
.await?
{
return Ok(get_result);
@ -35,60 +32,72 @@ impl StorageManager {
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_local_values_single_inner(
pub(super) async fn handle_set_local_value_single(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
subkey_lock: &SubkeyLockGuard,
value: Arc<SignedValueData>,
) -> VeilidAPIResult<SubkeyTransactionChanges> {
let opaque_record_key = subkey_lock.record();
let subkey = subkey_lock.subkey();
// Write subkey to local store
let local_record_store = self.get_local_record_store()?;
local_record_store
.set_subkeys_single_record(&opaque_record_key, vec![(subkey, value)])
.await
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_local_values_single(
&self,
records_lock: &RecordsLockGuard,
subkeys: SubkeyValueList,
) -> VeilidAPIResult<SubkeyTransactionChanges> {
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let opaque_record_key = records_lock.single_record()?;
// Write subkey to local store
let local_record_store = self.get_local_record_store()?;
local_record_store
.set_subkeys_single_record(opaque_record_key, subkeys.clone())
.set_subkeys_single_record(&opaque_record_key, subkeys)
.await
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_local_values_multiple_inner(
pub(super) async fn handle_set_local_values_multiple(
&self,
inner: &mut StorageManagerInner,
records_lock: &RecordsLockGuard,
keys_and_subkeys: RecordSubkeyValueList,
) -> VeilidAPIResult<SubkeyTransactionChanges> {
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let records = records_lock.records().into_iter().collect::<BTreeSet<_>>();
for x in keys_and_subkeys.iter() {
if !records.contains(&x.0) {
apibail_internal!("invalid records lock")
}
}
// Write subkey to local store
let local_record_store = self.get_local_record_store()?;
local_record_store
.set_subkeys_multiple_records(keys_and_subkeys.clone())
.set_subkeys_multiple_records(keys_and_subkeys)
.await
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_commit_local_values_inner(
pub(super) async fn handle_commit_local_values(
&self,
inner: &mut StorageManagerInner,
subkey_transaction_changes: SubkeyTransactionChanges,
) -> VeilidAPIResult<()> {
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let local_record_store = self.get_local_record_store()?;
let record_subkey_value_list = local_record_store
.commit_subkeys_tx(subkey_transaction_changes, InboundWatchUpdateMode::NoUpdate)
.await?;
// See if this new data supercedes any offline subkey writes
let mut inner = self.inner.lock();
for (opaque_record_key, subkey_value_list) in record_subkey_value_list {
for (subkey, signed_value_data) in subkey_value_list {
self.remove_old_offline_subkey_writes_inner(
inner,
&mut inner,
&opaque_record_key,
subkey,
signed_value_data,
@ -100,19 +109,19 @@ impl StorageManager {
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_inspect_local_value_inner(
pub(super) async fn handle_inspect_local_value(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
records_lock: &RecordsLockGuard,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<InspectResult> {
let opaque_record_key = records_lock.single_record()?;
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let local_record_store = self.get_local_record_store()?;
if let Some(inspect_result) = local_record_store
.inspect_record(opaque_record_key, &subkeys, want_descriptor)
.inspect_record(&opaque_record_key, &subkeys, want_descriptor)
.await?
{
return Ok(inspect_result);
@ -124,13 +133,13 @@ impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn get_value_nodes(
&self,
records_lock: &RecordsLockGuard,
opaque_record_key: &OpaqueRecordKey,
) -> VeilidAPIResult<Option<Vec<NodeRef>>> {
let inner = self.inner.lock().await;
// xxx is this the right records lock??
// Get local record store
let Some(local_record_store) = inner.local_record_store.as_ref() else {
apibail_not_initialized!();
};
let local_record_store = self.get_local_record_store()?;
// Get routing table to see if we still know about these nodes
let routing_table = self.routing_table();

View file

@ -102,8 +102,8 @@ const OFFLINE_SUBKEY_WRITES: &[u8] = b"offline_subkey_writes";
const OUTBOUND_WATCH_MANAGER: &[u8] = b"outbound_watch_manager";
/// Rehydration requests metadata key name for rehydration persistence
const REHYDRATION_REQUESTS: &[u8] = b"rehydration_requests";
/// SetValue descriptor cache metadata key name for persistence
const SET_VALUE_DESCRIPTOR_CACHE: &[u8] = b"set_value_descriptor_cace";
/// Descriptor cache metadata key name for persistence
const DESCRIPTOR_CACHE: &[u8] = b"descriptor_cache";
#[derive(Debug, Clone)]
/// A single 'value changed' message to send
@ -175,7 +175,7 @@ impl fmt::Debug for StorageManagerInner {
pub(crate) struct StorageManager {
registry: VeilidComponentRegistry,
inner: AsyncMutex<StorageManagerInner>,
inner: Mutex<StorageManagerInner>,
startup_lock: Arc<StartupLock>,
// Background processes
@ -192,14 +192,9 @@ pub(crate) struct StorageManager {
// Anonymous watch keys that will be used when watching or transacting on records or we opened without a writer
anonymous_signing_keys: KeyPairGroup,
// Outbound watch operation lock
// Keeps changes to watches to one-at-a-time per record
outbound_watch_lock_table: RecordLockTable,
// Outbound transaction record locks
// Allow begin/commit/rollback/inspect/sync to be exclusive, and set/get to be per subkey
// Prevents concurrent conflicting operations on the same record and/or subkey
outbound_transaction_lock_table: RecordLockTable,
// Record operation lock
// Keeps changes to records to one-at-a-time per record
record_lock_table: RecordLockTable,
// Background operation processor
// for offline subkey writes, watch changes, and any other
@ -218,27 +213,12 @@ impl fmt::Debug for StorageManager {
f.debug_struct("StorageManager")
.field("registry", &self.registry)
.field("inner", &self.inner)
// .field("flush_record_stores_task", &self.flush_record_stores_task)
// .field(
// "offline_subkey_writes_task",
// &self.offline_subkey_writes_task,
// )
// .field("send_value_changes_task", &self.send_value_changes_task)
// .field("check_active_watches_task", &self.check_active_watches_task)
// .field(
// "check_watched_records_task",
// &self.check_watched_records_task,
// )
.field("outbound_watch_lock_table", &self.outbound_watch_lock_table)
.field(
"outbound_transaction_lock_table",
&self.outbound_transaction_lock_table,
)
.field("record_lock_table", &self.record_lock_table)
.field(
"background_operation_processor",
&self.background_operation_processor,
)
.field("anonymous_watch_keys", &self.anonymous_signing_keys)
.field("anonymous_signing_keys", &self.anonymous_signing_keys)
.field("is_online", &self.is_online)
.field("descriptor_cache", &self.descriptor_cache)
.finish()
@ -248,10 +228,6 @@ impl fmt::Debug for StorageManager {
impl_veilid_component!(StorageManager);
impl StorageManager {
fn new_inner() -> StorageManagerInner {
StorageManagerInner::default()
}
pub fn new(registry: VeilidComponentRegistry) -> StorageManager {
let crypto = registry.crypto();
@ -263,10 +239,9 @@ impl StorageManager {
anonymous_signing_keys.add(kp);
}
let inner = Self::new_inner();
let this = StorageManager {
registry,
inner: AsyncMutex::new(inner),
inner: Default::default(),
startup_lock: Arc::new(StartupLock::new()),
save_metadata_task: TickTask::new("save_metadata_task", SAVE_METADATA_INTERVAL_SECS),
@ -302,8 +277,7 @@ impl StorageManager {
"rehydrate_records_task",
REHYDRATE_RECORDS_INTERVAL_SECS,
),
outbound_watch_lock_table: RecordLockTable::new(),
outbound_transaction_lock_table: RecordLockTable::new(),
record_lock_table: RecordLockTable::new(),
anonymous_signing_keys,
background_operation_processor: DeferredStreamProcessor::new(),
is_online: AtomicBool::new(false),
@ -377,13 +351,14 @@ impl StorageManager {
RecordStore::try_new(&table_store, "remote", remote_limits).await?;
{
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
inner.metadata_db = Some(metadata_db);
inner.local_record_store = Some(local_record_store);
inner.remote_record_store = Some(remote_record_store);
self.load_metadata_inner(&mut inner).await?;
}
self.load_metadata().await?;
// Start deferred results processors
self.background_operation_processor.init();
@ -399,7 +374,7 @@ impl StorageManager {
impl_subscribe_event_bus_async!(self, Self, peer_info_change_event_handler);
let tick_subscription = impl_subscribe_event_bus_async!(self, Self, tick_event_handler);
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
// Resolve outbound watch manager noderefs
inner.outbound_watch_manager.prepare(&self.routing_table());
@ -420,7 +395,7 @@ impl StorageManager {
async fn pre_terminate_async(&self) {
// Stop background operations
{
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
if let Some(sub) = inner.peer_info_change_subscription.take() {
self.event_bus().unsubscribe(sub);
}
@ -448,28 +423,30 @@ impl StorageManager {
self.background_operation_processor.terminate().await;
// Terminate and release the storage manager
let (opt_local_record_store, opt_remote_record_store) = {
let mut inner = self.inner.lock();
let opt_local_record_store = inner.local_record_store.take();
let opt_remote_record_store = inner.remote_record_store.take();
(opt_local_record_store, opt_remote_record_store)
};
// Final flush on record stores
if let Some(local_record_store) = opt_local_record_store {
local_record_store.flush().await;
}
if let Some(remote_record_store) = opt_remote_record_store {
remote_record_store.flush().await;
}
// Save metadata
if let Err(e) = self.save_metadata().await {
veilid_log!(self error "termination metadata save failed: {}", e);
}
// Reset inner state
{
let mut inner = self.inner.lock().await;
// Final flush on record stores
if let Some(mut local_record_store) = inner.local_record_store.take() {
if let Err(e) = local_record_store.flush().await {
veilid_log!(self error "termination local record store tick failed: {}", e);
}
}
if let Some(mut remote_record_store) = inner.remote_record_store.take() {
if let Err(e) = remote_record_store.flush().await {
veilid_log!(self error "termination remote record store tick failed: {}", e);
}
}
// Save metadata
if let Err(e) = self.save_metadata_inner(&mut inner).await {
veilid_log!(self error "termination metadata save failed: {}", e);
}
// Reset inner state
*inner = Self::new_inner();
let mut inner = self.inner.lock();
*inner = Default::default();
}
guard.success();
@ -477,102 +454,153 @@ impl StorageManager {
veilid_log!(self debug "finished storage manager shutdown");
}
async fn save_metadata_inner(&self, inner: &mut StorageManagerInner) -> EyreResult<()> {
if let Some(metadata_db) = &inner.metadata_db {
let tx = metadata_db.transact();
let set_value_descriptor_cache = self
async fn save_metadata(&self) -> EyreResult<()> {
let (
metadata_db,
offline_subkey_writes_json,
outbound_watch_manager_json,
rehydration_requests_json,
descriptor_cache_json,
) = {
let descriptor_cache = self
.descriptor_cache
.lock()
.iter()
.map(|x| x.0.clone())
.collect::<Vec<DescriptorCacheKey>>();
tx.store_json(0, OFFLINE_SUBKEY_WRITES, &inner.offline_subkey_writes)
.await?;
tx.store_json(0, OUTBOUND_WATCH_MANAGER, &inner.outbound_watch_manager)
.await?;
tx.store_json(0, REHYDRATION_REQUESTS, &inner.rehydration_requests)
.await?;
tx.store_json(0, SET_VALUE_DESCRIPTOR_CACHE, &set_value_descriptor_cache)
.await?;
let inner = self.inner.lock();
let Some(metadata_db) = inner.metadata_db.clone() else {
return Ok(());
};
let offline_subkey_writes_json = serde_json::to_vec(&inner.offline_subkey_writes)
.map_err(VeilidAPIError::internal)?;
let outbound_watch_manager_json = serde_json::to_vec(&inner.outbound_watch_manager)
.map_err(VeilidAPIError::internal)?;
let rehydration_requests_json = serde_json::to_vec(&inner.rehydration_requests)
.map_err(VeilidAPIError::internal)?;
let descriptor_cache_json =
serde_json::to_vec(&descriptor_cache).map_err(VeilidAPIError::internal)?;
(
metadata_db,
offline_subkey_writes_json,
outbound_watch_manager_json,
rehydration_requests_json,
descriptor_cache_json,
)
};
let tx = metadata_db.transact();
tx.store(0, OFFLINE_SUBKEY_WRITES, &offline_subkey_writes_json)
.await?;
tx.store(0, OUTBOUND_WATCH_MANAGER, &outbound_watch_manager_json)
.await?;
tx.store(0, REHYDRATION_REQUESTS, &rehydration_requests_json)
.await?;
tx.store(0, DESCRIPTOR_CACHE, &descriptor_cache_json)
.await?;
tx.commit().await.wrap_err("failed to commit")?;
tx.commit().await.wrap_err("failed to commit")?
}
Ok(())
}
async fn load_metadata_inner(&self, inner: &mut StorageManagerInner) -> EyreResult<()> {
if let Some(metadata_db) = &inner.metadata_db {
inner.offline_subkey_writes = match metadata_db
.load_json(0, OFFLINE_SUBKEY_WRITES)
.await
{
Ok(v) => v.unwrap_or_default(),
Err(_) => {
if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await {
veilid_log!(self debug "offline_subkey_writes format changed, clearing: {}", e);
}
Default::default()
}
};
inner.outbound_watch_manager = match metadata_db
.load_json(0, OUTBOUND_WATCH_MANAGER)
.await
{
Ok(v) => v.unwrap_or_default(),
Err(_) => {
if let Err(e) = metadata_db.delete(0, OUTBOUND_WATCH_MANAGER).await {
veilid_log!(self debug "outbound_watch_manager format changed, clearing: {}", e);
}
Default::default()
}
};
async fn load_metadata(&self) -> EyreResult<()> {
let Some(metadata_db) = self.inner.lock().metadata_db.clone() else {
bail!("metadata db should exist");
};
inner.rehydration_requests = match metadata_db.load_json(0, REHYDRATION_REQUESTS).await
{
Ok(v) => v.unwrap_or_default(),
Err(_) => {
if let Err(e) = metadata_db.delete(0, REHYDRATION_REQUESTS).await {
veilid_log!(self debug "rehydration_requests format changed, clearing: {}", e);
}
Default::default()
let offline_subkey_writes = match metadata_db.load_json(0, OFFLINE_SUBKEY_WRITES).await {
Ok(v) => v.unwrap_or_default(),
Err(_) => {
if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await {
veilid_log!(self debug "offline_subkey_writes format changed, clearing: {}", e);
}
};
let set_value_descriptor_cache_keys = match metadata_db
.load_json::<Vec<DescriptorCacheKey>>(0, SET_VALUE_DESCRIPTOR_CACHE)
.await
{
Ok(v) => v.unwrap_or_default(),
Err(_) => {
if let Err(e) = metadata_db.delete(0, SET_VALUE_DESCRIPTOR_CACHE).await {
veilid_log!(self debug "set_value_descriptor_cache format changed, clearing: {}", e);
}
Default::default()
Default::default()
}
};
let outbound_watch_manager = match metadata_db.load_json(0, OUTBOUND_WATCH_MANAGER).await {
Ok(v) => v.unwrap_or_default(),
Err(_) => {
if let Err(e) = metadata_db.delete(0, OUTBOUND_WATCH_MANAGER).await {
veilid_log!(self debug "outbound_watch_manager format changed, clearing: {}", e);
}
};
{
let mut set_value_descriptor_cache = self.descriptor_cache.lock();
set_value_descriptor_cache.clear();
for k in set_value_descriptor_cache_keys {
set_value_descriptor_cache.insert(k, ());
Default::default()
}
};
let rehydration_requests = match metadata_db.load_json(0, REHYDRATION_REQUESTS).await {
Ok(v) => v.unwrap_or_default(),
Err(_) => {
if let Err(e) = metadata_db.delete(0, REHYDRATION_REQUESTS).await {
veilid_log!(self debug "rehydration_requests format changed, clearing: {}", e);
}
Default::default()
}
};
let descriptor_cache_keys = match metadata_db
.load_json::<Vec<DescriptorCacheKey>>(0, DESCRIPTOR_CACHE)
.await
{
Ok(v) => v.unwrap_or_default(),
Err(_) => {
if let Err(e) = metadata_db.delete(0, DESCRIPTOR_CACHE).await {
veilid_log!(self debug "descriptor_cache format changed, clearing: {}", e);
}
Default::default()
}
};
{
let mut inner = self.inner.lock();
inner.offline_subkey_writes = offline_subkey_writes;
inner.outbound_watch_manager = outbound_watch_manager;
inner.rehydration_requests = rehydration_requests;
}
{
let mut descriptor_cache = self.descriptor_cache.lock();
descriptor_cache.clear();
for k in descriptor_cache_keys {
descriptor_cache.insert(k, ());
}
}
Ok(())
}
async fn has_offline_subkey_writes(&self) -> bool {
!self.inner.lock().await.offline_subkey_writes.is_empty()
fn has_offline_subkey_writes(&self) -> bool {
!self.inner.lock().offline_subkey_writes.is_empty()
}
async fn has_rehydration_requests(&self) -> bool {
!self.inner.lock().await.rehydration_requests.is_empty()
fn has_rehydration_requests(&self) -> bool {
!self.inner.lock().rehydration_requests.is_empty()
}
fn dht_is_online(&self) -> bool {
self.is_online.load(Ordering::Acquire)
}
fn get_local_record_store(&self) -> VeilidAPIResult<RecordStore<LocalRecordDetail>> {
self.inner
.lock()
.local_record_store
.as_ref()
.cloned()
.ok_or_else(VeilidAPIError::not_initialized)
}
fn get_remote_record_store(&self) -> VeilidAPIResult<RecordStore<RemoteRecordDetail>> {
self.inner
.lock()
.remote_record_store
.as_ref()
.cloned()
.ok_or_else(VeilidAPIError::not_initialized)
}
// Send a value change up through the callback
#[instrument(level = "trace", target = "stor", skip(self, value))]
fn update_callback_value_change(
@ -637,27 +665,23 @@ impl StorageManager {
////////////////////////////////////////////////////////////////////////
#[instrument(level = "trace", target = "stor", skip_all)]
fn process_fanout_results_inner<I: IntoIterator<Item = (ValueSubkeyRangeSet, FanoutResult)>>(
inner: &mut StorageManagerInner,
fn process_fanout_results<I: IntoIterator<Item = (ValueSubkeyRangeSet, FanoutResult)>>(
&self,
opaque_record_key: OpaqueRecordKey,
subkey_results_iter: I,
is_set: bool,
consensus_width: usize,
) {
// Get local record store
let local_record_store = inner.local_record_store.as_mut().unwrap();
) -> VeilidAPIResult<()> {
let cur_ts = Timestamp::now();
local_record_store.with_record_mut(&opaque_record_key, |r| {
let d = r.detail_mut();
let local_record_store = self.get_local_record_store()?;
local_record_store.with_record_detail_mut(&opaque_record_key, |detail| {
for (subkeys, fanout_result) in subkey_results_iter {
for node_id in fanout_result
.value_nodes
.iter()
.filter_map(|x| x.node_ids().get(opaque_record_key.kind()))
{
let pnd = d.nodes.entry(node_id).or_default();
let pnd = detail.nodes.entry(node_id).or_default();
if is_set || pnd.last_set == Timestamp::default() {
pnd.last_set = cur_ts;
}
@ -667,7 +691,7 @@ impl StorageManager {
}
// Purge nodes down to the N most recently seen, where N is the consensus width
let mut nodes_ts = d
let mut nodes_ts = detail
.nodes
.iter()
.map(|kv| (kv.0.clone(), kv.1.last_seen))
@ -690,9 +714,11 @@ impl StorageManager {
});
for dead_node_key in nodes_ts.iter().skip(consensus_width) {
d.nodes.remove(&dead_node_key.0);
detail.nodes.remove(&dead_node_key.0);
}
});
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all)]

View file

@ -48,19 +48,21 @@ impl StorageManager {
pub(super) fn get_offline_subkey_writes_subkey(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<Option<GetResult>> {
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let Some(osw) = inner.offline_subkey_writes.get(opaque_record_key) else {
return Ok(None);
};
let Some(signed_value_data) = osw.subkey_value_data.get(&subkey).cloned() else {
return Ok(None);
let local_record_store = self.get_local_record_store()?;
let signed_value_data = {
let inner = self.inner.lock();
let Some(osw) = inner.offline_subkey_writes.get(opaque_record_key) else {
return Ok(None);
};
let Some(signed_value_data) = osw.subkey_value_data.get(&subkey).cloned() else {
return Ok(None);
};
signed_value_data
};
let opt_descriptor = if want_descriptor {
if let Some(descriptor) = local_record_store
@ -141,9 +143,8 @@ impl StorageManager {
/// so we can try again later. If the data associated with the write is no longer necessary
/// we can drop it.
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn finish_offline_subkey_writes_inner(
pub(super) fn finish_offline_subkey_writes(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
subkeys_written: ValueSubkeyRangeSet,
subkeys_still_offline: ValueSubkeyRangeSet,
@ -154,7 +155,12 @@ impl StorageManager {
);
// Get the offline subkey write record
match inner.offline_subkey_writes.entry(opaque_record_key.clone()) {
match self
.inner
.lock()
.offline_subkey_writes
.entry(opaque_record_key.clone())
{
hashlink::linked_hash_map::Entry::Occupied(mut o) => {
let finished = {
let osw = o.get_mut();

View file

@ -12,22 +12,22 @@ impl StorageManager {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
let mut inner = self.inner.lock().await;
let opaque_record_key = record_key.opaque();
let record_lock = self
.record_lock_table
.lock_record(opaque_record_key.clone())
.await;
// See if we have a local record already or not
if let Some(res) = self
.open_existing_record_inner(
&mut inner,
.open_existing_record_locked(
&record_lock,
record_key.clone(),
writer.clone(),
safety_selection.clone(),
)
.await?
{
drop(inner);
// We had an existing record, so check the network to see if we should
// update it with what we have here
let set_consensus = self.config().network.dht.set_value_count as usize;
@ -36,8 +36,7 @@ impl StorageManager {
opaque_record_key,
ValueSubkeyRangeSet::full(),
set_consensus,
)
.await;
);
return Ok(res);
}
@ -47,9 +46,6 @@ impl StorageManager {
apibail_try_again!("offline, try again later");
};
// Drop the mutex so we dont block during network access
drop(inner);
// No last descriptor, no last value
// Use the safety selection we opened the record with
let result = self
@ -71,11 +67,9 @@ impl StorageManager {
// Check again to see if we have a local record already or not
// because waiting for the outbound_inspect_value action could result in the key being opened
// via some parallel process
let mut inner = self.inner.lock().await;
if let Some(res) = self
.open_existing_record_inner(
&mut inner,
.open_existing_record_locked(
&record_lock,
record_key.clone(),
writer.clone(),
safety_selection.clone(),
@ -89,8 +83,8 @@ impl StorageManager {
}
// Open the new record
self.open_new_record_inner(
&mut inner,
self.open_new_record_locked(
&record_lock,
record_key,
writer,
result.inspect_result,
@ -102,35 +96,38 @@ impl StorageManager {
////////////////////////////////////////////////////////////////////////
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn open_existing_record_inner(
pub(super) async fn open_existing_record_locked(
&self,
inner: &mut StorageManagerInner,
record_lock: &RecordsLockGuard,
record_key: RecordKey,
writer: Option<KeyPair>,
safety_selection: SafetySelection,
) -> VeilidAPIResult<Option<DHTRecordDescriptor>> {
let opaque_record_key = record_lock.single_record()?;
if record_key.opaque() != opaque_record_key {
apibail_internal!("wrong record lock");
}
// Get local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let local_record_store = self.get_local_record_store()?;
// See if we have a local record already or not
let cb = |r: &mut Record<LocalRecordDetail>| {
let cb = |descriptor: Arc<SignedValueDescriptor>, r: &mut LocalRecordDetail| {
// Process local record
// Keep the safety selection we opened the record with
r.detail_mut().safety_selection = safety_selection.clone();
r.safety_selection = safety_selection.clone();
// Return record details
(r.owner(), r.schema())
};
let opaque_record_key = record_key.opaque();
let (owner, schema) = match local_record_store.with_record_mut(&opaque_record_key, cb) {
Some(v) => v,
None => {
return Ok(None);
}
(descriptor.owner(), descriptor.schema().unwrap())
};
let (owner, schema) =
match local_record_store.with_record_detail_mut(&opaque_record_key, cb) {
Some(v) => v,
None => {
return Ok(None);
}
};
// Had local record
// If the writer we chose is also the owner, we have the owner secret
@ -157,7 +154,8 @@ impl StorageManager {
}
// Write open record
inner
self.inner
.lock()
.opened_records
.entry(opaque_record_key)
.and_modify(|e| {
@ -179,17 +177,28 @@ impl StorageManager {
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn open_new_record_inner(
pub(super) async fn open_new_record_locked(
&self,
inner: &mut StorageManagerInner,
records_lock: &RecordsLockGuard,
record_key: RecordKey,
writer: Option<KeyPair>,
inspect_result: InspectResult,
safety_selection: SafetySelection,
) -> VeilidAPIResult<DHTRecordDescriptor> {
let opaque_record_key = records_lock.single_record()?;
if record_key.opaque() != opaque_record_key {
apibail_internal!("wrong record lock");
}
let local_record_store = self.get_local_record_store()?;
// Ensure the record is closed
let opaque_record_key = record_key.opaque();
if inner.opened_records.contains_key(&opaque_record_key) {
if self
.inner
.lock()
.opened_records
.contains_key(&opaque_record_key)
{
panic!("new record should never be opened at this point");
}
@ -214,11 +223,6 @@ impl StorageManager {
};
let schema = signed_value_descriptor.schema()?;
// Get local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Make and store a new record for this descriptor
let record = Record::<LocalRecordDetail>::new(
Timestamp::now(),
@ -233,7 +237,7 @@ impl StorageManager {
let encryption_key = record_key.ref_value().encryption_key();
// Write open record
inner.opened_records.insert(
self.inner.lock().opened_records.insert(
opaque_record_key,
OpenedRecord::new(writer, safety_selection, encryption_key),
);

View file

@ -3,12 +3,11 @@ use super::*;
impl StorageManager {
/// Get the encryption key for an opened OpaqueRecordKey
/// Opaque record keys must have been opened with their full record key in order to be read
pub(super) async fn get_encryption_key_for_opaque_record_key(
pub(super) fn get_encryption_key_for_opaque_record_key(
&self,
opaque_record_key: &OpaqueRecordKey,
) -> VeilidAPIResult<Option<BareSharedSecret>> {
let inner = self.inner.lock().await;
let inner = self.inner.lock();
let Some(opened_record) = inner.opened_records.get(opaque_record_key) else {
apibail_generic!("decrypt_value_data: opened_records does not contain an expected key");
};

View file

@ -3,38 +3,58 @@ use weak_table::WeakValueHashMap;
use super::*;
pub struct RecordsLockGuard {
_record_locks: Vec<Arc<RecordLock>>,
record_locks: Vec<Arc<RecordLock>>,
_whole_record_lock_guards: Vec<AsyncRwLockWriteGuardArc<()>>,
}
impl RecordsLockGuard {
pub fn records(&self) -> Vec<OpaqueRecordKey> {
self.record_locks.iter().map(|x| x.record()).collect()
}
pub fn single_record(&self) -> VeilidAPIResult<OpaqueRecordKey> {
if self.record_locks.len() != 1 {
apibail_internal!("invalid record count");
}
Ok(self.record_locks.first().cloned().unwrap())
}
}
pub struct SubkeyLockGuard {
record_lock: Arc<RecordLock>,
_whole_record_lock_guard: AsyncRwLockReadGuardArc<()>,
_subkey_lock_guard: AsyncMutexGuardArc<()>,
subkey: ValueSubkey,
}
impl Drop for SubkeyLockGuard {
fn drop(&mut self) {
self.record_lock
.subkey_lock_table
.lock()
.remove(self.subkey);
impl SubkeyLockGuard {
pub fn record(&self) -> OpaqueRecordKey {
self.record_lock.record()
}
pub fn subkey(&self) -> ValueSubkey {
self.subkey
}
}
#[derive(Debug)]
struct RecordLock {
pub whole_record_lock: Arc<AsyncRwLock<()>>,
pub subkey_lock_table: Mutex<ValueSubkeyRangeSet>,
whole_record_lock: Arc<AsyncRwLock<()>>,
subkey_lock_table: Mutex<WeakValueHashMap<ValueSubkey, Weak<AsyncMutex<()>>>>,
record: OpaqueRecordKey,
}
impl RecordLock {
pub fn new() -> Self {
pub fn new(record: OpaqueRecordKey) -> Self {
Self {
whole_record_lock: Arc::new(AsyncRwLock::new(())),
subkey_lock_table: Mutex::new(ValueSubkeyRangeSet::new()),
subkey_lock_table: Mutex::new(WeakValueHashMap::new()),
record,
}
}
pub fn record(&self) -> OpaqueRecordKey {
self.record.clone()
}
}
#[derive(Debug)]
@ -62,8 +82,8 @@ impl RecordLockTable {
let mut inner = self.inner.lock();
let record_lock = inner
.record_lock_table
.entry(record)
.or_insert_with(|| Arc::new(RecordLock::new()));
.entry(record.clone())
.or_insert_with(|| Arc::new(RecordLock::new(record.clone())));
inner.record_lock_table.remove_expired();
record_lock
};
@ -72,7 +92,7 @@ impl RecordLockTable {
let whole_record_lock_guard = record_lock.whole_record_lock.write_arc().await;
RecordsLockGuard {
_record_locks: vec![record_lock],
record_locks: vec![record_lock],
_whole_record_lock_guards: vec![whole_record_lock_guard],
}
}
@ -88,8 +108,8 @@ impl RecordLockTable {
.map(|record| {
inner
.record_lock_table
.entry(record)
.or_insert_with(|| Arc::new(RecordLock::new()))
.entry(record.clone())
.or_insert_with(|| Arc::new(RecordLock::new(record.clone())))
})
.collect::<Vec<_>>();
inner.record_lock_table.remove_expired();
@ -104,7 +124,7 @@ impl RecordLockTable {
}
RecordsLockGuard {
_record_locks: record_locks,
record_locks,
_whole_record_lock_guards: whole_record_lock_guards,
}
}
@ -115,8 +135,8 @@ impl RecordLockTable {
let mut inner = self.inner.lock();
let record_lock = inner
.record_lock_table
.entry(record)
.or_insert_with(|| Arc::new(RecordLock::new()));
.entry(record.clone())
.or_insert_with(|| Arc::new(RecordLock::new(record.clone())));
inner.record_lock_table.remove_expired();
record_lock
};
@ -124,10 +144,10 @@ impl RecordLockTable {
// Wait on each lock to complete in order
let whole_record_lock_guard = record_lock.whole_record_lock.try_write_arc()?;
RecordsLockGuard {
_record_locks: vec![record_lock],
Some(RecordsLockGuard {
record_locks: vec![record_lock],
_whole_record_lock_guards: vec![whole_record_lock_guard],
}
})
}
pub fn try_lock_records(&self, mut records: Vec<OpaqueRecordKey>) -> Option<RecordsLockGuard> {
@ -141,8 +161,8 @@ impl RecordLockTable {
.map(|record| {
inner
.record_lock_table
.entry(record)
.or_insert_with(|| Arc::new(RecordLock::new()))
.entry(record.clone())
.or_insert_with(|| Arc::new(RecordLock::new(record.clone())))
})
.collect::<Vec<_>>();
inner.record_lock_table.remove_expired();
@ -157,11 +177,50 @@ impl RecordLockTable {
}
Some(RecordsLockGuard {
_record_locks: record_locks,
record_locks,
_whole_record_lock_guards: whole_record_lock_guards,
})
}
pub async fn lock_subkey(
&self,
record: OpaqueRecordKey,
subkey: ValueSubkey,
) -> SubkeyLockGuard {
// Get record lock
let record_lock = {
let mut inner = self.inner.lock();
let record_lock = inner
.record_lock_table
.entry(record.clone())
.or_insert_with(|| Arc::new(RecordLock::new(record.clone())));
inner.record_lock_table.remove_expired();
record_lock
};
// Attempt shared lock
let _whole_record_lock_guard = record_lock.whole_record_lock.read_arc().await;
// Get subkey lock
let subkey_lock = {
let mut subkey_lock_table = record_lock.subkey_lock_table.lock();
let subkey_lock = subkey_lock_table
.entry(subkey)
.or_insert_with(|| Arc::new(AsyncMutex::new(())));
subkey_lock_table.remove_expired();
subkey_lock
};
let _subkey_lock_guard = asyncmutex_lock_arc!(subkey_lock);
SubkeyLockGuard {
record_lock,
_whole_record_lock_guard,
_subkey_lock_guard,
subkey,
}
}
pub fn try_lock_subkey(
&self,
record: OpaqueRecordKey,
@ -172,8 +231,8 @@ impl RecordLockTable {
let mut inner = self.inner.lock();
let record_lock = inner
.record_lock_table
.entry(record)
.or_insert_with(|| Arc::new(RecordLock::new()));
.entry(record.clone())
.or_insert_with(|| Arc::new(RecordLock::new(record.clone())));
inner.record_lock_table.remove_expired();
record_lock
};
@ -182,16 +241,21 @@ impl RecordLockTable {
let _whole_record_lock_guard = record_lock.whole_record_lock.try_read_arc()?;
// Get subkey lock
{
let subkey_lock = {
let mut subkey_lock_table = record_lock.subkey_lock_table.lock();
if !subkey_lock_table.insert(subkey) {
return None;
}
}
let subkey_lock = subkey_lock_table
.entry(subkey)
.or_insert_with(|| Arc::new(AsyncMutex::new(())));
subkey_lock_table.remove_expired();
subkey_lock
};
let _subkey_lock_guard = asyncmutex_try_lock_arc!(subkey_lock)?;
Some(SubkeyLockGuard {
record_lock,
_whole_record_lock_guard,
_subkey_lock_guard,
subkey,
})
}

View file

@ -8,6 +8,7 @@
mod inbound_transactions;
mod inbound_watch;
mod opened_record;
mod record;
mod record_snapshot;
mod record_store_inner;
mod record_store_limits;
@ -17,6 +18,7 @@ mod subkey_transaction_changes;
pub(super) use inbound_transactions::*;
pub(super) use inbound_watch::*;
pub(super) use opened_record::*;
pub(super) use record::*;
pub(super) use record_snapshot::*;
pub(super) use record_store_inner::*;
pub(super) use record_store_limits::*;
@ -124,11 +126,7 @@ where
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn flush(&self) {
let opt_commit_action = {
let mut inner = self.inner.lock();
inner.with_record_index_mut(|record_index| record_index.prepare_commit_action())
};
let opt_commit_action = self.inner.lock().flush();
if let Some(commit_action) = opt_commit_action {
self.process_commit_action(commit_action).await;
};
@ -145,16 +143,7 @@ where
.lock_record(opaque_record_key.clone())
.await;
let opt_commit_action = {
let mut inner = self.inner.lock();
inner.with_record_index_mut(|record_index| {
record_index.create(opaque_record_key.clone(), record)?;
VeilidAPIResult::<Option<CommitAction<D>>>::Ok(
record_index.maybe_prepare_commit_action(),
)
})?
};
let opt_commit_action = self.inner.lock().new_record(opaque_record_key, record)?;
if let Some(commit_action) = opt_commit_action {
self.process_commit_action(commit_action).await;
};
@ -163,43 +152,138 @@ where
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn delete_record(
&mut self,
opaque_record_key: OpaqueRecordKey,
) -> VeilidAPIResult<()> {
pub async fn delete_record(&self, opaque_record_key: OpaqueRecordKey) -> VeilidAPIResult<()> {
let _record_lock = self
.record_lock_table
.lock_record(opaque_record_key.clone())
.await;
let opt_commit_action = {
let mut inner = self.inner.lock();
inner.with_record_index_mut(|record_index| {
record_index.delete(opaque_record_key.clone())?;
VeilidAPIResult::<Option<CommitAction<D>>>::Ok(
record_index.maybe_prepare_commit_action(),
)
})?
xxx move operations to inner and cleanup function too
inner.
// Remove all references to this record
self.cleanup_record_internal(rtk, record);
};
let opt_commit_action = self.inner.lock().delete_record(opaque_record_key)?;
if let Some(commit_action) = opt_commit_action {
self.process_commit_action(commit_action).await;
};
// Purge the record's space immediately along with any other dead records
self.purge_dead_records(false).await;
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn get_subkey(
&self,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<Option<GetResult>> {
let _record_lock = self
.record_lock_table
.lock_subkey(opaque_record_key.clone(), subkey)
.await;
let load_action_result = {
let mut inner = self.inner.lock();
inner.prepare_get_subkey(opaque_record_key, subkey)
};
match load_action_result {
LoadActionResult::NoRecord => Ok(None),
LoadActionResult::NoSubkey { descriptor } => Ok(Some(GetResult {
opt_value: None,
opt_descriptor: if want_descriptor {
Some(descriptor)
} else {
None
},
})),
LoadActionResult::Subkey {
descriptor,
mut load_action,
} => {
let res = load_action.load().await;
{
let mut inner = self.inner.lock();
inner.finish_get_subkey(load_action);
}
let opt_value = res?.map(|x| x.signed_value_data());
Ok(Some(GetResult {
opt_value,
opt_descriptor: if want_descriptor {
Some(descriptor)
} else {
None
},
}))
}
}
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn inspect_record(
&self,
opaque_record_key: &OpaqueRecordKey,
subkeys: &ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<Option<InspectResult>> {
let res = self.with_record(opaque_record_key, |record| {
// Get number of subkeys from schema and ensure we are getting the
// right number of sequence numbers betwen that and what we asked for
let schema_subkeys = record
.schema()
.truncate_subkeys(subkeys, Some(DHTSchema::MAX_SUBKEY_COUNT));
let opt_descriptor = if want_descriptor {
Some(record.descriptor().clone())
} else {
None
};
// Check if we can return some subkeys
if schema_subkeys.is_empty() {
// No overlapping keys
return Ok(None);
}
// Collect the requested subkey sequence numbers
let seqs = schema_subkeys
.iter()
.map(|subkey| record.subkey_seq(subkey))
.collect();
Ok(Some(InspectResult::new(
self,
subkeys.clone(),
"inspect_record",
schema_subkeys,
seqs,
opt_descriptor,
)?))
});
match res {
None => Ok(None),
Some(out) => out,
}
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn with_record<F, R>(&self, opaque_record_key: &OpaqueRecordKey, func: F) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
{
let mut inner = self.inner.lock();
inner.with_record(opaque_record_key, func)
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn with_record_detail_mut<R, F>(
&self,
opaque_record_key: &OpaqueRecordKey,
func: F,
) -> Option<R>
where
F: FnOnce(Arc<SignedValueDescriptor>, &mut D) -> R,
{
let mut inner = self.inner.lock();
inner.with_record_detail_mut(opaque_record_key, func)
}
//////////////////////////////////////////////////////////////////////////////////
async fn process_commit_action(&self, mut commit_action: CommitAction<D>) {
@ -209,9 +293,7 @@ xxx move operations to inner and cleanup function too
let res = {
let mut inner = self.inner.lock();
inner.with_record_index_mut(|record_index| {
record_index.finish_commit_action(commit_action)
})
inner.finish_commit_action(commit_action)
};
if let Err(e) = res {
@ -219,249 +301,164 @@ xxx move operations to inner and cleanup function too
}
}
fn cleanup_record_internal(&mut self, rtk: RecordTableKey, record: Record<D>) -> u64 {
// Remove transactions
self.record_transactions.remove(&rtk);
// #[instrument(level = "trace", target = "stor", skip_all)]
// pub(super) fn contains_record(&self, opaque_record_key: &OpaqueRecordKey) -> bool {
// let rtk = RecordTableKey {
// record_key: opaque_record_key.clone(),
// };
// self.record_index.contains_key(&rtk)
// }
// Remove watches
self.watched_records.remove(&rtk);
// #[instrument(level = "trace", target = "stor", skip_all)]
// pub(super) fn with_record<R, F>(
// &mut self,
// opaque_record_key: &OpaqueRecordKey,
// f: F,
// ) -> Option<R>
// where
// F: FnOnce(&Record<D>) -> R,
// {
// // Get record from index
// let mut out = None;
// let rtk = RecordTableKey {
// record_key: opaque_record_key.clone(),
// };
// if let Some(record) = self.record_index.get_mut(&rtk) {
// // Callback
// out = Some(f(record));
// Remove watch changes
self.changed_watched_values.remove(&rtk);
// // Touch
// record.touch();
// }
// if out.is_some() {
// // Marks as changed because the record was touched and we want to keep the
// // LRU ordering serialized
// self.changed_records.insert(rtk);
// }
}
// out
// }
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn contains_record(&self, opaque_record_key: &OpaqueRecordKey) -> bool {
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
self.record_index.contains_key(&rtk)
}
// #[instrument(level = "trace", target = "stor", skip_all)]
// pub(super) fn peek_record<R, F>(&self, opaque_record_key: &OpaqueRecordKey, f: F) -> Option<R>
// where
// F: FnOnce(&Record<D>) -> R,
// {
// // Get record from index
// let mut out = None;
// let rtk = RecordTableKey {
// record_key: opaque_record_key.clone(),
// };
// if let Some(record) = self.record_index.peek(&rtk) {
// // Callback
// out = Some(f(record));
// }
// out
// }
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn with_record<R, F>(
&mut self,
opaque_record_key: &OpaqueRecordKey,
f: F,
) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
if let Some(record) = self.record_index.get_mut(&rtk) {
// Callback
out = Some(f(record));
// #[instrument(level = "trace", target = "stor", skip_all)]
// pub(super) fn with_record_mut<R, F>(
// &mut self,
// opaque_record_key: &OpaqueRecordKey,
// f: F,
// ) -> Option<R>
// where
// F: FnOnce(&mut Record<D>) -> R,
// {
// // Get record from index
// let mut out = None;
// let rtk = RecordTableKey {
// record_key: opaque_record_key.clone(),
// };
// if let Some(record) = self.record_index.get_mut(&rtk) {
// // Callback
// out = Some(f(record));
// Touch
record.touch();
}
if out.is_some() {
// Marks as changed because the record was touched and we want to keep the
// LRU ordering serialized
self.changed_records.insert(rtk);
}
// // Touch
// record.touch();
// }
// if out.is_some() {
// // Marks as changed because the record was touched and we want to keep the
// // LRU ordering serialized
// self.changed_records.insert(rtk);
// }
out
}
// out
// }
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn peek_record<R, F>(&self, opaque_record_key: &OpaqueRecordKey, f: F) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
if let Some(record) = self.record_index.peek(&rtk) {
// Callback
out = Some(f(record));
}
out
}
// #[instrument(level = "trace", target = "stor", skip_all, err)]
// pub async fn peek_subkey(
// &self,
// opaque_record_key: &OpaqueRecordKey,
// subkey: ValueSubkey,
// want_descriptor: bool,
// ) -> VeilidAPIResult<Option<GetResult>> {
// // record from index
// let Some((subkey_count, has_subkey, opt_descriptor)) =
// self.peek_record(opaque_record_key, |record| {
// (
// record.subkey_count(),
// record.stored_subkeys().contains(subkey),
// if want_descriptor {
// Some(record.descriptor().clone())
// } else {
// None
// },
// )
// })
// else {
// // Record not available
// return Ok(None);
// };
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn with_record_mut<R, F>(
&mut self,
opaque_record_key: &OpaqueRecordKey,
f: F,
) -> Option<R>
where
F: FnOnce(&mut Record<D>) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
if let Some(record) = self.record_index.get_mut(&rtk) {
// Callback
out = Some(f(record));
// // Check if the subkey is in range
// if subkey as usize >= subkey_count {
// apibail_invalid_argument!("subkey out of range", "subkey", subkey);
// }
// Touch
record.touch();
}
if out.is_some() {
// Marks as changed because the record was touched and we want to keep the
// LRU ordering serialized
self.changed_records.insert(rtk);
}
// // See if we have this subkey stored
// if !has_subkey {
// // If not, return no value but maybe with descriptor
// return Ok(Some(GetResult {
// opt_value: None,
// opt_descriptor,
// }));
// }
out
}
// // If subkey exists in subkey cache, use that
// let stk = SubkeyTableKey {
// record_key: opaque_record_key.clone(),
// subkey,
// };
// if let Some(record_data) = self.subkey_cache.peek(&stk) {
// let out = record_data.signed_value_data().clone();
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn get_subkey(
&mut self,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<Option<GetResult>> {
// Get record from index
let Some((subkey_count, has_subkey, opt_descriptor)) =
self.with_record(opaque_record_key, |record| {
(
record.subkey_count(),
record.stored_subkeys().contains(subkey),
if want_descriptor {
Some(record.descriptor().clone())
} else {
None
},
)
})
else {
// Record not available
return Ok(None);
};
// return Ok(Some(GetResult {
// opt_value: Some(out),
// opt_descriptor,
// }));
// }
// // If not in cache, try to pull from table store if it is in our stored subkey set
// let Some(record_data) = self
// .subkey_table
// .load_json::<RecordData>(0, &stk.bytes())
// .await
// .map_err(VeilidAPIError::internal)?
// else {
// apibail_internal!("failed to peek subkey that was stored");
// };
// Check if the subkey is in range
if subkey as usize >= subkey_count {
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
// let out = record_data.signed_value_data().clone();
// See if we have this subkey stored
if !has_subkey {
// If not, return no value but maybe with descriptor
return Ok(Some(GetResult {
opt_value: None,
opt_descriptor,
}));
}
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey {
record_key: opaque_record_key.clone(),
subkey,
};
if let Some(record_data) = self.subkey_cache.get(&stk) {
let out = record_data.signed_value_data().clone();
return Ok(Some(GetResult {
opt_value: Some(out),
opt_descriptor,
}));
}
// If not in cache, try to pull from table store if it is in our stored subkey set
let Some(record_data) = self
.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
else {
apibail_internal!("failed to get subkey that was stored");
};
let out = record_data.signed_value_data().clone();
// Add to cache, do nothing with lru out
self.add_to_subkey_cache(stk, record_data);
Ok(Some(GetResult {
opt_value: Some(out),
opt_descriptor,
}))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn peek_subkey(
&self,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<Option<GetResult>> {
// record from index
let Some((subkey_count, has_subkey, opt_descriptor)) =
self.peek_record(opaque_record_key, |record| {
(
record.subkey_count(),
record.stored_subkeys().contains(subkey),
if want_descriptor {
Some(record.descriptor().clone())
} else {
None
},
)
})
else {
// Record not available
return Ok(None);
};
// Check if the subkey is in range
if subkey as usize >= subkey_count {
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
// See if we have this subkey stored
if !has_subkey {
// If not, return no value but maybe with descriptor
return Ok(Some(GetResult {
opt_value: None,
opt_descriptor,
}));
}
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey {
record_key: opaque_record_key.clone(),
subkey,
};
if let Some(record_data) = self.subkey_cache.peek(&stk) {
let out = record_data.signed_value_data().clone();
return Ok(Some(GetResult {
opt_value: Some(out),
opt_descriptor,
}));
}
// If not in cache, try to pull from table store if it is in our stored subkey set
let Some(record_data) = self
.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
else {
apibail_internal!("failed to peek subkey that was stored");
};
let out = record_data.signed_value_data().clone();
Ok(Some(GetResult {
opt_value: Some(out),
opt_descriptor,
}))
}
// Ok(Some(GetResult {
// opt_value: Some(out),
// opt_descriptor,
// }))
// }
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn set_subkeys_single_record(
&mut self,
&self,
opaque_record_key: &OpaqueRecordKey,
subkeys: SubkeyValueList,
) -> VeilidAPIResult<SubkeyTransactionChanges> {
@ -483,7 +480,7 @@ xxx move operations to inner and cleanup function too
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn set_subkeys_multiple_records(
&mut self,
&self,
keys_and_subkeys: RecordSubkeyValueList,
) -> VeilidAPIResult<SubkeyTransactionChanges> {
// Start subkey table transaction
@ -506,7 +503,7 @@ xxx move operations to inner and cleanup function too
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn commit_subkeys_tx(
&mut self,
&self,
subkey_transaction_changes: SubkeyTransactionChanges,
watch_update_mode: InboundWatchUpdateMode,
) -> VeilidAPIResult<RecordSubkeyValueList> {
@ -566,7 +563,7 @@ xxx move operations to inner and cleanup function too
#[instrument(level = "trace", target = "stor", skip_all, err)]
async fn set_subkeys_in_tx(
&mut self,
&self,
subkey_table_tx: TableDBTransaction,
opaque_record_key: &OpaqueRecordKey,
subkeys: SubkeyValueList,
@ -683,93 +680,6 @@ xxx move operations to inner and cleanup function too
res
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn inspect_record(
&mut self,
opaque_record_key: &OpaqueRecordKey,
subkeys: &ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<Option<InspectResult>> {
// Get record from index
let Some((schema_subkeys, opt_descriptor)) =
self.with_record(opaque_record_key, |record| {
// Get number of subkeys from schema and ensure we are getting the
// right number of sequence numbers betwen that and what we asked for
let schema_subkeys = record
.schema()
.truncate_subkeys(subkeys, Some(DHTSchema::MAX_SUBKEY_COUNT));
(
schema_subkeys,
if want_descriptor {
Some(record.descriptor().clone())
} else {
None
},
)
})
else {
// Record not available
return Ok(None);
};
// Check if we can return some subkeys
if schema_subkeys.is_empty() {
// No overlapping keys
return Ok(None);
}
// See if we have this inspection cached
if let Some(icv) = self.inspect_cache.get(opaque_record_key, &schema_subkeys) {
return Ok(Some(InspectResult::new(
self,
subkeys.clone(),
"inspect_record",
schema_subkeys.clone(),
icv.seqs,
opt_descriptor,
)?));
}
// Build sequence number list to return
#[allow(clippy::unnecessary_cast)]
let mut seqs = Vec::with_capacity(schema_subkeys.len() as usize);
for subkey in schema_subkeys.iter() {
let stk = SubkeyTableKey {
record_key: opaque_record_key.clone(),
subkey,
};
let seq = if let Some(record_data) = self.subkey_cache.peek(&stk) {
record_data.signed_value_data().value_data().seq()
} else {
// If not in cache, try to pull from table store if it is in our stored subkey set
// XXX: This would be better if it didn't have to pull the whole record data to get the seq.
self.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
.map(|record_data| record_data.signed_value_data().value_data().seq())
.unwrap_or_default()
};
seqs.push(seq)
}
// Save seqs cache
self.inspect_cache.put(
opaque_record_key.clone(),
schema_subkeys.clone(),
InspectCacheL2Value { seqs: seqs.clone() },
);
Ok(Some(InspectResult::new(
self,
subkeys.clone(),
"inspect_record",
schema_subkeys,
seqs,
opt_descriptor,
)?))
}
/// LRU out some records until we reclaim the amount of space requested
/// This will force a garbage collection of the space immediately
/// If zero is passed in here, a garbage collection will be performed of dead records

View file

@ -39,3 +39,14 @@ impl LoadAction {
(self.subkey_table_key, self.opt_cached_record_data)
}
}
pub enum LoadActionResult {
NoRecord,
NoSubkey {
descriptor: Arc<SignedValueDescriptor>,
},
Subkey {
descriptor: Arc<SignedValueDescriptor>,
load_action: LoadAction,
},
}

View file

@ -2,7 +2,6 @@ mod commit_action;
mod keys;
mod limited_size;
mod load_action;
mod record;
mod record_data;
mod record_index;
@ -12,7 +11,6 @@ pub(super) use commit_action::*;
pub(super) use keys::*;
pub(super) use limited_size::*;
pub(super) use load_action::*;
pub(super) use record::*;
pub(super) use record_data::*;
pub(super) use record_index::*;
@ -80,19 +78,79 @@ where
})
}
pub fn with_record_index<F, R>(&self, func: F) -> R
where
F: FnOnce(&RecordIndex<D>) -> R,
{
func(&self.record_index)
pub fn new_record(
&mut self,
opaque_record_key: OpaqueRecordKey,
record: Record<D>,
) -> VeilidAPIResult<Option<CommitAction<D>>> {
self.record_index
.create(opaque_record_key.clone(), record)?;
Ok(self.record_index.maybe_prepare_commit_action())
}
pub fn with_record_index_mut<F, R>(&mut self, func: F) -> R
pub fn delete_record(
&mut self,
opaque_record_key: OpaqueRecordKey,
) -> VeilidAPIResult<Option<CommitAction<D>>> {
self.record_index.delete(opaque_record_key.clone())?;
self.cleanup_record(opaque_record_key);
Ok(self.record_index.maybe_prepare_commit_action())
}
pub fn flush(&mut self) -> Option<CommitAction<D>> {
self.record_index.prepare_commit_action()
}
pub fn finish_commit_action(&mut self, commit_action: CommitAction<D>) -> VeilidAPIResult<()> {
self.record_index.finish_commit_action(commit_action)
}
pub fn prepare_get_subkey(
&mut self,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
) -> LoadActionResult {
self.record_index
.prepare_load_action(opaque_record_key.clone(), subkey)
}
pub fn finish_get_subkey(&mut self, load_action: LoadAction) {
self.record_index.finish_load_action(load_action);
}
pub fn with_record<F, R>(&mut self, opaque_record_key: &OpaqueRecordKey, func: F) -> Option<R>
where
F: FnOnce(&mut RecordIndex<D>) -> R,
F: FnOnce(&Record<D>) -> R,
{
func(&mut self.record_index)
self.record_index.with_record(opaque_record_key, func)
}
pub(super) fn with_record_detail_mut<R, F>(
&mut self,
opaque_record_key: &OpaqueRecordKey,
func: F,
) -> Option<R>
where
F: FnOnce(Arc<SignedValueDescriptor>, &mut D) -> R,
{
self.record_index
.with_record_detail_mut(opaque_record_key, func)
}
////////////////////////////////////////////////////////////
fn cleanup_record(&mut self, opaque_record_key: OpaqueRecordKey) {
let rtk = RecordTableKey {
record_key: opaque_record_key,
};
// Remove transactions
self.record_transactions.remove(&rtk);
// Remove watches
self.watched_records.remove(&rtk);
// Remove watch changes
self.changed_watched_values.remove(&rtk);
}
}
impl<D> RecordStore<D> where D: RecordDetail {}

View file

@ -166,8 +166,7 @@ where
};
let Some(record) = self.record_cache.remove(&rtk) else {
veilid_log!(self error "RecordIndex({}): Record missing with key {}", self.unlocked_inner.name, key);
apibail_internal!("record missing");
apibail_invalid_argument!("record missing", "key", key);
};
self.purge_record_and_subkeys(rtk, record, false);
@ -179,14 +178,96 @@ where
Ok(())
}
/// Access a record
///
/// If the record exists, passes it to a function and marks the record as recently used
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn with_record<R, F>(
&mut self,
opaque_record_key: &OpaqueRecordKey,
func: F,
) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
if let Some(record) = self.record_cache.get_mut(&rtk) {
let old_record = record.clone();
// LRU touch
record.touch();
// Callback
out = Some(func(record));
let new_record = record.clone();
self.add_uncommitted_record_update(rtk, new_record, old_record);
}
out
}
/// Modify a record's detail
///
/// If the record exists, passes a mutable reference of its detail to a function and marks the record as recently used
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn with_record_detail_mut<R, F>(
&mut self,
opaque_record_key: &OpaqueRecordKey,
func: F,
) -> Option<R>
where
F: FnOnce(Arc<SignedValueDescriptor>, &mut D) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
if let Some(record) = self.record_cache.get_mut(&rtk) {
let old_record = record.clone();
// LRU touch
record.touch();
// Callback
out = Some(func(record.descriptor(), record.detail_mut()));
let new_record = record.clone();
self.add_uncommitted_record_update(rtk, new_record, old_record);
}
out
}
/// Get a subkey value
///
/// Does not perform database operations if the subkey does not exist in the cache.
/// Returns a load action object that either has the subkey data or can retrieve it from the database.
pub fn prepare_load_action(
&mut self,
key: OpaqueRecordKey,
subkey: ValueSubkey,
) -> Option<LoadAction> {
) -> LoadActionResult {
let rtk = RecordTableKey {
record_key: key.clone(),
};
let Some(record) = self.record_cache.get(&rtk) else {
return LoadActionResult::NoRecord;
};
if !record.stored_subkeys().contains(subkey) {
return LoadActionResult::NoSubkey {
descriptor: record.descriptor(),
};
}
let stk = SubkeyTableKey {
record_key: key.clone(),
subkey,
@ -215,14 +296,18 @@ where
.flatten()
});
Some(LoadAction::new(
self.unlocked_inner.subkey_table.clone(),
stk,
opt_cached_record_data,
))
LoadActionResult::Subkey {
descriptor: record.descriptor(),
load_action: LoadAction::new(
self.unlocked_inner.subkey_table.clone(),
stk,
opt_cached_record_data,
),
}
}
/// Finalize a load action
///
/// If the load action pulled a value from the database, it stores a subkey in
/// the cache only if it isn't already there
pub fn finish_load_action(&mut self, load_action: LoadAction) {
@ -254,8 +339,7 @@ where
// Remove the old record from the cache
let Some(old_record) = self.record_cache.remove(&rtk) else {
veilid_log!(self error "RecordIndex({}): Record missing with key {}", self.unlocked_inner.name, key);
apibail_internal!("record missing");
apibail_invalid_argument!("record missing", "key", key);
};
// Make a copy of the record to edit

View file

@ -22,7 +22,7 @@ pub(super) struct RehydrationRequest {
impl StorageManager {
/// Add a background rehydration request
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn add_rehydration_request(
pub fn add_rehydration_request(
&self,
opaque_record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
@ -33,7 +33,7 @@ impl StorageManager {
consensus_count,
};
veilid_log!(self debug "Adding rehydration request: {} {:?}", opaque_record_key, req);
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
inner
.rehydration_requests
.entry(opaque_record_key)
@ -59,6 +59,8 @@ impl StorageManager {
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
) -> VeilidAPIResult<RehydrateReport> {
let local_record_store = self.get_local_record_store()?;
veilid_log!(self debug "Checking for record rehydration: {} {} @ consensus {}", opaque_record_key, subkeys, consensus_count);
// Get subkey range for consideration
let subkeys = if subkeys.is_empty() {
@ -84,9 +86,6 @@ impl StorageManager {
opened_record.safety_selection()
} else {
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let Some(safety_selection) = local_record_store
.with_record(&opaque_record_key, |rec| {
rec.detail().safety_selection.clone()
@ -247,8 +246,6 @@ impl StorageManager {
local_inspect_result: InspectResult,
outbound_inspect_result: OutboundInspectValueResult,
) -> VeilidAPIResult<RehydrateReport> {
let mut inner = self.inner.lock().await;
// For each subkey, determine if we should rehydrate it
let mut rehydrated = ValueSubkeyRangeSet::new();
for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() {
@ -292,8 +289,7 @@ impl StorageManager {
.map(ValueSubkeyRangeSet::single)
.zip(outbound_inspect_result.subkey_fanout_results.into_iter());
Self::process_fanout_results_inner(
&mut inner,
self.process_fanout_results(
opaque_record_key.clone(),
results_iter,
false,

View file

@ -706,7 +706,6 @@ impl StorageManager {
result: set_value::OutboundSetValueResult,
) -> Result<Option<ValueData>, VeilidAPIError> {
// Regain the lock after network access
let mut inner = self.inner.lock().await;
let opaque_record_key = record_key.opaque();
// Report on fanout result offline
@ -724,8 +723,7 @@ impl StorageManager {
}
// Keep the list of nodes that returned a value for later reference
Self::process_fanout_results_inner(
&mut inner,
self.process_fanout_results(
opaque_record_key.clone(),
core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)),
true,

View file

@ -178,23 +178,19 @@ impl StorageManager {
async fn process_single_result(&self, result: WorkItemResult) {
let consensus_width = self.config().network.dht.consensus_width as usize;
let mut inner = self.inner.lock().await;
// Debug print the result
veilid_log!(self debug "Offline write result: {:?}", result);
// Mark the offline subkey write as no longer in-flight
let subkeys_still_offline = result.work_item.subkeys.difference(&result.written_subkeys);
self.finish_offline_subkey_writes_inner(
&mut inner,
self.finish_offline_subkey_writes(
&result.work_item.opaque_record_key,
result.written_subkeys,
subkeys_still_offline,
);
// Keep the list of nodes that returned a value for later reference
Self::process_fanout_results_inner(
&mut inner,
self.process_fanout_results(
result.work_item.opaque_record_key,
result.fanout_results.into_iter().map(|x| (x.0, x.1)),
true,
@ -279,8 +275,7 @@ impl StorageManager {
// Ensure nothing is left in-flight when returning even due to an error
{
let mut inner = self.inner.lock().await;
inner.offline_subkey_writes.retain(|_, v| {
self.inner.lock().offline_subkey_writes.retain(|_, v| {
v.subkeys = v.subkeys.union(&mem::take(&mut v.subkeys_in_flight));
!v.subkeys.is_empty()
});

View file

@ -9,8 +9,6 @@ impl StorageManager {
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let mut inner = self.inner.lock().await;
self.save_metadata_inner(&mut inner).await?;
Ok(())
self.save_metadata().await
}
}

View file

@ -176,16 +176,13 @@ impl StorageManager {
// Store transaction results
{
let mut inner = self.inner.lock().await;
let inner = &mut *inner;
// Snapshot local valuedata for transaction
{
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let local_record_store = self.get_local_record_store()?;
let transaction_state = inner
let transaction_state = self
.inner
.lock()
.outbound_transaction_manager
.get_transaction_state_mut(&transaction_handle)?;
@ -214,8 +211,7 @@ impl StorageManager {
}
let max_subkey = result.descriptor.schema()?.max_subkey();
Self::process_fanout_results_inner(
inner,
self.process_fanout_results(
result.opaque_record_key.clone(),
core::iter::once((
ValueSubkeyRangeSet::single_range(0, max_subkey),
@ -226,7 +222,9 @@ impl StorageManager {
);
}
if let Err(e) = inner
if let Err(e) = self
.inner
.lock()
.outbound_transaction_manager
.record_transact_begin_results(transaction_handle.clone(), results)
{

View file

@ -605,9 +605,7 @@ impl StorageManager {
veilid_log!(self debug target:"dht", "WatchValue Fanout: {:#}", fanout_result);
// Keep the list of nodes that responded for later reference
let mut inner = self.inner.lock().await;
Self::process_fanout_results_inner(
&mut inner,
self.process_fanout_results(
record_key.opaque(),
core::iter::once((ValueSubkeyRangeSet::new(), fanout_result)),
false,
@ -625,8 +623,9 @@ impl StorageManager {
) {
let opaque_record_key = watch_lock.tag();
let mut inner = self.inner.lock().await;
let Some(outbound_watch) = inner
let Some(outbound_watch) = self
.inner
.lock()
.outbound_watch_manager
.outbound_watches
.remove(&opaque_record_key)