From 3b687aed507a32beb8df570fcbf2cb702d39f2c3 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 9 Apr 2023 13:29:20 -0400 Subject: [PATCH] record store --- veilid-core/src/storage_manager/mod.rs | 42 +-- .../src/storage_manager/record_store.rs | 301 +++++++++++++----- .../storage_manager/record_store_limits.rs | 10 +- .../src/storage_manager/value_record.rs | 30 +- veilid-core/src/veilid_api/types.rs | 17 + veilid-core/src/veilid_config.rs | 23 ++ 6 files changed, 302 insertions(+), 121 deletions(-) diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 79e5369f..05dd515e 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -67,22 +67,22 @@ impl StorageManager { fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { RecordStoreLimits { subkey_cache_size: todo!(), + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_total_size: MAX_RECORD_DATA_SIZE, max_records: None, max_subkey_cache_memory_mb: Some(xxx), - max_disk_space_mb: None, - max_subkey_size: MAX_SUBKEY_SIZE, - max_record_data_size: MAX_RECORD_DATA_SIZE, + max_storage_space_mb: None, } } fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { RecordStoreLimits { subkey_cache_size: todo!(), + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_total_size: MAX_RECORD_DATA_SIZE, max_records: Some(xxx), max_subkey_cache_memory_mb: Some(xxx), - max_disk_space_mb: Some(xxx), - max_subkey_size: MAX_SUBKEY_SIZE, - max_record_data_size: MAX_RECORD_DATA_SIZE, + max_storage_space_mb: Some(xxx), } } @@ -112,18 +112,25 @@ impl StorageManager { debug!("startup storage manager"); let mut inner = self.inner.lock(); - let local_limits = Self::local_limits_from_config(config.clone()); - let remote_limits = Self::remote_limits_from_config(config.clone()); - inner.local_record_store = Some(RecordStore::new( + let local_limits = Self::local_limits_from_config(self.unlocked_inner.config.clone()); + let remote_limits = Self::remote_limits_from_config(self.unlocked_inner.config.clone()); + + let mut local_record_store = RecordStore::new( self.unlocked_inner.table_store.clone(), "local", local_limits, - )); - inner.remote_record_store = Some(RecordStore::new( + ); + local_record_store.init().await?; + + let mut remote_record_store = RecordStore::new( self.unlocked_inner.table_store.clone(), "remote", remote_limits, - )); + ); + remote_record_store.init().await?; + + inner.local_record_store = Some(local_record_store); + inner.remote_record_store = Some(remote_record_store); Ok(()) } @@ -137,18 +144,13 @@ impl StorageManager { debug!("finished storage manager shutdown"); } - async fn new_local_record( - &self, - key: TypedKey, - record: ValueRecord, - ) -> Result<(), VeilidAPIError> { + async fn new_local_record(&self, key: TypedKey, record: Record) -> Result<(), VeilidAPIError> { // add value record to record store let mut inner = self.inner.lock(); let Some(local_record_store) = inner.local_record_store.as_mut() else { apibail_generic!("not initialized"); - }; - local_record_store.new_record(key, record) + local_record_store.new_record(key, record).await } pub async fn create_record( @@ -169,7 +171,7 @@ impl StorageManager { // Add new local value record let cur_ts = get_aligned_timestamp(); - let record = ValueRecord::new(cur_ts, Some(secret), schema, safety_selection); + let record = Record::new(cur_ts, Some(secret), schema, safety_selection); self.new_local_record(key, record) .await .map_err(VeilidAPIError::internal)?; diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index 49f469eb..8c41b0f4 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -14,11 +14,15 @@ pub struct RecordStore { record_table: Option, subkey_table: Option, - record_index: LruCache, - subkey_cache: LruCache, + record_index: LruCache, + subkey_cache: LruCache, + subkey_cache_total_size: usize, + total_storage_space: usize, - dead_records: Vec<(RecordTableKey, ValueRecord)>, - changed_records: HashSet<(RecordTableKey, Timestamp)>, + dead_records: Vec<(RecordTableKey, Record)>, + changed_records: HashSet, + + purge_dead_records_mutex: AsyncMutex<()>, } impl RecordStore { @@ -32,8 +36,11 @@ impl RecordStore { subkey_table: None, record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)), subkey_cache: LruCache::new(subkey_cache_size), + subkey_cache_total_size: 0, + total_storage_space: 0, dead_records: Vec::new(), changed_records: HashSet::new(), + purge_dead_records_mutex: AsyncMutex::new(()), } } @@ -49,10 +56,10 @@ impl RecordStore { // Pull record index from table into a vector to ensure we sort them let record_table_keys = record_table.get_keys(0)?; - let mut record_index_saved: Vec<(RecordTableKey, ValueRecord)> = + let mut record_index_saved: Vec<(RecordTableKey, Record)> = Vec::with_capacity(record_table_keys.len()); for rtk in record_table_keys { - if let Some(vr) = record_table.load_rkyv::(0, &rtk)? { + if let Some(vr) = record_table.load_rkyv::(0, &rtk)? { let rik = RecordTableKey::try_from(rtk.as_ref())?; record_index_saved.push((rik, vr)); } @@ -62,10 +69,22 @@ impl RecordStore { record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched())); let mut dead_records = Vec::new(); for ri in record_index_saved { - self.record_index.insert(ri.0, ri.1, |k, v| { + // total the storage space + self.total_storage_space += mem::size_of::(); + self.total_storage_space += ri.1.total_size(); + + // add to index and ensure we deduplicate in the case of an error + if let Some(v) = self.record_index.insert(ri.0, ri.1, |k, v| { // If the configuration change, we only want to keep the 'limits.max_records' records dead_records.push((k, v)); - }); + }) { + // This shouldn't happen, but deduplicate anyway + log_stor!(warn "duplicate record in table: {}", ri.0); + dead_records.push((ri.0, v)); + } + } + for (k, v) in dead_records { + self.add_dead_record(k, v); } self.record_table = Some(record_table); @@ -73,16 +92,59 @@ impl RecordStore { Ok(()) } - fn add_dead_record(&mut self, key: RecordTableKey, record: ValueRecord) { + fn add_dead_record(&mut self, key: RecordTableKey, record: Record) { self.dead_records.push((key, record)); } fn mark_record_changed(&mut self, key: RecordTableKey) { - let cur_ts = get_aligned_timestamp(); - self.changed_records.insert((key, cur_ts)); + self.changed_records.insert(key); } - async fn purge_dead_records(&mut self) { + fn add_to_subkey_cache(&mut self, key: SubkeyTableKey, record_data: RecordData) { + // Write to subkey cache + let mut dead_size = 0usize; + if let Some(old_record_data) = self.subkey_cache.insert(key, record_data, |_, v| { + // LRU out + dead_size += v.total_size(); + }) { + // Old data + dead_size += old_record_data.total_size(); + } + self.subkey_cache_total_size -= dead_size; + self.subkey_cache_total_size += record_data.total_size(); + + // Purge over size limit + if let Some(max_subkey_cache_memory_mb) = self.limits.max_subkey_cache_memory_mb { + while self.subkey_cache_total_size > (max_subkey_cache_memory_mb * 1_048_576usize) { + if let Some((_, v)) = self.subkey_cache.remove_lru() { + self.subkey_cache_total_size -= v.total_size(); + } else { + break; + } + } + } + } + + fn remove_from_subkey_cache(&mut self, key: SubkeyTableKey) { + if let Some(dead_record_data) = self.subkey_cache.remove(&key) { + self.subkey_cache_total_size -= dead_record_data.total_size(); + } + } + + async fn purge_dead_records(&mut self, lazy: bool) { + let lock = if lazy { + match self.purge_dead_records_mutex.try_lock().await { + Ok(v) => v, + Err(_) => { + // If not ready now, just skip it if we're lazy + return; + } + } + } else { + // Not lazy, must wait + self.purge_dead_records_mutex.lock().await; + }; + // Delete dead keys if self.dead_records.is_empty() { return; @@ -102,15 +164,19 @@ impl RecordStore { let subkey_count = v.subkey_count() as u32; for sk in 0..subkey_count { // From table - let sck = SubkeyTableKey { + let stk = SubkeyTableKey { key: k.key, subkey: sk, }; - st_xact.delete(0, &sck.bytes()); + st_xact.delete(0, &stk.bytes()); // From cache - self.subkey_cache.remove(&sck); + self.remove_from_subkey_cache(stk); } + + // Remove from total size + self.total_storage_space -= mem::size_of::(); + self.total_storage_space -= v.total_size(); } if let Err(e) = rt_xact.commit().await { log_stor!(error "failed to commit record table transaction: {}", e); @@ -120,9 +186,9 @@ impl RecordStore { } } - async fn flush_records(&mut self) { + async fn flush_changed_records(&mut self) { // touch records - if self.changed_records.empty() { + if self.changed_records.is_empty() { return; } @@ -130,12 +196,13 @@ impl RecordStore { let subkey_table = self.subkey_table.clone().unwrap(); let rt_xact = record_table.transact(); - let mut changed_records = mem::take(&mut self.changed_records); - for (rik, ts) in changed_records { - // Flush changed records - if let Some(r) = self.record_index.peek(&rik) { - record_table.store_rkyv(0, &rtk)?; - xxx + let changed_records = mem::take(&mut self.changed_records); + for rtk in changed_records { + // Get the changed record and save it to the table + if let Some(r) = self.record_index.peek(&rtk) { + if let Err(e) = rt_xact.store_rkyv(0, &rtk.bytes(), r) { + log_stor!(error "failed to save record: {}", e); + } } } if let Err(e) = rt_xact.commit().await { @@ -144,14 +211,18 @@ impl RecordStore { } pub async fn tick(&mut self, last_ts: Timestamp, cur_ts: Timestamp) { - self.flush_records().await; - self.purge_dead_records().await; + self.flush_changed_records().await; + self.purge_dead_records(true).await; } - pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> Result<(), VeilidAPIError> { - let rik = RecordTableKey { key }; - if self.record_index.contains_key(&rik) { - apibail_generic!("record already exists"); + pub async fn new_record( + &mut self, + key: TypedKey, + record: Record, + ) -> Result<(), VeilidAPIError> { + let rtk = RecordTableKey { key }; + if self.record_index.contains_key(&rtk) { + apibail_internal!("record already exists"); } // Get record table @@ -159,54 +230,78 @@ impl RecordStore { apibail_internal!("record store not initialized"); }; + // If over size limit, dont create record + let new_total_storage_space = + self.total_storage_space + mem::size_of::() + record.total_size(); + if let Some(max_storage_space_mb) = &self.limits.max_storage_space_mb { + if new_total_storage_space > (max_storage_space_mb * 1_048_576usize) { + apibail_try_again!(); + } + } + // Save to record table record_table - .store_rkyv(0, &rik, &r) + .store_rkyv(0, &rtk.bytes(), &record) .await .map_err(VeilidAPIError::internal)?; - // Cache it - self.record_cache.insert(key, value, |k, v| { - self.add_dead_record(k, v); - }); + // Save to record index + let mut dead_records = Vec::new(); + if let Some(v) = self.record_index.insert(rtk, record, |k, v| { + dead_records.push((k, v)); + }) { + // Shouldn't happen but log it + log_stor!(warn "new duplicate record in table: {}", rtk); + self.add_dead_record(rtk, v); + } + for dr in dead_records { + self.add_dead_record(dr.0, dr.1); + } + + // Update storage space + self.total_storage_space = new_total_storage_space; Ok(()) } pub fn with_record(&mut self, key: TypedKey, f: F) -> Option where - F: FnOnce(&ValueRecord) -> R, + F: FnOnce(&Record) -> R, { // Get record from index - let rck = RecordTableKey { key }; - if let Some(r) = self.record_index.get_mut(&rck) { + let rtk = RecordTableKey { key }; + if let Some(record) = self.record_index.get_mut(&rtk) { // Touch - r.touch(get_aligned_timestamp()); - self.mark_record_changed(&rck); + record.touch(get_aligned_timestamp()); + self.mark_record_changed(rtk); // Callback - return Some(f(key, r)); + return Some(f(record)); } None } - pub fn get_subkey( + pub async fn get_subkey( &mut self, key: TypedKey, subkey: ValueSubkey, - ) -> Result, VeilidAPIError> { + ) -> Result, VeilidAPIError> { // record from index - let rck = RecordTableKey { key }; - let Some(r) = self.record_index.get_mut(&rck) else { - apibail_invalid_argument!("no record at this key", "key", key); - }; + let rtk = RecordTableKey { key }; + let subkey_count = { + let Some(record) = self.record_index.get_mut(&rtk) else { + apibail_invalid_argument!("no record at this key", "key", key); + }; - // Touch - r.touch(get_aligned_timestamp()); - self.mark_record_changed(&rck); + // Touch + record.touch(get_aligned_timestamp()); + + record.subkey_count() + }; + self.mark_record_changed(rtk); // Check if the subkey is in range - if subkey >= r.subkey_count() { + if subkey as usize >= subkey_count { apibail_invalid_argument!("subkey out of range", "subkey", subkey); } @@ -216,22 +311,21 @@ impl RecordStore { }; // If subkey exists in subkey cache, use that - let skck = SubkeyTableKey { key, subkey }; - if let Some(rd) = self.subkey_cache.get_mut(&skck) { - let out = rd.clone(); + let stk = SubkeyTableKey { key, subkey }; + if let Some(record_data) = self.subkey_cache.get_mut(&stk) { + let out = record_data.clone(); return Ok(Some(out)); } // If not in cache, try to pull from table store - let k = skck.bytes(); - if let Some(rd) = subkey_table - .load_rkyv::(0, &k) + if let Some(record_data) = subkey_table + .load_rkyv::(0, &stk.bytes()) .map_err(VeilidAPIError::internal)? { - let out = rd.clone(); + let out = record_data.clone(); // Add to cache, do nothing with lru out - self.subkey_cache.insert(skck, rd, |_| {}); + self.add_to_subkey_cache(stk, record_data); return Ok(Some(out)); }; @@ -239,29 +333,33 @@ impl RecordStore { return Ok(None); } - pub fn set_subkey( + pub async fn set_subkey( &mut self, key: TypedKey, subkey: ValueSubkey, - data: ValueRecordData, + record_data: RecordData, ) -> Result<(), VeilidAPIError> { // Check size limit for data - if data.data.len() > self.limits.max_subkey_size { + if record_data.value_data.data().len() > self.limits.max_subkey_size { return Err(VeilidAPIError::generic("record subkey too large")); } // Get record from index - let rck = RecordTableKey { key }; - let Some(r) = self.record_index.get_mut(&rck) else { - apibail_invalid_argument!("no record at this key", "key", key); - }; + let rtk = RecordTableKey { key }; + let (subkey_count, total_size) = { + let Some(record) = self.record_index.get_mut(&rtk) else { + apibail_invalid_argument!("no record at this key", "key", key); + }; - // Touch - r.touch(get_aligned_timestamp()); - self.mark_record_changed(&rck); + // Touch + record.touch(get_aligned_timestamp()); + + (record.subkey_count(), record.total_size()) + }; + self.mark_record_changed(rtk); // Check if the subkey is in range - if subkey >= r.subkey_count() { + if subkey as usize >= subkey_count { apibail_invalid_argument!("subkey out of range", "subkey", subkey); } @@ -271,40 +369,71 @@ impl RecordStore { }; // Get the previous subkey and ensure we aren't going over the record size limit - let mut prior_subkey_size = 0usize; + let mut prior_record_data_size = 0usize; // If subkey exists in subkey cache, use that - let skck = SubkeyTableKey { key, subkey }; - if let Some(rd) = self.subkey_cache.peek(&skck) { - prior_subkey_size = rd.data.data().len(); + let stk = SubkeyTableKey { key, subkey }; + let stk_bytes = stk.bytes(); + + if let Some(record_data) = self.subkey_cache.peek(&stk) { + prior_record_data_size = record_data.total_size(); } else { // If not in cache, try to pull from table store - let k = skck.bytes(); - if let Some(rd) = subkey_table - .load_rkyv::(0, &k) + if let Some(record_data) = subkey_table + .load_rkyv::(0, &stk_bytes) .map_err(VeilidAPIError::internal)? { - prior_subkey_size = rd.data.data().len(); + prior_record_data_size = record_data.total_size(); } } - // Check new data size - let new_data_size = r.data_size() + data.data().len() - priod_subkey_size; - if new_data_size > self.limits.max_record_data_size { - return Err(VeilidAPIError::generic("dht record too large")); + // Check new total record size + let new_record_data_size = record_data.total_size(); + let new_total_size = total_size + new_record_data_size - prior_record_data_size; + if new_total_size > self.limits.max_record_total_size { + apibail_generic!("dht record too large"); + } + + // Check new total storage space + let new_total_storage_space = + self.total_storage_space + new_record_data_size - prior_record_data_size; + if let Some(max_storage_space_mb) = self.limits.max_storage_space_mb { + if new_total_storage_space > (max_storage_space_mb * 1_048_576usize) { + apibail_try_again!(); + } } // Write subkey - let k = skck.bytes(); - subkey_table.store_rkyv(0, &k, &data)?; + subkey_table + .store_rkyv(0, &stk_bytes, &record_data) + .await + .map_err(VeilidAPIError::internal)?; // Write to subkey cache - let skck = SubkeyTableKey { key, subkey }; - self.subkey_cache.insert(skck, data, |_, _| {}); + self.add_to_subkey_cache(stk, record_data); // Update record - r.set_data_size(new_data_size); + let Some(record) = self.record_index.get_mut(&rtk) else { + apibail_invalid_argument!("no record at this key", "key", key); + }; + record.set_record_data_size(new_record_data_size); Ok(()) } + + /// LRU out some records until we reclaim the amount of space requested + /// This will force a garbage collection of the space immediately + /// If zero is passed in here, a garbage collection will be performed of dead records + /// without removing any live records + pub async fn reclaim_space(&mut self, space: usize) { + let mut reclaimed = 0usize; + while reclaimed < space { + if let Some((k, v)) = self.record_index.remove_lru() { + reclaimed += mem::size_of::(); + reclaimed += v.total_size(); + self.add_dead_record(k, v); + } + } + self.purge_dead_records(false).await; + } } diff --git a/veilid-core/src/storage_manager/record_store_limits.rs b/veilid-core/src/storage_manager/record_store_limits.rs index cabc00ea..5dfb25d4 100644 --- a/veilid-core/src/storage_manager/record_store_limits.rs +++ b/veilid-core/src/storage_manager/record_store_limits.rs @@ -3,14 +3,14 @@ pub struct RecordStoreLimits { /// Number of subkeys to keep in the memory cache pub subkey_cache_size: usize, - /// Maximum size of a subkey + /// Maximum size of an individual subkey pub max_subkey_size: usize, - /// Maximum total record data size: - pub max_record_data_size: usize, + /// Maximum total record data size per record + pub max_record_total_size: usize, /// Limit on the total number of records in the table store pub max_records: Option, /// Limit on the amount of subkey cache memory to use before evicting cache items pub max_subkey_cache_memory_mb: Option, - /// Limit on the amount of disk space to use for subkey data - pub max_disk_space_mb: Option, + /// Limit on the amount of storage space to use for subkey data and record data + pub max_storage_space_mb: Option, } diff --git a/veilid-core/src/storage_manager/value_record.rs b/veilid-core/src/storage_manager/value_record.rs index ac5c0c56..0ffd44d3 100644 --- a/veilid-core/src/storage_manager/value_record.rs +++ b/veilid-core/src/storage_manager/value_record.rs @@ -15,11 +15,17 @@ use serde::*; RkyvDeserialize, )] #[archive_attr(repr(C), derive(CheckBytes))] -pub struct ValueRecordData { - pub data: ValueData, +pub struct RecordData { + pub value_data: ValueData, pub signature: Signature, } +impl RecordData { + pub fn total_size(&self) -> usize { + mem::size_of::() + self.value_data.data().len() + } +} + #[derive( Clone, Debug, @@ -33,15 +39,15 @@ pub struct ValueRecordData { RkyvDeserialize, )] #[archive_attr(repr(C), derive(CheckBytes))] -pub struct ValueRecord { +pub struct Record { last_touched_ts: Timestamp, secret: Option, schema: DHTSchema, safety_selection: SafetySelection, - data_size: usize, + record_data_size: usize, } -impl ValueRecord { +impl Record { pub fn new( cur_ts: Timestamp, secret: Option, @@ -53,7 +59,7 @@ impl ValueRecord { secret, schema, safety_selection, - data_size: 0, + record_data_size: 0, } } @@ -69,11 +75,15 @@ impl ValueRecord { self.last_touched_ts } - pub fn set_data_size(&mut self, size: usize) { - self.data_size = size; + pub fn set_record_data_size(&mut self, size: usize) { + self.record_data_size = size; } - pub fn data_size(&self) -> usize { - self.data_size + pub fn record_data_size(&self) -> usize { + self.record_data_size + } + + pub fn total_size(&self) -> usize { + mem::size_of::() + self.schema.data_size() + self.record_data_size } } diff --git a/veilid-core/src/veilid_api/types.rs b/veilid-core/src/veilid_api/types.rs index acdf0b15..6b204941 100644 --- a/veilid-core/src/veilid_api/types.rs +++ b/veilid-core/src/veilid_api/types.rs @@ -2479,6 +2479,10 @@ impl DHTSchemaDFLT { pub fn subkey_count(&self) -> usize { self.o_cnt as usize } + /// Get the data size of this schema beyond the size of the structure itself + pub fn data_size(&self) -> usize { + 0 + } } impl TryFrom<&[u8]> for DHTSchemaDFLT { @@ -2550,6 +2554,11 @@ impl DHTSchemaSMPL { .iter() .fold(self.o_cnt as usize, |acc, x| acc + (x.m_cnt as usize)) } + + /// Get the data size of this schema beyond the size of the structure itself + pub fn data_size(&self) -> usize { + self.members.len() * mem::size_of:: + } } impl TryFrom<&[u8]> for DHTSchemaSMPL { @@ -2619,6 +2628,14 @@ impl DHTSchema { DHTSchema::SMPL(s) => s.subkey_count(), } } + + /// Get the data size of this schema beyond the size of the structure itself + pub fn data_size(&self) -> usize { + match self { + DHTSchema::DFLT(d) => d.data_size(), + DHTSchema::SMPL(s) => s.data_size(), + } + } } impl Default for DHTSchema { diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index ce5f07c5..32d205c5 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -290,6 +290,29 @@ pub struct VeilidConfigDHT { pub min_peer_count: u32, pub min_peer_refresh_time_ms: u32, pub validate_dial_info_receipt_time_ms: u32, + + pub local_subkey_cache_size: fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { + RecordStoreLimits { + subkey_cache_size: todo!(), + max_records: None, + max_subkey_cache_memory_mb: Some(xxx), + max_storage_space_mb: None, + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_total_size: MAX_RECORD_DATA_SIZE, + } + } + + fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { + RecordStoreLimits { + subkey_cache_size: todo!(), + max_records: Some(xxx), + max_subkey_cache_memory_mb: Some(xxx), + max_storage_space_mb: Some(xxx), + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_total_size: MAX_RECORD_DATA_SIZE, + } + } + } /// Configure RPC