more record index

This commit is contained in:
Christien Rioux 2025-11-21 19:29:42 -05:00
parent f9b371486b
commit 8cdc3be460
6 changed files with 436 additions and 105 deletions

View file

@ -32,4 +32,7 @@ impl RecordDetail for LocalRecordDetail {
fn is_new(&self) -> bool {
self.nodes.is_empty()
}
fn total_size(&self) -> usize {
size_of::<Self>() + (self.subkeys.ranges_len() * size_of::<ValueSubkey>() * 2)
}
}

View file

@ -53,6 +53,7 @@ pub(super) trait RecordDetail:
fmt::Debug + Clone + PartialEq + Eq + Serialize + for<'d> Deserialize<'d>
{
fn is_new(&self) -> bool;
fn total_size(&self) -> usize;
}
/// Record store interface

View file

@ -9,10 +9,11 @@ where
subkey_count: usize,
stored_subkeys: ValueSubkeyRangeSet,
#[serde(default)]
subkey_seqs: Vec<u32>, xxx figure out where load_db handles this. we shouldnt need to load_db in the record_index direcly...
subkey_seqs: Vec<u32>,
#[serde(default)]
subkey_sizes: Vec<u16>,
last_touched_ts: Timestamp,
#[serde(skip)]
record_data_size: usize,
#[serde(bound(deserialize = "D: RecordDetail"))]
detail: D,
@ -41,6 +42,14 @@ where
})
}
pub fn post_deserialize(&mut self) {
self.record_data_size = self
.subkey_sizes
.iter()
.copied()
.fold(0, |a, b| a + (b as usize))
}
pub fn is_new(&self) -> bool {
self.stored_subkeys.is_empty() && self.record_data_size == 0 && self.detail.is_new()
}
@ -61,12 +70,54 @@ where
&self.stored_subkeys
}
pub fn store_subkey(&mut self, subkey: ValueSubkey) {
self.stored_subkeys.insert(subkey);
pub fn needs_cleanup(&self) -> bool {
if self.subkey_seqs.len() != self.subkey_count
|| self.subkey_sizes.len() != self.subkey_count
{
return true;
}
false
}
pub fn store_subkeys(&mut self, subkeys: &ValueSubkeyRangeSet) {
self.stored_subkeys = self.stored_subkeys.union(subkeys);
pub fn cleanup(&mut self, subkey_info: Vec<(ValueSeqNum, u16)>) {
self.subkey_seqs = vec![0; self.subkey_count];
self.subkey_sizes = vec![0; self.subkey_count];
self.record_data_size = 0;
self.stored_subkeys.clear();
for (n, (seq, size)) in subkey_info.iter().copied().enumerate() {
self.subkey_seqs[n] = u32::from(seq);
self.subkey_sizes[n] = size;
self.record_data_size += size as usize;
if seq.is_some() {
self.stored_subkeys.insert(n as ValueSubkey);
}
}
}
pub fn store_subkey(&mut self, subkey: ValueSubkey, data: Arc<SignedValueData>) {
let seq = data.value_data().seq();
let size = data.data_size() as u16;
self.stored_subkeys.insert(subkey);
self.subkey_seqs.resize(self.subkey_count, 0);
self.subkey_seqs[subkey as usize] = u32::from(seq);
self.subkey_sizes.resize(self.subkey_count, 0);
let old_size = self.subkey_sizes[subkey as usize];
self.subkey_sizes[subkey as usize] = size;
if size > old_size {
self.record_data_size += (size - old_size) as usize;
} else if size < old_size {
self.record_data_size -= (old_size - size) as usize;
}
}
pub fn subkey_size(&self, subkey: ValueSubkey) -> u16 {
self.subkey_sizes[subkey as usize]
}
pub fn subkey_seq(&self, subkey: ValueSubkey) -> ValueSeqNum {
ValueSeqNum::from(self.subkey_seqs[subkey as usize])
}
pub fn touch(&mut self) {
@ -77,10 +128,6 @@ where
self.last_touched_ts
}
pub fn set_record_data_size(&mut self, size: usize) {
self.record_data_size = size;
}
pub fn record_data_size(&self) -> usize {
self.record_data_size
}
@ -91,9 +138,20 @@ where
}
pub fn total_size(&self) -> usize {
(mem::size_of::<Self>() - mem::size_of::<Arc<SignedValueDescriptor>>())
// self - (things that have total_size() function)
(size_of::<Self>() - size_of::<Arc<SignedValueDescriptor>>() - size_of::<D>())
// descriptor
+ self.descriptor.total_size()
// stored_subkeys
+ self.stored_subkeys.ranges_len() * size_of::<usize>() * 2
// subkey_seys
+ self.subkey_seqs.len() * size_of::<u32>()
// subkey_sizes
+ self.subkey_sizes.len() * size_of::<u16>()
// record data
+ self.record_data_size
// detail
+ self.detail.total_size()
}
pub fn detail(&self) -> &D {

View file

@ -52,6 +52,8 @@ where
}
}
xxx continue here
impl<D> RecordStoreInner<D>
where
D: RecordDetail,
@ -73,70 +75,6 @@ where
Ok(inner)
}
async fn load_db(&mut self) -> EyreResult<()> {
// Pull record index from table into a vector to ensure we sort them
let record_table_keys = self.unlocked_inner.record_table.get_keys(0).await?;
let mut record_index_sorted: Vec<(RecordTableKey, Record<D>)> =
Vec::with_capacity(record_table_keys.len());
for k in record_table_keys {
if let Some(record) = self
.unlocked_inner
.record_table
.load_json::<Record<D>>(0, &k)
.await?
{
let rtk = RecordTableKey::try_from(k.as_ref())?;
record_index_sorted.push((rtk, record));
}
}
// Sort the record index by reverse last touched time (newest first)
record_index_sorted.sort_by(|a, b| b.1.last_touched().cmp(&a.1.last_touched()));
// Truncate the record list to the max record count
let rt_xact = self.unlocked_inner.record_table.transact();
let st_xact = self.unlocked_inner.subkey_table.transact();
let record_index_limit = self.record_index.capacity();
if record_index_sorted.len() > record_index_limit {
// Purge excess records
self.purge_records(
record_index_sorted[record_index_limit..].iter(),
&rt_xact,
&st_xact,
)
.await;
record_index_sorted.truncate(record_index_limit);
}
// Figure out which records might overflow any of our limits
for (n, (_, record)) in record_index_sorted.iter().enumerate() {
// Total the storage space
self.total_storage_space
.add((mem::size_of::<RecordTableKey>() + record.total_size()) as u64)
.unwrap();
if !self.total_storage_space.check_limit() {
// Revert the total storage space because the commit would fail
self.total_storage_space.rollback();
// Purge excess records
self.purge_records(record_index_sorted[n..].iter(), &rt_xact, &st_xact)
.await;
record_index_sorted.truncate(n);
break;
}
self.total_storage_space.commit().unwrap();
}
// Commit purge if anything was added
rt_xact.commit().await?;
st_xact.commit().await?;
Ok(())
}
/// Transactionally delete a list of records
/// Only called during initial db load
async fn purge_records<'a, I: Iterator<Item = &'a (RecordTableKey, Record<D>)>>(

View file

@ -144,6 +144,86 @@ where
}
}
async fn load_db(&mut self) -> EyreResult<()> {
let start_ts = Timestamp::now();
veilid_log!(self info "Loading record index: {}", self.unlocked_inner.name);
// Start transactions for cleanups
let rt_xact = self.unlocked_inner.record_table.transact();
let st_xact = self.unlocked_inner.subkey_table.transact();
// Pull record index from table into a vector to ensure we sort them
// If they don't load, delete 'em.
let record_table_keys = self.unlocked_inner.record_table.get_keys(0).await?;
let mut record_index_sorted: Vec<(RecordTableKey, Record<D>)> =
Vec::with_capacity(record_table_keys.len());
for k in record_table_keys {
let Ok(rtk) = RecordTableKey::try_from(k.as_slice()) else {
rt_xact.delete(0, &k).await?;
continue;
};
let Ok(record) = self.load_record_from_db(&rtk).await else {
rt_xact.delete(0, &k).await?;
continue;
};
let rtk = RecordTableKey::try_from(k.as_ref())?;
record_index_sorted.push((rtk, record));
}
// Sort the record index by reverse last touched time (newest first)
record_index_sorted.sort_by(|a, b| b.1.last_touched().cmp(&a.1.last_touched()));
// Truncate the record list to the max record count
let record_index_limit = self.record_cache.capacity();
if record_index_sorted.len() > record_index_limit {
// Drop excess records
for (rtk, record) in record_index_sorted[record_index_limit..].iter() {
self.delete_record_from_db_transaction(rtk, record, &rt_xact, &st_xact)
.await?;
}
record_index_sorted.truncate(record_index_limit);
}
// Figure out which records might overflow any of our limits
for (n, (_, record)) in record_index_sorted.iter().enumerate() {
// Total the storage space
self.add_to_total_storage(&record);
// See if we need to drop records to fit
if !self.total_storage_space.check_limit() {
// Revert from the storage total
self.sub_from_total_storage(&record);
self.total_storage_space.commit().unwrap();
// Drop excess records
for (rtk, record) in record_index_sorted[n..].iter() {
self.delete_record_from_db_transaction(rtk, record, &rt_xact, &st_xact)
.await?;
}
record_index_sorted.truncate(n);
break;
}
self.total_storage_space.commit().unwrap();
}
// Commit purges
rt_xact.commit().await?;
st_xact.commit().await?;
// Now insert records in reverse order from oldest to newest to preserve LRU
for (rtk, record) in record_index_sorted.into_iter().rev() {
self.record_cache.insert(rtk, record);
}
let end_ts = Timestamp::now();
veilid_log!(self info "Finished loading {} in {}", self.unlocked_inner.name, end_ts.duration_since(start_ts));
Ok(())
}
/// Create a new record
pub fn create(&mut self, key: OpaqueRecordKey, record: Record<D>) -> VeilidAPIResult<()> {
let rtk = RecordTableKey {
@ -157,9 +237,11 @@ where
}
// Make room here or reject create
self.make_room_for_record(&record)?;
self.make_room_for_record(&record, None)?;
// Add to record cache
self.add_to_total_storage(&record);
let mut opt_lru_out = None;
if let Some(prev_record) =
self.record_cache
@ -178,6 +260,10 @@ where
self.purge_record_and_subkeys(lru_out.0, lru_out.1, true);
}
self.total_storage_space
.commit()
.map_err(VeilidAPIError::internal)?;
// Add uncommited record create
self.add_uncommitted_record_create(rtk, record);
@ -196,25 +282,32 @@ where
};
self.purge_record_and_subkeys(rtk, record, false);
self.total_storage_space
.commit()
.map_err(VeilidAPIError::internal)?;
Ok(())
}
/// Update a record subkey
pub fn update(
pub async fn update(
&mut self,
key: OpaqueRecordKey,
subkey: ValueSubkey,
value: RecordData,
new_data: RecordData,
) -> VeilidAPIResult<()> {
let rtk = RecordTableKey {
record_key: key.clone(),
};
let stk = SubkeyTableKey {
record_key: key.clone(),
subkey,
};
let opt_old_data = self.load_subkey_cached(&stk).await?;
let rtk = RecordTableKey {
record_key: key.clone(),
};
let Some(record) = self.record_cache.get_mut(&rtk) else {
veilid_log!(self error "RecordIndex({}): Record missing with key {}", self.unlocked_inner.name, key);
apibail_internal!("record missing");
@ -222,10 +315,31 @@ where
let old_record = record.clone();
record.touch();
record.store_subkey(subkey);
record.store_subkey(subkey, new_data.signed_value_data());
self.add_uncommitted_record_update(rtk, record.clone(), old_record);
record.touch();
let new_record = record.clone();
// Make room here or reject update
self.make_room_for_record(&new_record, Some(&old_record))?;
// Adjust total storage
// Should not fail because we made room
self.sub_from_total_storage(&old_record);
self.add_to_total_storage(&new_record);
self.total_storage_space
.commit()
.map_err(VeilidAPIError::internal)?;
// Cache the new subkey data
self.cache_subkey(stk.clone(), new_data.clone());
// Queue the db updates
self.add_uncommitted_subkey_update(stk, new_data, opt_old_data);
self.add_uncommitted_record_update(rtk, new_record, old_record);
Ok(())
}
/// Write out a transaction to be committed
@ -365,7 +479,7 @@ where
for (stk, usc) in uncommitted_subkey_changes.iter().rev() {
match usc {
UncommittedSubkeyChange::Create { data } => {
let opt_prev_data = self.subkey_cache.remove(&stk);
let opt_prev_data = self.uncache_subkey(&stk);
// Validate
if let Some(prev_data) = opt_prev_data {
@ -383,10 +497,10 @@ where
if opt_old_data.is_some() {
// Skip for now
} else {
let prev_data = self.subkey_cache.remove(&stk);
let opt_prev_data = self.uncache_subkey(&stk);
// Validate
if prev_data.as_ref() != Some(new_data) {
if opt_prev_data.as_ref() != Some(new_data) {
veilid_log!(self error "UncommittedSubkeyChange::Update rollback: {} had unexpected previous value upon removal", &stk);
}
}
@ -406,7 +520,7 @@ where
opt_old_data,
} => {
if let Some(old_data) = opt_old_data {
let prev_data = self.subkey_cache.insert(stk.clone(), old_data);
let prev_data = self.cache_subkey(stk.clone(), old_data);
// Validate
if prev_data != Some(new_data) {
@ -419,7 +533,7 @@ where
UncommittedSubkeyChange::Delete { opt_data, is_lru } => {
if let Some(data) = opt_data {
// Put the data back in the cache
let prev_data = self.subkey_cache.insert(stk.clone(), data);
let prev_data = self.cache_subkey(stk.clone(), data);
// Validate
if prev_data.is_some() {
@ -460,6 +574,219 @@ where
//////////////////////////////////////////////////////////////////////////////////////////
/// Loads a record directly from the database, bypassing caches
/// Automatically cleans up the record if it is desynchronized
async fn load_record_from_db(&self, rtk: &RecordTableKey) -> VeilidAPIResult<Record<D>> {
let Some(mut record) = self
.unlocked_inner
.record_table
.load_json::<Record<D>>(0, &rtk.bytes())
.await?
else {
apibail_internal!(format!("missing record: {}", rtk));
};
record.post_deserialize();
if record.needs_cleanup() {
self.cleanup_record(rtk, &mut record).await?;
}
Ok(record)
}
/// Deletes a record directly from the database via a transaction
/// Requires that it is not in the record index and has no subkeys cached
async fn delete_record_from_db_transaction(
&self,
rtk: &RecordTableKey,
record: &Record<D>,
rt_xact: &TableDBTransaction,
st_xact: &TableDBTransaction,
) -> VeilidAPIResult<()> {
if self.record_cache.contains_key(rtk) {
apibail_internal!(format!(
"should have removed record from cache already: {}",
rtk
));
}
let stored_subkeys = record.stored_subkeys();
for sk in stored_subkeys.iter() {
let stk = SubkeyTableKey {
record_key: rtk.record_key.clone(),
subkey: sk,
};
if self.subkey_cache.contains_key(&stk) {
apibail_internal!(format!(
"should have removed subkey from cache already: {}",
stk
));
}
st_xact.delete(0, &stk.bytes()).await?;
}
rt_xact.delete(0, &rtk.bytes()).await?;
Ok(())
}
/// Resynchronizes record with in-database copies of all subkey data
async fn cleanup_record(
&self,
rtk: &RecordTableKey,
record: &mut Record<D>,
) -> VeilidAPIResult<()> {
let mut subkey_info = vec![];
let mut stk = SubkeyTableKey {
record_key: rtk.record_key.clone(),
subkey: 0,
};
for subkey in 0..record.subkey_count() {
stk.subkey = subkey as ValueSubkey;
if let Ok(Some(recorddata)) = self.load_subkey_from_db(&stk).await {
subkey_info.push((
recorddata.signed_value_data().value_data().seq(),
recorddata.data_size() as u16,
));
} else {
subkey_info.push((ValueSeqNum::NONE, 0u16));
}
}
record.cleanup(subkey_info);
self.unlocked_inner
.record_table
.store_json::<Record<D>>(0, &rtk.bytes(), &record)
.await?;
Ok(())
}
/// Loads a subkey from the database directly, bypassing the cache
/// Performs no verifications
async fn load_subkey_from_db(
&self,
stk: &SubkeyTableKey,
) -> VeilidAPIResult<Option<RecordData>> {
self.unlocked_inner
.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
}
/// Adds subkey data to the cache, performing all of the accounting around the operation
/// Evicts enough other subkeys from the cache to make room and meet limits
/// Return the data that was previously in the cache
fn cache_subkey(&mut self, stk: SubkeyTableKey, data: RecordData) -> Option<RecordData> {
self.add_to_subkey_cache_size(&data);
let mut opt_lru_out = None;
let opt_prev_data = self
.subkey_cache
.insert_with_callback(stk, data, |lruk, lruv| {
opt_lru_out = Some((lruk, lruv));
});
if let Some(lru_out) = opt_lru_out {
self.sub_from_subkey_cache_size(&lru_out.1);
}
if let Some(prev_data) = &opt_prev_data {
self.sub_from_subkey_cache_size(prev_data);
}
while !self.subkey_cache_total_size.check_limit() {
let Some((_dead_stk, dead_data)) = self.subkey_cache.remove_lru() else {
veilid_log!(self error "can not make enough room in subkey cache, purging cache");
self.subkey_cache_total_size.set(0);
self.subkey_cache_total_size.commit().unwrap();
self.subkey_cache.clear();
return opt_prev_data;
};
self.sub_from_subkey_cache_size(&dead_data);
}
self.subkey_cache_total_size.commit().unwrap();
opt_prev_data
}
/// Removes subkey data from the cache, performing all of the accounting around the operation
/// Return the data that was previously in the cache
fn uncache_subkey(&mut self, stk: &SubkeyTableKey) -> Option<RecordData> {
let opt_data = self.subkey_cache.remove(stk);
if let Some(data) = &opt_data {
self.sub_from_subkey_cache_size(data);
}
self.subkey_cache_total_size.commit().unwrap();
opt_data
}
/// Loads a subkey from the cache, or from disk if not in the cache
/// Performs cleanups on the subkey's record if inconsistent
async fn load_subkey_cached(
&mut self,
stk: &SubkeyTableKey,
) -> VeilidAPIResult<Option<RecordData>> {
let rtk = RecordTableKey {
record_key: stk.record_key.clone(),
};
let Some(record) = self.record_cache.get(&rtk) else {
apibail_internal!(format!("missing record for subkey: {}", &stk));
};
let valid_seq = record.subkey_seq(stk.subkey);
let valid_size = record.subkey_size(stk.subkey);
let opt_record_data = match self.subkey_cache.get(stk).cloned() {
Some(record_data) => Some(record_data.clone()),
None => {
let opt_record_data = self.load_subkey_from_db(stk).await?;
if let Some(record_data) = opt_record_data.clone() {
self.cache_subkey(stk.clone(), record_data);
}
opt_record_data
}
};
// Validate seq and size before returning it
let seq = opt_record_data
.as_ref()
.map(|x| x.signed_value_data().value_data().seq())
.unwrap_or_default();
let size = opt_record_data
.as_ref()
.map(|x| x.data_size() as u16)
.unwrap_or_default();
if seq == valid_seq && size == valid_size {
return Ok(opt_record_data);
}
// Cleanup required, remove the record for processing
let mut record = self
.record_cache
.remove(&rtk)
.ok_or_else(|| VeilidAPIError::internal("record missing from cache"))?;
self.sub_from_total_storage(&record);
let mut stk = stk.clone();
for subkey in 0..record.subkey_count() {
stk.subkey = subkey as ValueSubkey;
self.uncache_subkey(&stk);
}
// Clean up record and add it back to the cache
self.cleanup_record(&rtk, &mut record).await?;
self.add_to_total_storage(&record);
self.record_cache.insert(rtk, record);
// Load subkey, cache, and return it
let opt_record_data = self.load_subkey_from_db(&stk).await?;
if let Some(record_data) = opt_record_data.clone() {
self.cache_subkey(stk, record_data);
}
Ok(opt_record_data)
}
fn record_storage_size(record: &Record<D>) -> u64 {
(mem::size_of::<RecordTableKey>() + record.total_size()) as u64
}
@ -498,9 +825,19 @@ where
}
}
fn make_room_for_record(&mut self, record: &Record<D>) -> VeilidAPIResult<()> {
fn make_room_for_record(
&mut self,
record: &Record<D>,
opt_old_record: Option<&Record<D>>,
) -> VeilidAPIResult<()> {
let record_size = Self::record_storage_size(&record);
let opt_old_record_size = opt_old_record.map(|x| Self::record_storage_size(x));
if let Some(old_record_size) = opt_old_record_size {
self.total_storage_space
.sub(old_record_size)
.map_err(VeilidAPIError::internal)?;
}
self.total_storage_space
.add(record_size)
.map_err(VeilidAPIError::internal)?;
@ -516,6 +853,11 @@ where
self.total_storage_space
.sub(record_size)
.map_err(VeilidAPIError::internal)?;
if let Some(old_record_size) = opt_old_record_size {
self.total_storage_space
.add(old_record_size)
.map_err(VeilidAPIError::internal)?;
}
self.total_storage_space
.commit()
@ -535,7 +877,7 @@ where
subkey: sk,
};
let opt_data = self.subkey_cache.remove(&stk);
let opt_data = self.uncache_subkey(&stk);
self.add_uncommitted_subkey_delete(stk, opt_data, is_lru);
}
@ -549,7 +891,6 @@ where
record: Record<D>,
is_lru: bool,
) {
self.sub_from_total_storage(&record);
self.uncommitted_record_changes
.insert(rtk, UncommittedRecordChange::Delete { record, is_lru });
}
@ -560,8 +901,6 @@ where
new_record: Record<D>,
old_record: Record<D>,
) {
self.sub_from_total_storage(&old_record);
self.add_to_total_storage(&new_record);
self.uncommitted_record_changes.insert(
rtk,
UncommittedRecordChange::Update {
@ -572,7 +911,6 @@ where
}
fn add_uncommitted_record_create(&mut self, rtk: RecordTableKey, record: Record<D>) {
self.add_to_total_storage(&record);
self.uncommitted_record_changes
.insert(rtk, UncommittedRecordChange::Create { record });
}
@ -583,9 +921,6 @@ where
opt_data: Option<RecordData>,
is_lru: bool,
) {
if let Some(data) = &opt_data {
self.sub_from_subkey_cache_size(data);
}
self.uncommitted_subkey_changes
.insert(stk, UncommittedSubkeyChange::Delete { opt_data, is_lru });
}
@ -596,11 +931,6 @@ where
new_data: RecordData,
opt_old_data: Option<RecordData>,
) {
if let Some(old_data) = &opt_old_data {
self.sub_from_subkey_cache_size(old_data);
}
self.add_to_subkey_cache_size(&new_data);
self.uncommitted_subkey_changes.insert(
stk,
UncommittedSubkeyChange::Update {
@ -611,8 +941,6 @@ where
}
fn add_uncommitted_subkey_create(&mut self, stk: SubkeyTableKey, data: RecordData) {
self.add_to_subkey_cache_size(&data);
self.uncommitted_subkey_changes
.insert(stk, UncommittedSubkeyChange::Create { data });
}

View file

@ -7,4 +7,7 @@ impl RecordDetail for RemoteRecordDetail {
fn is_new(&self) -> bool {
true
}
fn total_size(&self) -> usize {
size_of::<Self>()
}
}