diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index 7a3070d0..fdd54087 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -4,6 +4,9 @@ use super::*; /// remains valid, as well as to make sure we remain in any relay node's routing table const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10; +/// Ping queue processing depth +const MAX_PARALLEL_PINGS: usize = 16; + use futures_util::stream::{FuturesUnordered, StreamExt}; use futures_util::FutureExt; use stop_token::future::FutureExt as StopFutureExt; @@ -14,12 +17,12 @@ type PingValidatorFuture = impl RoutingTable { // Ping each node in the routing table if they need to be pinged // to determine their reliability - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", skip(self, futurequeue), err)] async fn relay_keepalive_public_internet( &self, cur_ts: Timestamp, relay_nr: NodeRef, - unord: &mut FuturesUnordered, + futurequeue: &mut VecDeque, ) -> EyreResult<()> { let rpc = self.rpc_processor(); // Get our publicinternet dial info @@ -107,7 +110,7 @@ impl RoutingTable { #[cfg(not(feature = "network-result-extra"))] log_rtab!("--> Keepalive ping to {:?}", relay_nr_filtered); - unord.push( + futurequeue.push_back( async move { rpc.rpc_call_status(Destination::direct(relay_nr_filtered), true) .await @@ -120,11 +123,11 @@ impl RoutingTable { } // Ping each node in the routing table if they need to be pinged // to determine their reliability - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", skip(self, futurequeue), err)] async fn ping_validator_public_internet( &self, cur_ts: Timestamp, - unord: &mut FuturesUnordered, + futurequeue: &mut VecDeque, ) -> EyreResult<()> { let rpc = self.rpc_processor(); @@ -136,7 +139,7 @@ impl RoutingTable { // If this is our relay, let's check for NAT keepalives if let Some(relay_nr) = opt_relay_nr { - self.relay_keepalive_public_internet(cur_ts, relay_nr, unord) + self.relay_keepalive_public_internet(cur_ts, relay_nr, futurequeue) .await?; } @@ -144,7 +147,7 @@ impl RoutingTable { for nr in node_refs { let rpc = rpc.clone(); log_rtab!("--> Validator ping to {:?}", nr); - unord.push( + futurequeue.push_back( async move { rpc.rpc_call_status(Destination::direct(nr), false).await } .instrument(Span::current()) .boxed(), @@ -156,11 +159,11 @@ impl RoutingTable { // Ping each node in the LocalNetwork routing domain if they // need to be pinged to determine their reliability - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", skip(self, futurequeue), err)] async fn ping_validator_local_network( &self, cur_ts: Timestamp, - unord: &mut FuturesUnordered, + futurequeue: &mut VecDeque, ) -> EyreResult<()> { let rpc = self.rpc_processor(); @@ -172,7 +175,7 @@ impl RoutingTable { let rpc = rpc.clone(); // Just do a single ping with the best protocol for all the nodes - unord.push( + futurequeue.push_back( async move { rpc.rpc_call_status(Destination::direct(nr), false).await } .instrument(Span::current()) .boxed(), @@ -191,18 +194,44 @@ impl RoutingTable { _last_ts: Timestamp, cur_ts: Timestamp, ) -> EyreResult<()> { - let mut unord = FuturesUnordered::new(); + let mut futurequeue: VecDeque = VecDeque::new(); // PublicInternet - self.ping_validator_public_internet(cur_ts, &mut unord) + self.ping_validator_public_internet(cur_ts, &mut futurequeue) .await?; // LocalNetwork - self.ping_validator_local_network(cur_ts, &mut unord) + self.ping_validator_local_network(cur_ts, &mut futurequeue) .await?; // Wait for ping futures to complete in parallel - while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + let mut unord = FuturesUnordered::new(); + + while !unord.is_empty() || !futurequeue.is_empty() { + #[cfg(feature = "verbose-tracing")] + log_rtab!(debug "Ping validation queue: {} remaining, {} in progress", futurequeue.len(), unord.len()); + // Process one unordered futures if we have some + match unord.next().timeout_at(stop_token.clone()).await { + Ok(Some(_)) => { + // Some ping completed + } + Ok(None) => { + // We're empty + } + Err(_) => { + // Timeout means we drop the rest because we were asked to stop + break; + } + } + + // Fill unord up to max parallelism + while unord.len() < MAX_PARALLEL_PINGS { + let Some(fq) = futurequeue.pop_front() else { + break; + }; + unord.push(fq); + } + } Ok(()) }