From 9d4976b243b9ba0b84996912b79485455a269400 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Wed, 18 Jun 2025 22:09:36 -0400 Subject: [PATCH] Improved logic for 'allow_offline' and 'offline subkey writes' --- .../storage_manager/active_subkey_writes.rs | 72 ++++ veilid-core/src/storage_manager/get_value.rs | 7 +- veilid-core/src/storage_manager/mod.rs | 375 +++++++++++------- .../storage_manager/offline_subkey_writes.rs | 189 +++++++++ veilid-core/src/storage_manager/rehydrate.rs | 66 ++- veilid-core/src/storage_manager/set_value.rs | 56 +-- .../tasks/offline_subkey_writes.rs | 51 +-- .../src/storage_manager/watch_value.rs | 8 +- veilid-core/src/veilid_api/debug.rs | 43 +- veilid-python/tests/test_dht.py | 126 +++++- 10 files changed, 756 insertions(+), 237 deletions(-) create mode 100644 veilid-core/src/storage_manager/active_subkey_writes.rs create mode 100644 veilid-core/src/storage_manager/offline_subkey_writes.rs diff --git a/veilid-core/src/storage_manager/active_subkey_writes.rs b/veilid-core/src/storage_manager/active_subkey_writes.rs new file mode 100644 index 00000000..b7ec2e82 --- /dev/null +++ b/veilid-core/src/storage_manager/active_subkey_writes.rs @@ -0,0 +1,72 @@ +use super::*; + +impl_veilid_log_facility!("stor"); + +pub(super) struct ActiveSubkeyWriteGuard { + registry: VeilidComponentRegistry, + done: bool, + record_key: TypedRecordKey, + subkey: ValueSubkey, +} + +impl ActiveSubkeyWriteGuard { + fn set_done(&mut self) { + self.done = true; + } +} + +impl Drop for ActiveSubkeyWriteGuard { + fn drop(&mut self) { + if !self.done { + let registry = &self.registry; + veilid_log!(registry error "active subkey write finished without being marked done: {}:{}", self.record_key, self.subkey); + } + } +} + +impl StorageManager { + // Returns false if we were not already writing + // Returns true if this subkey was already being written to + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn mark_active_subkey_write_inner( + &self, + inner: &mut StorageManagerInner, + record_key: TypedRecordKey, + subkey: ValueSubkey, + ) -> Option { + let asw = inner.active_subkey_writes.entry(record_key).or_default(); + if asw.contains(subkey) { + veilid_log!(self debug "already writing to this subkey: {}:{}", record_key, subkey); + None + } else { + // Add to our list of active subkey writes + asw.insert(subkey); + Some(ActiveSubkeyWriteGuard { + registry: self.registry(), + done: false, + record_key, + subkey, + }) + } + } + + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn unmark_active_subkey_write_inner( + &self, + inner: &mut StorageManagerInner, + mut guard: ActiveSubkeyWriteGuard, + ) { + // Remove from active subkey writes + let asw = inner + .active_subkey_writes + .get_mut(&guard.record_key) + .unwrap(); + if !asw.remove(guard.subkey) { + veilid_log!(self error "missing active subkey write: {}:{}", guard.record_key, guard.subkey); + } + if asw.is_empty() { + inner.active_subkey_writes.remove(&guard.record_key); + } + guard.set_done(); + } +} diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 6e0362c6..2fd104ae 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -390,7 +390,7 @@ impl StorageManager { // If we got a new value back then write it to the opened record if Some(get_result_value.value_data().seq()) != opt_last_seq { - Self::handle_set_local_value_inner( + self.handle_set_local_value_inner( &mut inner, record_key, subkey, @@ -415,8 +415,9 @@ impl StorageManager { // See if this is a remote or local value let (_is_local, last_get_result) = { // See if the subkey we are getting has a last known local value - let mut last_get_result = - Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await?; + let mut last_get_result = self + .handle_get_local_value_inner(&mut inner, key, subkey, true) + .await?; // If this is local, it must have a descriptor already if last_get_result.opt_descriptor.is_some() { if !want_descriptor { diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 7379260c..5c38cb98 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,6 +1,8 @@ +mod active_subkey_writes; mod debug; mod get_value; mod inspect_value; +mod offline_subkey_writes; mod outbound_watch_manager; mod record_store; mod rehydrate; @@ -12,11 +14,13 @@ mod watch_value; use super::*; use hashlink::LinkedHashMap; +use offline_subkey_writes::*; use outbound_watch_manager::*; use record_store::*; use rehydrate::*; use routing_table::*; use rpc_processor::*; +use stop_token::future::FutureExt as _; pub use record_store::{InboundWatchParameters, InboundWatchResult}; @@ -81,12 +85,12 @@ struct StorageManagerInner { pub local_record_store: Option>, /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish pub remote_record_store: Option>, - /// Record subkeys that have not been pushed to the network because they were written to offline - pub offline_subkey_writes: - LinkedHashMap, + /// Record subkeys to commit to the network in the background, + /// either because they were written to offline, or due to a rehydration action + pub offline_subkey_writes: LinkedHashMap, /// Record subkeys that are currently being written to in the foreground pub active_subkey_writes: HashMap, - /// Records that have rehydration requests + /// Records that have pending rehydration requests pub rehydration_requests: HashMap, /// State management for outbound watches pub outbound_watch_manager: OutboundWatchManager, @@ -122,6 +126,7 @@ impl fmt::Debug for StorageManagerInner { pub(crate) struct StorageManager { registry: VeilidComponentRegistry, inner: AsyncMutex, + startup_lock: Arc, // Background processes save_metadata_task: TickTask, @@ -197,6 +202,7 @@ impl StorageManager { let this = StorageManager { registry, inner: AsyncMutex::new(inner), + startup_lock: Arc::new(StartupLock::new()), save_metadata_task: TickTask::new("save_metadata_task", SAVE_METADATA_INTERVAL_SECS), flush_record_stores_task: TickTask::new( @@ -276,6 +282,8 @@ impl StorageManager { #[instrument(level = "debug", skip_all, err)] async fn init_async(&self) -> EyreResult<()> { + let guard = self.startup_lock.startup()?; + veilid_log!(self debug "startup storage manager"); let table_store = self.table_store(); let config = self.config(); @@ -301,6 +309,8 @@ impl StorageManager { // Start deferred results processors self.background_operation_processor.init(); + guard.success(); + Ok(()) } @@ -355,6 +365,13 @@ impl StorageManager { async fn terminate_async(&self) { veilid_log!(self debug "starting storage manager shutdown"); + // Proceed with shutdown + let guard = self + .startup_lock + .shutdown() + .await + .expect("should be started up"); + // Stop deferred result processor self.background_operation_processor.terminate().await; @@ -383,6 +400,8 @@ impl StorageManager { *inner = Self::new_inner(); } + guard.success(); + veilid_log!(self debug "finished storage manager shutdown"); } @@ -507,6 +526,10 @@ impl StorageManager { owner: Option, safety_selection: SafetySelection, ) -> VeilidAPIResult { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + // Validate schema schema.validate()?; @@ -533,6 +556,10 @@ impl StorageManager { writer: Option, safety_selection: SafetySelection, ) -> VeilidAPIResult { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + let mut inner = self.inner.lock().await; // See if we have a local record already or not @@ -609,6 +636,10 @@ impl StorageManager { /// Close an opened local record #[instrument(level = "trace", target = "stor", skip_all)] pub async fn close_record(&self, record_key: TypedRecordKey) -> VeilidAPIResult<()> { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + // Attempt to close the record, returning the opened record if it wasn't already closed let mut inner = self.inner.lock().await; Self::close_record_inner(&mut inner, record_key)?; @@ -618,6 +649,10 @@ impl StorageManager { /// Close all opened records #[instrument(level = "trace", target = "stor", skip_all)] pub async fn close_all_records(&self) -> VeilidAPIResult<()> { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + // Attempt to close the record, returning the opened record if it wasn't already closed let mut inner = self.inner.lock().await; let keys = inner.opened_records.keys().copied().collect::>(); @@ -631,6 +666,10 @@ impl StorageManager { /// Delete a local record #[instrument(level = "trace", target = "stor", skip_all)] pub async fn delete_record(&self, record_key: TypedRecordKey) -> VeilidAPIResult<()> { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + // Ensure the record is closed let mut inner = self.inner.lock().await; Self::close_record_inner(&mut inner, record_key)?; @@ -652,6 +691,10 @@ impl StorageManager { subkey: ValueSubkey, force_refresh: bool, ) -> VeilidAPIResult> { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + let mut inner = self.inner.lock().await; let safety_selection = { let Some(opened_record) = inner.opened_records.get(&record_key) else { @@ -661,8 +704,9 @@ impl StorageManager { }; // See if the requested subkey is our local record store - let last_get_result = - Self::handle_get_local_value_inner(&mut inner, record_key, subkey, true).await?; + let last_get_result = self + .handle_get_local_value_inner(&mut inner, record_key, subkey, true) + .await?; // Return the existing value if we have one unless we are forcing a refresh if !force_refresh { @@ -720,41 +764,6 @@ impl StorageManager { Ok(out) } - // Returns false if we were not already writing - // Returns true if this subkey was already being written to - fn mark_active_subkey_write_inner( - &self, - inner: &mut StorageManagerInner, - record_key: TypedRecordKey, - subkey: ValueSubkey, - ) -> bool { - let asw = inner.active_subkey_writes.entry(record_key).or_default(); - if asw.contains(subkey) { - veilid_log!(self debug "Already writing to this subkey: {}:{}", record_key, subkey); - true - } else { - // Add to our list of active subkey writes - asw.insert(subkey); - false - } - } - - fn unmark_active_subkey_write_inner( - &self, - inner: &mut StorageManagerInner, - record_key: TypedRecordKey, - subkey: ValueSubkey, - ) { - // Remove from active subkey writes - let asw = inner.active_subkey_writes.get_mut(&record_key).unwrap(); - if !asw.remove(subkey) { - veilid_log!(self error "missing active subkey write: {}:{}", record_key, subkey); - } - if asw.is_empty() { - inner.active_subkey_writes.remove(&record_key); - } - } - /// Set the value of a subkey on an opened local record #[instrument(level = "trace", target = "stor", skip_all)] pub async fn set_value( @@ -764,6 +773,10 @@ impl StorageManager { data: Vec, options: Option, ) -> VeilidAPIResult> { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + let mut inner = self.inner.lock().await; // Get cryptosystem @@ -795,8 +808,9 @@ impl StorageManager { }; // See if the subkey we are modifying has a last known local value - let last_get_result = - Self::handle_get_local_value_inner(&mut inner, record_key, subkey, true).await?; + let last_get_result = self + .handle_get_local_value_inner(&mut inner, record_key, subkey, true) + .await?; // Get the descriptor and schema for the key let Some(descriptor) = last_get_result.opt_descriptor else { @@ -838,38 +852,44 @@ impl StorageManager { writer.secret, )?); - // Write the value locally first - veilid_log!(self debug "Writing subkey locally: {}:{} len={}", record_key, subkey, signed_value_data.value_data().data().len() ); - Self::handle_set_local_value_inner( - &mut inner, - record_key, - subkey, - signed_value_data.clone(), - InboundWatchUpdateMode::NoUpdate, - ) - .await?; - - // Note that we are writing this subkey actively - // If it appears we are already doing this, then put it to the offline queue - let already_writing = self.mark_active_subkey_write_inner(&mut inner, record_key, subkey); - - if already_writing || !self.dht_is_online() { - if allow_offline == AllowOffline(true) { - veilid_log!(self debug "Writing subkey offline: {}:{} len={}", record_key, subkey, signed_value_data.value_data().data().len() ); - // Add to offline writes to flush - Self::add_offline_subkey_write_inner( - &mut inner, - record_key, - subkey, - safety_selection, - ); - return Ok(None); - } else { - self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey); + // Check if we are offline + // This is a race, but an optimization to avoid fanout if it is likely to fail + if !self.dht_is_online() { + if allow_offline == AllowOffline(false) { apibail_try_again!("offline, try again later"); } + veilid_log!(self debug "Writing subkey offline because we are offline: {}:{} len={}", record_key, subkey, signed_value_data.value_data().data().len() ); + // Add to offline writes to flush + self.add_offline_subkey_write_inner( + &mut inner, + record_key, + subkey, + safety_selection, + signed_value_data, + ); + return Ok(None); }; + // Note that we are writing this subkey in the foreground + // If it appears we are already doing this, then put it to the background/offline queue + let opt_guard = self.mark_active_subkey_write_inner(&mut inner, record_key, subkey); + if opt_guard.is_none() { + if allow_offline == AllowOffline(false) { + apibail_try_again!("offline, try again later"); + } + veilid_log!(self debug "Writing subkey offline due to concurrent foreground write: {}:{} len={}", record_key, subkey, signed_value_data.value_data().data().len() ); + // Add to offline writes to flush + self.add_offline_subkey_write_inner( + &mut inner, + record_key, + subkey, + safety_selection, + signed_value_data, + ); + return Ok(None); + } + let guard = opt_guard.unwrap(); + // Drop the lock for network access drop(inner); @@ -891,21 +911,21 @@ impl StorageManager { // Failed to write, try again later let mut inner = self.inner.lock().await; + // Remove from active subkey writes + self.unmark_active_subkey_write_inner(&mut inner, guard); + if allow_offline == AllowOffline(true) { - Self::add_offline_subkey_write_inner( + self.add_offline_subkey_write_inner( &mut inner, record_key, subkey, safety_selection, + signed_value_data.clone(), ); } else { - self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey); apibail_try_again!("offline, try again later"); } - // Remove from active subkey writes - self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey); - if matches!(e, VeilidAPIError::TryAgain { message: _ }) { return Ok(None); } @@ -913,16 +933,110 @@ impl StorageManager { } }; - let process = || async { - // Wait for the first result - let Ok(result) = res_rx.recv_async().await else { + let out = if allow_offline == AllowOffline(true) { + // Process one fanout result in the foreground, and if necessary, more in the background + // This trades off possibly having a consensus conflict, which requires watching for ValueChanged + // for lower latency. Can only be done if we are allowing offline processing because + // the network could go down after the first fanout result is processed and before we complete fanout. + self.background_process_set_value_results( + res_rx, + record_key, + subkey, + signed_value_data, + safety_selection, + ) + .await + } else { + // Process all fanout results in the foreground. + // Takes longer but ensures the value is fully committed to the network. + self.foreground_process_set_value_results( + res_rx, + record_key, + subkey, + signed_value_data, + safety_selection, + ) + .await + }; + + // Remove active subkey write + let mut inner = self.inner.lock().await; + + // Remove from active subkey writes + self.unmark_active_subkey_write_inner(&mut inner, guard); + + if matches!(out, Err(VeilidAPIError::TryAgain { message: _ })) { + return Ok(None); + } + + out + } + + async fn background_process_set_value_results( + &self, + res_rx: flume::Receiver>, + record_key: TypedRecordKey, + subkey: ValueSubkey, + signed_value_data: Arc, + safety_selection: SafetySelection, + ) -> VeilidAPIResult> { + // Wait for the first result + let Ok(result) = res_rx.recv_async().await else { + apibail_internal!("failed to receive results"); + }; + let result = result?; + let partial = result.fanout_result.kind.is_incomplete(); + + // Process the returned result + let out = self + .process_outbound_set_value_result( + record_key, + subkey, + signed_value_data.value_data().clone(), + safety_selection, + result, + ) + .await?; + + // If there's more to process, do it in the background + if partial { + self.process_deferred_outbound_set_value_result( + res_rx, + record_key, + subkey, + out.clone() + .unwrap_or_else(|| signed_value_data.value_data().clone()), + safety_selection, + ); + } + + Ok(out) + } + + async fn foreground_process_set_value_results( + &self, + res_rx: flume::Receiver>, + record_key: TypedRecordKey, + subkey: ValueSubkey, + signed_value_data: Arc, + safety_selection: SafetySelection, + ) -> VeilidAPIResult> { + let Some(stop_token) = self.startup_lock.stop_token() else { + apibail_not_initialized!(); + }; + + loop { + let timeout_res = res_rx.recv_async().timeout_at(stop_token.clone()).await; + let Ok(res) = timeout_res else { + apibail_not_initialized!(); + }; + let Ok(result) = res else { apibail_internal!("failed to receive results"); }; let result = result?; - let partial = result.fanout_result.kind.is_incomplete(); + let is_incomplete = result.fanout_result.kind.is_incomplete(); - // Process the returned result - let out = self + let opt_value_data = self .process_outbound_set_value_result( record_key, subkey, @@ -931,35 +1045,10 @@ impl StorageManager { result, ) .await?; - - // If there's more to process, do it in the background - if partial { - self.process_deferred_outbound_set_value_result( - res_rx, - record_key, - subkey, - out.clone() - .unwrap_or_else(|| signed_value_data.value_data().clone()), - safety_selection, - ); + if !is_incomplete { + return Ok(opt_value_data); } - - Ok(out) - }; - - let out = process().await; - - // Remove active subkey write - let mut inner = self.inner.lock().await; - - // Remove from active subkey writes - self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey); - - if matches!(out, Err(VeilidAPIError::TryAgain { message: _ })) { - return Ok(None); } - - out } /// Create, update or cancel an outbound watch to a DHT value @@ -971,6 +1060,10 @@ impl StorageManager { expiration: Timestamp, count: u32, ) -> VeilidAPIResult { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + // Obtain the watch change lock // (may need to wait for background operations to complete on the watch) let watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await; @@ -1069,6 +1162,10 @@ impl StorageManager { record_key: TypedRecordKey, subkeys: ValueSubkeyRangeSet, ) -> VeilidAPIResult { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + // Obtain the watch change lock // (may need to wait for background operations to complete on the watch) let watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await; @@ -1138,6 +1235,10 @@ impl StorageManager { subkeys: ValueSubkeyRangeSet, scope: DHTReportScope, ) -> VeilidAPIResult { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + let subkeys = if subkeys.is_empty() { ValueSubkeyRangeSet::full() } else { @@ -1496,7 +1597,7 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn open_existing_record_inner( + async fn open_existing_record_inner( &self, inner: &mut StorageManagerInner, record_key: TypedRecordKey, @@ -1563,7 +1664,7 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn open_new_record_inner( + async fn open_new_record_inner( &self, inner: &mut StorageManagerInner, record_key: TypedRecordKey, @@ -1621,7 +1722,7 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn get_value_nodes( + async fn get_value_nodes( &self, record_key: TypedRecordKey, ) -> VeilidAPIResult>> { @@ -1652,9 +1753,7 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn process_fanout_results_inner< - I: IntoIterator, - >( + fn process_fanout_results_inner>( inner: &mut StorageManagerInner, vcrypto: &CryptoSystemGuard<'_>, record_key: TypedRecordKey, @@ -1734,11 +1833,20 @@ impl StorageManager { #[instrument(level = "trace", target = "stor", skip_all, err)] async fn handle_get_local_value_inner( + &self, inner: &mut StorageManagerInner, record_key: TypedRecordKey, subkey: ValueSubkey, want_descriptor: bool, ) -> VeilidAPIResult { + // See if the value is in the offline subkey writes first, + // since it may not have been committed yet to the local record store + if let Some(get_result) = + self.get_offline_subkey_writes_subkey(inner, record_key, subkey, want_descriptor)? + { + return Ok(get_result); + } + // See if it's in the local record store let Some(local_record_store) = inner.local_record_store.as_mut() else { apibail_not_initialized!(); @@ -1757,13 +1865,22 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_set_local_value_inner( + async fn handle_set_local_value_inner( + &self, inner: &mut StorageManagerInner, record_key: TypedRecordKey, subkey: ValueSubkey, signed_value_data: Arc, watch_update_mode: InboundWatchUpdateMode, ) -> VeilidAPIResult<()> { + // See if this new data supercedes any offline subkey writes + self.remove_old_offline_subkey_writes_inner( + inner, + record_key, + subkey, + signed_value_data.clone(), + ); + // See if it's in the local record store let Some(local_record_store) = inner.local_record_store.as_mut() else { apibail_not_initialized!(); @@ -1831,7 +1948,7 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_set_remote_value_inner( + async fn handle_set_remote_value_inner( inner: &mut StorageManagerInner, record_key: TypedRecordKey, subkey: ValueSubkey, @@ -1869,7 +1986,7 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_inspect_remote_value_inner( + async fn handle_inspect_remote_value_inner( &self, inner: &mut StorageManagerInner, record_key: TypedRecordKey, @@ -1911,27 +2028,7 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn add_offline_subkey_write_inner( - inner: &mut StorageManagerInner, - record_key: TypedRecordKey, - subkey: ValueSubkey, - safety_selection: SafetySelection, - ) { - inner - .offline_subkey_writes - .entry(record_key) - .and_modify(|x| { - x.subkeys.insert(subkey); - }) - .or_insert(tasks::offline_subkey_writes::OfflineSubkeyWrite { - safety_selection, - subkeys: ValueSubkeyRangeSet::single(subkey), - subkeys_in_flight: ValueSubkeyRangeSet::new(), - }); - } - - #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn process_deferred_results( + fn process_deferred_results( &self, receiver: flume::Receiver, handler: impl FnMut(T) -> PinBoxFutureStatic + Send + 'static, diff --git a/veilid-core/src/storage_manager/offline_subkey_writes.rs b/veilid-core/src/storage_manager/offline_subkey_writes.rs new file mode 100644 index 00000000..efa6d6b5 --- /dev/null +++ b/veilid-core/src/storage_manager/offline_subkey_writes.rs @@ -0,0 +1,189 @@ +use super::*; + +impl_veilid_log_facility!("stor"); + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct OfflineSubkeyWrite { + /// Safety selection to use when writing this record to the network + pub safety_selection: SafetySelection, + /// The subkeys that are queued up needing to be sent to the network in the background + pub subkeys: ValueSubkeyRangeSet, + /// The subkeys currently being sent to the network in the background + #[serde(default)] + pub subkeys_in_flight: ValueSubkeyRangeSet, + /// The value data to send to the network if it is newer than what is in the local record store + #[serde(default)] + pub subkey_value_data: HashMap>, +} + +impl StorageManager { + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn add_offline_subkey_write_inner( + &self, + inner: &mut StorageManagerInner, + record_key: TypedRecordKey, + subkey: ValueSubkey, + safety_selection: SafetySelection, + signed_value_data: Arc, + ) { + inner + .offline_subkey_writes + .entry(record_key) + .and_modify(|x| { + x.subkeys.insert(subkey); + x.subkey_value_data + .insert(subkey, signed_value_data.clone()); + }) + .or_insert(OfflineSubkeyWrite { + safety_selection, + subkeys: ValueSubkeyRangeSet::single(subkey), + subkeys_in_flight: ValueSubkeyRangeSet::new(), + subkey_value_data: { + let mut subkey_value_data = HashMap::new(); + subkey_value_data.insert(subkey, signed_value_data); + subkey_value_data + }, + }); + } + + pub(super) fn get_offline_subkey_writes_subkey( + &self, + inner: &mut StorageManagerInner, + record_key: TypedRecordKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> VeilidAPIResult> { + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + let Some(osw) = inner.offline_subkey_writes.get(&record_key) else { + return Ok(None); + }; + let Some(signed_value_data) = osw.subkey_value_data.get(&subkey).cloned() else { + return Ok(None); + }; + let opt_descriptor = if want_descriptor { + if let Some(descriptor) = + local_record_store.with_record(record_key, |record| record.descriptor().clone()) + { + Some(descriptor) + } else { + // Record not available + return Ok(None); + } + } else { + None + }; + Ok(Some(GetResult { + opt_value: Some(signed_value_data), + opt_descriptor, + })) + } + + /// If an offline subkey write happens and then we find newer data on the network while + /// waiting to process the offline subkey write, we should continue with it but use the + /// newer data in place of the originally requested data. If the sequence number of the + /// network data is the same, we defer to what is already on the network. + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn remove_old_offline_subkey_writes_inner( + &self, + inner: &mut StorageManagerInner, + record_key: TypedRecordKey, + subkey: ValueSubkey, + signed_value_data: Arc, + ) { + // Get the offline subkey write record + match inner.offline_subkey_writes.entry(record_key) { + hashlink::linked_hash_map::Entry::Occupied(mut o) => { + let finished = { + let osw = o.get_mut(); + match osw.subkey_value_data.entry(subkey) { + std::collections::hash_map::Entry::Occupied(o) => { + // If new data has greater or equal sequence number to the + // offline set value, drop the old data from the offline subkey write + let old_data = o.get().value_data(); + let new_data = signed_value_data.value_data(); + if old_data != new_data && new_data.seq() >= old_data.seq() { + o.remove(); + // Also, remove the subkey from queued offline subkey writes + // but leave it in-flight if it is in flight. That will get + // handled by finish_offline_subkey_writes_inner + osw.subkeys.remove(subkey); + + veilid_log!(self debug "offline write overwritten by newer or different data from network: record_key={} subkey={} seq={}", record_key, subkey, signed_value_data.value_data().seq()); + } + } + std::collections::hash_map::Entry::Vacant(_) => {} + } + + // If we have no new work to do, and not still doing work, then this record is done + let finished = osw.subkeys.is_empty() && osw.subkeys_in_flight.is_empty(); + if !finished { + // Remove any subkey value data that is no longer needed + let osw = o.get_mut(); + osw.subkey_value_data.retain(|k, _| { + osw.subkeys.contains(*k) || osw.subkeys_in_flight.contains(*k) + }); + } + finished + }; + if finished { + veilid_log!(self debug "Offline write finished key {}", record_key); + o.remove(); + } + } + hashlink::linked_hash_map::Entry::Vacant(_) => {} + } + } + + /// When we finish a offline subkey write, we mark subkeys as no longer in-flight + /// and if we didn't finish all the subkeys they are returned to the list of offline subkeys + /// so we can try again later. If the data associated with the write is no longer necessary + /// we can drop it. + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn finish_offline_subkey_writes_inner( + &self, + inner: &mut StorageManagerInner, + record_key: TypedRecordKey, + subkeys_written: ValueSubkeyRangeSet, + subkeys_still_offline: ValueSubkeyRangeSet, + ) { + assert!( + subkeys_written.is_disjoint(&subkeys_still_offline), + "subkeys can not be written and still offline" + ); + + // Get the offline subkey write record + match inner.offline_subkey_writes.entry(record_key) { + hashlink::linked_hash_map::Entry::Occupied(mut o) => { + let finished = { + let osw = o.get_mut(); + + // Now any left over are still offline, so merge them with any subkeys that have been added while we were working + osw.subkeys = osw.subkeys.union(&subkeys_still_offline); + + // Remove subkeys that were successfully written from in_flight status + osw.subkeys_in_flight = osw.subkeys_in_flight.difference(&subkeys_written); + + // If we have no new work to do, and not still doing work, then this record is done + let finished = osw.subkeys.is_empty() && osw.subkeys_in_flight.is_empty(); + if !finished { + // Remove any subkey value data that is no longer needed + let osw = o.get_mut(); + osw.subkey_value_data.retain(|k, _| { + osw.subkeys.contains(*k) || osw.subkeys_in_flight.contains(*k) + }); + } + finished + }; + if finished { + veilid_log!(self debug "offline subkey write finished key {}", record_key); + o.remove(); + } + } + hashlink::linked_hash_map::Entry::Vacant(_) => { + veilid_log!(self warn "can't finish missing offline subkey write: ignoring key {}", record_key); + } + } + } +} diff --git a/veilid-core/src/storage_manager/rehydrate.rs b/veilid-core/src/storage_manager/rehydrate.rs index afb39cc9..68eb1b49 100644 --- a/veilid-core/src/storage_manager/rehydrate.rs +++ b/veilid-core/src/storage_manager/rehydrate.rs @@ -140,6 +140,40 @@ impl StorageManager { .await; } + async fn rehydrate_single_subkey_inner( + &self, + inner: &mut StorageManagerInner, + record_key: TypedRecordKey, + subkey: ValueSubkey, + safety_selection: SafetySelection, + ) -> bool { + // Get value to rehydrate with + let get_result = match self + .handle_get_local_value_inner(inner, record_key, subkey, false) + .await + { + Ok(v) => v, + Err(e) => { + veilid_log!(self debug "Missing local record for rehydrating subkey: record={} subkey={}: {}", record_key, subkey, e); + return false; + } + }; + + let data = match get_result.opt_value { + Some(v) => v, + None => { + veilid_log!(self debug "Missing local subkey data for rehydrating subkey: record={} subkey={}", record_key, subkey); + return false; + } + }; + + // Add to offline writes to flush + veilid_log!(self debug "Rehydrating: record={} subkey={}", record_key, subkey); + self.add_offline_subkey_write_inner(inner, record_key, subkey, safety_selection, data); + + true + } + #[instrument(level = "trace", target = "stor", skip(self), ret, err)] pub(super) async fn rehydrate_all_subkeys( &self, @@ -156,15 +190,13 @@ impl StorageManager { let mut rehydrated = ValueSubkeyRangeSet::new(); for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() { if local_inspect_result.seqs()[n].is_some() { - // Add to offline writes to flush - veilid_log!(self debug "Rehydrating: record={} subkey={}", record_key, subkey); - rehydrated.insert(subkey); - Self::add_offline_subkey_write_inner( - &mut inner, - record_key, - subkey, - safety_selection, - ); + // Rehydrate subkey + if self + .rehydrate_single_subkey_inner(&mut inner, record_key, subkey, safety_selection) + .await + { + rehydrated.insert(subkey); + } } } @@ -214,15 +246,13 @@ impl StorageManager { // Does the online subkey have enough consensus? // If not, schedule it to be written in the background if sfr.consensus_nodes.len() < consensus_count { - // Add to offline writes to flush - veilid_log!(self debug "Rehydrating: record={} subkey={}", record_key, subkey); - rehydrated.insert(subkey); - Self::add_offline_subkey_write_inner( - &mut inner, - record_key, - subkey, - safety_selection, - ); + // Rehydrate subkey + if self + .rehydrate_single_subkey_inner(&mut inner, record_key, subkey, safety_selection) + .await + { + rehydrated.insert(subkey); + } } } diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index c8b43368..65f79195 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -277,19 +277,19 @@ impl StorageManager { pub(super) fn process_deferred_outbound_set_value_result( &self, res_rx: flume::Receiver>, - key: TypedRecordKey, + record_key: TypedRecordKey, subkey: ValueSubkey, - last_value_data: ValueData, + requested_value_data: ValueData, safety_selection: SafetySelection, ) { let registry = self.registry(); - let last_value_data = Arc::new(Mutex::new(last_value_data)); + let last_requested_value_data = Arc::new(Mutex::new(requested_value_data)); self.process_deferred_results( res_rx, Box::new( move |result: VeilidAPIResult| -> PinBoxFutureStatic { let registry = registry.clone(); - let last_value_data = last_value_data.clone(); + let last_requested_value_data = last_requested_value_data.clone(); Box::pin(async move { let this = registry.storage_manager(); @@ -301,8 +301,8 @@ impl StorageManager { } }; let is_incomplete = result.fanout_result.kind.is_incomplete(); - let lvd = last_value_data.lock().clone(); - let value_data = match this.process_outbound_set_value_result(key, subkey, lvd, safety_selection, result).await { + let requested_value_data = last_requested_value_data.lock().clone(); + let value_data = match this.process_outbound_set_value_result(record_key, subkey, requested_value_data, safety_selection, result).await { Ok(Some(v)) => v, Ok(None) => { return is_incomplete; @@ -320,7 +320,7 @@ impl StorageManager { // if the sequence number changed since our first partial update // Send with a max count as this is not attached to any watch let changed = { - let mut lvd = last_value_data.lock(); + let mut lvd = last_requested_value_data.lock(); if lvd.seq() != value_data.seq() { *lvd = value_data.clone(); true @@ -329,7 +329,7 @@ impl StorageManager { } }; if changed { - this.update_callback_value_change(key,ValueSubkeyRangeSet::single(subkey), u32::MAX, Some(value_data)); + this.update_callback_value_change(record_key,ValueSubkeyRangeSet::single(subkey), u32::MAX, Some(value_data)); } // Return done @@ -345,7 +345,7 @@ impl StorageManager { &self, record_key: TypedRecordKey, subkey: ValueSubkey, - last_value_data: ValueData, + requested_value_data: ValueData, safety_selection: SafetySelection, result: set_value::OutboundSetValueResult, ) -> Result, VeilidAPIError> { @@ -362,7 +362,13 @@ impl StorageManager { let was_offline = self.check_fanout_set_offline(record_key, subkey, &result.fanout_result); if was_offline { // Failed to write, try again later - Self::add_offline_subkey_write_inner(&mut inner, record_key, subkey, safety_selection); + self.add_offline_subkey_write_inner( + &mut inner, + record_key, + subkey, + safety_selection, + result.signed_value_data.clone(), + ); } // Keep the list of nodes that returned a value for later reference @@ -376,19 +382,18 @@ impl StorageManager { .with(|c| c.network.dht.set_value_count as usize), ); + // Record the set value locally since it was successfully set online + self.handle_set_local_value_inner( + &mut inner, + record_key, + subkey, + result.signed_value_data.clone(), + InboundWatchUpdateMode::UpdateAll, + ) + .await?; + // Return the new value if it differs from what was asked to set - if result.signed_value_data.value_data() != &last_value_data { - // Record the newer value and send and update since it is different than what we just set - - Self::handle_set_local_value_inner( - &mut inner, - record_key, - subkey, - result.signed_value_data.clone(), - InboundWatchUpdateMode::UpdateAll, - ) - .await?; - + if result.signed_value_data.value_data() != &requested_value_data { return Ok(Some(result.signed_value_data.value_data().clone())); } @@ -413,8 +418,9 @@ impl StorageManager { // See if this is a remote or local value let (is_local, last_get_result) = { // See if the subkey we are modifying has a last known local value - let last_get_result = - Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await?; + let last_get_result = self + .handle_get_local_value_inner(&mut inner, key, subkey, true) + .await?; // If this is local, it must have a descriptor already if last_get_result.opt_descriptor.is_some() { (true, last_get_result) @@ -484,7 +490,7 @@ impl StorageManager { // Do the set and return no new value let res = if is_local { - Self::handle_set_local_value_inner( + self.handle_set_local_value_inner( &mut inner, key, subkey, diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index 18a7a599..9c1b0bbd 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -4,14 +4,6 @@ use stop_token::future::FutureExt as _; impl_veilid_log_facility!("stor"); -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct OfflineSubkeyWrite { - pub safety_selection: SafetySelection, - pub subkeys: ValueSubkeyRangeSet, - #[serde(default)] - pub subkeys_in_flight: ValueSubkeyRangeSet, -} - #[derive(Debug)] enum OfflineSubkeyWriteResult { Finished(set_value::OutboundSetValueResult), @@ -49,7 +41,8 @@ impl StorageManager { }; let get_result = { let mut inner = self.inner.lock().await; - Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await + self.handle_get_local_value_inner(&mut inner, key, subkey, true) + .await }; let Ok(get_result) = get_result else { veilid_log!(self debug "Offline subkey write had no subkey result: {}:{}", key, subkey); @@ -85,7 +78,7 @@ impl StorageManager { // Record the newer value and send and update since it is different than what we just set let mut inner = self.inner.lock().await; - Self::handle_set_local_value_inner( + self.handle_set_local_value_inner( &mut inner, key, subkey, @@ -177,36 +170,14 @@ impl StorageManager { // Debug print the result veilid_log!(self debug "Offline write result: {:?}", result); - // Get the offline subkey write record - match inner - .offline_subkey_writes - .entry(result.work_item.record_key) - { - hashlink::linked_hash_map::Entry::Occupied(mut o) => { - let finished = { - let osw = o.get_mut(); - - // Mark in-flight subkeys as having been completed - let subkeys_still_offline = - result.work_item.subkeys.difference(&result.written_subkeys); - // Now any left over are still offline, so merge them with any subkeys that have been added while we were working - osw.subkeys = osw.subkeys.union(&subkeys_still_offline); - // And clear the subkeys in flight since we're done with this key for now - osw.subkeys_in_flight = - osw.subkeys_in_flight.difference(&result.written_subkeys); - - // If we have no new work to do, and not still doing work, then this record is done - osw.subkeys.is_empty() && osw.subkeys_in_flight.is_empty() - }; - if finished { - veilid_log!(self debug "Offline write finished key {}", result.work_item.record_key); - o.remove(); - } - } - hashlink::linked_hash_map::Entry::Vacant(_) => { - veilid_log!(self warn "offline write work items should always be on offline_subkey_writes entries that exist: ignoring key {}", result.work_item.record_key); - } - } + // Mark the offline subkey write as no longer in-flight + let subkeys_still_offline = result.work_item.subkeys.difference(&result.written_subkeys); + self.finish_offline_subkey_writes_inner( + &mut inner, + result.work_item.record_key, + result.written_subkeys, + subkeys_still_offline, + ); // Keep the list of nodes that returned a value for later reference let crypto = self.crypto(); diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 3ca9c6cc..9a98d8f4 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -1171,9 +1171,9 @@ impl StorageManager { // Set the local value let mut report_value_change = false; if let Some(value) = &value { - let last_get_result = - Self::handle_get_local_value_inner(inner, record_key, first_subkey, true) - .await?; + let last_get_result = self + .handle_get_local_value_inner(inner, record_key, first_subkey, true) + .await?; let descriptor = last_get_result.opt_descriptor.unwrap(); let schema = descriptor.schema()?; @@ -1211,7 +1211,7 @@ impl StorageManager { // Keep the value because it is newer than the one we have if report_value_change { - Self::handle_set_local_value_inner( + self.handle_set_local_value_inner( inner, record_key, first_subkey, diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index fe015171..0ba4f367 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -1641,7 +1641,7 @@ impl VeilidAPI { } async fn debug_record_set(&self, args: Vec) -> VeilidAPIResult { - let opt_arg_add = if args.len() >= 2 && get_dht_key_no_safety(&args[1]).is_some() { + let mut opt_arg_add = if args.len() >= 2 && get_dht_key_no_safety(&args[1]).is_some() { 1 } else { 0 @@ -1658,14 +1658,43 @@ impl VeilidAPI { )?; let data = get_debug_argument_at(&args, 2 + opt_arg_add, "debug_record_set", "data", get_data)?; - let writer = get_debug_argument_at( + let writer = match get_debug_argument_at( &args, 3 + opt_arg_add, "debug_record_set", "writer", get_keypair, - ) - .ok(); + ) { + Ok(v) => { + opt_arg_add += 1; + Some(v) + } + Err(_) => None, + }; + let allow_offline = if args.len() > 3 + opt_arg_add { + get_debug_argument_at( + &args, + 3 + opt_arg_add, + "debug_record_set", + "allow_offline", + get_string, + ) + .ok() + } else { + None + }; + + let allow_offline = if let Some(allow_offline) = allow_offline { + if &allow_offline == "online" || &allow_offline == "false" { + Some(AllowOffline(false)) + } else if &allow_offline == "offline" || &allow_offline == "true" { + Some(AllowOffline(true)) + } else { + return Ok(format!("Unknown allow_offline: {}", allow_offline)); + } + } else { + None + }; // Do a record set let value = match rc @@ -1675,7 +1704,7 @@ impl VeilidAPI { data, Some(SetDHTValueOptions { writer, - allow_offline: None, + allow_offline, }), ) .await @@ -1710,7 +1739,7 @@ impl VeilidAPI { "subkey", get_number::, )?; - let force_refresh = if args.len() >= 3 + opt_arg_add { + let force_refresh = if args.len() > 2 + opt_arg_add { Some(get_debug_argument_at( &args, 2 + opt_arg_add, @@ -2222,7 +2251,7 @@ DHT Operations: create [ []] - create a new dht record open [+] [] - open an existing dht record close [] - close an opened/created dht record - set [] - write a value to a dht record subkey + set [] [] [offline|online]- write a value to a dht record subkey get [] [force] - read a value from a dht record subkey delete - delete the local copy of a dht record (not from the network) info [] [subkey] - display information about a dht record or subkey diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 735a7c90..582fce5d 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -253,6 +253,130 @@ async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI): await rc.delete_dht_record(key) +@pytest.mark.asyncio +async def test_open_writer_dht_value_no_offline(api_connection: veilid.VeilidAPI): + rc = await api_connection.new_routing_context() + async with rc: + rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2)) + key = rec.key + owner = rec.owner + secret = rec.owner_secret + #print(f"key:{key}") + + cs = await api_connection.get_crypto_system(rec.key.kind()) + async with cs: + assert await cs.validate_key_pair(owner, secret) + other_keypair = await cs.generate_key_pair() + + va = b"Qwertyuiop Asdfghjkl Zxcvbnm" + vb = b"1234567890" + vc = b"!@#$%^&*()" + + # Test subkey writes + vdtemp = await rc.set_dht_value(key, ValueSubkey(1), va, veilid.SetDHTValueOptions(None, False)) + assert vdtemp is None + + vdtemp = await rc.get_dht_value(key, ValueSubkey(1), False) + assert vdtemp.data == va + assert vdtemp.seq == 0 + assert vdtemp.writer == owner + + vdtemp = await rc.get_dht_value(key, ValueSubkey(0), False) + assert vdtemp is None + + vdtemp = await rc.set_dht_value(key, ValueSubkey(0), vb, veilid.SetDHTValueOptions(None, False)) + assert vdtemp is None + + await sync(rc, [rec]) + + vdtemp = await rc.get_dht_value(key, ValueSubkey(0), True) + assert vdtemp.data == vb + + vdtemp = await rc.get_dht_value(key, ValueSubkey(1), True) + assert vdtemp.data == va + + # Equal value should not trigger sequence number update + vdtemp = await rc.set_dht_value(key, ValueSubkey(1), va, veilid.SetDHTValueOptions(None, False)) + assert vdtemp is None + + # Different value should trigger sequence number update + vdtemp = await rc.set_dht_value(key, ValueSubkey(1), vb, veilid.SetDHTValueOptions(None, False)) + assert vdtemp is None + + # Now that we initialized some subkeys + # and verified they stored correctly + # Delete things locally and reopen and see if we can write + # with the same writer key + + await rc.close_dht_record(key) + await rc.delete_dht_record(key) + + rec = await rc.open_dht_record(key, veilid.KeyPair.from_parts(owner, secret)) + assert rec is not None + assert rec.key == key + assert rec.owner == owner + assert rec.owner_secret == secret + assert rec.schema.kind == veilid.DHTSchemaKind.DFLT + assert rec.schema.o_cnt == 2 + + # Verify subkey 1 can be set before it is get but newer is available online + vdtemp = await rc.set_dht_value(key, ValueSubkey(1), vc, veilid.SetDHTValueOptions(None, False)) + assert vdtemp is not None + assert vdtemp.data == vb + assert vdtemp.seq == 1 + assert vdtemp.writer == owner + + # Verify subkey 1 can be set a second time and it updates because seq is newer + vdtemp = await rc.set_dht_value(key, ValueSubkey(1), vc, veilid.SetDHTValueOptions(None, False)) + assert vdtemp is None + + # Verify the network got the subkey update with a refresh check + vdtemp = await rc.get_dht_value(key, ValueSubkey(1), True) + assert vdtemp is not None + assert vdtemp.data == vc + assert vdtemp.seq == 2 + assert vdtemp.writer == owner + + # Delete things locally and reopen and see if we can write + # with a different writer key (should fail) + + await rc.close_dht_record(key) + await rc.delete_dht_record(key) + + rec = await rc.open_dht_record(key, other_keypair) + assert rec is not None + assert rec.key == key + assert rec.owner == owner + assert rec.owner_secret is None + assert rec.schema.kind == veilid.DHTSchemaKind.DFLT + assert rec.schema.o_cnt == 2 + + # Verify subkey 1 can NOT be set because we have the wrong writer + with pytest.raises(veilid.VeilidAPIError): + await rc.set_dht_value(key, ValueSubkey(1), va, veilid.SetDHTValueOptions(None, False)) + + # Verify subkey 0 can NOT be set because we have the wrong writer + with pytest.raises(veilid.VeilidAPIError): + await rc.set_dht_value(key, ValueSubkey(0), va, veilid.SetDHTValueOptions(None, False)) + + # Verify subkey 0 can be set because override with the right writer + # Should have prior sequence number as its returned value because it exists online at seq 0 + vdtemp = await rc.set_dht_value(key, ValueSubkey(0), va, veilid.SetDHTValueOptions(veilid.KeyPair.from_parts(owner, secret), False)) + assert vdtemp is not None + assert vdtemp.data == vb + assert vdtemp.seq == 0 + assert vdtemp.writer == owner + + # Should update the second time to seq 1 + vdtemp = await rc.set_dht_value(key, ValueSubkey(0), va, veilid.SetDHTValueOptions(veilid.KeyPair.from_parts(owner, secret), False)) + assert vdtemp is None + + # Clean up + await rc.close_dht_record(key) + await rc.delete_dht_record(key) + + + @pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") @pytest.mark.asyncio async def test_watch_dht_values(): @@ -793,7 +917,7 @@ async def test_dht_write_read_full_subkeys_local(): # Secret to encrypt test data SECRET = veilid.SharedSecret.from_bytes(b"A"*32) # Max subkey size - MAX_SUBKEY_SIZE = min(32768, 1024*1024/SUBKEY_COUNT) + MAX_SUBKEY_SIZE = min(32768, 1024*1024//SUBKEY_COUNT) # MAX_SUBKEY_SIZE = 256 # write dht records on server 0