Improved logic for 'allow_offline' and 'offline subkey writes'

This commit is contained in:
Christien Rioux 2025-06-18 22:09:36 -04:00
parent 24a098728c
commit 9d4976b243
10 changed files with 756 additions and 237 deletions

View file

@ -0,0 +1,72 @@
use super::*;
impl_veilid_log_facility!("stor");
pub(super) struct ActiveSubkeyWriteGuard {
registry: VeilidComponentRegistry,
done: bool,
record_key: TypedRecordKey,
subkey: ValueSubkey,
}
impl ActiveSubkeyWriteGuard {
fn set_done(&mut self) {
self.done = true;
}
}
impl Drop for ActiveSubkeyWriteGuard {
fn drop(&mut self) {
if !self.done {
let registry = &self.registry;
veilid_log!(registry error "active subkey write finished without being marked done: {}:{}", self.record_key, self.subkey);
}
}
}
impl StorageManager {
// Returns false if we were not already writing
// Returns true if this subkey was already being written to
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn mark_active_subkey_write_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
) -> Option<ActiveSubkeyWriteGuard> {
let asw = inner.active_subkey_writes.entry(record_key).or_default();
if asw.contains(subkey) {
veilid_log!(self debug "already writing to this subkey: {}:{}", record_key, subkey);
None
} else {
// Add to our list of active subkey writes
asw.insert(subkey);
Some(ActiveSubkeyWriteGuard {
registry: self.registry(),
done: false,
record_key,
subkey,
})
}
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn unmark_active_subkey_write_inner(
&self,
inner: &mut StorageManagerInner,
mut guard: ActiveSubkeyWriteGuard,
) {
// Remove from active subkey writes
let asw = inner
.active_subkey_writes
.get_mut(&guard.record_key)
.unwrap();
if !asw.remove(guard.subkey) {
veilid_log!(self error "missing active subkey write: {}:{}", guard.record_key, guard.subkey);
}
if asw.is_empty() {
inner.active_subkey_writes.remove(&guard.record_key);
}
guard.set_done();
}
}

View file

@ -390,7 +390,7 @@ impl StorageManager {
// If we got a new value back then write it to the opened record // If we got a new value back then write it to the opened record
if Some(get_result_value.value_data().seq()) != opt_last_seq { if Some(get_result_value.value_data().seq()) != opt_last_seq {
Self::handle_set_local_value_inner( self.handle_set_local_value_inner(
&mut inner, &mut inner,
record_key, record_key,
subkey, subkey,
@ -415,8 +415,9 @@ impl StorageManager {
// See if this is a remote or local value // See if this is a remote or local value
let (_is_local, last_get_result) = { let (_is_local, last_get_result) = {
// See if the subkey we are getting has a last known local value // See if the subkey we are getting has a last known local value
let mut last_get_result = let mut last_get_result = self
Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await?; .handle_get_local_value_inner(&mut inner, key, subkey, true)
.await?;
// If this is local, it must have a descriptor already // If this is local, it must have a descriptor already
if last_get_result.opt_descriptor.is_some() { if last_get_result.opt_descriptor.is_some() {
if !want_descriptor { if !want_descriptor {

View file

@ -1,6 +1,8 @@
mod active_subkey_writes;
mod debug; mod debug;
mod get_value; mod get_value;
mod inspect_value; mod inspect_value;
mod offline_subkey_writes;
mod outbound_watch_manager; mod outbound_watch_manager;
mod record_store; mod record_store;
mod rehydrate; mod rehydrate;
@ -12,11 +14,13 @@ mod watch_value;
use super::*; use super::*;
use hashlink::LinkedHashMap; use hashlink::LinkedHashMap;
use offline_subkey_writes::*;
use outbound_watch_manager::*; use outbound_watch_manager::*;
use record_store::*; use record_store::*;
use rehydrate::*; use rehydrate::*;
use routing_table::*; use routing_table::*;
use rpc_processor::*; use rpc_processor::*;
use stop_token::future::FutureExt as _;
pub use record_store::{InboundWatchParameters, InboundWatchResult}; pub use record_store::{InboundWatchParameters, InboundWatchResult};
@ -81,12 +85,12 @@ struct StorageManagerInner {
pub local_record_store: Option<RecordStore<LocalRecordDetail>>, pub local_record_store: Option<RecordStore<LocalRecordDetail>>,
/// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish
pub remote_record_store: Option<RecordStore<RemoteRecordDetail>>, pub remote_record_store: Option<RecordStore<RemoteRecordDetail>>,
/// Record subkeys that have not been pushed to the network because they were written to offline /// Record subkeys to commit to the network in the background,
pub offline_subkey_writes: /// either because they were written to offline, or due to a rehydration action
LinkedHashMap<TypedRecordKey, tasks::offline_subkey_writes::OfflineSubkeyWrite>, pub offline_subkey_writes: LinkedHashMap<TypedRecordKey, OfflineSubkeyWrite>,
/// Record subkeys that are currently being written to in the foreground /// Record subkeys that are currently being written to in the foreground
pub active_subkey_writes: HashMap<TypedRecordKey, ValueSubkeyRangeSet>, pub active_subkey_writes: HashMap<TypedRecordKey, ValueSubkeyRangeSet>,
/// Records that have rehydration requests /// Records that have pending rehydration requests
pub rehydration_requests: HashMap<TypedRecordKey, RehydrationRequest>, pub rehydration_requests: HashMap<TypedRecordKey, RehydrationRequest>,
/// State management for outbound watches /// State management for outbound watches
pub outbound_watch_manager: OutboundWatchManager, pub outbound_watch_manager: OutboundWatchManager,
@ -122,6 +126,7 @@ impl fmt::Debug for StorageManagerInner {
pub(crate) struct StorageManager { pub(crate) struct StorageManager {
registry: VeilidComponentRegistry, registry: VeilidComponentRegistry,
inner: AsyncMutex<StorageManagerInner>, inner: AsyncMutex<StorageManagerInner>,
startup_lock: Arc<StartupLock>,
// Background processes // Background processes
save_metadata_task: TickTask<EyreReport>, save_metadata_task: TickTask<EyreReport>,
@ -197,6 +202,7 @@ impl StorageManager {
let this = StorageManager { let this = StorageManager {
registry, registry,
inner: AsyncMutex::new(inner), inner: AsyncMutex::new(inner),
startup_lock: Arc::new(StartupLock::new()),
save_metadata_task: TickTask::new("save_metadata_task", SAVE_METADATA_INTERVAL_SECS), save_metadata_task: TickTask::new("save_metadata_task", SAVE_METADATA_INTERVAL_SECS),
flush_record_stores_task: TickTask::new( flush_record_stores_task: TickTask::new(
@ -276,6 +282,8 @@ impl StorageManager {
#[instrument(level = "debug", skip_all, err)] #[instrument(level = "debug", skip_all, err)]
async fn init_async(&self) -> EyreResult<()> { async fn init_async(&self) -> EyreResult<()> {
let guard = self.startup_lock.startup()?;
veilid_log!(self debug "startup storage manager"); veilid_log!(self debug "startup storage manager");
let table_store = self.table_store(); let table_store = self.table_store();
let config = self.config(); let config = self.config();
@ -301,6 +309,8 @@ impl StorageManager {
// Start deferred results processors // Start deferred results processors
self.background_operation_processor.init(); self.background_operation_processor.init();
guard.success();
Ok(()) Ok(())
} }
@ -355,6 +365,13 @@ impl StorageManager {
async fn terminate_async(&self) { async fn terminate_async(&self) {
veilid_log!(self debug "starting storage manager shutdown"); veilid_log!(self debug "starting storage manager shutdown");
// Proceed with shutdown
let guard = self
.startup_lock
.shutdown()
.await
.expect("should be started up");
// Stop deferred result processor // Stop deferred result processor
self.background_operation_processor.terminate().await; self.background_operation_processor.terminate().await;
@ -383,6 +400,8 @@ impl StorageManager {
*inner = Self::new_inner(); *inner = Self::new_inner();
} }
guard.success();
veilid_log!(self debug "finished storage manager shutdown"); veilid_log!(self debug "finished storage manager shutdown");
} }
@ -507,6 +526,10 @@ impl StorageManager {
owner: Option<KeyPair>, owner: Option<KeyPair>,
safety_selection: SafetySelection, safety_selection: SafetySelection,
) -> VeilidAPIResult<DHTRecordDescriptor> { ) -> VeilidAPIResult<DHTRecordDescriptor> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
// Validate schema // Validate schema
schema.validate()?; schema.validate()?;
@ -533,6 +556,10 @@ impl StorageManager {
writer: Option<KeyPair>, writer: Option<KeyPair>,
safety_selection: SafetySelection, safety_selection: SafetySelection,
) -> VeilidAPIResult<DHTRecordDescriptor> { ) -> VeilidAPIResult<DHTRecordDescriptor> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
// See if we have a local record already or not // See if we have a local record already or not
@ -609,6 +636,10 @@ impl StorageManager {
/// Close an opened local record /// Close an opened local record
#[instrument(level = "trace", target = "stor", skip_all)] #[instrument(level = "trace", target = "stor", skip_all)]
pub async fn close_record(&self, record_key: TypedRecordKey) -> VeilidAPIResult<()> { pub async fn close_record(&self, record_key: TypedRecordKey) -> VeilidAPIResult<()> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
// Attempt to close the record, returning the opened record if it wasn't already closed // Attempt to close the record, returning the opened record if it wasn't already closed
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
Self::close_record_inner(&mut inner, record_key)?; Self::close_record_inner(&mut inner, record_key)?;
@ -618,6 +649,10 @@ impl StorageManager {
/// Close all opened records /// Close all opened records
#[instrument(level = "trace", target = "stor", skip_all)] #[instrument(level = "trace", target = "stor", skip_all)]
pub async fn close_all_records(&self) -> VeilidAPIResult<()> { pub async fn close_all_records(&self) -> VeilidAPIResult<()> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
// Attempt to close the record, returning the opened record if it wasn't already closed // Attempt to close the record, returning the opened record if it wasn't already closed
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
let keys = inner.opened_records.keys().copied().collect::<Vec<_>>(); let keys = inner.opened_records.keys().copied().collect::<Vec<_>>();
@ -631,6 +666,10 @@ impl StorageManager {
/// Delete a local record /// Delete a local record
#[instrument(level = "trace", target = "stor", skip_all)] #[instrument(level = "trace", target = "stor", skip_all)]
pub async fn delete_record(&self, record_key: TypedRecordKey) -> VeilidAPIResult<()> { pub async fn delete_record(&self, record_key: TypedRecordKey) -> VeilidAPIResult<()> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
// Ensure the record is closed // Ensure the record is closed
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
Self::close_record_inner(&mut inner, record_key)?; Self::close_record_inner(&mut inner, record_key)?;
@ -652,6 +691,10 @@ impl StorageManager {
subkey: ValueSubkey, subkey: ValueSubkey,
force_refresh: bool, force_refresh: bool,
) -> VeilidAPIResult<Option<ValueData>> { ) -> VeilidAPIResult<Option<ValueData>> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
let safety_selection = { let safety_selection = {
let Some(opened_record) = inner.opened_records.get(&record_key) else { let Some(opened_record) = inner.opened_records.get(&record_key) else {
@ -661,8 +704,9 @@ impl StorageManager {
}; };
// See if the requested subkey is our local record store // See if the requested subkey is our local record store
let last_get_result = let last_get_result = self
Self::handle_get_local_value_inner(&mut inner, record_key, subkey, true).await?; .handle_get_local_value_inner(&mut inner, record_key, subkey, true)
.await?;
// Return the existing value if we have one unless we are forcing a refresh // Return the existing value if we have one unless we are forcing a refresh
if !force_refresh { if !force_refresh {
@ -720,41 +764,6 @@ impl StorageManager {
Ok(out) Ok(out)
} }
// Returns false if we were not already writing
// Returns true if this subkey was already being written to
fn mark_active_subkey_write_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
) -> bool {
let asw = inner.active_subkey_writes.entry(record_key).or_default();
if asw.contains(subkey) {
veilid_log!(self debug "Already writing to this subkey: {}:{}", record_key, subkey);
true
} else {
// Add to our list of active subkey writes
asw.insert(subkey);
false
}
}
fn unmark_active_subkey_write_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
) {
// Remove from active subkey writes
let asw = inner.active_subkey_writes.get_mut(&record_key).unwrap();
if !asw.remove(subkey) {
veilid_log!(self error "missing active subkey write: {}:{}", record_key, subkey);
}
if asw.is_empty() {
inner.active_subkey_writes.remove(&record_key);
}
}
/// Set the value of a subkey on an opened local record /// Set the value of a subkey on an opened local record
#[instrument(level = "trace", target = "stor", skip_all)] #[instrument(level = "trace", target = "stor", skip_all)]
pub async fn set_value( pub async fn set_value(
@ -764,6 +773,10 @@ impl StorageManager {
data: Vec<u8>, data: Vec<u8>,
options: Option<SetDHTValueOptions>, options: Option<SetDHTValueOptions>,
) -> VeilidAPIResult<Option<ValueData>> { ) -> VeilidAPIResult<Option<ValueData>> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
// Get cryptosystem // Get cryptosystem
@ -795,8 +808,9 @@ impl StorageManager {
}; };
// See if the subkey we are modifying has a last known local value // See if the subkey we are modifying has a last known local value
let last_get_result = let last_get_result = self
Self::handle_get_local_value_inner(&mut inner, record_key, subkey, true).await?; .handle_get_local_value_inner(&mut inner, record_key, subkey, true)
.await?;
// Get the descriptor and schema for the key // Get the descriptor and schema for the key
let Some(descriptor) = last_get_result.opt_descriptor else { let Some(descriptor) = last_get_result.opt_descriptor else {
@ -838,37 +852,43 @@ impl StorageManager {
writer.secret, writer.secret,
)?); )?);
// Write the value locally first // Check if we are offline
veilid_log!(self debug "Writing subkey locally: {}:{} len={}", record_key, subkey, signed_value_data.value_data().data().len() ); // This is a race, but an optimization to avoid fanout if it is likely to fail
Self::handle_set_local_value_inner( if !self.dht_is_online() {
&mut inner, if allow_offline == AllowOffline(false) {
record_key, apibail_try_again!("offline, try again later");
subkey, }
signed_value_data.clone(), veilid_log!(self debug "Writing subkey offline because we are offline: {}:{} len={}", record_key, subkey, signed_value_data.value_data().data().len() );
InboundWatchUpdateMode::NoUpdate,
)
.await?;
// Note that we are writing this subkey actively
// If it appears we are already doing this, then put it to the offline queue
let already_writing = self.mark_active_subkey_write_inner(&mut inner, record_key, subkey);
if already_writing || !self.dht_is_online() {
if allow_offline == AllowOffline(true) {
veilid_log!(self debug "Writing subkey offline: {}:{} len={}", record_key, subkey, signed_value_data.value_data().data().len() );
// Add to offline writes to flush // Add to offline writes to flush
Self::add_offline_subkey_write_inner( self.add_offline_subkey_write_inner(
&mut inner, &mut inner,
record_key, record_key,
subkey, subkey,
safety_selection, safety_selection,
signed_value_data,
); );
return Ok(None); return Ok(None);
} else { };
self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey);
// Note that we are writing this subkey in the foreground
// If it appears we are already doing this, then put it to the background/offline queue
let opt_guard = self.mark_active_subkey_write_inner(&mut inner, record_key, subkey);
if opt_guard.is_none() {
if allow_offline == AllowOffline(false) {
apibail_try_again!("offline, try again later"); apibail_try_again!("offline, try again later");
} }
}; veilid_log!(self debug "Writing subkey offline due to concurrent foreground write: {}:{} len={}", record_key, subkey, signed_value_data.value_data().data().len() );
// Add to offline writes to flush
self.add_offline_subkey_write_inner(
&mut inner,
record_key,
subkey,
safety_selection,
signed_value_data,
);
return Ok(None);
}
let guard = opt_guard.unwrap();
// Drop the lock for network access // Drop the lock for network access
drop(inner); drop(inner);
@ -891,21 +911,21 @@ impl StorageManager {
// Failed to write, try again later // Failed to write, try again later
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
// Remove from active subkey writes
self.unmark_active_subkey_write_inner(&mut inner, guard);
if allow_offline == AllowOffline(true) { if allow_offline == AllowOffline(true) {
Self::add_offline_subkey_write_inner( self.add_offline_subkey_write_inner(
&mut inner, &mut inner,
record_key, record_key,
subkey, subkey,
safety_selection, safety_selection,
signed_value_data.clone(),
); );
} else { } else {
self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey);
apibail_try_again!("offline, try again later"); apibail_try_again!("offline, try again later");
} }
// Remove from active subkey writes
self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey);
if matches!(e, VeilidAPIError::TryAgain { message: _ }) { if matches!(e, VeilidAPIError::TryAgain { message: _ }) {
return Ok(None); return Ok(None);
} }
@ -913,7 +933,53 @@ impl StorageManager {
} }
}; };
let process = || async { let out = if allow_offline == AllowOffline(true) {
// Process one fanout result in the foreground, and if necessary, more in the background
// This trades off possibly having a consensus conflict, which requires watching for ValueChanged
// for lower latency. Can only be done if we are allowing offline processing because
// the network could go down after the first fanout result is processed and before we complete fanout.
self.background_process_set_value_results(
res_rx,
record_key,
subkey,
signed_value_data,
safety_selection,
)
.await
} else {
// Process all fanout results in the foreground.
// Takes longer but ensures the value is fully committed to the network.
self.foreground_process_set_value_results(
res_rx,
record_key,
subkey,
signed_value_data,
safety_selection,
)
.await
};
// Remove active subkey write
let mut inner = self.inner.lock().await;
// Remove from active subkey writes
self.unmark_active_subkey_write_inner(&mut inner, guard);
if matches!(out, Err(VeilidAPIError::TryAgain { message: _ })) {
return Ok(None);
}
out
}
async fn background_process_set_value_results(
&self,
res_rx: flume::Receiver<VeilidAPIResult<set_value::OutboundSetValueResult>>,
record_key: TypedRecordKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
safety_selection: SafetySelection,
) -> VeilidAPIResult<Option<ValueData>> {
// Wait for the first result // Wait for the first result
let Ok(result) = res_rx.recv_async().await else { let Ok(result) = res_rx.recv_async().await else {
apibail_internal!("failed to receive results"); apibail_internal!("failed to receive results");
@ -945,21 +1011,44 @@ impl StorageManager {
} }
Ok(out) Ok(out)
};
let out = process().await;
// Remove active subkey write
let mut inner = self.inner.lock().await;
// Remove from active subkey writes
self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey);
if matches!(out, Err(VeilidAPIError::TryAgain { message: _ })) {
return Ok(None);
} }
out async fn foreground_process_set_value_results(
&self,
res_rx: flume::Receiver<VeilidAPIResult<set_value::OutboundSetValueResult>>,
record_key: TypedRecordKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
safety_selection: SafetySelection,
) -> VeilidAPIResult<Option<ValueData>> {
let Some(stop_token) = self.startup_lock.stop_token() else {
apibail_not_initialized!();
};
loop {
let timeout_res = res_rx.recv_async().timeout_at(stop_token.clone()).await;
let Ok(res) = timeout_res else {
apibail_not_initialized!();
};
let Ok(result) = res else {
apibail_internal!("failed to receive results");
};
let result = result?;
let is_incomplete = result.fanout_result.kind.is_incomplete();
let opt_value_data = self
.process_outbound_set_value_result(
record_key,
subkey,
signed_value_data.value_data().clone(),
safety_selection,
result,
)
.await?;
if !is_incomplete {
return Ok(opt_value_data);
}
}
} }
/// Create, update or cancel an outbound watch to a DHT value /// Create, update or cancel an outbound watch to a DHT value
@ -971,6 +1060,10 @@ impl StorageManager {
expiration: Timestamp, expiration: Timestamp,
count: u32, count: u32,
) -> VeilidAPIResult<bool> { ) -> VeilidAPIResult<bool> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
// Obtain the watch change lock // Obtain the watch change lock
// (may need to wait for background operations to complete on the watch) // (may need to wait for background operations to complete on the watch)
let watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await; let watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await;
@ -1069,6 +1162,10 @@ impl StorageManager {
record_key: TypedRecordKey, record_key: TypedRecordKey,
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
) -> VeilidAPIResult<bool> { ) -> VeilidAPIResult<bool> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
// Obtain the watch change lock // Obtain the watch change lock
// (may need to wait for background operations to complete on the watch) // (may need to wait for background operations to complete on the watch)
let watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await; let watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await;
@ -1138,6 +1235,10 @@ impl StorageManager {
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
scope: DHTReportScope, scope: DHTReportScope,
) -> VeilidAPIResult<DHTRecordReport> { ) -> VeilidAPIResult<DHTRecordReport> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
let subkeys = if subkeys.is_empty() { let subkeys = if subkeys.is_empty() {
ValueSubkeyRangeSet::full() ValueSubkeyRangeSet::full()
} else { } else {
@ -1496,7 +1597,7 @@ impl StorageManager {
} }
#[instrument(level = "trace", target = "stor", skip_all, err)] #[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn open_existing_record_inner( async fn open_existing_record_inner(
&self, &self,
inner: &mut StorageManagerInner, inner: &mut StorageManagerInner,
record_key: TypedRecordKey, record_key: TypedRecordKey,
@ -1563,7 +1664,7 @@ impl StorageManager {
} }
#[instrument(level = "trace", target = "stor", skip_all, err)] #[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn open_new_record_inner( async fn open_new_record_inner(
&self, &self,
inner: &mut StorageManagerInner, inner: &mut StorageManagerInner,
record_key: TypedRecordKey, record_key: TypedRecordKey,
@ -1621,7 +1722,7 @@ impl StorageManager {
} }
#[instrument(level = "trace", target = "stor", skip_all, err)] #[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn get_value_nodes( async fn get_value_nodes(
&self, &self,
record_key: TypedRecordKey, record_key: TypedRecordKey,
) -> VeilidAPIResult<Option<Vec<NodeRef>>> { ) -> VeilidAPIResult<Option<Vec<NodeRef>>> {
@ -1652,9 +1753,7 @@ impl StorageManager {
} }
#[instrument(level = "trace", target = "stor", skip_all)] #[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn process_fanout_results_inner< fn process_fanout_results_inner<I: IntoIterator<Item = (ValueSubkeyRangeSet, FanoutResult)>>(
I: IntoIterator<Item = (ValueSubkeyRangeSet, FanoutResult)>,
>(
inner: &mut StorageManagerInner, inner: &mut StorageManagerInner,
vcrypto: &CryptoSystemGuard<'_>, vcrypto: &CryptoSystemGuard<'_>,
record_key: TypedRecordKey, record_key: TypedRecordKey,
@ -1734,11 +1833,20 @@ impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all, err)] #[instrument(level = "trace", target = "stor", skip_all, err)]
async fn handle_get_local_value_inner( async fn handle_get_local_value_inner(
&self,
inner: &mut StorageManagerInner, inner: &mut StorageManagerInner,
record_key: TypedRecordKey, record_key: TypedRecordKey,
subkey: ValueSubkey, subkey: ValueSubkey,
want_descriptor: bool, want_descriptor: bool,
) -> VeilidAPIResult<GetResult> { ) -> VeilidAPIResult<GetResult> {
// See if the value is in the offline subkey writes first,
// since it may not have been committed yet to the local record store
if let Some(get_result) =
self.get_offline_subkey_writes_subkey(inner, record_key, subkey, want_descriptor)?
{
return Ok(get_result);
}
// See if it's in the local record store // See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else { let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!(); apibail_not_initialized!();
@ -1757,13 +1865,22 @@ impl StorageManager {
} }
#[instrument(level = "trace", target = "stor", skip_all, err)] #[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_local_value_inner( async fn handle_set_local_value_inner(
&self,
inner: &mut StorageManagerInner, inner: &mut StorageManagerInner,
record_key: TypedRecordKey, record_key: TypedRecordKey,
subkey: ValueSubkey, subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>, signed_value_data: Arc<SignedValueData>,
watch_update_mode: InboundWatchUpdateMode, watch_update_mode: InboundWatchUpdateMode,
) -> VeilidAPIResult<()> { ) -> VeilidAPIResult<()> {
// See if this new data supercedes any offline subkey writes
self.remove_old_offline_subkey_writes_inner(
inner,
record_key,
subkey,
signed_value_data.clone(),
);
// See if it's in the local record store // See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else { let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!(); apibail_not_initialized!();
@ -1831,7 +1948,7 @@ impl StorageManager {
} }
#[instrument(level = "trace", target = "stor", skip_all, err)] #[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_remote_value_inner( async fn handle_set_remote_value_inner(
inner: &mut StorageManagerInner, inner: &mut StorageManagerInner,
record_key: TypedRecordKey, record_key: TypedRecordKey,
subkey: ValueSubkey, subkey: ValueSubkey,
@ -1869,7 +1986,7 @@ impl StorageManager {
} }
#[instrument(level = "trace", target = "stor", skip_all, err)] #[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_inspect_remote_value_inner( async fn handle_inspect_remote_value_inner(
&self, &self,
inner: &mut StorageManagerInner, inner: &mut StorageManagerInner,
record_key: TypedRecordKey, record_key: TypedRecordKey,
@ -1911,27 +2028,7 @@ impl StorageManager {
} }
#[instrument(level = "trace", target = "stor", skip_all)] #[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn add_offline_subkey_write_inner( fn process_deferred_results<T: Send + 'static>(
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
) {
inner
.offline_subkey_writes
.entry(record_key)
.and_modify(|x| {
x.subkeys.insert(subkey);
})
.or_insert(tasks::offline_subkey_writes::OfflineSubkeyWrite {
safety_selection,
subkeys: ValueSubkeyRangeSet::single(subkey),
subkeys_in_flight: ValueSubkeyRangeSet::new(),
});
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn process_deferred_results<T: Send + 'static>(
&self, &self,
receiver: flume::Receiver<T>, receiver: flume::Receiver<T>,
handler: impl FnMut(T) -> PinBoxFutureStatic<bool> + Send + 'static, handler: impl FnMut(T) -> PinBoxFutureStatic<bool> + Send + 'static,

View file

@ -0,0 +1,189 @@
use super::*;
impl_veilid_log_facility!("stor");
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OfflineSubkeyWrite {
/// Safety selection to use when writing this record to the network
pub safety_selection: SafetySelection,
/// The subkeys that are queued up needing to be sent to the network in the background
pub subkeys: ValueSubkeyRangeSet,
/// The subkeys currently being sent to the network in the background
#[serde(default)]
pub subkeys_in_flight: ValueSubkeyRangeSet,
/// The value data to send to the network if it is newer than what is in the local record store
#[serde(default)]
pub subkey_value_data: HashMap<ValueSubkey, Arc<SignedValueData>>,
}
impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn add_offline_subkey_write_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
signed_value_data: Arc<SignedValueData>,
) {
inner
.offline_subkey_writes
.entry(record_key)
.and_modify(|x| {
x.subkeys.insert(subkey);
x.subkey_value_data
.insert(subkey, signed_value_data.clone());
})
.or_insert(OfflineSubkeyWrite {
safety_selection,
subkeys: ValueSubkeyRangeSet::single(subkey),
subkeys_in_flight: ValueSubkeyRangeSet::new(),
subkey_value_data: {
let mut subkey_value_data = HashMap::new();
subkey_value_data.insert(subkey, signed_value_data);
subkey_value_data
},
});
}
pub(super) fn get_offline_subkey_writes_subkey(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<Option<GetResult>> {
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let Some(osw) = inner.offline_subkey_writes.get(&record_key) else {
return Ok(None);
};
let Some(signed_value_data) = osw.subkey_value_data.get(&subkey).cloned() else {
return Ok(None);
};
let opt_descriptor = if want_descriptor {
if let Some(descriptor) =
local_record_store.with_record(record_key, |record| record.descriptor().clone())
{
Some(descriptor)
} else {
// Record not available
return Ok(None);
}
} else {
None
};
Ok(Some(GetResult {
opt_value: Some(signed_value_data),
opt_descriptor,
}))
}
/// If an offline subkey write happens and then we find newer data on the network while
/// waiting to process the offline subkey write, we should continue with it but use the
/// newer data in place of the originally requested data. If the sequence number of the
/// network data is the same, we defer to what is already on the network.
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn remove_old_offline_subkey_writes_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
) {
// Get the offline subkey write record
match inner.offline_subkey_writes.entry(record_key) {
hashlink::linked_hash_map::Entry::Occupied(mut o) => {
let finished = {
let osw = o.get_mut();
match osw.subkey_value_data.entry(subkey) {
std::collections::hash_map::Entry::Occupied(o) => {
// If new data has greater or equal sequence number to the
// offline set value, drop the old data from the offline subkey write
let old_data = o.get().value_data();
let new_data = signed_value_data.value_data();
if old_data != new_data && new_data.seq() >= old_data.seq() {
o.remove();
// Also, remove the subkey from queued offline subkey writes
// but leave it in-flight if it is in flight. That will get
// handled by finish_offline_subkey_writes_inner
osw.subkeys.remove(subkey);
veilid_log!(self debug "offline write overwritten by newer or different data from network: record_key={} subkey={} seq={}", record_key, subkey, signed_value_data.value_data().seq());
}
}
std::collections::hash_map::Entry::Vacant(_) => {}
}
// If we have no new work to do, and not still doing work, then this record is done
let finished = osw.subkeys.is_empty() && osw.subkeys_in_flight.is_empty();
if !finished {
// Remove any subkey value data that is no longer needed
let osw = o.get_mut();
osw.subkey_value_data.retain(|k, _| {
osw.subkeys.contains(*k) || osw.subkeys_in_flight.contains(*k)
});
}
finished
};
if finished {
veilid_log!(self debug "Offline write finished key {}", record_key);
o.remove();
}
}
hashlink::linked_hash_map::Entry::Vacant(_) => {}
}
}
/// When we finish a offline subkey write, we mark subkeys as no longer in-flight
/// and if we didn't finish all the subkeys they are returned to the list of offline subkeys
/// so we can try again later. If the data associated with the write is no longer necessary
/// we can drop it.
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn finish_offline_subkey_writes_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkeys_written: ValueSubkeyRangeSet,
subkeys_still_offline: ValueSubkeyRangeSet,
) {
assert!(
subkeys_written.is_disjoint(&subkeys_still_offline),
"subkeys can not be written and still offline"
);
// Get the offline subkey write record
match inner.offline_subkey_writes.entry(record_key) {
hashlink::linked_hash_map::Entry::Occupied(mut o) => {
let finished = {
let osw = o.get_mut();
// Now any left over are still offline, so merge them with any subkeys that have been added while we were working
osw.subkeys = osw.subkeys.union(&subkeys_still_offline);
// Remove subkeys that were successfully written from in_flight status
osw.subkeys_in_flight = osw.subkeys_in_flight.difference(&subkeys_written);
// If we have no new work to do, and not still doing work, then this record is done
let finished = osw.subkeys.is_empty() && osw.subkeys_in_flight.is_empty();
if !finished {
// Remove any subkey value data that is no longer needed
let osw = o.get_mut();
osw.subkey_value_data.retain(|k, _| {
osw.subkeys.contains(*k) || osw.subkeys_in_flight.contains(*k)
});
}
finished
};
if finished {
veilid_log!(self debug "offline subkey write finished key {}", record_key);
o.remove();
}
}
hashlink::linked_hash_map::Entry::Vacant(_) => {
veilid_log!(self warn "can't finish missing offline subkey write: ignoring key {}", record_key);
}
}
}
}

View file

@ -140,6 +140,40 @@ impl StorageManager {
.await; .await;
} }
async fn rehydrate_single_subkey_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
) -> bool {
// Get value to rehydrate with
let get_result = match self
.handle_get_local_value_inner(inner, record_key, subkey, false)
.await
{
Ok(v) => v,
Err(e) => {
veilid_log!(self debug "Missing local record for rehydrating subkey: record={} subkey={}: {}", record_key, subkey, e);
return false;
}
};
let data = match get_result.opt_value {
Some(v) => v,
None => {
veilid_log!(self debug "Missing local subkey data for rehydrating subkey: record={} subkey={}", record_key, subkey);
return false;
}
};
// Add to offline writes to flush
veilid_log!(self debug "Rehydrating: record={} subkey={}", record_key, subkey);
self.add_offline_subkey_write_inner(inner, record_key, subkey, safety_selection, data);
true
}
#[instrument(level = "trace", target = "stor", skip(self), ret, err)] #[instrument(level = "trace", target = "stor", skip(self), ret, err)]
pub(super) async fn rehydrate_all_subkeys( pub(super) async fn rehydrate_all_subkeys(
&self, &self,
@ -156,15 +190,13 @@ impl StorageManager {
let mut rehydrated = ValueSubkeyRangeSet::new(); let mut rehydrated = ValueSubkeyRangeSet::new();
for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() { for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() {
if local_inspect_result.seqs()[n].is_some() { if local_inspect_result.seqs()[n].is_some() {
// Add to offline writes to flush // Rehydrate subkey
veilid_log!(self debug "Rehydrating: record={} subkey={}", record_key, subkey); if self
.rehydrate_single_subkey_inner(&mut inner, record_key, subkey, safety_selection)
.await
{
rehydrated.insert(subkey); rehydrated.insert(subkey);
Self::add_offline_subkey_write_inner( }
&mut inner,
record_key,
subkey,
safety_selection,
);
} }
} }
@ -214,15 +246,13 @@ impl StorageManager {
// Does the online subkey have enough consensus? // Does the online subkey have enough consensus?
// If not, schedule it to be written in the background // If not, schedule it to be written in the background
if sfr.consensus_nodes.len() < consensus_count { if sfr.consensus_nodes.len() < consensus_count {
// Add to offline writes to flush // Rehydrate subkey
veilid_log!(self debug "Rehydrating: record={} subkey={}", record_key, subkey); if self
.rehydrate_single_subkey_inner(&mut inner, record_key, subkey, safety_selection)
.await
{
rehydrated.insert(subkey); rehydrated.insert(subkey);
Self::add_offline_subkey_write_inner( }
&mut inner,
record_key,
subkey,
safety_selection,
);
} }
} }

View file

@ -277,19 +277,19 @@ impl StorageManager {
pub(super) fn process_deferred_outbound_set_value_result( pub(super) fn process_deferred_outbound_set_value_result(
&self, &self,
res_rx: flume::Receiver<Result<set_value::OutboundSetValueResult, VeilidAPIError>>, res_rx: flume::Receiver<Result<set_value::OutboundSetValueResult, VeilidAPIError>>,
key: TypedRecordKey, record_key: TypedRecordKey,
subkey: ValueSubkey, subkey: ValueSubkey,
last_value_data: ValueData, requested_value_data: ValueData,
safety_selection: SafetySelection, safety_selection: SafetySelection,
) { ) {
let registry = self.registry(); let registry = self.registry();
let last_value_data = Arc::new(Mutex::new(last_value_data)); let last_requested_value_data = Arc::new(Mutex::new(requested_value_data));
self.process_deferred_results( self.process_deferred_results(
res_rx, res_rx,
Box::new( Box::new(
move |result: VeilidAPIResult<set_value::OutboundSetValueResult>| -> PinBoxFutureStatic<bool> { move |result: VeilidAPIResult<set_value::OutboundSetValueResult>| -> PinBoxFutureStatic<bool> {
let registry = registry.clone(); let registry = registry.clone();
let last_value_data = last_value_data.clone(); let last_requested_value_data = last_requested_value_data.clone();
Box::pin(async move { Box::pin(async move {
let this = registry.storage_manager(); let this = registry.storage_manager();
@ -301,8 +301,8 @@ impl StorageManager {
} }
}; };
let is_incomplete = result.fanout_result.kind.is_incomplete(); let is_incomplete = result.fanout_result.kind.is_incomplete();
let lvd = last_value_data.lock().clone(); let requested_value_data = last_requested_value_data.lock().clone();
let value_data = match this.process_outbound_set_value_result(key, subkey, lvd, safety_selection, result).await { let value_data = match this.process_outbound_set_value_result(record_key, subkey, requested_value_data, safety_selection, result).await {
Ok(Some(v)) => v, Ok(Some(v)) => v,
Ok(None) => { Ok(None) => {
return is_incomplete; return is_incomplete;
@ -320,7 +320,7 @@ impl StorageManager {
// if the sequence number changed since our first partial update // if the sequence number changed since our first partial update
// Send with a max count as this is not attached to any watch // Send with a max count as this is not attached to any watch
let changed = { let changed = {
let mut lvd = last_value_data.lock(); let mut lvd = last_requested_value_data.lock();
if lvd.seq() != value_data.seq() { if lvd.seq() != value_data.seq() {
*lvd = value_data.clone(); *lvd = value_data.clone();
true true
@ -329,7 +329,7 @@ impl StorageManager {
} }
}; };
if changed { if changed {
this.update_callback_value_change(key,ValueSubkeyRangeSet::single(subkey), u32::MAX, Some(value_data)); this.update_callback_value_change(record_key,ValueSubkeyRangeSet::single(subkey), u32::MAX, Some(value_data));
} }
// Return done // Return done
@ -345,7 +345,7 @@ impl StorageManager {
&self, &self,
record_key: TypedRecordKey, record_key: TypedRecordKey,
subkey: ValueSubkey, subkey: ValueSubkey,
last_value_data: ValueData, requested_value_data: ValueData,
safety_selection: SafetySelection, safety_selection: SafetySelection,
result: set_value::OutboundSetValueResult, result: set_value::OutboundSetValueResult,
) -> Result<Option<ValueData>, VeilidAPIError> { ) -> Result<Option<ValueData>, VeilidAPIError> {
@ -362,7 +362,13 @@ impl StorageManager {
let was_offline = self.check_fanout_set_offline(record_key, subkey, &result.fanout_result); let was_offline = self.check_fanout_set_offline(record_key, subkey, &result.fanout_result);
if was_offline { if was_offline {
// Failed to write, try again later // Failed to write, try again later
Self::add_offline_subkey_write_inner(&mut inner, record_key, subkey, safety_selection); self.add_offline_subkey_write_inner(
&mut inner,
record_key,
subkey,
safety_selection,
result.signed_value_data.clone(),
);
} }
// Keep the list of nodes that returned a value for later reference // Keep the list of nodes that returned a value for later reference
@ -376,11 +382,8 @@ impl StorageManager {
.with(|c| c.network.dht.set_value_count as usize), .with(|c| c.network.dht.set_value_count as usize),
); );
// Return the new value if it differs from what was asked to set // Record the set value locally since it was successfully set online
if result.signed_value_data.value_data() != &last_value_data { self.handle_set_local_value_inner(
// Record the newer value and send and update since it is different than what we just set
Self::handle_set_local_value_inner(
&mut inner, &mut inner,
record_key, record_key,
subkey, subkey,
@ -389,6 +392,8 @@ impl StorageManager {
) )
.await?; .await?;
// Return the new value if it differs from what was asked to set
if result.signed_value_data.value_data() != &requested_value_data {
return Ok(Some(result.signed_value_data.value_data().clone())); return Ok(Some(result.signed_value_data.value_data().clone()));
} }
@ -413,8 +418,9 @@ impl StorageManager {
// See if this is a remote or local value // See if this is a remote or local value
let (is_local, last_get_result) = { let (is_local, last_get_result) = {
// See if the subkey we are modifying has a last known local value // See if the subkey we are modifying has a last known local value
let last_get_result = let last_get_result = self
Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await?; .handle_get_local_value_inner(&mut inner, key, subkey, true)
.await?;
// If this is local, it must have a descriptor already // If this is local, it must have a descriptor already
if last_get_result.opt_descriptor.is_some() { if last_get_result.opt_descriptor.is_some() {
(true, last_get_result) (true, last_get_result)
@ -484,7 +490,7 @@ impl StorageManager {
// Do the set and return no new value // Do the set and return no new value
let res = if is_local { let res = if is_local {
Self::handle_set_local_value_inner( self.handle_set_local_value_inner(
&mut inner, &mut inner,
key, key,
subkey, subkey,

View file

@ -4,14 +4,6 @@ use stop_token::future::FutureExt as _;
impl_veilid_log_facility!("stor"); impl_veilid_log_facility!("stor");
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OfflineSubkeyWrite {
pub safety_selection: SafetySelection,
pub subkeys: ValueSubkeyRangeSet,
#[serde(default)]
pub subkeys_in_flight: ValueSubkeyRangeSet,
}
#[derive(Debug)] #[derive(Debug)]
enum OfflineSubkeyWriteResult { enum OfflineSubkeyWriteResult {
Finished(set_value::OutboundSetValueResult), Finished(set_value::OutboundSetValueResult),
@ -49,7 +41,8 @@ impl StorageManager {
}; };
let get_result = { let get_result = {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await self.handle_get_local_value_inner(&mut inner, key, subkey, true)
.await
}; };
let Ok(get_result) = get_result else { let Ok(get_result) = get_result else {
veilid_log!(self debug "Offline subkey write had no subkey result: {}:{}", key, subkey); veilid_log!(self debug "Offline subkey write had no subkey result: {}:{}", key, subkey);
@ -85,7 +78,7 @@ impl StorageManager {
// Record the newer value and send and update since it is different than what we just set // Record the newer value and send and update since it is different than what we just set
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
Self::handle_set_local_value_inner( self.handle_set_local_value_inner(
&mut inner, &mut inner,
key, key,
subkey, subkey,
@ -177,36 +170,14 @@ impl StorageManager {
// Debug print the result // Debug print the result
veilid_log!(self debug "Offline write result: {:?}", result); veilid_log!(self debug "Offline write result: {:?}", result);
// Get the offline subkey write record // Mark the offline subkey write as no longer in-flight
match inner let subkeys_still_offline = result.work_item.subkeys.difference(&result.written_subkeys);
.offline_subkey_writes self.finish_offline_subkey_writes_inner(
.entry(result.work_item.record_key) &mut inner,
{ result.work_item.record_key,
hashlink::linked_hash_map::Entry::Occupied(mut o) => { result.written_subkeys,
let finished = { subkeys_still_offline,
let osw = o.get_mut(); );
// Mark in-flight subkeys as having been completed
let subkeys_still_offline =
result.work_item.subkeys.difference(&result.written_subkeys);
// Now any left over are still offline, so merge them with any subkeys that have been added while we were working
osw.subkeys = osw.subkeys.union(&subkeys_still_offline);
// And clear the subkeys in flight since we're done with this key for now
osw.subkeys_in_flight =
osw.subkeys_in_flight.difference(&result.written_subkeys);
// If we have no new work to do, and not still doing work, then this record is done
osw.subkeys.is_empty() && osw.subkeys_in_flight.is_empty()
};
if finished {
veilid_log!(self debug "Offline write finished key {}", result.work_item.record_key);
o.remove();
}
}
hashlink::linked_hash_map::Entry::Vacant(_) => {
veilid_log!(self warn "offline write work items should always be on offline_subkey_writes entries that exist: ignoring key {}", result.work_item.record_key);
}
}
// Keep the list of nodes that returned a value for later reference // Keep the list of nodes that returned a value for later reference
let crypto = self.crypto(); let crypto = self.crypto();

View file

@ -1171,8 +1171,8 @@ impl StorageManager {
// Set the local value // Set the local value
let mut report_value_change = false; let mut report_value_change = false;
if let Some(value) = &value { if let Some(value) = &value {
let last_get_result = let last_get_result = self
Self::handle_get_local_value_inner(inner, record_key, first_subkey, true) .handle_get_local_value_inner(inner, record_key, first_subkey, true)
.await?; .await?;
let descriptor = last_get_result.opt_descriptor.unwrap(); let descriptor = last_get_result.opt_descriptor.unwrap();
@ -1211,7 +1211,7 @@ impl StorageManager {
// Keep the value because it is newer than the one we have // Keep the value because it is newer than the one we have
if report_value_change { if report_value_change {
Self::handle_set_local_value_inner( self.handle_set_local_value_inner(
inner, inner,
record_key, record_key,
first_subkey, first_subkey,

View file

@ -1641,7 +1641,7 @@ impl VeilidAPI {
} }
async fn debug_record_set(&self, args: Vec<String>) -> VeilidAPIResult<String> { async fn debug_record_set(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let opt_arg_add = if args.len() >= 2 && get_dht_key_no_safety(&args[1]).is_some() { let mut opt_arg_add = if args.len() >= 2 && get_dht_key_no_safety(&args[1]).is_some() {
1 1
} else { } else {
0 0
@ -1658,14 +1658,43 @@ impl VeilidAPI {
)?; )?;
let data = let data =
get_debug_argument_at(&args, 2 + opt_arg_add, "debug_record_set", "data", get_data)?; get_debug_argument_at(&args, 2 + opt_arg_add, "debug_record_set", "data", get_data)?;
let writer = get_debug_argument_at( let writer = match get_debug_argument_at(
&args, &args,
3 + opt_arg_add, 3 + opt_arg_add,
"debug_record_set", "debug_record_set",
"writer", "writer",
get_keypair, get_keypair,
) {
Ok(v) => {
opt_arg_add += 1;
Some(v)
}
Err(_) => None,
};
let allow_offline = if args.len() > 3 + opt_arg_add {
get_debug_argument_at(
&args,
3 + opt_arg_add,
"debug_record_set",
"allow_offline",
get_string,
) )
.ok(); .ok()
} else {
None
};
let allow_offline = if let Some(allow_offline) = allow_offline {
if &allow_offline == "online" || &allow_offline == "false" {
Some(AllowOffline(false))
} else if &allow_offline == "offline" || &allow_offline == "true" {
Some(AllowOffline(true))
} else {
return Ok(format!("Unknown allow_offline: {}", allow_offline));
}
} else {
None
};
// Do a record set // Do a record set
let value = match rc let value = match rc
@ -1675,7 +1704,7 @@ impl VeilidAPI {
data, data,
Some(SetDHTValueOptions { Some(SetDHTValueOptions {
writer, writer,
allow_offline: None, allow_offline,
}), }),
) )
.await .await
@ -1710,7 +1739,7 @@ impl VeilidAPI {
"subkey", "subkey",
get_number::<u32>, get_number::<u32>,
)?; )?;
let force_refresh = if args.len() >= 3 + opt_arg_add { let force_refresh = if args.len() > 2 + opt_arg_add {
Some(get_debug_argument_at( Some(get_debug_argument_at(
&args, &args,
2 + opt_arg_add, 2 + opt_arg_add,
@ -2222,7 +2251,7 @@ DHT Operations:
create <dhtschema> [<cryptokind> [<safety>]] - create a new dht record create <dhtschema> [<cryptokind> [<safety>]] - create a new dht record
open <key>[+<safety>] [<writer>] - open an existing dht record open <key>[+<safety>] [<writer>] - open an existing dht record
close [<key>] - close an opened/created dht record close [<key>] - close an opened/created dht record
set [<key>] <subkey> <data> - write a value to a dht record subkey set [<key>] <subkey> <data> [<writer>] [offline|online]- write a value to a dht record subkey
get [<key>] <subkey> [force] - read a value from a dht record subkey get [<key>] <subkey> [force] - read a value from a dht record subkey
delete <key> - delete the local copy of a dht record (not from the network) delete <key> - delete the local copy of a dht record (not from the network)
info [<key>] [subkey] - display information about a dht record or subkey info [<key>] [subkey] - display information about a dht record or subkey

View file

@ -253,6 +253,130 @@ async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI):
await rc.delete_dht_record(key) await rc.delete_dht_record(key)
@pytest.mark.asyncio
async def test_open_writer_dht_value_no_offline(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2))
key = rec.key
owner = rec.owner
secret = rec.owner_secret
#print(f"key:{key}")
cs = await api_connection.get_crypto_system(rec.key.kind())
async with cs:
assert await cs.validate_key_pair(owner, secret)
other_keypair = await cs.generate_key_pair()
va = b"Qwertyuiop Asdfghjkl Zxcvbnm"
vb = b"1234567890"
vc = b"!@#$%^&*()"
# Test subkey writes
vdtemp = await rc.set_dht_value(key, ValueSubkey(1), va, veilid.SetDHTValueOptions(None, False))
assert vdtemp is None
vdtemp = await rc.get_dht_value(key, ValueSubkey(1), False)
assert vdtemp.data == va
assert vdtemp.seq == 0
assert vdtemp.writer == owner
vdtemp = await rc.get_dht_value(key, ValueSubkey(0), False)
assert vdtemp is None
vdtemp = await rc.set_dht_value(key, ValueSubkey(0), vb, veilid.SetDHTValueOptions(None, False))
assert vdtemp is None
await sync(rc, [rec])
vdtemp = await rc.get_dht_value(key, ValueSubkey(0), True)
assert vdtemp.data == vb
vdtemp = await rc.get_dht_value(key, ValueSubkey(1), True)
assert vdtemp.data == va
# Equal value should not trigger sequence number update
vdtemp = await rc.set_dht_value(key, ValueSubkey(1), va, veilid.SetDHTValueOptions(None, False))
assert vdtemp is None
# Different value should trigger sequence number update
vdtemp = await rc.set_dht_value(key, ValueSubkey(1), vb, veilid.SetDHTValueOptions(None, False))
assert vdtemp is None
# Now that we initialized some subkeys
# and verified they stored correctly
# Delete things locally and reopen and see if we can write
# with the same writer key
await rc.close_dht_record(key)
await rc.delete_dht_record(key)
rec = await rc.open_dht_record(key, veilid.KeyPair.from_parts(owner, secret))
assert rec is not None
assert rec.key == key
assert rec.owner == owner
assert rec.owner_secret == secret
assert rec.schema.kind == veilid.DHTSchemaKind.DFLT
assert rec.schema.o_cnt == 2
# Verify subkey 1 can be set before it is get but newer is available online
vdtemp = await rc.set_dht_value(key, ValueSubkey(1), vc, veilid.SetDHTValueOptions(None, False))
assert vdtemp is not None
assert vdtemp.data == vb
assert vdtemp.seq == 1
assert vdtemp.writer == owner
# Verify subkey 1 can be set a second time and it updates because seq is newer
vdtemp = await rc.set_dht_value(key, ValueSubkey(1), vc, veilid.SetDHTValueOptions(None, False))
assert vdtemp is None
# Verify the network got the subkey update with a refresh check
vdtemp = await rc.get_dht_value(key, ValueSubkey(1), True)
assert vdtemp is not None
assert vdtemp.data == vc
assert vdtemp.seq == 2
assert vdtemp.writer == owner
# Delete things locally and reopen and see if we can write
# with a different writer key (should fail)
await rc.close_dht_record(key)
await rc.delete_dht_record(key)
rec = await rc.open_dht_record(key, other_keypair)
assert rec is not None
assert rec.key == key
assert rec.owner == owner
assert rec.owner_secret is None
assert rec.schema.kind == veilid.DHTSchemaKind.DFLT
assert rec.schema.o_cnt == 2
# Verify subkey 1 can NOT be set because we have the wrong writer
with pytest.raises(veilid.VeilidAPIError):
await rc.set_dht_value(key, ValueSubkey(1), va, veilid.SetDHTValueOptions(None, False))
# Verify subkey 0 can NOT be set because we have the wrong writer
with pytest.raises(veilid.VeilidAPIError):
await rc.set_dht_value(key, ValueSubkey(0), va, veilid.SetDHTValueOptions(None, False))
# Verify subkey 0 can be set because override with the right writer
# Should have prior sequence number as its returned value because it exists online at seq 0
vdtemp = await rc.set_dht_value(key, ValueSubkey(0), va, veilid.SetDHTValueOptions(veilid.KeyPair.from_parts(owner, secret), False))
assert vdtemp is not None
assert vdtemp.data == vb
assert vdtemp.seq == 0
assert vdtemp.writer == owner
# Should update the second time to seq 1
vdtemp = await rc.set_dht_value(key, ValueSubkey(0), va, veilid.SetDHTValueOptions(veilid.KeyPair.from_parts(owner, secret), False))
assert vdtemp is None
# Clean up
await rc.close_dht_record(key)
await rc.delete_dht_record(key)
@pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") @pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_watch_dht_values(): async def test_watch_dht_values():
@ -793,7 +917,7 @@ async def test_dht_write_read_full_subkeys_local():
# Secret to encrypt test data # Secret to encrypt test data
SECRET = veilid.SharedSecret.from_bytes(b"A"*32) SECRET = veilid.SharedSecret.from_bytes(b"A"*32)
# Max subkey size # Max subkey size
MAX_SUBKEY_SIZE = min(32768, 1024*1024/SUBKEY_COUNT) MAX_SUBKEY_SIZE = min(32768, 1024*1024//SUBKEY_COUNT)
# MAX_SUBKEY_SIZE = 256 # MAX_SUBKEY_SIZE = 256
# write dht records on server 0 # write dht records on server 0