staggered pings

This commit is contained in:
Christien Rioux 2023-10-31 15:39:00 -04:00
parent d750b7c5c3
commit f47d6402c3

View File

@ -4,6 +4,9 @@ use super::*;
/// remains valid, as well as to make sure we remain in any relay node's routing table
const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
/// Ping queue processing depth
const MAX_PARALLEL_PINGS: usize = 16;
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::FutureExt;
use stop_token::future::FutureExt as StopFutureExt;
@ -14,12 +17,12 @@ type PingValidatorFuture =
impl RoutingTable {
// Ping each node in the routing table if they need to be pinged
// to determine their reliability
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", skip(self, futurequeue), err)]
async fn relay_keepalive_public_internet(
&self,
cur_ts: Timestamp,
relay_nr: NodeRef,
unord: &mut FuturesUnordered<PingValidatorFuture>,
futurequeue: &mut VecDeque<PingValidatorFuture>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
// Get our publicinternet dial info
@ -107,7 +110,7 @@ impl RoutingTable {
#[cfg(not(feature = "network-result-extra"))]
log_rtab!("--> Keepalive ping to {:?}", relay_nr_filtered);
unord.push(
futurequeue.push_back(
async move {
rpc.rpc_call_status(Destination::direct(relay_nr_filtered), true)
.await
@ -120,11 +123,11 @@ impl RoutingTable {
}
// Ping each node in the routing table if they need to be pinged
// to determine their reliability
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", skip(self, futurequeue), err)]
async fn ping_validator_public_internet(
&self,
cur_ts: Timestamp,
unord: &mut FuturesUnordered<PingValidatorFuture>,
futurequeue: &mut VecDeque<PingValidatorFuture>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
@ -136,7 +139,7 @@ impl RoutingTable {
// If this is our relay, let's check for NAT keepalives
if let Some(relay_nr) = opt_relay_nr {
self.relay_keepalive_public_internet(cur_ts, relay_nr, unord)
self.relay_keepalive_public_internet(cur_ts, relay_nr, futurequeue)
.await?;
}
@ -144,7 +147,7 @@ impl RoutingTable {
for nr in node_refs {
let rpc = rpc.clone();
log_rtab!("--> Validator ping to {:?}", nr);
unord.push(
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr), false).await }
.instrument(Span::current())
.boxed(),
@ -156,11 +159,11 @@ impl RoutingTable {
// Ping each node in the LocalNetwork routing domain if they
// need to be pinged to determine their reliability
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", skip(self, futurequeue), err)]
async fn ping_validator_local_network(
&self,
cur_ts: Timestamp,
unord: &mut FuturesUnordered<PingValidatorFuture>,
futurequeue: &mut VecDeque<PingValidatorFuture>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
@ -172,7 +175,7 @@ impl RoutingTable {
let rpc = rpc.clone();
// Just do a single ping with the best protocol for all the nodes
unord.push(
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr), false).await }
.instrument(Span::current())
.boxed(),
@ -191,18 +194,44 @@ impl RoutingTable {
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let mut unord = FuturesUnordered::new();
let mut futurequeue: VecDeque<PingValidatorFuture> = VecDeque::new();
// PublicInternet
self.ping_validator_public_internet(cur_ts, &mut unord)
self.ping_validator_public_internet(cur_ts, &mut futurequeue)
.await?;
// LocalNetwork
self.ping_validator_local_network(cur_ts, &mut unord)
self.ping_validator_local_network(cur_ts, &mut futurequeue)
.await?;
// Wait for ping futures to complete in parallel
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
let mut unord = FuturesUnordered::new();
while !unord.is_empty() || !futurequeue.is_empty() {
#[cfg(feature = "verbose-tracing")]
log_rtab!(debug "Ping validation queue: {} remaining, {} in progress", futurequeue.len(), unord.len());
// Process one unordered futures if we have some
match unord.next().timeout_at(stop_token.clone()).await {
Ok(Some(_)) => {
// Some ping completed
}
Ok(None) => {
// We're empty
}
Err(_) => {
// Timeout means we drop the rest because we were asked to stop
break;
}
}
// Fill unord up to max parallelism
while unord.len() < MAX_PARALLEL_PINGS {
let Some(fq) = futurequeue.pop_front() else {
break;
};
unord.push(fq);
}
}
Ok(())
}