[ci skip] fix WAL file explosion

This commit is contained in:
Christien Rioux 2025-12-24 01:01:30 -05:00
parent 0aa0c9788f
commit e3bb172cec
10 changed files with 100 additions and 24 deletions

View file

@ -3,6 +3,7 @@
- veilid-core:
- Change route test failure during allocation to a `VeilidAPIError::TryAgain`
- Fix connectivity problem resulting in `couldn't look up relay for inbound relay` in logs
- Update keyvaluedb to 0.1.5 to fix sqlite WAL file explosion
- veilid-tools:
- Add `debug-locks` feature implementation for AsyncRwLock, AsyncSemaphore, AsyncMutex

22
Cargo.lock generated
View file

@ -3202,18 +3202,18 @@ dependencies = [
[[package]]
name = "keyvaluedb"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cf5f716c938c5720860919731f933740c437f7eb1e35d194b877f85a6be992f"
checksum = "f82f6a6d061554e431c5d30117f5fb8192ce2aed4c67a37cf8a11430a1182017"
dependencies = [
"smallvec",
]
[[package]]
name = "keyvaluedb-memorydb"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e15911696a5b7e2a88f40d38c9c33d8ac0722bc5b88ad6ecdd0b3dada0eef575"
checksum = "58507837ab456eb0b3e16f83abbabda214cd85e78878da03f0d62a5b77e05ad3"
dependencies = [
"keyvaluedb",
"parking_lot 0.12.5",
@ -3221,9 +3221,9 @@ dependencies = [
[[package]]
name = "keyvaluedb-sqlite"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81123fd2c40c4406a3bfd1f4ee2e6810d2dd9ef9254d10ddceb67a3156337344"
checksum = "1e5a996ef8caa7a6e7cf2e333388edbd048389a1d8d301464b57baa4fa4fd38b"
dependencies = [
"async-sqlite",
"hex",
@ -3234,9 +3234,9 @@ dependencies = [
[[package]]
name = "keyvaluedb-web"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "744d5c74aed22b16ffd5c48fc99f5bb9c1889766c653db5181148451725c8cd5"
checksum = "6369c136e5778a4f4c9d0c2587c9c22b8b708d139fbed39098247174a014ba87"
dependencies = [
"flume",
"futures",
@ -6665,7 +6665,7 @@ dependencies = [
"cfg-if 1.0.4",
"chrono",
"clap 4.5.53",
"config 0.13.4",
"config 0.14.1",
"console",
"crossbeam-channel",
"cursive",
@ -6682,7 +6682,7 @@ dependencies = [
"log",
"lru",
"owning_ref",
"parking_lot 0.11.2",
"parking_lot 0.12.5",
"rustyline-async",
"serde",
"serde_derive",
@ -7054,7 +7054,7 @@ dependencies = [
"lazy_static",
"parking_lot 0.12.5",
"paste",
"send_wrapper 0.4.0",
"send_wrapper 0.6.0",
"serde",
"serde-wasm-bindgen",
"serde_bytes",

View file

@ -119,7 +119,7 @@ thiserror = "1.0.69"
# Data structures
enumset = { version = "1.1.5", features = ["serde"] }
keyvaluedb = "0.1.4"
keyvaluedb = "0.1.5"
range-set-blaze = "0.1.16"
weak-table = "0.3.2"
hashlink = { package = "veilid-hashlink", version = "0.1.1", features = [
@ -207,7 +207,7 @@ futures-util = { version = "0.3.31", default-features = false, features = [
# Data structures
keyring-manager = "0.6.0"
keyvaluedb-sqlite = "0.1.4"
keyvaluedb-sqlite = "0.1.5"
# Network
async-tungstenite = { version = "0.27.0", features = ["async-tls"] }
@ -253,7 +253,7 @@ ws_stream_wasm = "0.7.4"
wasm-logger = "0.2.0"
# Data Structures
keyvaluedb-web = "0.1.4"
keyvaluedb-web = "0.1.5"
### Configuration for WASM32 'web-sys' crate
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies.web-sys]

View file

@ -61,6 +61,7 @@ mod table_store;
mod veilid_api;
mod veilid_config;
pub(crate) use self::attachment_manager::TickEvent;
pub(crate) use self::component::*;
pub(crate) use self::core_context::RegisteredComponents;
#[allow(unused_imports)]

View file

@ -1,6 +1,5 @@
pub mod rolling_transfers;
use super::*;
use crate::attachment_manager::TickEvent;
impl NetworkManager {
pub fn setup_tasks(&self) {

View file

@ -8,8 +8,6 @@ pub mod private_route_management;
pub mod relay_management;
pub mod update_statistics;
use crate::attachment_manager::TickEvent;
use super::*;
impl_veilid_log_facility!("rtab");

View file

@ -28,8 +28,6 @@ mod transaction_command;
mod types;
mod watch_value;
use crate::attachment_manager::TickEvent;
use super::*;
use hashlink::{LinkedHashMap, LruCache};

View file

@ -1,6 +1,8 @@
use super::*;
mod table_db;
mod tasks;
pub use table_db::*;
pub mod tests;
@ -20,6 +22,7 @@ use weak_table::WeakValueHashMap;
impl_veilid_log_facility!("tstore");
const ALL_TABLE_NAMES: &[u8] = b"all_table_names";
const FLUSH_TABLES_INTERVAL_SECS: u32 = 60;
/// Description of column
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
@ -80,6 +83,8 @@ struct TableStoreInner {
encryption_key: Option<SharedSecret>,
all_table_names: HashMap<String, String>,
all_tables_db: Option<Database>,
/// Tick subscription
tick_subscription: Option<EventBusSubscription>,
}
impl fmt::Debug for TableStoreInner {
@ -88,7 +93,6 @@ impl fmt::Debug for TableStoreInner {
.field("opened", &self.opened)
.field("encryption_key", &self.encryption_key)
.field("all_table_names", &self.all_table_names)
//.field("all_tables_db", &self.all_tables_db)
.finish()
}
}
@ -101,6 +105,7 @@ pub struct TableStore {
inner: Mutex<TableStoreInner>, // Sync mutex here because TableDB drops can happen at any time
table_store_driver: TableStoreDriver,
async_lock: Arc<AsyncMutex<()>>,
flush_tables_task: TickTask<EyreReport>,
}
impl fmt::Debug for TableStore {
@ -123,18 +128,24 @@ impl TableStore {
encryption_key: None,
all_table_names: HashMap::new(),
all_tables_db: None,
tick_subscription: None,
}
}
pub(crate) fn new(registry: VeilidComponentRegistry) -> Self {
let inner = Self::new_inner();
let table_store_driver = TableStoreDriver::new(registry.clone());
Self {
let this = Self {
registry,
inner: Mutex::new(inner),
table_store_driver,
async_lock: Arc::new(AsyncMutex::new(())),
}
flush_tables_task: TickTask::new("flush_tables_task", FLUSH_TABLES_INTERVAL_SECS),
};
this.setup_tasks();
this
}
// Flush internal control state
@ -534,11 +545,30 @@ impl TableStore {
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn post_init_async(&self) -> EyreResult<()> {
// Register event handlers
let tick_subscription = impl_subscribe_event_bus_async!(self, Self, tick_event_handler);
let mut inner = self.inner.lock();
// Schedule tick
inner.tick_subscription = Some(tick_subscription);
Ok(())
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn pre_terminate_async(&self) {}
async fn pre_terminate_async(&self) {
// Stop background operations
{
let mut inner = self.inner.lock();
if let Some(sub) = inner.tick_subscription.take() {
self.event_bus().unsubscribe(sub);
}
}
// Cancel all tasks associated with the tick future
self.cancel_tasks().await;
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn terminate_async(&self) {
@ -622,7 +652,7 @@ impl TableStore {
// Flush table names to disk
self.flush().await;
// Clean up the new database before it gets added to the opened list
// Clean up the new database before it gets added to the opened list
db.cleanup().await.map_err(VeilidAPIError::internal)?;
// If more columns are available, open the low level db with the max column count but restrict the tabledb object to the number requested
@ -775,4 +805,11 @@ impl TableStore {
self.flush().await;
Ok(())
}
async fn tick_event_handler(&self, evt: Arc<TickEvent>) {
let lag = evt.last_tick_ts.map(|x| evt.cur_tick_ts.duration_since(x));
if let Err(e) = self.tick(lag).await {
error!("Error in table store tick: {}", e);
}
}
}

View file

@ -0,0 +1,15 @@
use super::*;
impl TableStore {
// Flush records stores to disk and remove dead records
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn flush_tables_task_routine(
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
self.flush().await;
Ok(())
}
}

View file

@ -0,0 +1,27 @@
pub mod flush_tables;
use super::*;
impl TableStore {
pub(super) fn setup_tasks(&self) {
// Set flush tables tick task
veilid_log!(self debug "starting flush tables task");
impl_setup_task_async!(self, Self, flush_tables_task, flush_tables_task_routine);
}
#[instrument(parent = None, level = "trace", target = "tstore", name = "TableStore::tick", skip_all, err)]
pub async fn tick(&self, _lag: Option<TimestampDuration>) -> EyreResult<()> {
// Run the flush tables task
self.flush_tables_task.tick().await?;
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) async fn cancel_tasks(&self) {
veilid_log!(self debug "stopping flush tables task");
if let Err(e) = self.flush_tables_task.stop().await {
veilid_log!(self warn "flush_tables_task not stopped: {}", e);
}
}
}