mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-04-16 22:13:14 -04:00
fix tabledb race condition
This commit is contained in:
parent
ab3cf25647
commit
0cf745a302
@ -15,6 +15,7 @@ mod native;
|
||||
use native::*;
|
||||
|
||||
use keyvaluedb::*;
|
||||
use weak_table::WeakValueHashMap;
|
||||
|
||||
impl_veilid_log_facility!("tstore");
|
||||
|
||||
@ -72,7 +73,7 @@ pub struct TableInfo {
|
||||
|
||||
#[must_use]
|
||||
struct TableStoreInner {
|
||||
opened: BTreeMap<String, Weak<TableDBUnlockedInner>>,
|
||||
opened: WeakValueHashMap<String, Weak<TableDBUnlockedInner>>,
|
||||
encryption_key: Option<TypedSharedSecret>,
|
||||
all_table_names: HashMap<String, String>,
|
||||
all_tables_db: Option<Database>,
|
||||
@ -115,7 +116,7 @@ impl_veilid_component!(TableStore);
|
||||
impl TableStore {
|
||||
fn new_inner() -> TableStoreInner {
|
||||
TableStoreInner {
|
||||
opened: BTreeMap::new(),
|
||||
opened: WeakValueHashMap::new(),
|
||||
encryption_key: None,
|
||||
all_table_names: HashMap::new(),
|
||||
all_tables_db: None,
|
||||
@ -528,6 +529,7 @@ impl TableStore {
|
||||
self.flush().await;
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
inner.opened.shrink_to_fit();
|
||||
if !inner.opened.is_empty() {
|
||||
panic!(
|
||||
"all open databases should have been closed: {:?}",
|
||||
@ -539,15 +541,6 @@ impl TableStore {
|
||||
inner.encryption_key = None;
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "tstore", skip_all)]
|
||||
pub(crate) fn on_table_db_drop(&self, table: String) {
|
||||
veilid_log!(self trace "dropping table db: {}", table);
|
||||
let mut inner = self.inner.lock();
|
||||
if inner.opened.remove(&table).is_none() {
|
||||
unreachable!("should have removed an item");
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create a TableDB database table. If the column count is greater than an
|
||||
/// existing TableDB's column count, the database will be upgraded to add the missing columns.
|
||||
#[instrument(level = "trace", target = "tstore", skip_all)]
|
||||
@ -566,25 +559,20 @@ impl TableStore {
|
||||
|
||||
// See if this table is already opened, if so the column count must be the same
|
||||
{
|
||||
let mut inner = self.inner.lock();
|
||||
if let Some(table_db_weak_inner) = inner.opened.get(&table_name) {
|
||||
match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone(), column_count) {
|
||||
Some(tdb) => {
|
||||
// Ensure column count isnt bigger
|
||||
let existing_col_count = tdb.get_column_count()?;
|
||||
if column_count > existing_col_count {
|
||||
return Err(VeilidAPIError::generic(format!(
|
||||
"database must be closed before increasing column count {} -> {}",
|
||||
existing_col_count, column_count,
|
||||
)));
|
||||
}
|
||||
let inner = self.inner.lock();
|
||||
if let Some(table_db_unlocked_inner) = inner.opened.get(&table_name) {
|
||||
let tdb = TableDB::new_from_unlocked_inner(table_db_unlocked_inner, column_count);
|
||||
|
||||
return Ok(tdb);
|
||||
}
|
||||
None => {
|
||||
inner.opened.remove(&table_name);
|
||||
}
|
||||
};
|
||||
// Ensure column count isnt bigger
|
||||
let existing_col_count = tdb.get_column_count()?;
|
||||
if column_count > existing_col_count {
|
||||
return Err(VeilidAPIError::generic(format!(
|
||||
"database must be closed before increasing column count {} -> {}",
|
||||
existing_col_count, column_count,
|
||||
)));
|
||||
}
|
||||
|
||||
return Ok(tdb);
|
||||
}
|
||||
}
|
||||
|
||||
@ -637,7 +625,7 @@ impl TableStore {
|
||||
// Keep track of opened DBs
|
||||
inner
|
||||
.opened
|
||||
.insert(table_name.clone(), table_db.weak_unlocked_inner());
|
||||
.insert(table_name.clone(), table_db.unlocked_inner());
|
||||
|
||||
Ok(table_db)
|
||||
}
|
||||
|
@ -38,13 +38,6 @@ impl fmt::Debug for TableDBUnlockedInner {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TableDBUnlockedInner {
|
||||
fn drop(&mut self) {
|
||||
let table_store = self.registry.table_store();
|
||||
table_store.on_table_db_drop(self.table.clone());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use]
|
||||
pub struct TableDB {
|
||||
@ -87,26 +80,24 @@ impl TableDB {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn try_new_from_weak_inner(
|
||||
weak_inner: Weak<TableDBUnlockedInner>,
|
||||
pub(super) fn new_from_unlocked_inner(
|
||||
unlocked_inner: Arc<TableDBUnlockedInner>,
|
||||
opened_column_count: u32,
|
||||
) -> Option<Self> {
|
||||
weak_inner.upgrade().map(|table_db_unlocked_inner| {
|
||||
let db = &table_db_unlocked_inner.database;
|
||||
let total_columns = db.num_columns().unwrap();
|
||||
Self {
|
||||
opened_column_count: if opened_column_count == 0 {
|
||||
total_columns
|
||||
} else {
|
||||
opened_column_count
|
||||
},
|
||||
unlocked_inner: table_db_unlocked_inner,
|
||||
}
|
||||
})
|
||||
) -> Self {
|
||||
let db = &unlocked_inner.database;
|
||||
let total_columns = db.num_columns().unwrap();
|
||||
Self {
|
||||
opened_column_count: if opened_column_count == 0 {
|
||||
total_columns
|
||||
} else {
|
||||
opened_column_count
|
||||
},
|
||||
unlocked_inner,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn weak_unlocked_inner(&self) -> Weak<TableDBUnlockedInner> {
|
||||
Arc::downgrade(&self.unlocked_inner)
|
||||
pub(super) fn unlocked_inner(&self) -> Arc<TableDBUnlockedInner> {
|
||||
self.unlocked_inner.clone()
|
||||
}
|
||||
|
||||
/// Get the internal name of the table
|
||||
|
Loading…
x
Reference in New Issue
Block a user