[ci skip] fix bugs

This commit is contained in:
Christien Rioux 2025-11-30 22:34:44 -05:00
parent 9025ab0428
commit d24145ccd3
10 changed files with 79 additions and 89 deletions

View file

@ -56,7 +56,6 @@ struct CryptoInner {
dh_cache_misses: usize,
dh_cache_hits: usize,
dh_cache_lru: usize,
dh_cache_flush_future: Option<PinBoxFutureStatic<()>>,
}
impl fmt::Debug for CryptoInner {
@ -66,7 +65,6 @@ impl fmt::Debug for CryptoInner {
.field("dh_cache_misses", &self.dh_cache_misses)
.field("dh_cache_hits", &self.dh_cache_hits)
.field("dh_cache_lru", &self.dh_cache_lru)
// .field("flush_future", &self.flush_future)
// .field("crypto_vld0", &self.crypto_vld0)
// .field("crypto_none", &self.crypto_none)
.finish()
@ -104,7 +102,6 @@ impl Crypto {
dh_cache_misses: 0,
dh_cache_hits: 0,
dh_cache_lru: 0,
dh_cache_flush_future: None,
}
}
@ -157,25 +154,6 @@ impl Crypto {
#[instrument(level = "trace", target = "crypto", skip_all, err)]
async fn post_init_async(&self) -> EyreResult<()> {
// Schedule flushing
let registry = self.registry();
let flush_future = interval("crypto flush", 60000, move || {
let crypto = registry.crypto();
async move {
if let Err(e) = crypto
.flush()
.measure_debug(
TimestampDuration::new_ms(100),
veilid_log_dbg!(crypto, "Crypto::flush"),
)
.await
{
veilid_log!(crypto warn "flush failed: {}", e);
}
}
});
self.inner.lock().dh_cache_flush_future = Some(flush_future);
Ok(())
}
@ -191,10 +169,6 @@ impl Crypto {
}
async fn pre_terminate_async(&self) {
let flush_future = self.inner.lock().dh_cache_flush_future.take();
if let Some(f) = flush_future {
f.await;
}
veilid_log!(self trace "starting termination flush");
match self.flush().await {
Ok(_) => {

View file

@ -32,15 +32,7 @@ where
let out = closure();
let duration = TimestampDuration::since(start);
if duration > limit {
#[cfg(feature = "verbose-tracing")]
let msg = format!(
"Excessive duration: {}\n{:?}",
duration,
backtrace::Backtrace::new()
);
#[cfg(not(feature = "verbose-tracing"))]
let msg = format!("Excessive duration: {}", duration);
callback(msg);
}
@ -98,15 +90,7 @@ where
let out = Box::pin(self).await;
let duration = TimestampDuration::since(start);
if duration > limit {
#[cfg(feature = "verbose-tracing")]
let msg = format!(
"Excessive duration: {}\n{:?}",
duration,
backtrace::Backtrace::new()
);
#[cfg(not(feature = "verbose-tracing"))]
let msg = format!("Excessive duration: {}", duration);
callback(msg);
}
out

View file

@ -518,7 +518,7 @@ impl ConnectionTable {
out += &format!(
" {}{}\n",
conn.debug_print(cur_ts),
if is_priority_flow { "PRIORITY" } else { "" }
if is_priority_flow { " PRIORITY" } else { "" }
);
}
}

View file

@ -268,6 +268,7 @@ impl RecordLockTable {
}
}
#[expect(dead_code)]
pub fn try_lock_subkey(
&self,
record: OpaqueRecordKey,

View file

@ -11,9 +11,9 @@ pub enum LimitError<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> {
#[derive(ThisError, Debug, Clone, Copy, Eq, PartialEq)]
pub enum NumericError<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> {
#[error("numeric overflow")]
#[error("numeric overflow: current={current} added={added}")]
Overflow { current: T, added: T },
#[error("numeric underflow")]
#[error("numeric underflow: current={current} removed={removed}")]
Underflow { current: T, removed: T },
}
@ -114,6 +114,10 @@ impl<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> LimitedSize<T> {
new_value
}
pub fn limit(&self) -> Option<T> {
self.limit
}
pub fn check_limit(&self) -> bool {
if let Some(uncommitted_value) = self.uncommitted_value {
if let Some(limit) = self.limit {
@ -143,6 +147,7 @@ impl<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> LimitedSize<T> {
Ok(self.value)
}
#[expect(dead_code)]
pub fn rollback(&mut self) -> T {
if let Some(uv) = self.uncommitted_value.take() {
veilid_log!(self trace "Rollback ({}): {} (drop {})", self.description, self.value, uv);

View file

@ -195,7 +195,7 @@ where
&mut self,
opaque_record_key: &OpaqueRecordKey,
func: F,
) -> Option<R>
) -> VeilidAPIResult<Option<R>>
where
F: FnOnce(&Record<D>) -> R,
{
@ -215,6 +215,15 @@ where
let new_record = record.clone();
xxx continue here, solve proper making of room and always commit where appropriate
xxx also should break out estimation code and just pass in u64 size to sub_from_total_storage so we dont calculate it twice all over the place
self.make_room_for_record(new_record, opt_old_record_size)
self.sub_from_total_storage(&rtk, &old_record);
self.add_to_total_storage(&rtk, &new_record);
if let Err(e) = self.total_storage_space.commit() {
}
self.add_uncommitted_record_update(rtk, new_record, old_record);
}
@ -274,6 +283,8 @@ where
let new_record = record.clone();
self.sub_from_total_storage(&rtk, &old_record);
self.add_to_total_storage(&rtk, &new_record);
self.add_uncommitted_record_update(rtk, new_record, old_record);
}
@ -723,14 +734,14 @@ where
};
let total_storage_space = self.total_storage_space.get();
let opaque_record_key = record.0.record_key.clone();
self.purge_record_and_subkeys(record.0, record.1, true);
// Reducing the total size will always succeed
let new_total_storage_space = self.total_storage_space.commit().unwrap();
(
None,
Some(opaque_record_key),
ReclaimedSpace {
reclaimed: total_storage_space - new_total_storage_space,
total: total_storage_space,
@ -1165,6 +1176,9 @@ where
return;
}
};
veilid_log!(self trace "Adding {} to total storage", record_size);
if let Err(e) = self.total_storage_space.add(record_size) {
veilid_log!(self error "RecordIndex({}): Unexpected storage space overflow: {}", self.unlocked_inner.name, e);
// haha if we ever hit this line, i would be very very surprised
@ -1180,6 +1194,9 @@ where
return;
}
};
veilid_log!(self trace "Subtracting {} from total storage", record_size);
if let Err(e) = self.total_storage_space.sub(record_size) {
veilid_log!(self error "RecordIndex({}): Unexpected storage space underflow: {}", self.unlocked_inner.name, e);
self.total_storage_space.set(0);
@ -1210,35 +1227,44 @@ where
record_size: u64,
opt_old_record_size: Option<u64>,
) -> VeilidAPIResult<()> {
if let Some(old_record_size) = opt_old_record_size {
self.total_storage_space
.sub(old_record_size)
.map_err(VeilidAPIError::internal)?;
}
self.total_storage_space
.add(record_size)
.map_err(VeilidAPIError::internal)?;
// Get starting size and limit
let mut storage_size_current = self.total_storage_space.get();
let Some(storage_size_limit) = self.total_storage_space.limit() else {
// No limit, just go for it
return Ok(());
};
while !self.total_storage_space.check_limit() {
let Some((dead_k, dead_v)) = self.record_cache.remove_lru() else {
self.total_storage_space.rollback();
// Add the delta we need to make room for
if let Some(old_record_size) = opt_old_record_size {
storage_size_current = storage_size_current.saturating_sub(old_record_size);
}
storage_size_current = storage_size_current.saturating_add(record_size);
// Figure out how many records from the LRU need to go to fit the delta
let mut dead_count = 0usize;
let mut lru_iter = self.record_cache.iter();
while storage_size_current > storage_size_limit {
let Some((dead_k, dead_v)) = lru_iter.next() else {
apibail_generic!("can not make enough room in record store");
};
self.purge_record_and_subkeys(dead_k, dead_v, true);
let lru_record_size = self.estimate_record_storage_size(dead_k, dead_v)?;
storage_size_current = storage_size_current.saturating_sub(lru_record_size);
dead_count += 1;
}
self.total_storage_space
.sub(record_size)
.map_err(VeilidAPIError::internal)?;
if let Some(old_record_size) = opt_old_record_size {
self.total_storage_space
.add(old_record_size)
.map_err(VeilidAPIError::internal)?;
// Purge the required number of records
for _n in 0..dead_count {
let (dead_k, dead_v) = self.record_cache.remove_lru().unwrap();
self.purge_record_and_subkeys(dead_k, dead_v, true);
}
self.total_storage_space
.commit()
.map_err(VeilidAPIError::internal)?;
Ok(())
}

View file

@ -52,7 +52,7 @@ impl StorageManager {
/// If a newer copy of a subkey's data is available online, the background
/// write will pick up the newest subkey data as it does the SetValue fanout
/// and will drive the newest values to consensus.
#[instrument(level = "trace", target = "stor", skip(self), ret, err)]
#[instrument(level = "trace", target = "stor", skip(self), ret)]
pub(super) async fn rehydrate_record(
&self,
opaque_record_key: OpaqueRecordKey,
@ -86,7 +86,7 @@ impl StorageManager {
.get_transaction_by_record(&opaque_record_key)
.is_some()
{
apibail_try_again!("record is currently in transaction");
apibail_try_again!("not rehydrating while records is in transaction");
}
if let Some(opened_record) = inner.opened_records.get(&opaque_record_key) {
opened_record.safety_selection()

View file

@ -26,14 +26,16 @@ impl StorageManager {
let _report = match res {
Ok(v) => v,
Err(e) => {
veilid_log!(self debug "Rehydration request failed: {}", e);
if matches!(e, VeilidAPIError::TryAgain { message: _ }) {
veilid_log!(self debug "Rehydration request skipped: {}", e);
// Try again later
self.add_rehydration_request(
req.0,
req.1.subkeys,
req.1.consensus_count,
);
} else {
veilid_log!(self error "Rehydration request failed: {}", e);
}
return;
}

View file

@ -67,8 +67,8 @@ impl StorageManager {
let records_lock = self
.record_lock_table
.try_lock_records(record_keys.iter().map(|x| x.opaque()).collect())
.ok_or_else(|| VeilidAPIError::try_again("record busy"))?;
.lock_records(record_keys.iter().map(|x| x.opaque()).collect())
.await;
// Early rejection if dht is not online
if !self.dht_is_online() {
@ -284,8 +284,8 @@ impl StorageManager {
let records_lock = self
.record_lock_table
.try_lock_records(transaction_handle.keys().to_vec())
.ok_or_else(|| VeilidAPIError::try_again("record busy"))?;
.lock_records(transaction_handle.keys().to_vec())
.await;
self.end_transaction_locked(&records_lock, transaction_handle.clone())
.await?;
@ -521,8 +521,8 @@ impl StorageManager {
};
let records_lock = self
.record_lock_table
.try_lock_records(transaction_handle.keys().to_vec())
.ok_or_else(|| VeilidAPIError::try_again("record busy"))?;
.lock_records(transaction_handle.keys().to_vec())
.await;
// Early exit if transaction is already gone
if !self
@ -638,11 +638,6 @@ impl StorageManager {
.record_lock_table
.lock_subkey(record_key.opaque(), subkey);
// let _subkey_lock = self
// .record_lock_table
// .try_lock_subkey(record_key.opaque(), subkey)
// .ok_or_else(|| VeilidAPIError::try_again("record subkey busy"))?;
// Early rejection if dht is not online
if !self.dht_is_online() {
apibail_try_again!("dht is not online");
@ -727,11 +722,6 @@ impl StorageManager {
.record_lock_table
.lock_subkey(record_key.opaque(), subkey);
// let _subkey_lock = self
// .record_lock_table
// .try_lock_subkey(record_key.opaque(), subkey)
// .ok_or_else(|| VeilidAPIError::try_again("record subkey busy"))?;
let opaque_record_key = record_key.opaque();
// Early rejection if dht is not online

View file

@ -925,7 +925,7 @@ async def test_dht_transaction_write_read_full_parallel_local():
async with cs:
# Number of records
COUNT = 8
COUNT = 48
# Number of subkeys per record
SUBKEY_COUNT = 32
# BareNonce to encrypt test data
@ -947,7 +947,7 @@ async def test_dht_transaction_write_read_full_parallel_local():
records.append(desc)
# Make encrypted data that is consistent and hard to compress
subkey_data = bytes(chr(ord("A")+n)*MAX_SUBKEY_SIZE, 'ascii')
subkey_data = bytes(chr(ord("A")+n%32)*MAX_SUBKEY_SIZE, 'ascii')
subkey_data = await cs.crypt_no_auth(subkey_data, NONCE, SECRET)
subkey_data_list.append(subkey_data)
@ -964,7 +964,15 @@ async def test_dht_transaction_write_read_full_parallel_local():
async def setter(key: RecordKey, subkey: ValueSubkey, data: bytes):
start = time.time()
await transaction.set(key, subkey, data)
cnt = 0
while True:
try:
await transaction.set(key, subkey, data)
break
except veilid.VeilidAPIErrorTryAgain:
cnt += 1
print(f' retry #{cnt} setting {key} #{subkey}')
continue
print(f'set {key} #{subkey}: {time.time()-start}')
init_set_futures.add(setter(key, ValueSubkey(i), subkey_data))