mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-10-01 01:26:08 -04:00
prop keystore changes checkpoint
This commit is contained in:
parent
dea8be6522
commit
135b66298c
@ -170,12 +170,12 @@ impl Crypto {
|
||||
|
||||
// load caches if they are valid for this node id
|
||||
let mut db = table_store.open("crypto_caches", 1).await?;
|
||||
let caches_valid = match db.load(0, b"cache_validity_key")? {
|
||||
let caches_valid = match db.load(0, b"cache_validity_key").await? {
|
||||
Some(v) => v == cache_validity_key,
|
||||
None => false,
|
||||
};
|
||||
if caches_valid {
|
||||
if let Some(b) = db.load(0, b"dh_cache")? {
|
||||
if let Some(b) = db.load(0, b"dh_cache").await? {
|
||||
let mut inner = self.inner.lock();
|
||||
bytes_to_cache(&b, &mut inner.dh_cache);
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use keyvaluedb_sqlite::*;
|
||||
use std::path::PathBuf;
|
||||
|
||||
struct TableStoreInner {
|
||||
opened: BTreeMap<String, Weak<Mutex<TableDBUnlockedInner>>>,
|
||||
opened: BTreeMap<String, Weak<TableDBUnlockedInner>>,
|
||||
}
|
||||
|
||||
/// Veilid Table Storage
|
||||
|
@ -127,16 +127,11 @@ impl TableDB {
|
||||
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
||||
<T as RkyvArchive>::Archived: RkyvDeserialize<T, VeilidSharedDeserializeMap>,
|
||||
{
|
||||
let db = self.unlocked_inner.database.clone();
|
||||
let out = db.get(col, key).await.wrap_err("failed to get key")?;
|
||||
let b = match out {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
return Ok(None);
|
||||
}
|
||||
let out = match self.load(col, key).await? {
|
||||
Some(v) => from_rkyv(v)?,
|
||||
None => None,
|
||||
};
|
||||
let obj = from_rkyv(b)?;
|
||||
Ok(Some(obj))
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Read an serde-json key from a column in the TableDB immediately
|
||||
@ -144,32 +139,47 @@ impl TableDB {
|
||||
where
|
||||
T: for<'de> serde::Deserialize<'de>,
|
||||
{
|
||||
let db = self.unlocked_inner.database.clone();
|
||||
let out = db.get(col, key).await.wrap_err("failed to get key")?;
|
||||
let b = match out {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
return Ok(None);
|
||||
}
|
||||
let out = match self.load(col, key).await? {
|
||||
Some(v) => serde_json::from_slice(&v)?,
|
||||
None => None,
|
||||
};
|
||||
let obj = serde_json::from_slice(&b)?;
|
||||
Ok(Some(obj))
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Delete key with from a column in the TableDB
|
||||
xxx fix me
|
||||
pub async fn delete(&self, col: u32, key: &[u8]) -> EyreResult<bool> {
|
||||
pub async fn delete(&self, col: u32, key: &[u8]) -> EyreResult<Option<Vec<u8>>> {
|
||||
let db = self.unlocked_inner.database.clone();
|
||||
let found = db.get(col, key).await.wrap_err("failed to get key")?;
|
||||
match found {
|
||||
None => Ok(false),
|
||||
Some(_) => {
|
||||
let mut dbt = db.transaction();
|
||||
dbt.delete(col, key);
|
||||
db.write(dbt).await.wrap_err("failed to delete key")?;
|
||||
Ok(true)
|
||||
let old_value = db.delete(col, key).await.wrap_err("failed to delete key")?;
|
||||
Ok(old_value)
|
||||
}
|
||||
|
||||
/// Delete rkyv key with from a column in the TableDB
|
||||
pub async fn delete_rkyv<T>(&self, col: u32, key: &[u8]) -> EyreResult<Option<T>>
|
||||
where
|
||||
T: RkyvArchive,
|
||||
<T as RkyvArchive>::Archived:
|
||||
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
||||
<T as RkyvArchive>::Archived: RkyvDeserialize<T, VeilidSharedDeserializeMap>,
|
||||
{
|
||||
let db = self.unlocked_inner.database.clone();
|
||||
let old_value = match db.delete(col, key).await.wrap_err("failed to delete key")? {
|
||||
Some(v) => from_rkyv(v)?,
|
||||
None => None,
|
||||
};
|
||||
Ok(old_value)
|
||||
}
|
||||
|
||||
/// Delete serde-json key with from a column in the TableDB
|
||||
pub async fn delete_json<T>(&self, col: u32, key: &[u8]) -> EyreResult<Option<T>>
|
||||
where
|
||||
T: for<'de> serde::Deserialize<'de>,
|
||||
{
|
||||
let db = self.unlocked_inner.database.clone();
|
||||
let old_value = match db.delete(col, key).await.wrap_err("failed to delete key")? {
|
||||
Some(v) => serde_json::from_slice(&v)?,
|
||||
None => None,
|
||||
};
|
||||
Ok(old_value)
|
||||
}
|
||||
}
|
||||
|
||||
@ -217,7 +227,7 @@ impl TableDBTransaction {
|
||||
.take()
|
||||
.ok_or_else(|| eyre!("transaction already completed"))?
|
||||
};
|
||||
let db = self.db.unlocked_inner.lock().database.clone();
|
||||
let db = self.db.unlocked_inner.database.clone();
|
||||
db.write(dbt)
|
||||
.await
|
||||
.wrap_err("commit failed, transaction lost")
|
||||
|
@ -339,11 +339,11 @@ impl RoutingTable {
|
||||
// Deserialize bucket map and all entries from the table store
|
||||
let tstore = self.unlocked_inner.network_manager().table_store();
|
||||
let tdb = tstore.open("routing_table", 1).await?;
|
||||
let Some(serialized_bucket_map): Option<BTreeMap<CryptoKind, Vec<Vec<u8>>>> = tdb.load_rkyv(0, b"serialized_bucket_map")? else {
|
||||
let Some(serialized_bucket_map): Option<BTreeMap<CryptoKind, Vec<Vec<u8>>>> = tdb.load_rkyv(0, b"serialized_bucket_map").await? else {
|
||||
log_rtab!(debug "no bucket map in saved routing table");
|
||||
return Ok(());
|
||||
};
|
||||
let Some(all_entry_bytes): Option<Vec<Vec<u8>>> = tdb.load_rkyv(0, b"all_entry_bytes")? else {
|
||||
let Some(all_entry_bytes): Option<Vec<Vec<u8>>> = tdb.load_rkyv(0, b"all_entry_bytes").await? else {
|
||||
log_rtab!(debug "no all_entry_bytes in saved routing table");
|
||||
return Ok(());
|
||||
};
|
||||
|
@ -23,7 +23,7 @@ impl RouteSpecStoreContent {
|
||||
let table_store = routing_table.network_manager().table_store();
|
||||
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
|
||||
let mut content: RouteSpecStoreContent =
|
||||
rsstdb.load_rkyv(0, b"content")?.unwrap_or_default();
|
||||
rsstdb.load_rkyv(0, b"content").await?.unwrap_or_default();
|
||||
|
||||
// Look up all route hop noderefs since we can't serialize those
|
||||
let mut dead_ids = Vec::new();
|
||||
|
@ -177,7 +177,7 @@ impl StorageManager {
|
||||
/// Handle a recieved 'Get Value' query
|
||||
pub async fn inbound_get_value(&self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> Result<NetworkResult<SubkeyResult>, VeilidAPIError> {
|
||||
let mut inner = self.lock().await?;
|
||||
let res = match inner.handle_get_remote_value(key, subkey, want_descriptor) {
|
||||
let res = match inner.handle_get_remote_value(key, subkey, want_descriptor).await {
|
||||
Ok(res) => res,
|
||||
Err(VeilidAPIError::Internal { message }) => {
|
||||
apibail_internal!(message);
|
||||
|
@ -240,7 +240,7 @@ impl StorageManager {
|
||||
};
|
||||
|
||||
// See if the requested subkey is our local record store
|
||||
let last_subkey_result = inner.handle_get_local_value(key, subkey, true)?;
|
||||
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?;
|
||||
|
||||
// Return the existing value if we have one unless we are forcing a refresh
|
||||
if !force_refresh {
|
||||
@ -319,7 +319,7 @@ impl StorageManager {
|
||||
};
|
||||
|
||||
// See if the subkey we are modifying has a last known local value
|
||||
let last_subkey_result = inner.handle_get_local_value(key, subkey, true)?;
|
||||
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?;
|
||||
|
||||
// Get the descriptor and schema for the key
|
||||
let Some(descriptor) = last_subkey_result.descriptor else {
|
||||
|
@ -74,11 +74,11 @@ where
|
||||
.await?;
|
||||
|
||||
// Pull record index from table into a vector to ensure we sort them
|
||||
let record_table_keys = record_table.get_keys(0)?;
|
||||
let record_table_keys = record_table.get_keys(0).await?;
|
||||
let mut record_index_saved: Vec<(RecordTableKey, Record<D>)> =
|
||||
Vec::with_capacity(record_table_keys.len());
|
||||
for rtk in record_table_keys {
|
||||
if let Some(vr) = record_table.load_rkyv::<Record<D>>(0, &rtk)? {
|
||||
if let Some(vr) = record_table.load_rkyv::<Record<D>>(0, &rtk).await? {
|
||||
let rik = RecordTableKey::try_from(rtk.as_ref())?;
|
||||
record_index_saved.push((rik, vr));
|
||||
}
|
||||
@ -352,7 +352,7 @@ where
|
||||
// self.with_record(key, |record| record.descriptor().clone())
|
||||
// }
|
||||
|
||||
pub fn get_subkey(
|
||||
pub async fn get_subkey(
|
||||
&mut self,
|
||||
key: TypedKey,
|
||||
subkey: ValueSubkey,
|
||||
@ -393,6 +393,7 @@ where
|
||||
// If not in cache, try to pull from table store
|
||||
if let Some(record_data) = subkey_table
|
||||
.load_rkyv::<RecordData>(0, &stk.bytes())
|
||||
.await
|
||||
.map_err(VeilidAPIError::internal)?
|
||||
{
|
||||
let out = record_data.signed_value_data().clone();
|
||||
@ -458,6 +459,7 @@ where
|
||||
// If not in cache, try to pull from table store
|
||||
if let Some(record_data) = subkey_table
|
||||
.load_rkyv::<RecordData>(0, &stk_bytes)
|
||||
.await
|
||||
.map_err(VeilidAPIError::internal)?
|
||||
{
|
||||
prior_record_data_size = record_data.total_size();
|
||||
|
@ -168,7 +168,7 @@ impl StorageManager {
|
||||
let mut inner = self.lock().await?;
|
||||
|
||||
// See if the subkey we are modifying has a last known local value
|
||||
let last_subkey_result = inner.handle_get_local_value(key, subkey, true)?;
|
||||
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?;
|
||||
|
||||
// Make sure this value would actually be newer
|
||||
if let Some(last_value) = &last_subkey_result.value {
|
||||
|
@ -154,7 +154,7 @@ impl StorageManagerInner {
|
||||
|
||||
async fn load_metadata(&mut self) -> EyreResult<()> {
|
||||
if let Some(metadata_db) = &self.metadata_db {
|
||||
self.offline_subkey_writes = metadata_db.load_rkyv(0, b"offline_subkey_writes")?.unwrap_or_default();
|
||||
self.offline_subkey_writes = metadata_db.load_rkyv(0, b"offline_subkey_writes").await?.unwrap_or_default();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@ -324,7 +324,7 @@ impl StorageManagerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn handle_get_local_value(
|
||||
pub async fn handle_get_local_value(
|
||||
&mut self,
|
||||
key: TypedKey,
|
||||
subkey: ValueSubkey,
|
||||
@ -334,7 +334,7 @@ impl StorageManagerInner {
|
||||
let Some(local_record_store) = self.local_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
if let Some(subkey_result) = local_record_store.get_subkey(key, subkey, want_descriptor)? {
|
||||
if let Some(subkey_result) = local_record_store.get_subkey(key, subkey, want_descriptor).await? {
|
||||
return Ok(subkey_result);
|
||||
}
|
||||
|
||||
@ -363,7 +363,7 @@ impl StorageManagerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn handle_get_remote_value(
|
||||
pub async fn handle_get_remote_value(
|
||||
&mut self,
|
||||
key: TypedKey,
|
||||
subkey: ValueSubkey,
|
||||
@ -373,7 +373,7 @@ impl StorageManagerInner {
|
||||
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
if let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, want_descriptor)? {
|
||||
if let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, want_descriptor).await? {
|
||||
return Ok(subkey_result);
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ pub async fn test_store_delete_load(ts: TableStore) {
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
db.load(0, b"foo").unwrap(),
|
||||
db.load(0, b"foo").await.unwrap(),
|
||||
None,
|
||||
"should not load missing key"
|
||||
);
|
||||
@ -67,11 +67,14 @@ pub async fn test_store_delete_load(ts: TableStore) {
|
||||
"should store new key"
|
||||
);
|
||||
assert_eq!(
|
||||
db.load(0, b"foo").unwrap(),
|
||||
db.load(0, b"foo").await.unwrap(),
|
||||
None,
|
||||
"should not load missing key"
|
||||
);
|
||||
assert_eq!(db.load(1, b"foo").unwrap(), Some(b"1234567890".to_vec()));
|
||||
assert_eq!(
|
||||
db.load(1, b"foo").await.unwrap(),
|
||||
Some(b"1234567890".to_vec())
|
||||
);
|
||||
|
||||
assert!(
|
||||
db.store(1, b"bar", b"FNORD").await.is_ok(),
|
||||
@ -96,16 +99,22 @@ pub async fn test_store_delete_load(ts: TableStore) {
|
||||
"should store new key"
|
||||
);
|
||||
|
||||
assert_eq!(db.load(1, b"bar").unwrap(), Some(b"FNORD".to_vec()));
|
||||
assert_eq!(db.load(1, b"bar").await.unwrap(), Some(b"FNORD".to_vec()));
|
||||
assert_eq!(
|
||||
db.load(0, b"bar").unwrap(),
|
||||
db.load(0, b"bar").await.unwrap(),
|
||||
Some(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ".to_vec())
|
||||
);
|
||||
assert_eq!(db.load(2, b"bar").unwrap(), Some(b"QWERTYUIOP".to_vec()));
|
||||
assert_eq!(db.load(2, b"baz").unwrap(), Some(b"QWERTY".to_vec()));
|
||||
assert_eq!(
|
||||
db.load(2, b"bar").await.unwrap(),
|
||||
Some(b"QWERTYUIOP".to_vec())
|
||||
);
|
||||
assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec()));
|
||||
|
||||
assert_eq!(db.delete(1, b"bar").await.unwrap(), true);
|
||||
assert_eq!(db.delete(1, b"bar").await.unwrap(), false);
|
||||
assert_eq!(
|
||||
db.delete(1, b"bar").await.unwrap(),
|
||||
Some(b"QWERTYUIOP".to_vec())
|
||||
);
|
||||
assert_eq!(db.delete(1, b"bar").await.unwrap(), None);
|
||||
assert!(
|
||||
db.delete(4, b"bar").await.is_err(),
|
||||
"can't delete from column that doesn't exist"
|
||||
@ -114,17 +123,20 @@ pub async fn test_store_delete_load(ts: TableStore) {
|
||||
drop(db);
|
||||
let db = ts.open("test", 3).await.expect("should have opened");
|
||||
|
||||
assert_eq!(db.load(1, b"bar").unwrap(), None);
|
||||
assert_eq!(db.load(1, b"bar").await.unwrap(), None);
|
||||
assert_eq!(
|
||||
db.load(0, b"bar").unwrap(),
|
||||
db.load(0, b"bar").await.unwrap(),
|
||||
Some(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ".to_vec())
|
||||
);
|
||||
assert_eq!(db.load(2, b"bar").unwrap(), Some(b"QWERTYUIOP".to_vec()));
|
||||
assert_eq!(db.load(2, b"baz").unwrap(), Some(b"QWERTY".to_vec()));
|
||||
assert_eq!(
|
||||
db.load(2, b"bar").await.unwrap(),
|
||||
Some(b"QWERTYUIOP".to_vec())
|
||||
);
|
||||
assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec()));
|
||||
}
|
||||
|
||||
pub async fn test_frozen(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
||||
trace!("test_frozen");
|
||||
pub async fn test_rkyv(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
||||
trace!("test_rkyv");
|
||||
|
||||
let _ = ts.delete("test");
|
||||
let db = ts.open("test", 3).await.expect("should have opened");
|
||||
@ -132,9 +144,17 @@ pub async fn test_frozen(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
||||
|
||||
assert!(db.store_rkyv(0, b"asdf", &keypair).await.is_ok());
|
||||
|
||||
assert_eq!(db.load_rkyv::<KeyPair>(0, b"qwer").unwrap(), None);
|
||||
assert_eq!(db.load_rkyv::<KeyPair>(0, b"qwer").await.unwrap(), None);
|
||||
|
||||
let d = match db.load_rkyv::<KeyPair>(0, b"asdf") {
|
||||
let d = match db.load_rkyv::<KeyPair>(0, b"asdf").await {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
panic!("couldn't decode: {}", e);
|
||||
}
|
||||
};
|
||||
assert_eq!(d, Some(keypair), "keys should be equal");
|
||||
|
||||
let d = match db.delete_rkyv::<KeyPair>(0, b"asdf").await {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
panic!("couldn't decode: {}", e);
|
||||
@ -148,7 +168,45 @@ pub async fn test_frozen(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
||||
);
|
||||
|
||||
assert!(
|
||||
db.load_rkyv::<TypedKey>(1, b"foo").is_err(),
|
||||
db.load_rkyv::<TypedKey>(1, b"foo").await.is_err(),
|
||||
"should fail to unfreeze"
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn test_json(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
||||
trace!("test_json");
|
||||
|
||||
let _ = ts.delete("test");
|
||||
let db = ts.open("test", 3).await.expect("should have opened");
|
||||
let keypair = vcrypto.generate_keypair();
|
||||
|
||||
assert!(db.store_json(0, b"asdf", &keypair).await.is_ok());
|
||||
|
||||
assert_eq!(db.load_json::<KeyPair>(0, b"qwer").await.unwrap(), None);
|
||||
|
||||
let d = match db.load_json::<KeyPair>(0, b"asdf").await {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
panic!("couldn't decode: {}", e);
|
||||
}
|
||||
};
|
||||
assert_eq!(d, Some(keypair), "keys should be equal");
|
||||
|
||||
let d = match db.delete_json::<KeyPair>(0, b"asdf").await {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
panic!("couldn't decode: {}", e);
|
||||
}
|
||||
};
|
||||
assert_eq!(d, Some(keypair), "keys should be equal");
|
||||
|
||||
assert!(
|
||||
db.store(1, b"foo", b"1234567890").await.is_ok(),
|
||||
"should store new key"
|
||||
);
|
||||
|
||||
assert!(
|
||||
db.load_json::<TypedKey>(1, b"foo").await.is_err(),
|
||||
"should fail to unfreeze"
|
||||
);
|
||||
}
|
||||
@ -162,7 +220,8 @@ pub async fn test_all() {
|
||||
let vcrypto = crypto.get(ck).unwrap();
|
||||
test_delete_open_delete(ts.clone()).await;
|
||||
test_store_delete_load(ts.clone()).await;
|
||||
test_frozen(vcrypto, ts.clone()).await;
|
||||
test_rkyv(vcrypto.clone(), ts.clone()).await;
|
||||
test_json(vcrypto, ts.clone()).await;
|
||||
let _ = ts.delete("test").await;
|
||||
}
|
||||
|
||||
|
@ -785,7 +785,7 @@ pub extern "C" fn table_db_get_keys(id: u32, col: u32) -> *mut c_char {
|
||||
let table_dbs = TABLE_DBS.lock();
|
||||
let Some(table_db) = table_dbs.get(&id) else {
|
||||
return std::ptr::null_mut();
|
||||
};
|
||||
}; xxx continue here and run all tests
|
||||
let Ok(keys) = table_db.clone().get_keys(col) else {
|
||||
return std::ptr::null_mut();
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user