checkpoint

This commit is contained in:
John Smith 2023-04-03 20:58:10 -04:00
parent 39ade462c2
commit 7eded89b11
6 changed files with 319 additions and 105 deletions

View File

@ -126,18 +126,6 @@ macro_rules! byte_array_type {
Self { bytes }
}
pub fn try_from_vec(v: Vec<u8>) -> Result<Self, VeilidAPIError> {
let vl = v.len();
Ok(Self {
bytes: v.try_into().map_err(|_| {
VeilidAPIError::generic(format!(
"Expected a Vec of length {} but it was {}",
$size, vl
))
})?,
})
}
pub fn bit(&self, index: usize) -> bool {
assert!(index < ($size * 8));
let bi = index / 8;
@ -250,6 +238,20 @@ macro_rules! byte_array_type {
Self::try_decode(value)
}
}
impl TryFrom(&[u8]) for $name {
type Error = VeilidAPIError;
pub fn try_from(v: &[u8]) -> Result<Self, Self::Error> {
let vl = v.len();
Ok(Self {
bytes: v.try_into().map_err(|_| {
VeilidAPIError::generic(format!(
"Expected a slice of length {} but it was {}",
$size, vl
))
})?,
})
}
}
};
}

View File

@ -40,6 +40,21 @@ impl TableStore {
if let Err(e) = self.delete("routing_table").await {
error!("failed to delete 'routing_table': {}", e);
}
if let Err(e) = self.delete("routing_table").await {
error!("failed to delete 'routing_table': {}", e);
}
if let Err(e) = self.delete("local_records").await {
error!("failed to delete 'local_records': {}", e);
}
if let Err(e) = self.delete("local_subkeys").await {
error!("failed to delete 'local_subkeys': {}", e);
}
if let Err(e) = self.delete("remote_records").await {
error!("failed to delete 'remote_records': {}", e);
}
if let Err(e) = self.delete("remote_subkeys").await {
error!("failed to delete 'remote_subkeys': {}", e);
}
}
pub(crate) async fn init(&self) -> EyreResult<()> {

View File

@ -8,8 +8,12 @@ use value_record::*;
use super::*;
use crate::rpc_processor::*;
/// Locked structure for storage manager
struct StorageManagerInner {
record_store: RecordStore,
/// Records that have been 'created' or 'opened' by this node
local_record_store: Option<RecordStore>,
/// Records that have been pushed to this node for distribution by other nodes
remote_record_store: Option<RecordStore>,
}
struct StorageManagerUnlockedInner {
@ -47,7 +51,28 @@ impl StorageManager {
}
fn new_inner() -> StorageManagerInner {
StorageManagerInner {
record_store: RecordStore::new(table_store),
local_record_store: None,
remote_record_store: None,
}
}
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
RecordStoreLimits {
record_cache_size: todo!(),
subkey_cache_size: todo!(),
max_records: None,
max_subkey_cache_memory_mb: Some(xxx),
max_disk_space_mb: None,
}
}
fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
RecordStoreLimits {
record_cache_size: todo!(),
subkey_cache_size: todo!(),
max_records: Some(xxx),
max_subkey_cache_memory_mb: Some(xxx),
max_disk_space_mb: Some(xxx)
}
}
@ -59,6 +84,7 @@ impl StorageManager {
block_store: BlockStore,
rpc_processor: RPCProcessor,
) -> StorageManager {
StorageManager {
unlocked_inner: Arc::new(Self::new_unlocked_inner(
config,
@ -68,7 +94,7 @@ impl StorageManager {
block_store,
rpc_processor,
)),
inner: Arc::new(Mutex::new(Self::new_inner())),
inner: Arc::new(Mutex::new(Self::new_inner()))
}
}
@ -76,7 +102,12 @@ impl StorageManager {
pub async fn init(&self) -> EyreResult<()> {
debug!("startup storage manager");
let mut inner = self.inner.lock();
// xxx
let local_limits = Self::local_limits_from_config(config.clone());
let remote_limits = Self::remote_limits_from_config(config.clone());
inner.local_record_store = Some(RecordStore::new(self.unlocked_inner.table_store.clone(), "local", local_limits));
inner.remote_record_store = Some(RecordStore::new(self.unlocked_inner.table_store.clone(), "remote", remote_limits));
Ok(())
}

View File

@ -4,14 +4,62 @@ use hashlink::LruCache;
pub type RecordIndex = u32;
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct RecordCacheKey {
record_idx: RecordIndex,
struct RecordIndexKey {
pub key: TypedKey,
}
impl RecordIndexKey {
pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4] {
let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4];
bytes[0..4] = self.key.kind.0;
bytes[4..PUBLIC_KEY_LENGTH + 4] = self.key.value.bytes;
bytes
}
}
impl TryFrom<&[u8]> for RecordIndexKey {
type Error = EyreReport;
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
if bytes.len() != PUBLIC_KEY_LENGTH + 4 {
bail!("invalid bytes length");
}
let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?;
let value =
PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?;
let key = TypedKey::new(kind, value);
Ok(RecordIndexKey { key })
}
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct SubkeyCacheKey {
record_idx: RecordIndex,
subkey: ValueSubkey,
pub key: TypedKey,
pub subkey: ValueSubkey,
}
impl SubkeyCacheKey {
pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4 + 4] {
let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4 + 4];
bytes[0..4] = self.key.kind.0;
bytes[4..PUBLIC_KEY_LENGTH + 4] = self.key.value.bytes;
bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4] = self.subkey.to_le_bytes();
bytes
}
}
impl TryFrom<&[u8]> for SubkeyCacheKey {
type Error = EyreReport;
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
if bytes.len() != PUBLIC_KEY_LENGTH + 4 {
bail!("invalid bytes length");
}
let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?;
let value =
PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?;
let subkey =
ValueSubkey::try_from(&bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4])
.wrap_err("invalid subkey")?;
let key = TypedKey::new(kind, value);
Ok(SubkeyCacheKey { key, subkey })
}
}
pub struct RecordStore {
@ -21,15 +69,12 @@ pub struct RecordStore {
record_table: Option<TableDB>,
subkey_table: Option<TableDB>,
record_index: HashMap<TypedKey, RecordIndex>,
free_record_index_list: Vec<RecordIndex>,
record_cache: LruCache<RecordIndex, ValueRecord>,
record_index: LruCache<RecordIndexKey, ValueRecord>,
subkey_cache: LruCache<SubkeyCacheKey, ValueRecordData>,
}
impl RecordStore {
pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self {
let record_cache_size = limits.record_cache_size as usize;
let subkey_cache_size = limits.subkey_cache_size as usize;
Self {
table_store,
@ -37,9 +82,7 @@ impl RecordStore {
limits,
record_table: None,
subkey_table: None,
record_index: HashMap::new(),
free_record_index_list: Vec::new(), // xxx can this be auto-recovered? should we ever compact the allocated indexes?
record_cache: LruCache::new(record_cache_size),
record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)),
subkey_cache: LruCache::new(subkey_cache_size),
}
}
@ -54,82 +97,55 @@ impl RecordStore {
.open(&&format!("{}_subkeys", self.name), 1)
.await?;
// xxx get record index and free record index list
// Pull record index from table into a vector to ensure we sort them
let record_table_keys = record_table.get_keys(0)?;
let mut record_index_saved: Vec<(RecordIndexKey, ValueRecord)> =
Vec::with_capacity(record_table_keys.len());
for rtk in record_table_keys {
if let Some(vr) = record_table.load_rkyv::<ValueRecord>(0, &rtk)? {
record_index_saved.push((rtk, vr));
}
}
// Sort the record index by last touched time and insert in sorted order
record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched()));
let mut dead_records = Vec::new();
for ri in record_index_saved {
let rik = RecordIndexKey::try_from(&ri.0)?;
self.record_index.insert(rik, ri.1, |k, v| {
// If the configuration change, we only want to keep the 'limits.max_records' records
dead_records.push((k, v));
})
}
// Delete dead keys
if !dead_records.empty() {
let rt_xact = record_table.transact();
let st_xact = subkey_table.transact();
for (k, v) in dead_records {
// Delete record
rt_xact.delete(0, &k.bytes());
// Delete subkeys
let subkey_count = v.subkey_count();
for sk in 0..subkey_count {
let sck = SubkeyCacheKey {
key: k.key,
subkey: sk,
};
st_xact.delete(0, &sck.bytes())?;
}
}
rt_xact.commit().await?;
st_xact.commit().await?;
}
self.record_table = Some(record_table);
self.subkey_table = Some(record_table);
Ok(())
}
fn key_bytes(key: TypedKey) -> [u8; PUBLIC_KEY_LENGTH + 4] {
let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4];
bytes[0..4] = key.kind.0;
bytes[4..PUBLIC_KEY_LENGTH + 4] = key.value.bytes;
bytes
}
pub fn with_record<R, F: FnOnce(TypedKey, &ValueRecord) -> R>(
&mut self,
key: TypedKey,
f: F,
) -> EyreResult<Option<R>> {
// Get record table
let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized");
};
// If record exists in cache, use that
if let Some(r) = self.record_cache.get(&key) {
// Callback
return Ok(Some(f(key, r)));
}
// If not in cache, try to pull from table store
let k = Self::key_bytes(key);
if let Some(r) = record_table.load_rkyv(0, &k)? {
// Callback
let out = f(key, &r);
// Add to cache, do nothing with lru out
self.record_cache.insert(key, r, |_| {});
return Ok(Some(out));
};
return Ok(None);
}
pub fn with_record_mut<R, F: FnOnce(TypedKey, &mut ValueRecord) -> R>(
&mut self,
key: TypedKey,
f: F,
) -> EyreResult<Option<R>> {
// Get record table
let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized");
};
// If record exists in cache, use that
if let Some(r) = self.record_cache.get_mut(&key) {
// Callback
return Ok(Some(f(key, r)));
}
// If not in cache, try to pull from table store
let k = Self::key_bytes(key);
if let Some(r) = record_table.load_rkyv(0, &k)? {
// Callback
let out = f(key, &mut r);
// Save changes back to record table
record_table.store_rkyv(0, &k, &r).await?;
// Add to cache, do nothing with lru out
self.record_cache.insert(key, r, |_| {});
return Ok(Some(out));
};
Ok(None)
}
fix up new record
pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> EyreResult<()> {
if self.with_record(key, |_| {})?.is_some() {
@ -149,4 +165,140 @@ impl RecordStore {
Ok(())
}
pub fn with_record<R, F>(&mut self, key: TypedKey, f: F) -> EyreResult<Option<R>>
where
F: FnOnce(&mut RecordStore, TypedKey, &ValueRecord) -> R,
{
// Get record table
let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized");
};
// If record exists in cache, use that
let rck = RecordIndexKey { key };
if let Some(r) = self.record_cache.get(&rck) {
// Callback
return Ok(Some(f(self, key, r)));
}
// If not in cache, try to pull from table store
let k = rck.bytes();
if let Some(r) = record_table.load_rkyv(0, &k)? {
// Callback
let out = f(self, key, &r);
// Add to cache, do nothing with lru out
self.record_cache.insert(rck, r, |_| {});
return Ok(Some(out));
};
return Ok(None);
}
pub fn with_record_mut<R, F>(&mut self, key: TypedKey, f: F) -> EyreResult<Option<R>>
where
F: FnOnce(&mut RecordStore, TypedKey, &mut ValueRecord) -> R,
{
// Get record table
let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized");
};
// If record exists in cache, use that
let rck = RecordIndexKey { key };
if let Some(r) = self.record_cache.get_mut(&rck) {
// Callback
return Ok(Some(f(self, key, r)));
}
// If not in cache, try to pull from table store
let k = rck.bytes();
if let Some(r) = record_table.load_rkyv(0, &k)? {
// Callback
let out = f(self, key, &mut r);
// Save changes back to record table
record_table.store_rkyv(0, &k, &r).await?;
// Add to cache, do nothing with lru out
self.record_cache.insert(rck, r, |_| {});
return Ok(Some(out));
};
Ok(None)
}
pub fn with_subkey<R, F>(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
f: F,
) -> EyreResult<Option<R>>
where
F: FnOnce(&mut RecordStore, TypedKey, ValueSubkey, &ValueRecordData) -> R,
{
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
bail!("record store not initialized");
};
// If subkey exists in subkey cache, use that
let skck = SubkeyCacheKey { key, subkey };
if let Some(rd) = self.subkey_cache.get(&skck) {
// Callback
return Ok(Some(f(self, key, subkey, rd)));
}
// If not in cache, try to pull from table store
let k = skck.bytes();
if let Some(rd) = subkey_table.load_rkyv(0, &k)? {
// Callback
let out = f(self, key, subkey, &rd);
// Add to cache, do nothing with lru out
self.subkey_cache.insert(skck, r, |_| {});
return Ok(Some(out));
};
return Ok(None);
}
pub fn with_subkey_mut<R, F>(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
f: F,
) -> EyreResult<Option<R>>
where
F: FnOnce(&mut RecordStore, TypedKey, ValueSubkey, &mut ValueRecord) -> R,
{
// Get record table
let Some(subkey_table) = self.subkey_table.clone() else {
bail!("record store not initialized");
};
// If subkey exists in cache, use that
let skck = SubkeyCacheKey { key, subkey };
if let Some(rd) = self.subkey_cache.get_mut(&skck) {
// Callback
return Ok(Some(f(self, key, subkey, rd)));
}
// If not in cache, try to pull from table store
let k = skck.bytes();
if let Some(rd) = subkey_table.load_rkyv(0, &k)? {
// Callback
let out = f(self, key, subkey, &mut rd);
// Save changes back to record table
subkey_table.store_rkyv(0, &k, &rd).await?;
// Add to cache, do nothing with lru out
self.subkey_cache.insert(key, r, |_| {});
return Ok(Some(out));
};
Ok(None)
}
}

View File

@ -3,9 +3,12 @@ use super::*;
/// Configuration for the record store
#[derive(Debug, Default, Copy, Clone)]
pub struct RecordStoreLimits {
pub record_cache_size: u32,
/// Number of subkeys to keep in the memory cache
pub subkey_cache_size: u32,
/// Limit on the total number of records in the table store
pub max_records: Option<u32>,
pub max_cache_memory_mb: Option<u32>,
/// Limit on the amount of subkey cache memory to use before evicting cache items
pub max_subkey_cache_memory_mb: Option<u32>,
/// Limit on the amount of disk space to use for subkey data
pub max_disk_space_mb: Option<u32>,
}

View File

@ -32,27 +32,38 @@ pub struct ValueRecordData {
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueRecord {
last_touched_ts: Timestamp,
secret: Option<SecretKey>,
schema: DHTSchema,
safety_selection: SafetySelection,
total_size: usize,
subkeys: Vec<ValueRecordData>,
data_size: usize,
}
impl ValueRecord {
pub fn new(
cur_ts: Timestamp,
secret: Option<SecretKey>,
schema: DHTSchema,
safety_selection: SafetySelection,
) -> Self {
// Get number of subkeys
let subkey_count = schema.subkey_count();
Self {
last_touched_ts: cur_ts,
secret,
schema,
safety_selection,
subkeys: vec![Vec::new(); subkey_count],
data_size: 0,
}
}
pub fn subkey_count(&self) -> usize {
self.schema.subkey_count()
}
pub fn touch(&mut self, cur_ts: Timestamp) {
self.last_touched_ts = cur_ts
}
pub fn last_touched(&self) -> Timestamp {
self.last_touched_ts
}
}