mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-12-24 03:51:15 -05:00
proper cleanup for deleted records
This commit is contained in:
parent
3ea19b40b7
commit
68ed357aba
8 changed files with 192 additions and 33 deletions
|
|
@ -59,20 +59,31 @@ impl StorageManager {
|
|||
|
||||
pub async fn purge_local_records(&self, reclaim: Option<u64>) -> String {
|
||||
let local_record_store = {
|
||||
let mut inner = self.inner.lock();
|
||||
let inner = self.inner.lock();
|
||||
let Some(local_record_store) = inner.local_record_store.clone() else {
|
||||
return "not initialized".to_owned();
|
||||
};
|
||||
if !inner.opened_records.is_empty() {
|
||||
return "records still opened".to_owned();
|
||||
}
|
||||
inner.offline_subkey_writes.clear();
|
||||
|
||||
local_record_store
|
||||
};
|
||||
let reclaimed_space = local_record_store
|
||||
.reclaim_space(reclaim.unwrap_or(u64::MAX))
|
||||
.await;
|
||||
let record_locks = self
|
||||
.record_lock_table
|
||||
.lock_records(
|
||||
reclaimed_space.dead_records,
|
||||
StorageManagerRecordLockPurpose::Delete,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(e) = self.cleanup_records_locked(&record_locks).await {
|
||||
veilid_log!(self error "Error cleaning up records in local purge: {}", e);
|
||||
}
|
||||
|
||||
format!(
|
||||
"Local records purged: purged {} bytes, now {} bytes total",
|
||||
reclaimed_space.reclaimed, reclaimed_space.total
|
||||
|
|
|
|||
|
|
@ -22,6 +22,53 @@ impl StorageManager {
|
|||
self.close_record_locked(&record_lock)?;
|
||||
|
||||
// Remove the record from the local store
|
||||
local_record_store.delete_record(opaque_record_key).await
|
||||
local_record_store.delete_record(&opaque_record_key).await?;
|
||||
|
||||
let record_locks: StorageManagerRecordsLockGuard = record_lock.into();
|
||||
|
||||
// Clean up the record from the storage manager
|
||||
self.cleanup_records_locked(&record_locks).await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "stor", skip_all)]
|
||||
pub(super) async fn cleanup_records_locked(
|
||||
&self,
|
||||
records_lock: &StorageManagerRecordsLockGuard,
|
||||
) -> VeilidAPIResult<()> {
|
||||
let Ok(_guard) = self.startup_lock.enter() else {
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
let local_record_store = self.get_local_record_store()?;
|
||||
|
||||
// Ensure the records are closed
|
||||
let opaque_record_keys = records_lock.records();
|
||||
let mut inner = self.inner.lock();
|
||||
for opaque_record_key in &opaque_record_keys {
|
||||
if local_record_store.contains_record(opaque_record_key) {
|
||||
apibail_internal!(
|
||||
"can't clean up record that is still in local record store: {}",
|
||||
opaque_record_key
|
||||
);
|
||||
}
|
||||
|
||||
if inner.opened_records.contains_key(opaque_record_key) {
|
||||
apibail_internal!(
|
||||
"can't clean up record that is still opened: {}",
|
||||
opaque_record_key
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let dead_records_set: HashSet<OpaqueRecordKey> = opaque_record_keys.into_iter().collect();
|
||||
|
||||
inner
|
||||
.offline_subkey_writes
|
||||
.retain(|k, _| !dead_records_set.contains(k));
|
||||
|
||||
inner
|
||||
.rehydration_requests
|
||||
.retain(|k, _| !dead_records_set.contains(k));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,3 +37,11 @@ impl<R: LockPurpose, S: LockPurpose> fmt::Display for RecordsLockGuard<R, S> {
|
|||
write!(f, "[{}]", records)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: LockPurpose, S: LockPurpose> From<RecordLockGuard<R, S>> for RecordsLockGuard<R, S> {
|
||||
fn from(value: RecordLockGuard<R, S>) -> Self {
|
||||
Self {
|
||||
record_lock_guards: vec![value],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,10 +51,11 @@ pub(super) trait RecordDetail:
|
|||
}
|
||||
|
||||
/// Reclaimed space return value
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(super) struct ReclaimedSpace {
|
||||
pub reclaimed: u64,
|
||||
pub total: u64,
|
||||
pub dead_records: Vec<OpaqueRecordKey>,
|
||||
}
|
||||
|
||||
/// Record store interface
|
||||
|
|
@ -168,7 +169,7 @@ where
|
|||
}
|
||||
|
||||
#[instrument(level = "trace", target = "stor", skip_all, err)]
|
||||
pub async fn delete_record(&self, opaque_record_key: OpaqueRecordKey) -> VeilidAPIResult<()> {
|
||||
pub async fn delete_record(&self, opaque_record_key: &OpaqueRecordKey) -> VeilidAPIResult<()> {
|
||||
let _record_lock = self
|
||||
.record_store_lock_table
|
||||
.lock_record(
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ where
|
|||
|
||||
pub fn delete_record(
|
||||
&mut self,
|
||||
opaque_record_key: OpaqueRecordKey,
|
||||
opaque_record_key: &OpaqueRecordKey,
|
||||
) -> VeilidAPIResult<Option<CommitAction<D>>> {
|
||||
self.record_index.delete_record(opaque_record_key.clone())?;
|
||||
self.cleanup_record(opaque_record_key);
|
||||
|
|
@ -217,10 +217,11 @@ where
|
|||
let mut total_reclaimed_space = ReclaimedSpace {
|
||||
reclaimed: 0,
|
||||
total: self.record_index.total_storage_space(),
|
||||
dead_records: vec![],
|
||||
};
|
||||
|
||||
while total_reclaimed_space.reclaimed < space {
|
||||
let (opt_deleted_key, reclaimed_space) = match self.record_index.delete_lru() {
|
||||
let mut reclaimed_space = match self.record_index.delete_lru() {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
veilid_log!(self error "Error reclaiming space: {}", e);
|
||||
|
|
@ -228,14 +229,19 @@ where
|
|||
}
|
||||
};
|
||||
|
||||
if reclaimed_space.dead_records.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
for dead_record in &reclaimed_space.dead_records {
|
||||
self.cleanup_record(dead_record);
|
||||
}
|
||||
|
||||
total_reclaimed_space.reclaimed += reclaimed_space.reclaimed;
|
||||
total_reclaimed_space.total = reclaimed_space.total;
|
||||
|
||||
let Some(deleted_key) = opt_deleted_key else {
|
||||
break;
|
||||
};
|
||||
|
||||
self.cleanup_record(deleted_key);
|
||||
total_reclaimed_space
|
||||
.dead_records
|
||||
.append(&mut reclaimed_space.dead_records);
|
||||
}
|
||||
|
||||
(
|
||||
|
|
@ -246,10 +252,10 @@ where
|
|||
|
||||
////////////////////////////////////////////////////////////
|
||||
|
||||
fn cleanup_record(&mut self, opaque_record_key: OpaqueRecordKey) {
|
||||
fn cleanup_record(&mut self, opaque_record_key: &OpaqueRecordKey) {
|
||||
if self
|
||||
.record_index
|
||||
.peek_record(&opaque_record_key, |_| {})
|
||||
.peek_record(opaque_record_key, |_| {})
|
||||
.is_some()
|
||||
{
|
||||
veilid_log!(self error "Record should not exist in index: {}", opaque_record_key);
|
||||
|
|
@ -257,7 +263,7 @@ where
|
|||
}
|
||||
|
||||
let rtk = RecordTableKey {
|
||||
record_key: opaque_record_key,
|
||||
record_key: opaque_record_key.clone(),
|
||||
};
|
||||
|
||||
// Remove transactions
|
||||
|
|
|
|||
|
|
@ -826,17 +826,15 @@ where
|
|||
|
||||
/// Delete the least recently used record
|
||||
/// Returns which record was deleted and amount of space reclaimed
|
||||
pub fn delete_lru(&mut self) -> VeilidAPIResult<(Option<OpaqueRecordKey>, ReclaimedSpace)> {
|
||||
pub fn delete_lru(&mut self) -> VeilidAPIResult<ReclaimedSpace> {
|
||||
let total_storage_space = self.record_cache_space.with_value(|x| x)?;
|
||||
|
||||
let Some(record) = self.record_cache.remove_lru() else {
|
||||
return Ok((
|
||||
None,
|
||||
ReclaimedSpace {
|
||||
reclaimed: 0,
|
||||
total: total_storage_space,
|
||||
},
|
||||
));
|
||||
return Ok(ReclaimedSpace {
|
||||
reclaimed: 0,
|
||||
total: total_storage_space,
|
||||
dead_records: vec![],
|
||||
});
|
||||
};
|
||||
|
||||
let opaque_record_key = record.0.record_key.clone();
|
||||
|
|
@ -844,13 +842,11 @@ where
|
|||
|
||||
let new_total_storage_space = self.record_cache_space.with_value(|x| x)?;
|
||||
|
||||
Ok((
|
||||
Some(opaque_record_key),
|
||||
ReclaimedSpace {
|
||||
reclaimed: total_storage_space - new_total_storage_space,
|
||||
total: total_storage_space,
|
||||
},
|
||||
))
|
||||
Ok(ReclaimedSpace {
|
||||
reclaimed: total_storage_space - new_total_storage_space,
|
||||
total: total_storage_space,
|
||||
dead_records: vec![opaque_record_key],
|
||||
})
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
|||
|
|
@ -340,7 +340,7 @@ describe('VeilidRoutingContext', () => {
|
|||
|
||||
describe(`DHT transactions kitchen sink for ${cryptoKind}`, () => {
|
||||
const DHT_RECORD_COUNT = 8
|
||||
const DHT_SUBKEY_SIZE = 37268
|
||||
const DHT_SUBKEY_SIZE = 32768
|
||||
const DHT_SUBKEY_COUNT = 32
|
||||
const data: Uint8Array[] = [];
|
||||
for (let rec = 0; rec < DHT_RECORD_COUNT; rec++) {
|
||||
|
|
@ -393,6 +393,96 @@ describe('VeilidRoutingContext', () => {
|
|||
await tx.commit();
|
||||
});
|
||||
|
||||
it('should create transaction, add sets, and rollback', async () => {
|
||||
const tx = await veilidClient.transactDHTRecords(dhtRecords);
|
||||
await expect(tx).toBeInstanceOf(VeilidDHTTransaction);
|
||||
await tx.rollback();
|
||||
});
|
||||
|
||||
it('should create transaction, add sets, and commit', async () => {
|
||||
const tx = await veilidClient.transactDHTRecords(dhtRecords);
|
||||
await expect(tx).toBeInstanceOf(VeilidDHTTransaction);
|
||||
|
||||
for (let rec = 0; rec < DHT_RECORD_COUNT; rec++) {
|
||||
const res = await tx.set(dhtRecords[rec], 0, data[rec]);
|
||||
await expect(res).toBeUndefined();
|
||||
}
|
||||
|
||||
await tx.commit();
|
||||
});
|
||||
|
||||
it('should create transaction, add sets, gets, and commit', async () => {
|
||||
const tx = await veilidClient.transactDHTRecords(dhtRecords);
|
||||
await expect(tx).toBeInstanceOf(VeilidDHTTransaction);
|
||||
|
||||
for (let rec = 0; rec < DHT_RECORD_COUNT; rec++) {
|
||||
const res = await tx.set(dhtRecords[rec], 0, data[rec]);
|
||||
await expect(res).toBeUndefined();
|
||||
}
|
||||
|
||||
for (let rec = 0; rec < DHT_RECORD_COUNT; rec++) {
|
||||
const res = await tx.get(dhtRecords[rec], 0);
|
||||
await expect(res).toBeUndefined();
|
||||
}
|
||||
|
||||
await tx.commit();
|
||||
});
|
||||
|
||||
it('should create transaction, add sets, commit then a new transaction and gets, and commit', async () => {
|
||||
const tx = await veilidClient.transactDHTRecords(dhtRecords);
|
||||
await expect(tx).toBeInstanceOf(VeilidDHTTransaction);
|
||||
|
||||
for (let rec = 0; rec < DHT_RECORD_COUNT; rec++) {
|
||||
const res = await tx.set(dhtRecords[rec], 0, data[rec]);
|
||||
await expect(res).toBeUndefined();
|
||||
}
|
||||
|
||||
await tx.commit();
|
||||
|
||||
const tx2 = await veilidClient.transactDHTRecords(dhtRecords);
|
||||
await expect(tx2).toBeInstanceOf(VeilidDHTTransaction);
|
||||
|
||||
for (let rec = 0; rec < DHT_RECORD_COUNT; rec++) {
|
||||
const res = await tx2.get(dhtRecords[rec], 0);
|
||||
await expect(res).toBeDefined();
|
||||
await expect(res?.seq).toEqual(0);
|
||||
await expect(res?.data).toEqual(data[rec]);
|
||||
}
|
||||
|
||||
await tx2.commit();
|
||||
});
|
||||
|
||||
it('should create empty transaction, add gets and commit', async () => {
|
||||
const tx = await veilidClient.transactDHTRecords(dhtRecords);
|
||||
await expect(tx).toBeInstanceOf(VeilidDHTTransaction);
|
||||
|
||||
for (let rec = 0; rec < DHT_RECORD_COUNT; rec++) {
|
||||
const res = await tx.get(dhtRecords[rec], 0);
|
||||
await expect(res).toBeUndefined();
|
||||
}
|
||||
|
||||
await tx.commit();
|
||||
});
|
||||
|
||||
it('should create empty transaction, fail non-transactional sets and then rollback', async () => {
|
||||
const tx = await veilidClient.transactDHTRecords(dhtRecords);
|
||||
await expect(tx).toBeInstanceOf(VeilidDHTTransaction);
|
||||
|
||||
for (let rec = 0; rec < DHT_RECORD_COUNT; rec++) {
|
||||
await expect(async () => { await routingContext.setDHTValue(dhtRecords[rec], 0, data[rec]); }).toThrow();
|
||||
}
|
||||
|
||||
await tx.rollback();
|
||||
});
|
||||
|
||||
|
||||
// it('should create transaction, fill all records, commit, and then get all records', async () => {
|
||||
// const tx = await veilidClient.transactDHTRecords(dhtRecords);
|
||||
// await expect(tx).toBeInstanceOf(VeilidDHTTransaction);
|
||||
// await tx.commit();
|
||||
// });
|
||||
|
||||
|
||||
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import { VeilidWASMConfig, veilidClient } from 'veilid-wasm';
|
||||
|
||||
export const DEBUGGING = false;
|
||||
export const DEBUGGING = true;
|
||||
|
||||
export const veilidCoreInitConfig: VeilidWASMConfig = {
|
||||
logging: {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue