diff --git a/veilid-core/src/crypto/mod.rs b/veilid-core/src/crypto/mod.rs index 6e5d3ba3..ab30155f 100644 --- a/veilid-core/src/crypto/mod.rs +++ b/veilid-core/src/crypto/mod.rs @@ -56,7 +56,6 @@ struct CryptoInner { dh_cache_misses: usize, dh_cache_hits: usize, dh_cache_lru: usize, - dh_cache_flush_future: Option>, } impl fmt::Debug for CryptoInner { @@ -66,7 +65,6 @@ impl fmt::Debug for CryptoInner { .field("dh_cache_misses", &self.dh_cache_misses) .field("dh_cache_hits", &self.dh_cache_hits) .field("dh_cache_lru", &self.dh_cache_lru) - // .field("flush_future", &self.flush_future) // .field("crypto_vld0", &self.crypto_vld0) // .field("crypto_none", &self.crypto_none) .finish() @@ -104,7 +102,6 @@ impl Crypto { dh_cache_misses: 0, dh_cache_hits: 0, dh_cache_lru: 0, - dh_cache_flush_future: None, } } @@ -157,25 +154,6 @@ impl Crypto { #[instrument(level = "trace", target = "crypto", skip_all, err)] async fn post_init_async(&self) -> EyreResult<()> { - // Schedule flushing - let registry = self.registry(); - let flush_future = interval("crypto flush", 60000, move || { - let crypto = registry.crypto(); - async move { - if let Err(e) = crypto - .flush() - .measure_debug( - TimestampDuration::new_ms(100), - veilid_log_dbg!(crypto, "Crypto::flush"), - ) - .await - { - veilid_log!(crypto warn "flush failed: {}", e); - } - } - }); - self.inner.lock().dh_cache_flush_future = Some(flush_future); - Ok(()) } @@ -191,10 +169,6 @@ impl Crypto { } async fn pre_terminate_async(&self) { - let flush_future = self.inner.lock().dh_cache_flush_future.take(); - if let Some(f) = flush_future { - f.await; - } veilid_log!(self trace "starting termination flush"); match self.flush().await { Ok(_) => { diff --git a/veilid-core/src/logging/duration_recorder.rs b/veilid-core/src/logging/duration_recorder.rs index f534fec9..bacb022e 100644 --- a/veilid-core/src/logging/duration_recorder.rs +++ b/veilid-core/src/logging/duration_recorder.rs @@ -32,15 +32,7 @@ where let out = closure(); let duration = TimestampDuration::since(start); if duration > limit { - #[cfg(feature = "verbose-tracing")] - let msg = format!( - "Excessive duration: {}\n{:?}", - duration, - backtrace::Backtrace::new() - ); - #[cfg(not(feature = "verbose-tracing"))] let msg = format!("Excessive duration: {}", duration); - callback(msg); } @@ -98,15 +90,7 @@ where let out = Box::pin(self).await; let duration = TimestampDuration::since(start); if duration > limit { - #[cfg(feature = "verbose-tracing")] - let msg = format!( - "Excessive duration: {}\n{:?}", - duration, - backtrace::Backtrace::new() - ); - #[cfg(not(feature = "verbose-tracing"))] let msg = format!("Excessive duration: {}", duration); - callback(msg); } out diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index b6a0d124..13170057 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -518,7 +518,7 @@ impl ConnectionTable { out += &format!( " {}{}\n", conn.debug_print(cur_ts), - if is_priority_flow { "PRIORITY" } else { "" } + if is_priority_flow { " PRIORITY" } else { "" } ); } } diff --git a/veilid-core/src/storage_manager/record_lock_table.rs b/veilid-core/src/storage_manager/record_lock_table.rs index 2d4469ef..bf651cc0 100644 --- a/veilid-core/src/storage_manager/record_lock_table.rs +++ b/veilid-core/src/storage_manager/record_lock_table.rs @@ -268,6 +268,7 @@ impl RecordLockTable { } } + #[expect(dead_code)] pub fn try_lock_subkey( &self, record: OpaqueRecordKey, diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/limited_size.rs b/veilid-core/src/storage_manager/record_store/record_store_inner/limited_size.rs index be407287..290689e5 100644 --- a/veilid-core/src/storage_manager/record_store/record_store_inner/limited_size.rs +++ b/veilid-core/src/storage_manager/record_store/record_store_inner/limited_size.rs @@ -11,9 +11,9 @@ pub enum LimitError { #[derive(ThisError, Debug, Clone, Copy, Eq, PartialEq)] pub enum NumericError { - #[error("numeric overflow")] + #[error("numeric overflow: current={current} added={added}")] Overflow { current: T, added: T }, - #[error("numeric underflow")] + #[error("numeric underflow: current={current} removed={removed}")] Underflow { current: T, removed: T }, } @@ -114,6 +114,10 @@ impl LimitedSize { new_value } + pub fn limit(&self) -> Option { + self.limit + } + pub fn check_limit(&self) -> bool { if let Some(uncommitted_value) = self.uncommitted_value { if let Some(limit) = self.limit { @@ -143,6 +147,7 @@ impl LimitedSize { Ok(self.value) } + #[expect(dead_code)] pub fn rollback(&mut self) -> T { if let Some(uv) = self.uncommitted_value.take() { veilid_log!(self trace "Rollback ({}): {} (drop {})", self.description, self.value, uv); diff --git a/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs b/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs index c1873104..fd53bb56 100644 --- a/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs +++ b/veilid-core/src/storage_manager/record_store/record_store_inner/record_index.rs @@ -195,7 +195,7 @@ where &mut self, opaque_record_key: &OpaqueRecordKey, func: F, - ) -> Option + ) -> VeilidAPIResult> where F: FnOnce(&Record) -> R, { @@ -215,6 +215,15 @@ where let new_record = record.clone(); + xxx continue here, solve proper making of room and always commit where appropriate + xxx also should break out estimation code and just pass in u64 size to sub_from_total_storage so we dont calculate it twice all over the place + self.make_room_for_record(new_record, opt_old_record_size) + + self.sub_from_total_storage(&rtk, &old_record); + self.add_to_total_storage(&rtk, &new_record); + if let Err(e) = self.total_storage_space.commit() { + + } self.add_uncommitted_record_update(rtk, new_record, old_record); } @@ -274,6 +283,8 @@ where let new_record = record.clone(); + self.sub_from_total_storage(&rtk, &old_record); + self.add_to_total_storage(&rtk, &new_record); self.add_uncommitted_record_update(rtk, new_record, old_record); } @@ -723,14 +734,14 @@ where }; let total_storage_space = self.total_storage_space.get(); - + let opaque_record_key = record.0.record_key.clone(); self.purge_record_and_subkeys(record.0, record.1, true); // Reducing the total size will always succeed let new_total_storage_space = self.total_storage_space.commit().unwrap(); ( - None, + Some(opaque_record_key), ReclaimedSpace { reclaimed: total_storage_space - new_total_storage_space, total: total_storage_space, @@ -1165,6 +1176,9 @@ where return; } }; + + veilid_log!(self trace "Adding {} to total storage", record_size); + if let Err(e) = self.total_storage_space.add(record_size) { veilid_log!(self error "RecordIndex({}): Unexpected storage space overflow: {}", self.unlocked_inner.name, e); // haha if we ever hit this line, i would be very very surprised @@ -1180,6 +1194,9 @@ where return; } }; + + veilid_log!(self trace "Subtracting {} from total storage", record_size); + if let Err(e) = self.total_storage_space.sub(record_size) { veilid_log!(self error "RecordIndex({}): Unexpected storage space underflow: {}", self.unlocked_inner.name, e); self.total_storage_space.set(0); @@ -1210,35 +1227,44 @@ where record_size: u64, opt_old_record_size: Option, ) -> VeilidAPIResult<()> { - if let Some(old_record_size) = opt_old_record_size { - self.total_storage_space - .sub(old_record_size) - .map_err(VeilidAPIError::internal)?; - } - self.total_storage_space - .add(record_size) - .map_err(VeilidAPIError::internal)?; + // Get starting size and limit + let mut storage_size_current = self.total_storage_space.get(); + let Some(storage_size_limit) = self.total_storage_space.limit() else { + // No limit, just go for it + return Ok(()); + }; - while !self.total_storage_space.check_limit() { - let Some((dead_k, dead_v)) = self.record_cache.remove_lru() else { - self.total_storage_space.rollback(); + // Add the delta we need to make room for + if let Some(old_record_size) = opt_old_record_size { + storage_size_current = storage_size_current.saturating_sub(old_record_size); + } + storage_size_current = storage_size_current.saturating_add(record_size); + + // Figure out how many records from the LRU need to go to fit the delta + let mut dead_count = 0usize; + let mut lru_iter = self.record_cache.iter(); + + while storage_size_current > storage_size_limit { + let Some((dead_k, dead_v)) = lru_iter.next() else { apibail_generic!("can not make enough room in record store"); }; - self.purge_record_and_subkeys(dead_k, dead_v, true); + + let lru_record_size = self.estimate_record_storage_size(dead_k, dead_v)?; + + storage_size_current = storage_size_current.saturating_sub(lru_record_size); + dead_count += 1; } - self.total_storage_space - .sub(record_size) - .map_err(VeilidAPIError::internal)?; - if let Some(old_record_size) = opt_old_record_size { - self.total_storage_space - .add(old_record_size) - .map_err(VeilidAPIError::internal)?; + // Purge the required number of records + for _n in 0..dead_count { + let (dead_k, dead_v) = self.record_cache.remove_lru().unwrap(); + self.purge_record_and_subkeys(dead_k, dead_v, true); } self.total_storage_space .commit() .map_err(VeilidAPIError::internal)?; + Ok(()) } diff --git a/veilid-core/src/storage_manager/rehydrate.rs b/veilid-core/src/storage_manager/rehydrate.rs index 797935b2..f236ce1c 100644 --- a/veilid-core/src/storage_manager/rehydrate.rs +++ b/veilid-core/src/storage_manager/rehydrate.rs @@ -52,7 +52,7 @@ impl StorageManager { /// If a newer copy of a subkey's data is available online, the background /// write will pick up the newest subkey data as it does the SetValue fanout /// and will drive the newest values to consensus. - #[instrument(level = "trace", target = "stor", skip(self), ret, err)] + #[instrument(level = "trace", target = "stor", skip(self), ret)] pub(super) async fn rehydrate_record( &self, opaque_record_key: OpaqueRecordKey, @@ -86,7 +86,7 @@ impl StorageManager { .get_transaction_by_record(&opaque_record_key) .is_some() { - apibail_try_again!("record is currently in transaction"); + apibail_try_again!("not rehydrating while records is in transaction"); } if let Some(opened_record) = inner.opened_records.get(&opaque_record_key) { opened_record.safety_selection() diff --git a/veilid-core/src/storage_manager/tasks/rehydrate_records.rs b/veilid-core/src/storage_manager/tasks/rehydrate_records.rs index f66055cc..b674aaa8 100644 --- a/veilid-core/src/storage_manager/tasks/rehydrate_records.rs +++ b/veilid-core/src/storage_manager/tasks/rehydrate_records.rs @@ -26,14 +26,16 @@ impl StorageManager { let _report = match res { Ok(v) => v, Err(e) => { - veilid_log!(self debug "Rehydration request failed: {}", e); if matches!(e, VeilidAPIError::TryAgain { message: _ }) { + veilid_log!(self debug "Rehydration request skipped: {}", e); // Try again later self.add_rehydration_request( req.0, req.1.subkeys, req.1.consensus_count, ); + } else { + veilid_log!(self error "Rehydration request failed: {}", e); } return; } diff --git a/veilid-core/src/storage_manager/transaction.rs b/veilid-core/src/storage_manager/transaction.rs index ca30614b..03b7a58c 100644 --- a/veilid-core/src/storage_manager/transaction.rs +++ b/veilid-core/src/storage_manager/transaction.rs @@ -67,8 +67,8 @@ impl StorageManager { let records_lock = self .record_lock_table - .try_lock_records(record_keys.iter().map(|x| x.opaque()).collect()) - .ok_or_else(|| VeilidAPIError::try_again("record busy"))?; + .lock_records(record_keys.iter().map(|x| x.opaque()).collect()) + .await; // Early rejection if dht is not online if !self.dht_is_online() { @@ -284,8 +284,8 @@ impl StorageManager { let records_lock = self .record_lock_table - .try_lock_records(transaction_handle.keys().to_vec()) - .ok_or_else(|| VeilidAPIError::try_again("record busy"))?; + .lock_records(transaction_handle.keys().to_vec()) + .await; self.end_transaction_locked(&records_lock, transaction_handle.clone()) .await?; @@ -521,8 +521,8 @@ impl StorageManager { }; let records_lock = self .record_lock_table - .try_lock_records(transaction_handle.keys().to_vec()) - .ok_or_else(|| VeilidAPIError::try_again("record busy"))?; + .lock_records(transaction_handle.keys().to_vec()) + .await; // Early exit if transaction is already gone if !self @@ -638,11 +638,6 @@ impl StorageManager { .record_lock_table .lock_subkey(record_key.opaque(), subkey); - // let _subkey_lock = self - // .record_lock_table - // .try_lock_subkey(record_key.opaque(), subkey) - // .ok_or_else(|| VeilidAPIError::try_again("record subkey busy"))?; - // Early rejection if dht is not online if !self.dht_is_online() { apibail_try_again!("dht is not online"); @@ -727,11 +722,6 @@ impl StorageManager { .record_lock_table .lock_subkey(record_key.opaque(), subkey); - // let _subkey_lock = self - // .record_lock_table - // .try_lock_subkey(record_key.opaque(), subkey) - // .ok_or_else(|| VeilidAPIError::try_again("record subkey busy"))?; - let opaque_record_key = record_key.opaque(); // Early rejection if dht is not online diff --git a/veilid-python/tests/test_dht_transactions.py b/veilid-python/tests/test_dht_transactions.py index 2f350dec..b1c52309 100644 --- a/veilid-python/tests/test_dht_transactions.py +++ b/veilid-python/tests/test_dht_transactions.py @@ -925,7 +925,7 @@ async def test_dht_transaction_write_read_full_parallel_local(): async with cs: # Number of records - COUNT = 8 + COUNT = 48 # Number of subkeys per record SUBKEY_COUNT = 32 # BareNonce to encrypt test data @@ -947,7 +947,7 @@ async def test_dht_transaction_write_read_full_parallel_local(): records.append(desc) # Make encrypted data that is consistent and hard to compress - subkey_data = bytes(chr(ord("A")+n)*MAX_SUBKEY_SIZE, 'ascii') + subkey_data = bytes(chr(ord("A")+n%32)*MAX_SUBKEY_SIZE, 'ascii') subkey_data = await cs.crypt_no_auth(subkey_data, NONCE, SECRET) subkey_data_list.append(subkey_data) @@ -964,7 +964,15 @@ async def test_dht_transaction_write_read_full_parallel_local(): async def setter(key: RecordKey, subkey: ValueSubkey, data: bytes): start = time.time() - await transaction.set(key, subkey, data) + cnt = 0 + while True: + try: + await transaction.set(key, subkey, data) + break + except veilid.VeilidAPIErrorTryAgain: + cnt += 1 + print(f' retry #{cnt} setting {key} #{subkey}') + continue print(f'set {key} #{subkey}: {time.time()-start}') init_set_futures.add(setter(key, ValueSubkey(i), subkey_data))