From 43e6be29126e5e322f2dcb8b064405d7c05095aa Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 21 Apr 2024 20:24:15 -0400 Subject: [PATCH] add ping validator every 10 seconds for active watch nodes --- .../src/routing_table/routing_table_inner.rs | 3 + .../src/routing_table/tasks/ping_validator.rs | 61 +++++++++++++++++-- veilid-core/src/storage_manager/mod.rs | 10 +++ 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 80c6ad1d..25ea2659 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -33,6 +33,8 @@ pub(crate) struct RoutingTableInner { /// Async tagged critical sections table /// Tag: "tick" -> in ticker pub(super) critical_sections: AsyncTagLockTable<&'static str>, + /// Last time we pinged checked the active watches + pub(super) opt_active_watch_keepalive_ts: Option, } impl RoutingTableInner { @@ -50,6 +52,7 @@ impl RoutingTableInner { recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), route_spec_store: None, critical_sections: AsyncTagLockTable::new(), + opt_active_watch_keepalive_ts: None, } } diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index 65e37cbd..3cd3d269 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -2,7 +2,10 @@ use super::*; /// Keepalive pings are done occasionally to ensure holepunched public dialinfo /// remains valid, as well as to make sure we remain in any relay node's routing table -const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10; +const RELAY_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10; + +/// Keepalive pings are done for active watch nodes to make sure they are still there +const ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10; /// Ping queue processing depth const MAX_PARALLEL_PINGS: usize = 16; @@ -15,8 +18,7 @@ type PingValidatorFuture = SendPinBoxFuture>>, RPCError>>; impl RoutingTable { - // Ping each node in the routing table if they need to be pinged - // to determine their reliability + // Ping the relay to keep it alive, over every protocol it is relaying for us #[instrument(level = "trace", skip(self, futurequeue), err)] async fn relay_keepalive_public_internet( &self, @@ -35,7 +37,7 @@ impl RoutingTable { let relay_needs_keepalive = opt_relay_keepalive_ts .map(|kts| { cur_ts.saturating_sub(kts).as_u64() - >= (KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64) + >= (RELAY_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64) }) .unwrap_or(true); @@ -118,6 +120,53 @@ impl RoutingTable { } Ok(()) } + + // Ping the active watch nodes to ensure they are still there + #[instrument(level = "trace", skip(self, futurequeue), err)] + async fn active_watches_keepalive_public_internet( + &self, + cur_ts: Timestamp, + futurequeue: &mut VecDeque, + ) -> EyreResult<()> { + let rpc = self.rpc_processor(); + + let watches_need_keepalive = { + let mut inner = self.inner.write(); + let need = inner + .opt_active_watch_keepalive_ts + .map(|kts| { + cur_ts.saturating_sub(kts).as_u64() + >= (ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64) + }) + .unwrap_or(true); + if need { + inner.opt_active_watch_keepalive_ts = Some(cur_ts); + } + need + }; + + if !watches_need_keepalive { + return Ok(()); + } + + // Get all the active watches from the storage manager + let storage_manager = self.unlocked_inner.network_manager.storage_manager(); + let watch_node_refs = storage_manager.get_active_watch_nodes().await; + + for watch_nr in watch_node_refs { + let rpc = rpc.clone(); + + log_rtab!("--> Watch ping to {:?}", watch_nr); + + futurequeue.push_back( + async move { rpc.rpc_call_status(Destination::direct(watch_nr)).await } + .instrument(Span::current()) + .boxed(), + ); + } + Ok(()) + } + // Ping each node in the routing table if they need to be pinged // to determine their reliability #[instrument(level = "trace", skip(self, futurequeue), err)] @@ -140,6 +189,10 @@ impl RoutingTable { .await?; } + // Check active watch keepalives + self.active_watches_keepalive_public_internet(cur_ts, futurequeue) + .await?; + // Just do a single ping with the best protocol for all the other nodes to check for liveness for nr in node_refs { let rpc = rpc.clone(); diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index e701d4bd..e7986b64 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -201,6 +201,16 @@ impl StorageManager { Ok(!inner.offline_subkey_writes.is_empty()) } + /// Get the set of nodes in our active watches + pub async fn get_active_watch_nodes(&self) -> Vec { + let inner = self.inner.lock().await; + inner + .opened_records + .values() + .filter_map(|v| v.active_watch().map(|aw| aw.watch_node)) + .collect() + } + /// Create a local record from scratch with a new owner key, open it, and return the opened descriptor pub async fn create_record( &self,