diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cfd4165..7b6a8ad2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - Add the `veilid_features()` API, which lists the compile-time features that were enabled when `veilid-core` was built (available in language bindings as well). ([!401](https://gitlab.com/veilid/veilid/-/issues/400)) - When `veilid-core` starts up, log the version number, and the compile-time features that were enabled when it was built. ([!401](https://gitlab.com/veilid/veilid/-/issues/400)) - Closed issue #448: https://gitlab.com/veilid/veilid/-/issues/448 + - Add background flush for routing table and route spec store, to address issue #449: https://gitlab.com/veilid/veilid/-/issues/449 - veilid-flutter: - Bindings updated for API changes diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 64d3bb82..fb7583c6 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -52,6 +52,9 @@ pub const RELAY_SELECTION_PERCENTILE: f32 = 85.0; /// How frequently we tick the private route management routine pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1; +/// How frequently we flush the routing table and route spec store to storage +pub const ROUTING_TABLE_FLUSH_INTERVAL_SECS: u32 = 30; + // Connectionless protocols like UDP are dependent on a NAT translation timeout // We ping relays to maintain our UDP NAT state with a RELAY_KEEPALIVE_PING_INTERVAL_SECS=10 frequency // since 30 seconds is a typical UDP NAT state timeout. @@ -113,6 +116,8 @@ pub(crate) struct RoutingTable { route_spec_store: RouteSpecStore, /// Buckets to kick on our next kick task kick_queue: Mutex>, + /// Background process for flushing the table to disk + flush_task: TickTask, /// Background process for computing statistics rolling_transfers_task: TickTask, /// Background process for computing statistics @@ -163,6 +168,7 @@ impl RoutingTable { inner, route_spec_store, kick_queue: Mutex::new(BTreeSet::default()), + flush_task: TickTask::new("flush_task", ROUTING_TABLE_FLUSH_INTERVAL_SECS), rolling_transfers_task: TickTask::new( "rolling_transfers_task", ROLLING_TRANSFERS_INTERVAL_SECS, @@ -266,22 +272,9 @@ impl RoutingTable { async fn terminate_async(&self) { veilid_log!(self debug "starting routing table terminate"); - // Load bucket entries from table db if possible - veilid_log!(self debug "saving routing table entries"); - if let Err(e) = self.save_buckets().await { - error!("failed to save routing table entries: {}", e); - } + veilid_log!(self debug "routing table termination flush"); + self.flush().await; - veilid_log!(self debug "saving route spec store"); - let rss = { - let mut inner = self.inner.write(); - inner.route_spec_store.take() - }; - if let Some(rss) = rss { - if let Err(e) = rss.save().await { - error!("couldn't save route spec store: {}", e); - } - } veilid_log!(self debug "shutting down routing table"); let mut inner = self.inner.write(); @@ -290,6 +283,16 @@ impl RoutingTable { veilid_log!(self debug "finished routing table terminate"); } + pub async fn flush(&self) { + if let Err(e) = self.save_buckets().await { + error!("failed to save routing table entries: {}", e); + } + + if let Err(e) = self.route_spec_store().save().await { + error!("couldn't save route spec store: {}", e); + } + } + /////////////////////////////////////////////////////////////////// pub fn node_id(&self, kind: CryptoKind) -> TypedKey { diff --git a/veilid-core/src/routing_table/routing_table_inner/mod.rs b/veilid-core/src/routing_table/routing_table_inner/mod.rs index d0dda3e8..083f8ebf 100644 --- a/veilid-core/src/routing_table/routing_table_inner/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/mod.rs @@ -56,8 +56,6 @@ pub struct RoutingTableInner { pub(super) self_transfer_stats: TransferStatsDownUp, /// Peers we have recently communicated with pub(super) recent_peers: LruCache, - /// Storage for private/safety RouteSpecs - pub(super) route_spec_store: Option, /// Async tagged critical sections table /// Tag: "tick" -> in ticker pub(super) critical_sections: AsyncTagLockTable<&'static str>, @@ -82,7 +80,6 @@ impl RoutingTableInner { self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats: TransferStatsDownUp::default(), recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), - route_spec_store: None, critical_sections: AsyncTagLockTable::new(), opt_active_watch_keepalive_ts: None, } diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index d97e543a..db02376c 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -160,6 +160,8 @@ impl RoutingTable { } } + self.flush().await; + Ok(()) } } diff --git a/veilid-core/src/routing_table/tasks/flush.rs b/veilid-core/src/routing_table/tasks/flush.rs new file mode 100644 index 00000000..c85bedcd --- /dev/null +++ b/veilid-core/src/routing_table/tasks/flush.rs @@ -0,0 +1,17 @@ +use super::*; + +impl RoutingTable { + // Save routing table to disk + #[instrument(level = "trace", skip(self), err)] + pub async fn flush_task_routine( + &self, + _stop_token: StopToken, + last_ts: Timestamp, + cur_ts: Timestamp, + ) -> EyreResult<()> { + // Simple task, just writes everything to the tablestore + self.flush().await; + + Ok(()) + } +} diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index 68ed929f..b06eab6c 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -1,5 +1,6 @@ pub mod bootstrap; pub mod closest_peers_refresh; +pub mod flush; pub mod kick_buckets; pub mod peer_minimum_refresh; pub mod ping_validator; @@ -13,6 +14,9 @@ impl_veilid_log_facility!("rtab"); impl RoutingTable { pub fn setup_tasks(&self) { + // Set flush tick task + impl_setup_task!(self, Self, flush_task, flush_task_routine); + // Set rolling transfers tick task impl_setup_task!( self, @@ -121,6 +125,9 @@ impl RoutingTable { return Ok(()); }; + // Do flush every ROUTING_TABLE_FLUSH_INTERVAL_SECS secs + self.flush_task.tick().await?; + // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs self.rolling_transfers_task.tick().await?; @@ -225,6 +232,10 @@ impl RoutingTable { pub async fn cancel_tasks(&self) { // Cancel all tasks being ticked + veilid_log!(self debug "stopping flush task"); + if let Err(e) = self.flush_task.stop().await { + veilid_log!(self warn "flush_task not stopped: {}", e); + } veilid_log!(self debug "stopping rolling transfers task"); if let Err(e) = self.rolling_transfers_task.stop().await { veilid_log!(self warn "rolling_transfers_task not stopped: {}", e);