diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 924a2531..609fe0ee 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -219,7 +219,7 @@ impl NetworkManager { )), }; - this.start_tasks(); + this.setup_tasks(); this } @@ -379,7 +379,7 @@ impl NetworkManager { debug!("starting network manager shutdown"); // Cancel all tasks - self.stop_tasks().await; + self.cancel_tasks().await; // Shutdown network components if they started up debug!("shutting down network components"); diff --git a/veilid-core/src/network_manager/tasks/mod.rs b/veilid-core/src/network_manager/tasks/mod.rs index 03f76a42..35e3e99c 100644 --- a/veilid-core/src/network_manager/tasks/mod.rs +++ b/veilid-core/src/network_manager/tasks/mod.rs @@ -4,7 +4,7 @@ pub mod rolling_transfers; use super::*; impl NetworkManager { - pub(crate) fn start_tasks(&self) { + pub(crate) fn setup_tasks(&self) { // Set rolling transfers tick task { let this = self.clone(); @@ -67,7 +67,7 @@ impl NetworkManager { Ok(()) } - pub(crate) async fn stop_tasks(&self) { + pub(crate) async fn cancel_tasks(&self) { debug!("stopping rolling transfers task"); if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { warn!("rolling_transfers_task not stopped: {}", e); diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 021d9c40..50c4db74 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -211,7 +211,7 @@ impl RoutingTable { unlocked_inner, }; - this.start_tasks(); + this.setup_tasks(); this } @@ -262,7 +262,7 @@ impl RoutingTable { debug!("starting routing table terminate"); // Stop tasks - self.stop_tasks().await; + self.cancel_tasks().await; // Load bucket entries from table db if possible debug!("saving routing table entries"); diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index b7841b78..cf7f7cdc 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -9,7 +9,7 @@ pub mod rolling_transfers; use super::*; impl RoutingTable { - pub(crate) fn start_tasks(&self) { + pub(crate) fn setup_tasks(&self) { // Set rolling transfers tick task { let this = self.clone(); @@ -176,7 +176,7 @@ impl RoutingTable { Ok(()) } - pub(crate) async fn stop_tasks(&self) { + pub(crate) async fn cancel_tasks(&self) { // Cancel all tasks being ticked debug!("stopping rolling transfers task"); if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 7c04adca..68720be4 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -3,6 +3,7 @@ mod record; mod record_data; mod record_store; mod record_store_limits; +mod tasks; mod types; use keys::*; @@ -20,6 +21,8 @@ use crate::rpc_processor::*; const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN; /// The maximum total size of all subkeys of a record const MAX_RECORD_DATA_SIZE: usize = 1_048_576; +/// Frequency to flush record stores to disk +const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1; /// Locked structure for storage manager struct StorageManagerInner { @@ -31,6 +34,8 @@ struct StorageManagerInner { remote_record_store: Option, /// RPC processor if it is available rpc_processor: Option, + /// Background processing task (not part of attachment manager tick tree so it happens when detached too) + tick_future: Option>, } struct StorageManagerUnlockedInner { @@ -39,6 +44,9 @@ struct StorageManagerUnlockedInner { protected_store: ProtectedStore, table_store: TableStore, block_store: BlockStore, + + // Background processes + flush_record_stores_task: TickTask, } #[derive(Clone)] @@ -61,6 +69,7 @@ impl StorageManager { protected_store, table_store, block_store, + flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS), } } fn new_inner() -> StorageManagerInner { @@ -69,6 +78,7 @@ impl StorageManager { local_record_store: None, remote_record_store: None, rpc_processor: None, + tick_future: None, } } @@ -107,7 +117,7 @@ impl StorageManager { table_store: TableStore, block_store: BlockStore, ) -> StorageManager { - StorageManager { + let this = StorageManager { unlocked_inner: Arc::new(Self::new_unlocked_inner( config, crypto, @@ -116,7 +126,11 @@ impl StorageManager { block_store, )), inner: Arc::new(AsyncMutex::new(Self::new_inner())), - } + }; + + this.setup_tasks(); + + this } #[instrument(level = "debug", skip_all, err)] @@ -144,6 +158,18 @@ impl StorageManager { inner.local_record_store = Some(local_record_store); inner.remote_record_store = Some(remote_record_store); + // Schedule tick + let this = self.clone(); + let tick_future = interval(1000, move || { + let this = this.clone(); + async move { + if let Err(e) = this.tick().await { + warn!("storage manager tick failed: {}", e); + } + } + }); + inner.tick_future = Some(tick_future); + inner.initialized = true; Ok(()) @@ -151,8 +177,18 @@ impl StorageManager { pub async fn terminate(&self) { debug!("starting storage manager shutdown"); + let mut inner = self.inner.lock().await; + // Stop ticker + let tick_future = inner.tick_future.take(); + if let Some(f) = tick_future { + f.await; + } + + // Cancel all tasks + self.cancel_tasks().await; + // Release the storage manager *inner = Self::new_inner(); diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index c0b67668..9490fc27 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -134,7 +134,7 @@ impl RecordStore { async fn purge_dead_records(&mut self, lazy: bool) { let purge_dead_records_mutex = self.purge_dead_records_mutex.clone(); - let lock = if lazy { + let _lock = if lazy { match mutex_try_lock!(purge_dead_records_mutex) { Some(v) => v, None => { @@ -212,9 +212,10 @@ impl RecordStore { } } - pub async fn tick(&mut self, last_ts: Timestamp, cur_ts: Timestamp) { + pub async fn tick(&mut self) -> EyreResult<()> { self.flush_changed_records().await; self.purge_dead_records(true).await; + Ok(()) } pub async fn new_record( diff --git a/veilid-core/src/storage_manager/tasks/flush_record_stores.rs b/veilid-core/src/storage_manager/tasks/flush_record_stores.rs new file mode 100644 index 00000000..c2fb1b0d --- /dev/null +++ b/veilid-core/src/storage_manager/tasks/flush_record_stores.rs @@ -0,0 +1,21 @@ +use super::*; + +impl StorageManager { + // Flush records stores to disk and remove dead records + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn flush_record_stores_task_routine( + self, + stop_token: StopToken, + _last_ts: Timestamp, + _cur_ts: Timestamp, + ) -> EyreResult<()> { + let mut inner = self.inner.lock().await; + if let Some(local_record_store) = &mut inner.local_record_store { + local_record_store.tick().await?; + } + if let Some(remote_record_store) = &mut inner.remote_record_store { + remote_record_store.tick().await?; + } + Ok(()) + } +} diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs new file mode 100644 index 00000000..cd90a82e --- /dev/null +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -0,0 +1,43 @@ +pub mod flush_record_stores; + +use super::*; + +impl StorageManager { + pub(crate) fn setup_tasks(&self) { + // Set rolling transfers tick task + debug!("starting flush record stores task"); + { + let this = self.clone(); + self.unlocked_inner + .flush_record_stores_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .flush_record_stores_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + ) + .instrument(trace_span!( + parent: None, + "StorageManager flush record stores task routine" + )), + ) + }); + } + } + + pub async fn tick(&self) -> EyreResult<()> { + // Run the rolling transfers task + self.unlocked_inner.flush_record_stores_task.tick().await?; + + Ok(()) + } + + pub(crate) async fn cancel_tasks(&self) { + debug!("stopping flush record stores task"); + if let Err(e) = self.unlocked_inner.flush_record_stores_task.stop().await { + warn!("flush_record_stores_task not stopped: {}", e); + } + } +}