Serialize routing table

This commit is contained in:
Christien Rioux 2025-05-08 09:39:51 -04:00
parent 4e1afffc02
commit 29eeebd9c1
6 changed files with 49 additions and 18 deletions

View file

@ -23,6 +23,7 @@
- Add the `veilid_features()` API, which lists the compile-time features that were enabled when `veilid-core` was built (available in language bindings as well). ([!401](https://gitlab.com/veilid/veilid/-/issues/400))
- When `veilid-core` starts up, log the version number, and the compile-time features that were enabled when it was built. ([!401](https://gitlab.com/veilid/veilid/-/issues/400))
- Closed issue #448: https://gitlab.com/veilid/veilid/-/issues/448
- Add background flush for routing table and route spec store, to address issue #449: https://gitlab.com/veilid/veilid/-/issues/449
- veilid-flutter:
- Bindings updated for API changes

View file

@ -52,6 +52,9 @@ pub const RELAY_SELECTION_PERCENTILE: f32 = 85.0;
/// How frequently we tick the private route management routine
pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1;
/// How frequently we flush the routing table and route spec store to storage
pub const ROUTING_TABLE_FLUSH_INTERVAL_SECS: u32 = 30;
// Connectionless protocols like UDP are dependent on a NAT translation timeout
// We ping relays to maintain our UDP NAT state with a RELAY_KEEPALIVE_PING_INTERVAL_SECS=10 frequency
// since 30 seconds is a typical UDP NAT state timeout.
@ -113,6 +116,8 @@ pub(crate) struct RoutingTable {
route_spec_store: RouteSpecStore,
/// Buckets to kick on our next kick task
kick_queue: Mutex<BTreeSet<BucketIndex>>,
/// Background process for flushing the table to disk
flush_task: TickTask<EyreReport>,
/// Background process for computing statistics
rolling_transfers_task: TickTask<EyreReport>,
/// Background process for computing statistics
@ -163,6 +168,7 @@ impl RoutingTable {
inner,
route_spec_store,
kick_queue: Mutex::new(BTreeSet::default()),
flush_task: TickTask::new("flush_task", ROUTING_TABLE_FLUSH_INTERVAL_SECS),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
@ -266,22 +272,9 @@ impl RoutingTable {
async fn terminate_async(&self) {
veilid_log!(self debug "starting routing table terminate");
// Load bucket entries from table db if possible
veilid_log!(self debug "saving routing table entries");
if let Err(e) = self.save_buckets().await {
error!("failed to save routing table entries: {}", e);
}
veilid_log!(self debug "routing table termination flush");
self.flush().await;
veilid_log!(self debug "saving route spec store");
let rss = {
let mut inner = self.inner.write();
inner.route_spec_store.take()
};
if let Some(rss) = rss {
if let Err(e) = rss.save().await {
error!("couldn't save route spec store: {}", e);
}
}
veilid_log!(self debug "shutting down routing table");
let mut inner = self.inner.write();
@ -290,6 +283,16 @@ impl RoutingTable {
veilid_log!(self debug "finished routing table terminate");
}
pub async fn flush(&self) {
if let Err(e) = self.save_buckets().await {
error!("failed to save routing table entries: {}", e);
}
if let Err(e) = self.route_spec_store().save().await {
error!("couldn't save route spec store: {}", e);
}
}
///////////////////////////////////////////////////////////////////
pub fn node_id(&self, kind: CryptoKind) -> TypedKey {

View file

@ -56,8 +56,6 @@ pub struct RoutingTableInner {
pub(super) self_transfer_stats: TransferStatsDownUp,
/// Peers we have recently communicated with
pub(super) recent_peers: LruCache<TypedKey, RecentPeersEntry>,
/// Storage for private/safety RouteSpecs
pub(super) route_spec_store: Option<RouteSpecStore>,
/// Async tagged critical sections table
/// Tag: "tick" -> in ticker
pub(super) critical_sections: AsyncTagLockTable<&'static str>,
@ -82,7 +80,6 @@ impl RoutingTableInner {
self_transfer_stats_accounting: TransferStatsAccounting::new(),
self_transfer_stats: TransferStatsDownUp::default(),
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
route_spec_store: None,
critical_sections: AsyncTagLockTable::new(),
opt_active_watch_keepalive_ts: None,
}

View file

@ -160,6 +160,8 @@ impl RoutingTable {
}
}
self.flush().await;
Ok(())
}
}

View file

@ -0,0 +1,17 @@
use super::*;
impl RoutingTable {
// Save routing table to disk
#[instrument(level = "trace", skip(self), err)]
pub async fn flush_task_routine(
&self,
_stop_token: StopToken,
last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
// Simple task, just writes everything to the tablestore
self.flush().await;
Ok(())
}
}

View file

@ -1,5 +1,6 @@
pub mod bootstrap;
pub mod closest_peers_refresh;
pub mod flush;
pub mod kick_buckets;
pub mod peer_minimum_refresh;
pub mod ping_validator;
@ -13,6 +14,9 @@ impl_veilid_log_facility!("rtab");
impl RoutingTable {
pub fn setup_tasks(&self) {
// Set flush tick task
impl_setup_task!(self, Self, flush_task, flush_task_routine);
// Set rolling transfers tick task
impl_setup_task!(
self,
@ -121,6 +125,9 @@ impl RoutingTable {
return Ok(());
};
// Do flush every ROUTING_TABLE_FLUSH_INTERVAL_SECS secs
self.flush_task.tick().await?;
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
self.rolling_transfers_task.tick().await?;
@ -225,6 +232,10 @@ impl RoutingTable {
pub async fn cancel_tasks(&self) {
// Cancel all tasks being ticked
veilid_log!(self debug "stopping flush task");
if let Err(e) = self.flush_task.stop().await {
veilid_log!(self warn "flush_task not stopped: {}", e);
}
veilid_log!(self debug "stopping rolling transfers task");
if let Err(e) = self.rolling_transfers_task.stop().await {
veilid_log!(self warn "rolling_transfers_task not stopped: {}", e);