From 0cf745a302685bc3ec122157b8d9b28d4512e43b Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 18 Mar 2025 13:39:24 -0400 Subject: [PATCH] fix tabledb race condition --- veilid-core/src/table_store/mod.rs | 48 ++++++++++--------------- veilid-core/src/table_store/table_db.rs | 39 ++++++++------------ 2 files changed, 33 insertions(+), 54 deletions(-) diff --git a/veilid-core/src/table_store/mod.rs b/veilid-core/src/table_store/mod.rs index ed15f538..afc7d82d 100644 --- a/veilid-core/src/table_store/mod.rs +++ b/veilid-core/src/table_store/mod.rs @@ -15,6 +15,7 @@ mod native; use native::*; use keyvaluedb::*; +use weak_table::WeakValueHashMap; impl_veilid_log_facility!("tstore"); @@ -72,7 +73,7 @@ pub struct TableInfo { #[must_use] struct TableStoreInner { - opened: BTreeMap>, + opened: WeakValueHashMap>, encryption_key: Option, all_table_names: HashMap, all_tables_db: Option, @@ -115,7 +116,7 @@ impl_veilid_component!(TableStore); impl TableStore { fn new_inner() -> TableStoreInner { TableStoreInner { - opened: BTreeMap::new(), + opened: WeakValueHashMap::new(), encryption_key: None, all_table_names: HashMap::new(), all_tables_db: None, @@ -528,6 +529,7 @@ impl TableStore { self.flush().await; let mut inner = self.inner.lock(); + inner.opened.shrink_to_fit(); if !inner.opened.is_empty() { panic!( "all open databases should have been closed: {:?}", @@ -539,15 +541,6 @@ impl TableStore { inner.encryption_key = None; } - #[instrument(level = "trace", target = "tstore", skip_all)] - pub(crate) fn on_table_db_drop(&self, table: String) { - veilid_log!(self trace "dropping table db: {}", table); - let mut inner = self.inner.lock(); - if inner.opened.remove(&table).is_none() { - unreachable!("should have removed an item"); - } - } - /// Get or create a TableDB database table. If the column count is greater than an /// existing TableDB's column count, the database will be upgraded to add the missing columns. #[instrument(level = "trace", target = "tstore", skip_all)] @@ -566,25 +559,20 @@ impl TableStore { // See if this table is already opened, if so the column count must be the same { - let mut inner = self.inner.lock(); - if let Some(table_db_weak_inner) = inner.opened.get(&table_name) { - match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone(), column_count) { - Some(tdb) => { - // Ensure column count isnt bigger - let existing_col_count = tdb.get_column_count()?; - if column_count > existing_col_count { - return Err(VeilidAPIError::generic(format!( - "database must be closed before increasing column count {} -> {}", - existing_col_count, column_count, - ))); - } + let inner = self.inner.lock(); + if let Some(table_db_unlocked_inner) = inner.opened.get(&table_name) { + let tdb = TableDB::new_from_unlocked_inner(table_db_unlocked_inner, column_count); - return Ok(tdb); - } - None => { - inner.opened.remove(&table_name); - } - }; + // Ensure column count isnt bigger + let existing_col_count = tdb.get_column_count()?; + if column_count > existing_col_count { + return Err(VeilidAPIError::generic(format!( + "database must be closed before increasing column count {} -> {}", + existing_col_count, column_count, + ))); + } + + return Ok(tdb); } } @@ -637,7 +625,7 @@ impl TableStore { // Keep track of opened DBs inner .opened - .insert(table_name.clone(), table_db.weak_unlocked_inner()); + .insert(table_name.clone(), table_db.unlocked_inner()); Ok(table_db) } diff --git a/veilid-core/src/table_store/table_db.rs b/veilid-core/src/table_store/table_db.rs index 19ac1f4a..a561da95 100644 --- a/veilid-core/src/table_store/table_db.rs +++ b/veilid-core/src/table_store/table_db.rs @@ -38,13 +38,6 @@ impl fmt::Debug for TableDBUnlockedInner { } } -impl Drop for TableDBUnlockedInner { - fn drop(&mut self) { - let table_store = self.registry.table_store(); - table_store.on_table_db_drop(self.table.clone()); - } -} - #[derive(Debug, Clone)] #[must_use] pub struct TableDB { @@ -87,26 +80,24 @@ impl TableDB { } } - pub(super) fn try_new_from_weak_inner( - weak_inner: Weak, + pub(super) fn new_from_unlocked_inner( + unlocked_inner: Arc, opened_column_count: u32, - ) -> Option { - weak_inner.upgrade().map(|table_db_unlocked_inner| { - let db = &table_db_unlocked_inner.database; - let total_columns = db.num_columns().unwrap(); - Self { - opened_column_count: if opened_column_count == 0 { - total_columns - } else { - opened_column_count - }, - unlocked_inner: table_db_unlocked_inner, - } - }) + ) -> Self { + let db = &unlocked_inner.database; + let total_columns = db.num_columns().unwrap(); + Self { + opened_column_count: if opened_column_count == 0 { + total_columns + } else { + opened_column_count + }, + unlocked_inner, + } } - pub(super) fn weak_unlocked_inner(&self) -> Weak { - Arc::downgrade(&self.unlocked_inner) + pub(super) fn unlocked_inner(&self) -> Arc { + self.unlocked_inner.clone() } /// Get the internal name of the table