checkpoint

This commit is contained in:
John Smith 2023-05-20 14:53:55 -04:00
parent f574e63d90
commit dea8be6522
3 changed files with 37 additions and 38 deletions

2
external/keyvaluedb vendored

@ -1 +1 @@
Subproject commit 4a21429c5ea10ba550b793818e691793d7d8016b Subproject commit e92252f573c3ba98eebf71a5d115c44cff52af3d

View File

@ -1,11 +1,11 @@
use super::*; use super::*;
use crate::intf::table_db::TableDBInner; use crate::intf::table_db::TableDBUnlockedInner;
pub use crate::intf::table_db::{TableDB, TableDBTransaction}; pub use crate::intf::table_db::{TableDB, TableDBTransaction};
use keyvaluedb_sqlite::*; use keyvaluedb_sqlite::*;
use std::path::PathBuf; use std::path::PathBuf;
struct TableStoreInner { struct TableStoreInner {
opened: BTreeMap<String, Weak<Mutex<TableDBInner>>>, opened: BTreeMap<String, Weak<Mutex<TableDBUnlockedInner>>>,
} }
/// Veilid Table Storage /// Veilid Table Storage

View File

@ -10,19 +10,19 @@ cfg_if! {
} }
} }
pub struct TableDBInner { pub struct TableDBUnlockedInner {
table: String, table: String,
table_store: TableStore, table_store: TableStore,
database: Database, database: Database,
} }
impl fmt::Debug for TableDBInner { impl fmt::Debug for TableDBUnlockedInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TableDBInner(table={})", self.table) write!(f, "TableDBInner(table={})", self.table)
} }
} }
impl Drop for TableDBInner { impl Drop for TableDBUnlockedInner {
fn drop(&mut self) { fn drop(&mut self) {
self.table_store.on_table_db_drop(self.table.clone()); self.table_store.on_table_db_drop(self.table.clone());
} }
@ -30,60 +30,58 @@ impl Drop for TableDBInner {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TableDB { pub struct TableDB {
inner: Arc<Mutex<TableDBInner>>, unlocked_inner: Arc<TableDBUnlockedInner>,
} }
impl TableDB { impl TableDB {
pub(super) fn new(table: String, table_store: TableStore, database: Database) -> Self { pub(super) fn new(table: String, table_store: TableStore, database: Database) -> Self {
Self { Self {
inner: Arc::new(Mutex::new(TableDBInner { unlocked_inner: Arc::new(TableDBUnlockedInner {
table, table,
table_store, table_store,
database, database,
})), }),
} }
} }
pub(super) fn try_new_from_weak_inner(weak_inner: Weak<Mutex<TableDBInner>>) -> Option<Self> { pub(super) fn try_new_from_weak_inner(weak_inner: Weak<TableDBUnlockedInner>) -> Option<Self> {
weak_inner.upgrade().map(|table_db_inner| Self { weak_inner.upgrade().map(|table_db_unlocked_inner| Self {
inner: table_db_inner, unlocked_inner: table_db_unlocked_inner,
}) })
} }
pub(super) fn weak_inner(&self) -> Weak<Mutex<TableDBInner>> { pub(super) fn weak_inner(&self) -> Weak<TableDBUnlockedInner> {
Arc::downgrade(&self.inner) Arc::downgrade(&self.unlocked_inner)
} }
/// Get the total number of columns in the TableDB /// Get the total number of columns in the TableDB
pub fn get_column_count(&self) -> EyreResult<u32> { pub fn get_column_count(&self) -> EyreResult<u32> {
let db = &self.inner.lock().database; let db = &self.unlocked_inner.database;
db.num_columns().wrap_err("failed to get column count: {}") db.num_columns().wrap_err("failed to get column count: {}")
} }
/// Get the list of keys in a column of the TableDB /// Get the list of keys in a column of the TableDB
pub fn get_keys(&self, col: u32) -> EyreResult<Vec<Box<[u8]>>> { pub async fn get_keys(&self, col: u32) -> EyreResult<Vec<Box<[u8]>>> {
let db = &self.inner.lock().database; let db = self.unlocked_inner.database.clone();
let mut out: Vec<Box<[u8]>> = Vec::new(); let mut out: Vec<Box<[u8]>> = Vec::new();
db.iter(col, None, &mut |kv| { db.iter(col, None, |kv| {
out.push(kv.0.clone().into_boxed_slice()); out.push(kv.0.clone().into_boxed_slice());
Ok(true) Ok(Option::<()>::None)
}) })
.await
.wrap_err("failed to get keys for column")?; .wrap_err("failed to get keys for column")?;
Ok(out) Ok(out)
} }
/// Start a TableDB write transaction. The transaction object must be committed or rolled back before dropping. /// Start a TableDB write transaction. The transaction object must be committed or rolled back before dropping.
pub fn transact(&self) -> TableDBTransaction { pub fn transact(&self) -> TableDBTransaction {
let dbt = { let dbt = self.unlocked_inner.database.transaction();
let db = &self.inner.lock().database;
db.transaction()
};
TableDBTransaction::new(self.clone(), dbt) TableDBTransaction::new(self.clone(), dbt)
} }
/// Store a key with a value in a column in the TableDB. Performs a single transaction immediately. /// Store a key with a value in a column in the TableDB. Performs a single transaction immediately.
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> { pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> {
let db = self.inner.lock().database.clone(); let db = self.unlocked_inner.database.clone();
let mut dbt = db.transaction(); let mut dbt = db.transaction();
dbt.put(col, key, value); dbt.put(col, key, value);
db.write(dbt).await.wrap_err("failed to store key") db.write(dbt).await.wrap_err("failed to store key")
@ -96,7 +94,7 @@ impl TableDB {
{ {
let v = to_rkyv(value)?; let v = to_rkyv(value)?;
let db = self.inner.lock().database.clone(); let db = self.unlocked_inner.database.clone();
let mut dbt = db.transaction(); let mut dbt = db.transaction();
dbt.put(col, key, v.as_slice()); dbt.put(col, key, v.as_slice());
db.write(dbt).await.wrap_err("failed to store key") db.write(dbt).await.wrap_err("failed to store key")
@ -109,28 +107,28 @@ impl TableDB {
{ {
let v = serde_json::to_vec(value)?; let v = serde_json::to_vec(value)?;
let db = self.inner.lock().database.clone(); let db = self.unlocked_inner.database.clone();
let mut dbt = db.transaction(); let mut dbt = db.transaction();
dbt.put(col, key, v.as_slice()); dbt.put(col, key, v.as_slice());
db.write(dbt).await.wrap_err("failed to store key") db.write(dbt).await.wrap_err("failed to store key")
} }
/// Read a key from a column in the TableDB immediately. /// Read a key from a column in the TableDB immediately.
pub fn load(&self, col: u32, key: &[u8]) -> EyreResult<Option<Vec<u8>>> { pub async fn load(&self, col: u32, key: &[u8]) -> EyreResult<Option<Vec<u8>>> {
let db = self.inner.lock().database.clone(); let db = self.unlocked_inner.database.clone();
db.get(col, key).wrap_err("failed to get key") db.get(col, key).await.wrap_err("failed to get key")
} }
/// Read an rkyv key from a column in the TableDB immediately /// Read an rkyv key from a column in the TableDB immediately
pub fn load_rkyv<T>(&self, col: u32, key: &[u8]) -> EyreResult<Option<T>> pub async fn load_rkyv<T>(&self, col: u32, key: &[u8]) -> EyreResult<Option<T>>
where where
T: RkyvArchive, T: RkyvArchive,
<T as RkyvArchive>::Archived: <T as RkyvArchive>::Archived:
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>, for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
<T as RkyvArchive>::Archived: RkyvDeserialize<T, VeilidSharedDeserializeMap>, <T as RkyvArchive>::Archived: RkyvDeserialize<T, VeilidSharedDeserializeMap>,
{ {
let db = self.inner.lock().database.clone(); let db = self.unlocked_inner.database.clone();
let out = db.get(col, key).wrap_err("failed to get key")?; let out = db.get(col, key).await.wrap_err("failed to get key")?;
let b = match out { let b = match out {
Some(v) => v, Some(v) => v,
None => { None => {
@ -142,12 +140,12 @@ impl TableDB {
} }
/// Read an serde-json key from a column in the TableDB immediately /// Read an serde-json key from a column in the TableDB immediately
pub fn load_json<T>(&self, col: u32, key: &[u8]) -> EyreResult<Option<T>> pub async fn load_json<T>(&self, col: u32, key: &[u8]) -> EyreResult<Option<T>>
where where
T: for<'de> serde::Deserialize<'de>, T: for<'de> serde::Deserialize<'de>,
{ {
let db = self.inner.lock().database.clone(); let db = self.unlocked_inner.database.clone();
let out = db.get(col, key).wrap_err("failed to get key")?; let out = db.get(col, key).await.wrap_err("failed to get key")?;
let b = match out { let b = match out {
Some(v) => v, Some(v) => v,
None => { None => {
@ -159,9 +157,10 @@ impl TableDB {
} }
/// Delete key with from a column in the TableDB /// Delete key with from a column in the TableDB
xxx fix me
pub async fn delete(&self, col: u32, key: &[u8]) -> EyreResult<bool> { pub async fn delete(&self, col: u32, key: &[u8]) -> EyreResult<bool> {
let db = self.inner.lock().database.clone(); let db = self.unlocked_inner.database.clone();
let found = db.get(col, key).wrap_err("failed to get key")?; let found = db.get(col, key).await.wrap_err("failed to get key")?;
match found { match found {
None => Ok(false), None => Ok(false),
Some(_) => { Some(_) => {
@ -218,7 +217,7 @@ impl TableDBTransaction {
.take() .take()
.ok_or_else(|| eyre!("transaction already completed"))? .ok_or_else(|| eyre!("transaction already completed"))?
}; };
let db = self.db.inner.lock().database.clone(); let db = self.db.unlocked_inner.lock().database.clone();
db.write(dbt) db.write(dbt)
.await .await
.wrap_err("commit failed, transaction lost") .wrap_err("commit failed, transaction lost")