storage manager background tasks

This commit is contained in:
John Smith 2023-04-23 21:40:53 -04:00
parent 31edb8c059
commit 62615ad657
8 changed files with 113 additions and 12 deletions

View File

@ -219,7 +219,7 @@ impl NetworkManager {
)), )),
}; };
this.start_tasks(); this.setup_tasks();
this this
} }
@ -379,7 +379,7 @@ impl NetworkManager {
debug!("starting network manager shutdown"); debug!("starting network manager shutdown");
// Cancel all tasks // Cancel all tasks
self.stop_tasks().await; self.cancel_tasks().await;
// Shutdown network components if they started up // Shutdown network components if they started up
debug!("shutting down network components"); debug!("shutting down network components");

View File

@ -4,7 +4,7 @@ pub mod rolling_transfers;
use super::*; use super::*;
impl NetworkManager { impl NetworkManager {
pub(crate) fn start_tasks(&self) { pub(crate) fn setup_tasks(&self) {
// Set rolling transfers tick task // Set rolling transfers tick task
{ {
let this = self.clone(); let this = self.clone();
@ -67,7 +67,7 @@ impl NetworkManager {
Ok(()) Ok(())
} }
pub(crate) async fn stop_tasks(&self) { pub(crate) async fn cancel_tasks(&self) {
debug!("stopping rolling transfers task"); debug!("stopping rolling transfers task");
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
warn!("rolling_transfers_task not stopped: {}", e); warn!("rolling_transfers_task not stopped: {}", e);

View File

@ -211,7 +211,7 @@ impl RoutingTable {
unlocked_inner, unlocked_inner,
}; };
this.start_tasks(); this.setup_tasks();
this this
} }
@ -262,7 +262,7 @@ impl RoutingTable {
debug!("starting routing table terminate"); debug!("starting routing table terminate");
// Stop tasks // Stop tasks
self.stop_tasks().await; self.cancel_tasks().await;
// Load bucket entries from table db if possible // Load bucket entries from table db if possible
debug!("saving routing table entries"); debug!("saving routing table entries");

View File

@ -9,7 +9,7 @@ pub mod rolling_transfers;
use super::*; use super::*;
impl RoutingTable { impl RoutingTable {
pub(crate) fn start_tasks(&self) { pub(crate) fn setup_tasks(&self) {
// Set rolling transfers tick task // Set rolling transfers tick task
{ {
let this = self.clone(); let this = self.clone();
@ -176,7 +176,7 @@ impl RoutingTable {
Ok(()) Ok(())
} }
pub(crate) async fn stop_tasks(&self) { pub(crate) async fn cancel_tasks(&self) {
// Cancel all tasks being ticked // Cancel all tasks being ticked
debug!("stopping rolling transfers task"); debug!("stopping rolling transfers task");
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {

View File

@ -3,6 +3,7 @@ mod record;
mod record_data; mod record_data;
mod record_store; mod record_store;
mod record_store_limits; mod record_store_limits;
mod tasks;
mod types; mod types;
use keys::*; use keys::*;
@ -20,6 +21,8 @@ use crate::rpc_processor::*;
const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN; const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN;
/// The maximum total size of all subkeys of a record /// The maximum total size of all subkeys of a record
const MAX_RECORD_DATA_SIZE: usize = 1_048_576; const MAX_RECORD_DATA_SIZE: usize = 1_048_576;
/// Frequency to flush record stores to disk
const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1;
/// Locked structure for storage manager /// Locked structure for storage manager
struct StorageManagerInner { struct StorageManagerInner {
@ -31,6 +34,8 @@ struct StorageManagerInner {
remote_record_store: Option<RecordStore>, remote_record_store: Option<RecordStore>,
/// RPC processor if it is available /// RPC processor if it is available
rpc_processor: Option<RPCProcessor>, rpc_processor: Option<RPCProcessor>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
tick_future: Option<SendPinBoxFuture<()>>,
} }
struct StorageManagerUnlockedInner { struct StorageManagerUnlockedInner {
@ -39,6 +44,9 @@ struct StorageManagerUnlockedInner {
protected_store: ProtectedStore, protected_store: ProtectedStore,
table_store: TableStore, table_store: TableStore,
block_store: BlockStore, block_store: BlockStore,
// Background processes
flush_record_stores_task: TickTask<EyreReport>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -61,6 +69,7 @@ impl StorageManager {
protected_store, protected_store,
table_store, table_store,
block_store, block_store,
flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS),
} }
} }
fn new_inner() -> StorageManagerInner { fn new_inner() -> StorageManagerInner {
@ -69,6 +78,7 @@ impl StorageManager {
local_record_store: None, local_record_store: None,
remote_record_store: None, remote_record_store: None,
rpc_processor: None, rpc_processor: None,
tick_future: None,
} }
} }
@ -107,7 +117,7 @@ impl StorageManager {
table_store: TableStore, table_store: TableStore,
block_store: BlockStore, block_store: BlockStore,
) -> StorageManager { ) -> StorageManager {
StorageManager { let this = StorageManager {
unlocked_inner: Arc::new(Self::new_unlocked_inner( unlocked_inner: Arc::new(Self::new_unlocked_inner(
config, config,
crypto, crypto,
@ -116,7 +126,11 @@ impl StorageManager {
block_store, block_store,
)), )),
inner: Arc::new(AsyncMutex::new(Self::new_inner())), inner: Arc::new(AsyncMutex::new(Self::new_inner())),
} };
this.setup_tasks();
this
} }
#[instrument(level = "debug", skip_all, err)] #[instrument(level = "debug", skip_all, err)]
@ -144,6 +158,18 @@ impl StorageManager {
inner.local_record_store = Some(local_record_store); inner.local_record_store = Some(local_record_store);
inner.remote_record_store = Some(remote_record_store); inner.remote_record_store = Some(remote_record_store);
// Schedule tick
let this = self.clone();
let tick_future = interval(1000, move || {
let this = this.clone();
async move {
if let Err(e) = this.tick().await {
warn!("storage manager tick failed: {}", e);
}
}
});
inner.tick_future = Some(tick_future);
inner.initialized = true; inner.initialized = true;
Ok(()) Ok(())
@ -151,8 +177,18 @@ impl StorageManager {
pub async fn terminate(&self) { pub async fn terminate(&self) {
debug!("starting storage manager shutdown"); debug!("starting storage manager shutdown");
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
// Stop ticker
let tick_future = inner.tick_future.take();
if let Some(f) = tick_future {
f.await;
}
// Cancel all tasks
self.cancel_tasks().await;
// Release the storage manager // Release the storage manager
*inner = Self::new_inner(); *inner = Self::new_inner();

View File

@ -134,7 +134,7 @@ impl RecordStore {
async fn purge_dead_records(&mut self, lazy: bool) { async fn purge_dead_records(&mut self, lazy: bool) {
let purge_dead_records_mutex = self.purge_dead_records_mutex.clone(); let purge_dead_records_mutex = self.purge_dead_records_mutex.clone();
let lock = if lazy { let _lock = if lazy {
match mutex_try_lock!(purge_dead_records_mutex) { match mutex_try_lock!(purge_dead_records_mutex) {
Some(v) => v, Some(v) => v,
None => { None => {
@ -212,9 +212,10 @@ impl RecordStore {
} }
} }
pub async fn tick(&mut self, last_ts: Timestamp, cur_ts: Timestamp) { pub async fn tick(&mut self) -> EyreResult<()> {
self.flush_changed_records().await; self.flush_changed_records().await;
self.purge_dead_records(true).await; self.purge_dead_records(true).await;
Ok(())
} }
pub async fn new_record( pub async fn new_record(

View File

@ -0,0 +1,21 @@
use super::*;
impl StorageManager {
// Flush records stores to disk and remove dead records
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn flush_record_stores_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let mut inner = self.inner.lock().await;
if let Some(local_record_store) = &mut inner.local_record_store {
local_record_store.tick().await?;
}
if let Some(remote_record_store) = &mut inner.remote_record_store {
remote_record_store.tick().await?;
}
Ok(())
}
}

View File

@ -0,0 +1,43 @@
pub mod flush_record_stores;
use super::*;
impl StorageManager {
pub(crate) fn setup_tasks(&self) {
// Set rolling transfers tick task
debug!("starting flush record stores task");
{
let this = self.clone();
self.unlocked_inner
.flush_record_stores_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.flush_record_stores_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager flush record stores task routine"
)),
)
});
}
}
pub async fn tick(&self) -> EyreResult<()> {
// Run the rolling transfers task
self.unlocked_inner.flush_record_stores_task.tick().await?;
Ok(())
}
pub(crate) async fn cancel_tasks(&self) {
debug!("stopping flush record stores task");
if let Err(e) = self.unlocked_inner.flush_record_stores_task.stop().await {
warn!("flush_record_stores_task not stopped: {}", e);
}
}
}