add ping validator every 10 seconds for active watch nodes

This commit is contained in:
Christien Rioux 2024-04-21 20:24:15 -04:00
parent 80e2007fff
commit 43e6be2912
3 changed files with 70 additions and 4 deletions

View File

@ -33,6 +33,8 @@ pub(crate) struct RoutingTableInner {
/// Async tagged critical sections table
/// Tag: "tick" -> in ticker
pub(super) critical_sections: AsyncTagLockTable<&'static str>,
/// Last time we pinged checked the active watches
pub(super) opt_active_watch_keepalive_ts: Option<Timestamp>,
}
impl RoutingTableInner {
@ -50,6 +52,7 @@ impl RoutingTableInner {
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
route_spec_store: None,
critical_sections: AsyncTagLockTable::new(),
opt_active_watch_keepalive_ts: None,
}
}

View File

@ -2,7 +2,10 @@ use super::*;
/// Keepalive pings are done occasionally to ensure holepunched public dialinfo
/// remains valid, as well as to make sure we remain in any relay node's routing table
const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
const RELAY_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
/// Keepalive pings are done for active watch nodes to make sure they are still there
const ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
/// Ping queue processing depth
const MAX_PARALLEL_PINGS: usize = 16;
@ -15,8 +18,7 @@ type PingValidatorFuture =
SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>;
impl RoutingTable {
// Ping each node in the routing table if they need to be pinged
// to determine their reliability
// Ping the relay to keep it alive, over every protocol it is relaying for us
#[instrument(level = "trace", skip(self, futurequeue), err)]
async fn relay_keepalive_public_internet(
&self,
@ -35,7 +37,7 @@ impl RoutingTable {
let relay_needs_keepalive = opt_relay_keepalive_ts
.map(|kts| {
cur_ts.saturating_sub(kts).as_u64()
>= (KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64)
>= (RELAY_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64)
})
.unwrap_or(true);
@ -118,6 +120,53 @@ impl RoutingTable {
}
Ok(())
}
// Ping the active watch nodes to ensure they are still there
#[instrument(level = "trace", skip(self, futurequeue), err)]
async fn active_watches_keepalive_public_internet(
&self,
cur_ts: Timestamp,
futurequeue: &mut VecDeque<PingValidatorFuture>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
let watches_need_keepalive = {
let mut inner = self.inner.write();
let need = inner
.opt_active_watch_keepalive_ts
.map(|kts| {
cur_ts.saturating_sub(kts).as_u64()
>= (ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64)
})
.unwrap_or(true);
if need {
inner.opt_active_watch_keepalive_ts = Some(cur_ts);
}
need
};
if !watches_need_keepalive {
return Ok(());
}
// Get all the active watches from the storage manager
let storage_manager = self.unlocked_inner.network_manager.storage_manager();
let watch_node_refs = storage_manager.get_active_watch_nodes().await;
for watch_nr in watch_node_refs {
let rpc = rpc.clone();
log_rtab!("--> Watch ping to {:?}", watch_nr);
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(watch_nr)).await }
.instrument(Span::current())
.boxed(),
);
}
Ok(())
}
// Ping each node in the routing table if they need to be pinged
// to determine their reliability
#[instrument(level = "trace", skip(self, futurequeue), err)]
@ -140,6 +189,10 @@ impl RoutingTable {
.await?;
}
// Check active watch keepalives
self.active_watches_keepalive_public_internet(cur_ts, futurequeue)
.await?;
// Just do a single ping with the best protocol for all the other nodes to check for liveness
for nr in node_refs {
let rpc = rpc.clone();

View File

@ -201,6 +201,16 @@ impl StorageManager {
Ok(!inner.offline_subkey_writes.is_empty())
}
/// Get the set of nodes in our active watches
pub async fn get_active_watch_nodes(&self) -> Vec<NodeRef> {
let inner = self.inner.lock().await;
inner
.opened_records
.values()
.filter_map(|v| v.active_watch().map(|aw| aw.watch_node))
.collect()
}
/// Create a local record from scratch with a new owner key, open it, and return the opened descriptor
pub async fn create_record(
&self,