record store

This commit is contained in:
John Smith 2023-04-09 13:29:20 -04:00
parent 777efaff24
commit 3b687aed50
6 changed files with 302 additions and 121 deletions

View File

@ -67,22 +67,22 @@ impl StorageManager {
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
RecordStoreLimits {
subkey_cache_size: todo!(),
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
max_records: None,
max_subkey_cache_memory_mb: Some(xxx),
max_disk_space_mb: None,
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_data_size: MAX_RECORD_DATA_SIZE,
max_storage_space_mb: None,
}
}
fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
RecordStoreLimits {
subkey_cache_size: todo!(),
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
max_records: Some(xxx),
max_subkey_cache_memory_mb: Some(xxx),
max_disk_space_mb: Some(xxx),
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_data_size: MAX_RECORD_DATA_SIZE,
max_storage_space_mb: Some(xxx),
}
}
@ -112,18 +112,25 @@ impl StorageManager {
debug!("startup storage manager");
let mut inner = self.inner.lock();
let local_limits = Self::local_limits_from_config(config.clone());
let remote_limits = Self::remote_limits_from_config(config.clone());
inner.local_record_store = Some(RecordStore::new(
let local_limits = Self::local_limits_from_config(self.unlocked_inner.config.clone());
let remote_limits = Self::remote_limits_from_config(self.unlocked_inner.config.clone());
let mut local_record_store = RecordStore::new(
self.unlocked_inner.table_store.clone(),
"local",
local_limits,
));
inner.remote_record_store = Some(RecordStore::new(
);
local_record_store.init().await?;
let mut remote_record_store = RecordStore::new(
self.unlocked_inner.table_store.clone(),
"remote",
remote_limits,
));
);
remote_record_store.init().await?;
inner.local_record_store = Some(local_record_store);
inner.remote_record_store = Some(remote_record_store);
Ok(())
}
@ -137,18 +144,13 @@ impl StorageManager {
debug!("finished storage manager shutdown");
}
async fn new_local_record(
&self,
key: TypedKey,
record: ValueRecord,
) -> Result<(), VeilidAPIError> {
async fn new_local_record(&self, key: TypedKey, record: Record) -> Result<(), VeilidAPIError> {
// add value record to record store
let mut inner = self.inner.lock();
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_generic!("not initialized");
};
local_record_store.new_record(key, record)
local_record_store.new_record(key, record).await
}
pub async fn create_record(
@ -169,7 +171,7 @@ impl StorageManager {
// Add new local value record
let cur_ts = get_aligned_timestamp();
let record = ValueRecord::new(cur_ts, Some(secret), schema, safety_selection);
let record = Record::new(cur_ts, Some(secret), schema, safety_selection);
self.new_local_record(key, record)
.await
.map_err(VeilidAPIError::internal)?;

View File

@ -14,11 +14,15 @@ pub struct RecordStore {
record_table: Option<TableDB>,
subkey_table: Option<TableDB>,
record_index: LruCache<RecordTableKey, ValueRecord>,
subkey_cache: LruCache<SubkeyTableKey, ValueRecordData>,
record_index: LruCache<RecordTableKey, Record>,
subkey_cache: LruCache<SubkeyTableKey, RecordData>,
subkey_cache_total_size: usize,
total_storage_space: usize,
dead_records: Vec<(RecordTableKey, ValueRecord)>,
changed_records: HashSet<(RecordTableKey, Timestamp)>,
dead_records: Vec<(RecordTableKey, Record)>,
changed_records: HashSet<RecordTableKey>,
purge_dead_records_mutex: AsyncMutex<()>,
}
impl RecordStore {
@ -32,8 +36,11 @@ impl RecordStore {
subkey_table: None,
record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)),
subkey_cache: LruCache::new(subkey_cache_size),
subkey_cache_total_size: 0,
total_storage_space: 0,
dead_records: Vec::new(),
changed_records: HashSet::new(),
purge_dead_records_mutex: AsyncMutex::new(()),
}
}
@ -49,10 +56,10 @@ impl RecordStore {
// Pull record index from table into a vector to ensure we sort them
let record_table_keys = record_table.get_keys(0)?;
let mut record_index_saved: Vec<(RecordTableKey, ValueRecord)> =
let mut record_index_saved: Vec<(RecordTableKey, Record)> =
Vec::with_capacity(record_table_keys.len());
for rtk in record_table_keys {
if let Some(vr) = record_table.load_rkyv::<ValueRecord>(0, &rtk)? {
if let Some(vr) = record_table.load_rkyv::<Record>(0, &rtk)? {
let rik = RecordTableKey::try_from(rtk.as_ref())?;
record_index_saved.push((rik, vr));
}
@ -62,10 +69,22 @@ impl RecordStore {
record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched()));
let mut dead_records = Vec::new();
for ri in record_index_saved {
self.record_index.insert(ri.0, ri.1, |k, v| {
// total the storage space
self.total_storage_space += mem::size_of::<RecordTableKey>();
self.total_storage_space += ri.1.total_size();
// add to index and ensure we deduplicate in the case of an error
if let Some(v) = self.record_index.insert(ri.0, ri.1, |k, v| {
// If the configuration change, we only want to keep the 'limits.max_records' records
dead_records.push((k, v));
});
}) {
// This shouldn't happen, but deduplicate anyway
log_stor!(warn "duplicate record in table: {}", ri.0);
dead_records.push((ri.0, v));
}
}
for (k, v) in dead_records {
self.add_dead_record(k, v);
}
self.record_table = Some(record_table);
@ -73,16 +92,59 @@ impl RecordStore {
Ok(())
}
fn add_dead_record(&mut self, key: RecordTableKey, record: ValueRecord) {
fn add_dead_record(&mut self, key: RecordTableKey, record: Record) {
self.dead_records.push((key, record));
}
fn mark_record_changed(&mut self, key: RecordTableKey) {
let cur_ts = get_aligned_timestamp();
self.changed_records.insert((key, cur_ts));
self.changed_records.insert(key);
}
async fn purge_dead_records(&mut self) {
fn add_to_subkey_cache(&mut self, key: SubkeyTableKey, record_data: RecordData) {
// Write to subkey cache
let mut dead_size = 0usize;
if let Some(old_record_data) = self.subkey_cache.insert(key, record_data, |_, v| {
// LRU out
dead_size += v.total_size();
}) {
// Old data
dead_size += old_record_data.total_size();
}
self.subkey_cache_total_size -= dead_size;
self.subkey_cache_total_size += record_data.total_size();
// Purge over size limit
if let Some(max_subkey_cache_memory_mb) = self.limits.max_subkey_cache_memory_mb {
while self.subkey_cache_total_size > (max_subkey_cache_memory_mb * 1_048_576usize) {
if let Some((_, v)) = self.subkey_cache.remove_lru() {
self.subkey_cache_total_size -= v.total_size();
} else {
break;
}
}
}
}
fn remove_from_subkey_cache(&mut self, key: SubkeyTableKey) {
if let Some(dead_record_data) = self.subkey_cache.remove(&key) {
self.subkey_cache_total_size -= dead_record_data.total_size();
}
}
async fn purge_dead_records(&mut self, lazy: bool) {
let lock = if lazy {
match self.purge_dead_records_mutex.try_lock().await {
Ok(v) => v,
Err(_) => {
// If not ready now, just skip it if we're lazy
return;
}
}
} else {
// Not lazy, must wait
self.purge_dead_records_mutex.lock().await;
};
// Delete dead keys
if self.dead_records.is_empty() {
return;
@ -102,15 +164,19 @@ impl RecordStore {
let subkey_count = v.subkey_count() as u32;
for sk in 0..subkey_count {
// From table
let sck = SubkeyTableKey {
let stk = SubkeyTableKey {
key: k.key,
subkey: sk,
};
st_xact.delete(0, &sck.bytes());
st_xact.delete(0, &stk.bytes());
// From cache
self.subkey_cache.remove(&sck);
self.remove_from_subkey_cache(stk);
}
// Remove from total size
self.total_storage_space -= mem::size_of::<RecordTableKey>();
self.total_storage_space -= v.total_size();
}
if let Err(e) = rt_xact.commit().await {
log_stor!(error "failed to commit record table transaction: {}", e);
@ -120,9 +186,9 @@ impl RecordStore {
}
}
async fn flush_records(&mut self) {
async fn flush_changed_records(&mut self) {
// touch records
if self.changed_records.empty() {
if self.changed_records.is_empty() {
return;
}
@ -130,12 +196,13 @@ impl RecordStore {
let subkey_table = self.subkey_table.clone().unwrap();
let rt_xact = record_table.transact();
let mut changed_records = mem::take(&mut self.changed_records);
for (rik, ts) in changed_records {
// Flush changed records
if let Some(r) = self.record_index.peek(&rik) {
record_table.store_rkyv(0, &rtk)?;
xxx
let changed_records = mem::take(&mut self.changed_records);
for rtk in changed_records {
// Get the changed record and save it to the table
if let Some(r) = self.record_index.peek(&rtk) {
if let Err(e) = rt_xact.store_rkyv(0, &rtk.bytes(), r) {
log_stor!(error "failed to save record: {}", e);
}
}
}
if let Err(e) = rt_xact.commit().await {
@ -144,14 +211,18 @@ impl RecordStore {
}
pub async fn tick(&mut self, last_ts: Timestamp, cur_ts: Timestamp) {
self.flush_records().await;
self.purge_dead_records().await;
self.flush_changed_records().await;
self.purge_dead_records(true).await;
}
pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> Result<(), VeilidAPIError> {
let rik = RecordTableKey { key };
if self.record_index.contains_key(&rik) {
apibail_generic!("record already exists");
pub async fn new_record(
&mut self,
key: TypedKey,
record: Record,
) -> Result<(), VeilidAPIError> {
let rtk = RecordTableKey { key };
if self.record_index.contains_key(&rtk) {
apibail_internal!("record already exists");
}
// Get record table
@ -159,54 +230,78 @@ impl RecordStore {
apibail_internal!("record store not initialized");
};
// If over size limit, dont create record
let new_total_storage_space =
self.total_storage_space + mem::size_of::<RecordTableKey>() + record.total_size();
if let Some(max_storage_space_mb) = &self.limits.max_storage_space_mb {
if new_total_storage_space > (max_storage_space_mb * 1_048_576usize) {
apibail_try_again!();
}
}
// Save to record table
record_table
.store_rkyv(0, &rik, &r)
.store_rkyv(0, &rtk.bytes(), &record)
.await
.map_err(VeilidAPIError::internal)?;
// Cache it
self.record_cache.insert(key, value, |k, v| {
self.add_dead_record(k, v);
});
// Save to record index
let mut dead_records = Vec::new();
if let Some(v) = self.record_index.insert(rtk, record, |k, v| {
dead_records.push((k, v));
}) {
// Shouldn't happen but log it
log_stor!(warn "new duplicate record in table: {}", rtk);
self.add_dead_record(rtk, v);
}
for dr in dead_records {
self.add_dead_record(dr.0, dr.1);
}
// Update storage space
self.total_storage_space = new_total_storage_space;
Ok(())
}
pub fn with_record<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&ValueRecord) -> R,
F: FnOnce(&Record) -> R,
{
// Get record from index
let rck = RecordTableKey { key };
if let Some(r) = self.record_index.get_mut(&rck) {
let rtk = RecordTableKey { key };
if let Some(record) = self.record_index.get_mut(&rtk) {
// Touch
r.touch(get_aligned_timestamp());
self.mark_record_changed(&rck);
record.touch(get_aligned_timestamp());
self.mark_record_changed(rtk);
// Callback
return Some(f(key, r));
return Some(f(record));
}
None
}
pub fn get_subkey<R, F>(
pub async fn get_subkey<R, F>(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
) -> Result<Option<ValueRecordData>, VeilidAPIError> {
) -> Result<Option<RecordData>, VeilidAPIError> {
// record from index
let rck = RecordTableKey { key };
let Some(r) = self.record_index.get_mut(&rck) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
let rtk = RecordTableKey { key };
let subkey_count = {
let Some(record) = self.record_index.get_mut(&rtk) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
// Touch
r.touch(get_aligned_timestamp());
self.mark_record_changed(&rck);
// Touch
record.touch(get_aligned_timestamp());
record.subkey_count()
};
self.mark_record_changed(rtk);
// Check if the subkey is in range
if subkey >= r.subkey_count() {
if subkey as usize >= subkey_count {
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
@ -216,22 +311,21 @@ impl RecordStore {
};
// If subkey exists in subkey cache, use that
let skck = SubkeyTableKey { key, subkey };
if let Some(rd) = self.subkey_cache.get_mut(&skck) {
let out = rd.clone();
let stk = SubkeyTableKey { key, subkey };
if let Some(record_data) = self.subkey_cache.get_mut(&stk) {
let out = record_data.clone();
return Ok(Some(out));
}
// If not in cache, try to pull from table store
let k = skck.bytes();
if let Some(rd) = subkey_table
.load_rkyv::<ValueRecordData>(0, &k)
if let Some(record_data) = subkey_table
.load_rkyv::<RecordData>(0, &stk.bytes())
.map_err(VeilidAPIError::internal)?
{
let out = rd.clone();
let out = record_data.clone();
// Add to cache, do nothing with lru out
self.subkey_cache.insert(skck, rd, |_| {});
self.add_to_subkey_cache(stk, record_data);
return Ok(Some(out));
};
@ -239,29 +333,33 @@ impl RecordStore {
return Ok(None);
}
pub fn set_subkey<R, F>(
pub async fn set_subkey<R, F>(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
data: ValueRecordData,
record_data: RecordData,
) -> Result<(), VeilidAPIError> {
// Check size limit for data
if data.data.len() > self.limits.max_subkey_size {
if record_data.value_data.data().len() > self.limits.max_subkey_size {
return Err(VeilidAPIError::generic("record subkey too large"));
}
// Get record from index
let rck = RecordTableKey { key };
let Some(r) = self.record_index.get_mut(&rck) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
let rtk = RecordTableKey { key };
let (subkey_count, total_size) = {
let Some(record) = self.record_index.get_mut(&rtk) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
// Touch
r.touch(get_aligned_timestamp());
self.mark_record_changed(&rck);
// Touch
record.touch(get_aligned_timestamp());
(record.subkey_count(), record.total_size())
};
self.mark_record_changed(rtk);
// Check if the subkey is in range
if subkey >= r.subkey_count() {
if subkey as usize >= subkey_count {
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
@ -271,40 +369,71 @@ impl RecordStore {
};
// Get the previous subkey and ensure we aren't going over the record size limit
let mut prior_subkey_size = 0usize;
let mut prior_record_data_size = 0usize;
// If subkey exists in subkey cache, use that
let skck = SubkeyTableKey { key, subkey };
if let Some(rd) = self.subkey_cache.peek(&skck) {
prior_subkey_size = rd.data.data().len();
let stk = SubkeyTableKey { key, subkey };
let stk_bytes = stk.bytes();
if let Some(record_data) = self.subkey_cache.peek(&stk) {
prior_record_data_size = record_data.total_size();
} else {
// If not in cache, try to pull from table store
let k = skck.bytes();
if let Some(rd) = subkey_table
.load_rkyv::<ValueRecordData>(0, &k)
if let Some(record_data) = subkey_table
.load_rkyv::<RecordData>(0, &stk_bytes)
.map_err(VeilidAPIError::internal)?
{
prior_subkey_size = rd.data.data().len();
prior_record_data_size = record_data.total_size();
}
}
// Check new data size
let new_data_size = r.data_size() + data.data().len() - priod_subkey_size;
if new_data_size > self.limits.max_record_data_size {
return Err(VeilidAPIError::generic("dht record too large"));
// Check new total record size
let new_record_data_size = record_data.total_size();
let new_total_size = total_size + new_record_data_size - prior_record_data_size;
if new_total_size > self.limits.max_record_total_size {
apibail_generic!("dht record too large");
}
// Check new total storage space
let new_total_storage_space =
self.total_storage_space + new_record_data_size - prior_record_data_size;
if let Some(max_storage_space_mb) = self.limits.max_storage_space_mb {
if new_total_storage_space > (max_storage_space_mb * 1_048_576usize) {
apibail_try_again!();
}
}
// Write subkey
let k = skck.bytes();
subkey_table.store_rkyv(0, &k, &data)?;
subkey_table
.store_rkyv(0, &stk_bytes, &record_data)
.await
.map_err(VeilidAPIError::internal)?;
// Write to subkey cache
let skck = SubkeyTableKey { key, subkey };
self.subkey_cache.insert(skck, data, |_, _| {});
self.add_to_subkey_cache(stk, record_data);
// Update record
r.set_data_size(new_data_size);
let Some(record) = self.record_index.get_mut(&rtk) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
record.set_record_data_size(new_record_data_size);
Ok(())
}
/// LRU out some records until we reclaim the amount of space requested
/// This will force a garbage collection of the space immediately
/// If zero is passed in here, a garbage collection will be performed of dead records
/// without removing any live records
pub async fn reclaim_space(&mut self, space: usize) {
let mut reclaimed = 0usize;
while reclaimed < space {
if let Some((k, v)) = self.record_index.remove_lru() {
reclaimed += mem::size_of::<RecordTableKey>();
reclaimed += v.total_size();
self.add_dead_record(k, v);
}
}
self.purge_dead_records(false).await;
}
}

View File

@ -3,14 +3,14 @@
pub struct RecordStoreLimits {
/// Number of subkeys to keep in the memory cache
pub subkey_cache_size: usize,
/// Maximum size of a subkey
/// Maximum size of an individual subkey
pub max_subkey_size: usize,
/// Maximum total record data size:
pub max_record_data_size: usize,
/// Maximum total record data size per record
pub max_record_total_size: usize,
/// Limit on the total number of records in the table store
pub max_records: Option<usize>,
/// Limit on the amount of subkey cache memory to use before evicting cache items
pub max_subkey_cache_memory_mb: Option<usize>,
/// Limit on the amount of disk space to use for subkey data
pub max_disk_space_mb: Option<usize>,
/// Limit on the amount of storage space to use for subkey data and record data
pub max_storage_space_mb: Option<usize>,
}

View File

@ -15,11 +15,17 @@ use serde::*;
RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueRecordData {
pub data: ValueData,
pub struct RecordData {
pub value_data: ValueData,
pub signature: Signature,
}
impl RecordData {
pub fn total_size(&self) -> usize {
mem::size_of::<ValueData>() + self.value_data.data().len()
}
}
#[derive(
Clone,
Debug,
@ -33,15 +39,15 @@ pub struct ValueRecordData {
RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueRecord {
pub struct Record {
last_touched_ts: Timestamp,
secret: Option<SecretKey>,
schema: DHTSchema,
safety_selection: SafetySelection,
data_size: usize,
record_data_size: usize,
}
impl ValueRecord {
impl Record {
pub fn new(
cur_ts: Timestamp,
secret: Option<SecretKey>,
@ -53,7 +59,7 @@ impl ValueRecord {
secret,
schema,
safety_selection,
data_size: 0,
record_data_size: 0,
}
}
@ -69,11 +75,15 @@ impl ValueRecord {
self.last_touched_ts
}
pub fn set_data_size(&mut self, size: usize) {
self.data_size = size;
pub fn set_record_data_size(&mut self, size: usize) {
self.record_data_size = size;
}
pub fn data_size(&self) -> usize {
self.data_size
pub fn record_data_size(&self) -> usize {
self.record_data_size
}
pub fn total_size(&self) -> usize {
mem::size_of::<Record>() + self.schema.data_size() + self.record_data_size
}
}

View File

@ -2479,6 +2479,10 @@ impl DHTSchemaDFLT {
pub fn subkey_count(&self) -> usize {
self.o_cnt as usize
}
/// Get the data size of this schema beyond the size of the structure itself
pub fn data_size(&self) -> usize {
0
}
}
impl TryFrom<&[u8]> for DHTSchemaDFLT {
@ -2550,6 +2554,11 @@ impl DHTSchemaSMPL {
.iter()
.fold(self.o_cnt as usize, |acc, x| acc + (x.m_cnt as usize))
}
/// Get the data size of this schema beyond the size of the structure itself
pub fn data_size(&self) -> usize {
self.members.len() * mem::size_of::<DHTSchemaSMPLMember>
}
}
impl TryFrom<&[u8]> for DHTSchemaSMPL {
@ -2619,6 +2628,14 @@ impl DHTSchema {
DHTSchema::SMPL(s) => s.subkey_count(),
}
}
/// Get the data size of this schema beyond the size of the structure itself
pub fn data_size(&self) -> usize {
match self {
DHTSchema::DFLT(d) => d.data_size(),
DHTSchema::SMPL(s) => s.data_size(),
}
}
}
impl Default for DHTSchema {

View File

@ -290,6 +290,29 @@ pub struct VeilidConfigDHT {
pub min_peer_count: u32,
pub min_peer_refresh_time_ms: u32,
pub validate_dial_info_receipt_time_ms: u32,
pub local_subkey_cache_size: fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
RecordStoreLimits {
subkey_cache_size: todo!(),
max_records: None,
max_subkey_cache_memory_mb: Some(xxx),
max_storage_space_mb: None,
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
}
}
fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
RecordStoreLimits {
subkey_cache_size: todo!(),
max_records: Some(xxx),
max_subkey_cache_memory_mb: Some(xxx),
max_storage_space_mb: Some(xxx),
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
}
}
}
/// Configure RPC