implement answer and state statistics

This commit is contained in:
Christien Rioux 2024-09-28 21:20:38 -04:00
parent 545b646d8f
commit dc944fe920
28 changed files with 1367 additions and 292 deletions

View File

@ -3,10 +3,10 @@
use super::*;
/// Number of 'existing dialinfo inconsistent' results in the cache during inbound-capable to trigger detection
pub const ADDRESS_INCONSISTENCY_DETECTION_COUNT: usize = 3;
pub const ADDRESS_INCONSISTENCY_DETECTION_COUNT: usize = 5;
/// Number of consistent results in the cache during outbound-only to trigger detection
pub const ADDRESS_CONSISTENCY_DETECTION_COUNT: usize = 3;
pub const ADDRESS_CONSISTENCY_DETECTION_COUNT: usize = 5;
/// Length of consistent/inconsistent result cache for detection
pub const ADDRESS_CHECK_CACHE_SIZE: usize = 10;

View File

@ -609,7 +609,7 @@ impl ConnectionManager {
// Inform the processor of the event
if let Some(conn) = conn {
// If the connection closed while it was protected, report it on the node the connection was established on
// In-use connections will already get reported because they will cause a 'question_lost' stat on the remote node
// In-use connections will already get reported because they will cause a 'lost_answer' stat on the remote node
if let Some(protect_nr) = conn.protected_node_ref() {
// Find the protected address and increase our drop count
if let Some(inner) = self.arc.inner.lock().as_mut() {

View File

@ -11,12 +11,22 @@ use super::*;
/// established connection is always from a real address to another real address.
///
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[derive(Copy, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Flow {
remote: PeerAddress,
local: Option<SocketAddress>,
}
impl fmt::Display for Flow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(local) = &self.local {
write!(f, "{} -> {}", local, self.remote)
} else {
write!(f, "{}", self.remote)
}
}
}
impl Flow {
pub fn new(remote: PeerAddress, local: SocketAddress) -> Self {
assert!(!remote.protocol_type().is_ordered() || !local.address().is_unspecified());

View File

@ -1,3 +1,5 @@
use indent::indent_by;
use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
@ -27,7 +29,7 @@ const NEVER_SEEN_PING_COUNT: u32 = 3;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum BucketEntryDeadReason {
FailedToSend,
CanNotSend,
TooManyLostAnswers,
NoPingResponse,
}
@ -94,7 +96,7 @@ pub(crate) struct LastFlowKey(pub ProtocolType, pub AddressType);
pub(crate) struct LastSenderInfoKey(pub RoutingDomain, pub ProtocolType, pub AddressType);
/// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug, Serialize, Deserialize)]
#[derive(Serialize, Deserialize)]
pub(crate) struct BucketEntryPublicInternet {
/// The PublicInternet node infoe
signed_node_info: Option<Box<SignedNodeInfo>>,
@ -104,8 +106,37 @@ pub(crate) struct BucketEntryPublicInternet {
node_status: Option<NodeStatus>,
}
impl fmt::Debug for BucketEntryPublicInternet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if !f.alternate() {
if let Some(sni) = &self.signed_node_info {
writeln!(f, "signed_node_info:")?;
write!(f, " {}", indent_by(4, format!("{:?}", sni)))?;
} else {
writeln!(f, "signed_node_info: None")?;
}
writeln!(
f,
"last_seen_our_node_info_ts: {}",
self.last_seen_our_node_info_ts
)?;
writeln!(f, "node_status: {:?}", self.node_status)?;
Ok(())
} else {
f.debug_struct("BucketEntryPublicInternet")
.field("signed_node_info", &self.signed_node_info)
.field(
"last_seen_our_node_info_ts",
&self.last_seen_our_node_info_ts,
)
.field("node_status", &self.node_status)
.finish()
}
}
}
/// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug, Serialize, Deserialize)]
#[derive(Serialize, Deserialize)]
pub(crate) struct BucketEntryLocalNetwork {
/// The LocalNetwork node info
signed_node_info: Option<Box<SignedNodeInfo>>,
@ -115,8 +146,37 @@ pub(crate) struct BucketEntryLocalNetwork {
node_status: Option<NodeStatus>,
}
impl fmt::Debug for BucketEntryLocalNetwork {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if !f.alternate() {
if let Some(sni) = &self.signed_node_info {
writeln!(f, "signed_node_info:")?;
write!(f, " {}", indent_by(4, format!("{:?}", sni)))?;
} else {
writeln!(f, "signed_node_info: None")?;
}
writeln!(
f,
"last_seen_our_node_info_ts: {}",
self.last_seen_our_node_info_ts
)?;
writeln!(f, "node_status: {:?}", self.node_status)?;
Ok(())
} else {
f.debug_struct("BucketEntryLocalNetwork")
.field("signed_node_info", &self.signed_node_info)
.field(
"last_seen_our_node_info_ts",
&self.last_seen_our_node_info_ts,
)
.field("node_status", &self.node_status)
.finish()
}
}
}
/// The data associated with each bucket entry
#[derive(Debug, Serialize, Deserialize)]
#[derive(Serialize, Deserialize)]
pub(crate) struct BucketEntryInner {
/// The node ids matching this bucket entry, with the cryptography versions supported by this node as the 'kind' field
validated_node_ids: TypedKeyGroup,
@ -148,6 +208,12 @@ pub(crate) struct BucketEntryInner {
/// The accounting for the transfer statistics
#[serde(skip)]
transfer_stats_accounting: TransferStatsAccounting,
/// The account for the state and reason statistics
#[serde(skip)]
state_stats_accounting: Mutex<StateStatsAccounting>,
/// RPC answer stats accounting
#[serde(skip)]
answer_stats_accounting: AnswerStatsAccounting,
/// If the entry is being punished and should be considered dead
#[serde(skip)]
punishment: Option<PunishmentReason>,
@ -161,6 +227,87 @@ pub(crate) struct BucketEntryInner {
node_ref_tracks: HashMap<usize, backtrace::Backtrace>,
}
impl fmt::Debug for BucketEntryInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if !f.alternate() {
writeln!(f, "validated_node_ids: {}", self.validated_node_ids)?;
writeln!(f, "unsupported_node_ids: {}", self.unsupported_node_ids)?;
writeln!(f, "envelope_support: {:?}", self.envelope_support)?;
writeln!(
f,
"updated_since_last_network_change: {:?}",
self.updated_since_last_network_change
)?;
writeln!(f, "last_flows:")?;
for lf in &self.last_flows {
writeln!(
f,
" {:?}/{:?}: {} @ {}",
lf.0 .0, lf.0 .1, lf.1 .0, lf.1 .1
)?;
}
writeln!(f, "last_sender_info:")?;
for lsi in &self.last_sender_info {
writeln!(
f,
" {:?}/{:?}/{:?}: {}",
lsi.0 .0, lsi.0 .1, lsi.0 .2, lsi.1.socket_address
)?;
}
writeln!(f, "public_internet:")?;
writeln!(
f,
" {}",
indent_by(4, format!("{:?}", self.public_internet))
)?;
writeln!(f, "local_network:")?;
writeln!(
f,
" {}",
indent_by(4, format!("{:?}", self.local_network))
)?;
writeln!(f, "peer_stats:")?;
writeln!(f, " {}", indent_by(4, format!("{:?}", self.peer_stats)))?;
writeln!(
f,
"punishment: {}",
if let Some(punishment) = self.punishment {
format!("{:?}", punishment)
} else {
"None".to_owned()
}
)?;
Ok(())
} else {
let mut out = f.debug_struct("BucketEntryInner");
out.field("validated_node_ids", &self.validated_node_ids)
.field("unsupported_node_ids", &self.unsupported_node_ids)
.field("envelope_support", &self.envelope_support)
.field(
"updated_since_last_network_change",
&self.updated_since_last_network_change,
)
.field("last_flows", &self.last_flows)
.field("last_sender_info", &self.last_sender_info)
.field("public_internet", &self.public_internet)
.field("local_network", &self.local_network)
.field("peer_stats", &self.peer_stats)
.field("latency_stats_accounting", &self.latency_stats_accounting)
.field("transfer_stats_accounting", &self.transfer_stats_accounting)
.field("state_stats_accounting", &self.state_stats_accounting)
.field("answer_stats_accounting", &self.answer_stats_accounting)
.field("punishment", &self.punishment);
#[cfg(feature = "tracking")]
{
out = out.field("next_track_id", &self.next_track_id);
out = out.field("node_ref_tracks", &self.node_ref_tracks);
}
out.finish()
}
}
}
impl BucketEntryInner {
#[cfg(feature = "tracking")]
pub fn track(&mut self) -> usize {
@ -599,7 +746,7 @@ impl BucketEntryInner {
}
pub fn state_reason(&self, cur_ts: Timestamp) -> BucketEntryStateReason {
if let Some(punished_reason) = self.punishment {
let reason = if let Some(punished_reason) = self.punishment {
BucketEntryStateReason::Punished(punished_reason)
} else if let Some(dead_reason) = self.check_dead(cur_ts) {
BucketEntryStateReason::Dead(dead_reason)
@ -607,7 +754,14 @@ impl BucketEntryInner {
BucketEntryStateReason::Unreliable(unreliable_reason)
} else {
BucketEntryStateReason::Reliable
}
};
// record this reason
self.state_stats_accounting
.lock()
.record_state_reason(cur_ts, reason);
reason
}
pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
@ -687,6 +841,18 @@ impl BucketEntryInner {
self.peer_stats.latency = Some(self.latency_stats_accounting.record_latency(latency));
}
// Called every UPDATE_STATE_STATS_SECS seconds
pub(super) fn update_state_stats(&mut self) {
if let Some(state_stats) = self.state_stats_accounting.lock().take_stats() {
self.peer_stats.state = state_stats;
}
}
// called every ROLLING_ANSWERS_INTERVAL_SECS seconds
pub(super) fn roll_answer_stats(&mut self, cur_ts: Timestamp) {
self.peer_stats.rpc_stats.answer = self.answer_stats_accounting.roll_answers(cur_ts);
}
///// state machine handling
pub(super) fn check_unreliable(
&self,
@ -720,7 +886,7 @@ impl BucketEntryInner {
pub(super) fn check_dead(&self, cur_ts: Timestamp) -> Option<BucketEntryDeadReason> {
// If we have failed to send NEVER_REACHED_PING_COUNT times in a row, the node is dead
if self.peer_stats.rpc_stats.failed_to_send >= NEVER_SEEN_PING_COUNT {
return Some(BucketEntryDeadReason::FailedToSend);
return Some(BucketEntryDeadReason::CanNotSend);
}
match self.peer_stats.rpc_stats.last_seen_ts {
@ -879,6 +1045,7 @@ impl BucketEntryInner {
pub(super) fn question_sent(&mut self, ts: Timestamp, bytes: ByteCount, expects_answer: bool) {
self.transfer_stats_accounting.add_up(bytes);
self.answer_stats_accounting.record_question(ts);
self.peer_stats.rpc_stats.messages_sent += 1;
self.peer_stats.rpc_stats.failed_to_send = 0;
if expects_answer {
@ -898,13 +1065,16 @@ impl BucketEntryInner {
}
pub(super) fn answer_rcvd(&mut self, send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount) {
self.transfer_stats_accounting.add_down(bytes);
self.answer_stats_accounting.record_answer(recv_ts);
self.peer_stats.rpc_stats.messages_rcvd += 1;
self.peer_stats.rpc_stats.questions_in_flight -= 1;
self.record_latency(recv_ts.saturating_sub(send_ts));
self.touch_last_seen(recv_ts);
self.peer_stats.rpc_stats.recent_lost_answers = 0;
}
pub(super) fn question_lost(&mut self) {
pub(super) fn lost_answer(&mut self) {
let cur_ts = Timestamp::now();
self.answer_stats_accounting.record_lost_answer(cur_ts);
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
self.peer_stats.rpc_stats.questions_in_flight -= 1;
self.peer_stats.rpc_stats.recent_lost_answers += 1;
@ -965,9 +1135,12 @@ impl BucketEntry {
rpc_stats: RPCStats::default(),
latency: None,
transfer: TransferStatsDownUp::default(),
state: StateStats::default(),
},
latency_stats_accounting: LatencyStatsAccounting::new(),
transfer_stats_accounting: TransferStatsAccounting::new(),
state_stats_accounting: Mutex::new(StateStatsAccounting::new()),
answer_stats_accounting: AnswerStatsAccounting::new(),
punishment: None,
#[cfg(feature = "tracking")]
next_track_id: 0,

View File

@ -1,4 +1,5 @@
use super::*;
use indent::indent_by;
use routing_table::tasks::bootstrap::BOOTSTRAP_TXT_VERSION_0;
impl RoutingTable {
@ -110,15 +111,21 @@ impl RoutingTable {
let mut out = String::new();
if published {
out += &format!(
"{:?} Published PeerInfo:\n {:#?}\n",
"{:?} Published PeerInfo:\n {}\n",
routing_domain,
self.get_published_peer_info(routing_domain)
indent_by(
4,
format!("{:?}", self.get_published_peer_info(routing_domain))
)
);
} else {
out += &format!(
"{:?} Current PeerInfo:\n {:#?}\n",
"{:?} Current PeerInfo:\n {}\n",
routing_domain,
self.get_current_peer_info(routing_domain)
indent_by(
4,
format!("{:?}", self.get_current_peer_info(routing_domain))
)
);
}
out
@ -138,7 +145,7 @@ impl RoutingTable {
//
},
BucketEntryStateReason::Dead(d) => match d {
BucketEntryDeadReason::FailedToSend => "DFSEND",
BucketEntryDeadReason::CanNotSend => "DFSEND",
BucketEntryDeadReason::TooManyLostAnswers => "DALOST",
BucketEntryDeadReason::NoPingResponse => "DNOPNG",
},
@ -295,9 +302,9 @@ impl RoutingTable {
out += &node_ref.operate(|_rti, e| {
let state_reason = e.state_reason(cur_ts);
format!(
"state: {}\n{:#?}\n",
"{:?}\nstate: {}\n",
e,
Self::format_state_reason(state_reason),
e
)
});
out

View File

@ -100,6 +100,10 @@ pub(crate) struct RoutingTableUnlockedInner {
kick_queue: Mutex<BTreeSet<BucketIndex>>,
/// Background process for computing statistics
rolling_transfers_task: TickTask<EyreReport>,
/// Background process for computing statistics
update_state_stats_task: TickTask<EyreReport>,
/// Background process for computing statistics
rolling_answers_task: TickTask<EyreReport>,
/// Background process to purge dead routing table entries when necessary
kick_buckets_task: TickTask<EyreReport>,
/// Background process to get our initial routing table
@ -108,8 +112,14 @@ pub(crate) struct RoutingTableUnlockedInner {
peer_minimum_refresh_task: TickTask<EyreReport>,
/// Background process to ensure we have enough nodes close to our own in our routing table
closest_peers_refresh_task: TickTask<EyreReport>,
/// Background process to check nodes to see if they are still alive and for reliability
ping_validator_task: TickTask<EyreReport>,
/// Background process to check PublicInternet nodes to see if they are still alive and for reliability
ping_validator_public_internet_task: TickTask<EyreReport>,
/// Background process to check LocalNetwork nodes to see if they are still alive and for reliability
ping_validator_local_network_task: TickTask<EyreReport>,
/// Background process to check PublicInternet relay nodes to see if they are still alive and for reliability
ping_validator_public_internet_relay_task: TickTask<EyreReport>,
/// Background process to check Active Watch nodes to see if they are still alive and for reliability
ping_validator_active_watch_task: TickTask<EyreReport>,
/// Background process to keep relays up
relay_management_task: TickTask<EyreReport>,
/// Background process to keep private routes up
@ -216,6 +226,14 @@ impl RoutingTable {
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
update_state_stats_task: TickTask::new(
"update_state_stats_task",
UPDATE_STATE_STATS_INTERVAL_SECS,
),
rolling_answers_task: TickTask::new(
"rolling_answers_task",
ROLLING_ANSWER_INTERVAL_SECS,
),
kick_buckets_task: TickTask::new("kick_buckets_task", 1),
bootstrap_task: TickTask::new("bootstrap_task", 1),
peer_minimum_refresh_task: TickTask::new("peer_minimum_refresh_task", 1),
@ -223,7 +241,19 @@ impl RoutingTable {
"closest_peers_refresh_task",
c.network.dht.min_peer_refresh_time_ms,
),
ping_validator_task: TickTask::new("ping_validator_task", 1),
ping_validator_public_internet_task: TickTask::new(
"ping_validator_public_internet_task",
1,
),
ping_validator_local_network_task: TickTask::new(
"ping_validator_local_network_task",
1,
),
ping_validator_public_internet_relay_task: TickTask::new(
"ping_validator_public_internet_relay_task",
1,
),
ping_validator_active_watch_task: TickTask::new("ping_validator_active_watch_task", 1),
relay_management_task: TickTask::new(
"relay_management_task",
RELAY_MANAGEMENT_INTERVAL_SECS,

View File

@ -91,12 +91,16 @@ pub trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait {
.unwrap_or(0u64.into())
})
}
fn has_seen_our_node_info_ts(
&self,
routing_domain: RoutingDomain,
our_node_info_ts: Timestamp,
) -> bool {
self.operate(|_rti, e| e.has_seen_our_node_info_ts(routing_domain, our_node_info_ts))
fn has_seen_our_node_info_ts(&self, routing_domain: RoutingDomain) -> bool {
self.operate(|rti, e| {
let Some(our_node_info_ts) = rti
.get_published_peer_info(routing_domain)
.map(|pi| pi.signed_node_info().timestamp())
else {
return false;
};
e.has_seen_our_node_info_ts(routing_domain, our_node_info_ts)
})
}
fn set_seen_our_node_info_ts(&self, routing_domain: RoutingDomain, seen_ts: Timestamp) {
self.operate_mut(|_rti, e| e.set_seen_our_node_info_ts(routing_domain, seen_ts));
@ -287,9 +291,9 @@ pub trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait {
e.answer_rcvd(send_ts, recv_ts, bytes);
})
}
fn stats_question_lost(&self) {
fn stats_lost_answer(&self) {
self.operate_mut(|_rti, e| {
e.question_lost();
e.lost_answer();
})
}
fn stats_failed_to_send(&self, ts: Timestamp, expects_answer: bool) {

View File

@ -1122,8 +1122,7 @@ impl RouteSpecStore {
// We can optimize the peer info in this safety route if it has been successfully
// communicated over either via an outbound test, or used as a private route inbound
// and we are replying over the same route as our safety route outbound
let optimize = safety_rssd.get_stats().last_tested_ts.is_some()
|| safety_rssd.get_stats().last_received_ts.is_some();
let optimize = safety_rssd.get_stats().last_known_valid_ts.is_some();
// Get the first hop noderef of the safety route
let first_hop = safety_rssd.hop_node_ref(0).unwrap();
@ -1492,10 +1491,7 @@ impl RouteSpecStore {
// See if we can optimize this compilation yet
// We don't want to include full nodeinfo if we don't have to
let optimized = optimized.unwrap_or(
rssd.get_stats().last_tested_ts.is_some()
|| rssd.get_stats().last_received_ts.is_some(),
);
let optimized = optimized.unwrap_or(rssd.get_stats().last_known_valid_ts.is_some());
let rsd = rssd
.get_route_by_key(key)
@ -1519,10 +1515,7 @@ impl RouteSpecStore {
// See if we can optimize this compilation yet
// We don't want to include full nodeinfo if we don't have to
let optimized = optimized.unwrap_or(
rssd.get_stats().last_tested_ts.is_some()
|| rssd.get_stats().last_received_ts.is_some(),
);
let optimized = optimized.unwrap_or(rssd.get_stats().last_known_valid_ts.is_some());
let mut out = Vec::new();
for (key, rsd) in rssd.iter_route_set() {
@ -1761,6 +1754,17 @@ impl RouteSpecStore {
inner.cache.roll_transfers(last_ts, cur_ts);
}
/// Process answer statistics
pub fn roll_answers(&self, cur_ts: Timestamp) {
let inner = &mut *self.inner.lock();
// Roll transfers for locally allocated routes
inner.content.roll_answers(cur_ts);
// Roll transfers for remote private routes
inner.cache.roll_answers(cur_ts);
}
/// Convert private route list to binary blob
pub fn private_routes_to_blob(private_routes: &[PrivateRoute]) -> VeilidAPIResult<Vec<u8>> {
let mut buffer = vec![];

View File

@ -365,6 +365,12 @@ impl RouteSpecStoreCache {
v.get_stats_mut().roll_transfers(last_ts, cur_ts);
}
}
/// Roll answer statistics
pub fn roll_answers(&mut self, cur_ts: Timestamp) {
for (_k, v) in self.remote_private_route_set_cache.iter_mut() {
v.get_stats_mut().roll_answers(cur_ts);
}
}
}
impl Default for RouteSpecStoreCache {

View File

@ -122,4 +122,10 @@ impl RouteSpecStoreContent {
rssd.get_stats_mut().roll_transfers(last_ts, cur_ts);
}
}
/// Roll answer statistics
pub fn roll_answers(&mut self, cur_ts: Timestamp) {
for rssd in self.details.values_mut() {
rssd.get_stats_mut().roll_answers(cur_ts);
}
}
}

View File

@ -7,28 +7,36 @@ pub(crate) struct RouteStats {
pub failed_to_send: u32,
/// Questions lost
#[serde(skip)]
pub questions_lost: u32,
pub lost_answers: u32,
/// Timestamp of when the route was created
pub created_ts: Timestamp,
/// Timestamp of when the route was last checked for validity
/// Timestamp of when the route was last checked for validity or received traffic
#[serde(skip)]
pub last_tested_ts: Option<Timestamp>,
pub last_known_valid_ts: Option<Timestamp>,
/// Timestamp of when the route was last sent to
#[serde(skip)]
pub last_sent_ts: Option<Timestamp>,
/// Timestamp of when the route was last received over
/// Timestamp of when the route last received a question or statement
#[serde(skip)]
pub last_received_ts: Option<Timestamp>,
pub last_rcvd_question_ts: Option<Timestamp>,
/// Timestamp of when the route last received an answer
#[serde(skip)]
pub last_rcvd_answer_ts: Option<Timestamp>,
/// Transfers up and down
pub transfer_stats_down_up: TransferStatsDownUp,
/// Latency stats
pub latency_stats: LatencyStats,
/// Answer stats
pub answer_stats: AnswerStats,
/// Accounting mechanism for this route's RPC latency
#[serde(skip)]
latency_stats_accounting: LatencyStatsAccounting,
/// Accounting mechanism for the bandwidth across this route
#[serde(skip)]
transfer_stats_accounting: TransferStatsAccounting,
/// Accounting mechanism for this route's RPC answers
#[serde(skip)]
answer_stats_accounting: AnswerStatsAccounting,
}
impl RouteStats {
@ -44,16 +52,28 @@ impl RouteStats {
self.failed_to_send += 1;
}
/// Mark a route as having lost a question
pub fn record_question_lost(&mut self) {
self.questions_lost += 1;
/// Mark a route as having lost an answer
pub fn record_lost_answer(&mut self) {
let cur_ts = Timestamp::now();
self.lost_answers += 1;
self.answer_stats_accounting.record_lost_answer(cur_ts);
}
/// Mark a route as having received something
pub fn record_received(&mut self, cur_ts: Timestamp, bytes: ByteCount) {
self.last_received_ts = Some(cur_ts);
self.last_tested_ts = Some(cur_ts);
/// Mark a route as having received a question or statement
pub fn record_question_received(&mut self, cur_ts: Timestamp, bytes: ByteCount) {
self.last_rcvd_question_ts = Some(cur_ts);
self.last_known_valid_ts = Some(cur_ts);
self.transfer_stats_accounting.add_down(bytes);
self.answer_stats_accounting.record_question(cur_ts);
}
/// Mark a route as having received an answer
pub fn record_answer_received(&mut self, cur_ts: Timestamp, bytes: ByteCount) {
self.last_rcvd_answer_ts = Some(cur_ts);
self.last_known_valid_ts = Some(cur_ts);
self.lost_answers = 0;
self.transfer_stats_accounting.add_down(bytes);
self.answer_stats_accounting.record_answer(cur_ts);
}
/// Mark a route as having been sent to
@ -70,22 +90,16 @@ impl RouteStats {
self.latency_stats = self.latency_stats_accounting.record_latency(latency);
}
/// Mark a route as having been tested
pub fn record_tested(&mut self, cur_ts: Timestamp) {
self.last_tested_ts = Some(cur_ts);
// Reset question_lost and failed_to_send if we test clean
self.failed_to_send = 0;
self.questions_lost = 0;
}
/// Roll transfers for these route stats
pub fn roll_transfers(&mut self, last_ts: Timestamp, cur_ts: Timestamp) {
self.transfer_stats_accounting.roll_transfers(
last_ts,
cur_ts,
&mut self.transfer_stats_down_up,
)
);
}
pub fn roll_answers(&mut self, cur_ts: Timestamp) {
self.answer_stats = self.answer_stats_accounting.roll_answers(cur_ts);
}
/// Get the latency stats
@ -101,24 +115,25 @@ impl RouteStats {
/// Reset stats when network restarts
pub fn reset(&mut self) {
self.last_tested_ts = None;
self.last_known_valid_ts = None;
self.last_sent_ts = None;
self.last_received_ts = None;
self.last_rcvd_question_ts = None;
self.last_rcvd_answer_ts = None;
self.failed_to_send = 0;
self.questions_lost = 0;
self.lost_answers = 0;
}
/// Check if a route needs testing
pub fn needs_testing(&self, cur_ts: Timestamp) -> bool {
// Has the route had any failures lately?
if self.questions_lost > 0 || self.failed_to_send > 0 {
if self.lost_answers > 0 || self.failed_to_send > 0 {
// If so, always test
return true;
}
// Has the route been tested within the idle time we'd want to check things?
// (also if we've received successfully over the route, this will get set)
if let Some(last_tested_ts) = self.last_tested_ts {
if let Some(last_tested_ts) = self.last_known_valid_ts {
if cur_ts.saturating_sub(last_tested_ts)
> TimestampDuration::new(ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64)
{

View File

@ -477,6 +477,7 @@ impl RoutingTableInner {
None
}
// Collect all entries that are 'needs_ping' and have some node info making them reachable somehow
pub(super) fn get_nodes_needing_ping(
&self,
outer_self: RoutingTable,
@ -487,24 +488,36 @@ impl RoutingTableInner {
.get_published_peer_info(routing_domain)
.map(|pi| pi.signed_node_info().timestamp());
// Collect all entries that are 'needs_ping' and have some node info making them reachable somehow
let mut node_refs = Vec::<FilteredNodeRef>::with_capacity(self.bucket_entry_count());
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
let entry_needs_ping = |e: &BucketEntryInner| {
let mut filters = VecDeque::new();
// Remove our own node from the results
let filter_self =
Box::new(move |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| v.is_some())
as RoutingTableEntryFilter;
filters.push_back(filter_self);
let filter_ping = Box::new(
move |rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
let entry = v.unwrap();
entry.with_inner(|e| {
// If this entry isn't in the routing domain we are checking, don't include it
if !e.exists_in_routing_domain(rti, routing_domain) {
return false;
}
// If we don't have node status for this node, then we should ping it to get some node status
if e.has_node_info(routing_domain.into()) && e.node_status(routing_domain).is_none()
if e.has_node_info(routing_domain.into())
&& e.node_status(routing_domain).is_none()
{
return true;
}
// If this entry needs a ping because this node hasn't seen our latest node info, then do it
if opt_own_node_info_ts.is_some()
&& !e.has_seen_our_node_info_ts(routing_domain, opt_own_node_info_ts.unwrap())
&& !e.has_seen_our_node_info_ts(
routing_domain,
opt_own_node_info_ts.unwrap(),
)
{
return true;
}
@ -515,19 +528,67 @@ impl RoutingTableInner {
}
false
})
},
) as RoutingTableEntryFilter;
filters.push_back(filter_ping);
// Sort by least recently contacted
let compare = |_rti: &RoutingTableInner,
a_entry: &Option<Arc<BucketEntry>>,
b_entry: &Option<Arc<BucketEntry>>| {
// same nodes are always the same
if let Some(a_entry) = a_entry {
if let Some(b_entry) = b_entry {
if Arc::ptr_eq(a_entry, b_entry) {
return core::cmp::Ordering::Equal;
}
}
} else if b_entry.is_none() {
return core::cmp::Ordering::Equal;
}
// our own node always comes last (should not happen, here for completeness)
if a_entry.is_none() {
return core::cmp::Ordering::Greater;
}
if b_entry.is_none() {
return core::cmp::Ordering::Less;
}
// Sort by least recently contacted regardless of reliability
// If something needs a ping it should get it in the order of need
let ae = a_entry.as_ref().unwrap();
let be = b_entry.as_ref().unwrap();
ae.with_inner(|ae| {
be.with_inner(|be| {
let ca = ae
.peer_stats()
.rpc_stats
.last_question_ts
.unwrap_or(Timestamp::new(0))
.as_u64();
let cb = be
.peer_stats()
.rpc_stats
.last_question_ts
.unwrap_or(Timestamp::new(0))
.as_u64();
ca.cmp(&cb)
})
})
};
if entry.with_inner(entry_needs_ping) {
node_refs.push(FilteredNodeRef::new(
let transform = |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
FilteredNodeRef::new(
outer_self.clone(),
entry,
v.unwrap().clone(),
NodeRefFilter::new().with_routing_domain(routing_domain),
Sequencing::default(),
));
}
Option::<()>::None
});
node_refs
)
};
self.find_peers_with_sort_and_filter(usize::MAX, cur_ts, filters, compare, transform)
}
#[expect(dead_code)]
@ -1017,7 +1078,7 @@ impl RoutingTableInner {
&'b Option<Arc<BucketEntry>>,
&'b Option<Arc<BucketEntry>>,
) -> core::cmp::Ordering,
T: for<'r, 't> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O,
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O,
{
// collect all the nodes for sorting
let mut nodes =

View File

@ -1,5 +1,4 @@
use crate::*;
use alloc::collections::VecDeque;
use super::*;
// Latency entry is per round-trip packet (ping or data)
// - Size is number of entries
@ -11,6 +10,17 @@ const ROLLING_LATENCIES_SIZE: usize = 10;
const ROLLING_TRANSFERS_SIZE: usize = 10;
pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 1;
// State entry is per state reason change
// - Size is number of entries
const ROLLING_STATE_REASON_SPAN_SIZE: usize = 32;
pub const UPDATE_STATE_STATS_INTERVAL_SECS: u32 = 1;
// Answer entries are in counts per interval
// - Size is number of entries
// - Interval is number of seconds in each entry
const ROLLING_ANSWERS_SIZE: usize = 10;
pub const ROLLING_ANSWER_INTERVAL_SECS: u32 = 60;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct TransferCount {
down: ByteCount,
@ -73,10 +83,12 @@ impl TransferStatsAccounting {
transfer_stats.up.average += bpsu;
}
let len = self.rolling_transfers.len() as u64;
if len > 0 {
transfer_stats.down.average /= len;
transfer_stats.up.average /= len;
}
}
}
#[derive(Debug, Clone, Default)]
pub struct LatencyStatsAccounting {
@ -90,7 +102,7 @@ impl LatencyStatsAccounting {
}
}
pub fn record_latency(&mut self, latency: TimestampDuration) -> veilid_api::LatencyStats {
pub fn record_latency(&mut self, latency: TimestampDuration) -> LatencyStats {
while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE {
self.rolling_latencies.pop_front();
}
@ -107,8 +119,274 @@ impl LatencyStatsAccounting {
ls.average += *rl;
}
let len = self.rolling_latencies.len() as u64;
if len > 0 {
ls.average /= len;
}
ls
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StateReasonSpan {
state_reason: BucketEntryStateReason,
enter_ts: Timestamp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StateSpan {
state: BucketEntryState,
enter_ts: Timestamp,
}
#[derive(Debug, Clone, Default)]
pub struct StateStatsAccounting {
rolling_state_reason_spans: VecDeque<StateReasonSpan>,
last_stats: Option<StateStats>,
}
impl StateStatsAccounting {
pub fn new() -> Self {
Self {
rolling_state_reason_spans: VecDeque::new(),
last_stats: None,
}
}
fn make_stats(&self, cur_ts: Timestamp) -> StateStats {
let mut ss = StateStats::default();
let srs = &mut ss.reason;
let mut last_ts = cur_ts;
for rss in self.rolling_state_reason_spans.iter().rev() {
let span_dur = last_ts.saturating_sub(rss.enter_ts);
match BucketEntryState::from(rss.state_reason) {
BucketEntryState::Punished => ss.punished += span_dur,
BucketEntryState::Dead => ss.dead += span_dur,
BucketEntryState::Unreliable => ss.unreliable += span_dur,
BucketEntryState::Reliable => ss.reliable += span_dur,
}
match rss.state_reason {
BucketEntryStateReason::Punished(_) => {
// Ignore punished nodes for now
}
BucketEntryStateReason::Dead(bucket_entry_dead_reason) => {
match bucket_entry_dead_reason {
BucketEntryDeadReason::CanNotSend => srs.can_not_send += span_dur,
BucketEntryDeadReason::TooManyLostAnswers => {
srs.too_many_lost_answers += span_dur
}
BucketEntryDeadReason::NoPingResponse => srs.no_ping_response += span_dur,
}
}
BucketEntryStateReason::Unreliable(bucket_entry_unreliable_reason) => {
match bucket_entry_unreliable_reason {
BucketEntryUnreliableReason::FailedToSend => srs.failed_to_send += span_dur,
BucketEntryUnreliableReason::LostAnswers => srs.lost_answers += span_dur,
BucketEntryUnreliableReason::NotSeenConsecutively => {
srs.not_seen_consecutively += span_dur
}
BucketEntryUnreliableReason::InUnreliablePingSpan => {
srs.in_unreliable_ping_span += span_dur
}
}
}
BucketEntryStateReason::Reliable => {
// Reliable nodes don't have a reason other than lack of unreliability
}
}
last_ts = rss.enter_ts;
}
ss.span = cur_ts.saturating_sub(last_ts);
ss
}
pub fn take_stats(&mut self) -> Option<StateStats> {
self.last_stats.take()
}
pub fn record_state_reason(&mut self, cur_ts: Timestamp, state_reason: BucketEntryStateReason) {
let new_span = if let Some(cur_span) = self.rolling_state_reason_spans.back() {
if state_reason != cur_span.state_reason {
while self.rolling_state_reason_spans.len() >= ROLLING_STATE_REASON_SPAN_SIZE {
self.rolling_state_reason_spans.pop_front();
}
true
} else {
false
}
} else {
true
};
if new_span {
self.last_stats = Some(self.make_stats(cur_ts));
self.rolling_state_reason_spans.push_back(StateReasonSpan {
state_reason,
enter_ts: cur_ts,
});
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct AnswerSpan {
enter_ts: Timestamp,
questions: u32,
answers: u32,
lost_answers: u32,
current_consecutive_answers: u32,
current_consecutive_lost_answers: u32,
consecutive_answers_maximum: u32,
consecutive_answers_total: u32,
consecutive_answers_count: u32,
consecutive_answers_minimum: u32,
consecutive_lost_answers_maximum: u32,
consecutive_lost_answers_total: u32,
consecutive_lost_answers_count: u32,
consecutive_lost_answers_minimum: u32,
}
impl AnswerSpan {
pub fn new(cur_ts: Timestamp) -> Self {
AnswerSpan {
enter_ts: cur_ts,
questions: 0,
answers: 0,
lost_answers: 0,
current_consecutive_answers: 0,
current_consecutive_lost_answers: 0,
consecutive_answers_maximum: 0,
consecutive_answers_total: 0,
consecutive_answers_count: 0,
consecutive_answers_minimum: 0,
consecutive_lost_answers_maximum: 0,
consecutive_lost_answers_total: 0,
consecutive_lost_answers_count: 0,
consecutive_lost_answers_minimum: 0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct AnswerStatsAccounting {
rolling_answer_spans: VecDeque<AnswerSpan>,
}
impl AnswerStatsAccounting {
pub fn new() -> Self {
Self {
rolling_answer_spans: VecDeque::new(),
}
}
fn current_span(&mut self, cur_ts: Timestamp) -> &mut AnswerSpan {
if self.rolling_answer_spans.is_empty() {
self.rolling_answer_spans.push_back(AnswerSpan::new(cur_ts));
}
self.rolling_answer_spans.front_mut().unwrap()
}
fn make_stats(&self, cur_ts: Timestamp) -> AnswerStats {
let mut questions = 0u32;
let mut answers = 0u32;
let mut lost_answers = 0u32;
let mut consecutive_answers_maximum = 0u32;
let mut consecutive_answers_average = 0u32;
let mut consecutive_answers_minimum = u32::MAX;
let mut consecutive_lost_answers_maximum = 0u32;
let mut consecutive_lost_answers_average = 0u32;
let mut consecutive_lost_answers_minimum = u32::MAX;
let mut last_ts = cur_ts;
for ras in self.rolling_answer_spans.iter().rev() {
questions += ras.questions;
answers += ras.answers;
lost_answers += ras.lost_answers;
consecutive_answers_maximum.max_assign(ras.consecutive_answers_maximum);
consecutive_answers_minimum.min_assign(ras.consecutive_answers_minimum);
consecutive_answers_average += if ras.consecutive_answers_total > 0 {
ras.consecutive_answers_count / ras.consecutive_answers_total
} else {
0
};
consecutive_lost_answers_maximum.max_assign(ras.consecutive_lost_answers_maximum);
consecutive_lost_answers_minimum.min_assign(ras.consecutive_lost_answers_minimum);
consecutive_lost_answers_average += if ras.consecutive_lost_answers_total > 0 {
ras.consecutive_lost_answers_count / ras.consecutive_lost_answers_total
} else {
0
};
last_ts = ras.enter_ts;
}
let len = self.rolling_answer_spans.len() as u32;
if len > 0 {
consecutive_answers_average /= len;
consecutive_lost_answers_average /= len;
}
let span = cur_ts.saturating_sub(last_ts);
AnswerStats {
span,
questions,
answers,
lost_answers,
consecutive_answers_maximum,
consecutive_answers_average,
consecutive_answers_minimum,
consecutive_lost_answers_maximum,
consecutive_lost_answers_average,
consecutive_lost_answers_minimum,
}
}
pub fn roll_answers(&mut self, cur_ts: Timestamp) -> AnswerStats {
let stats = self.make_stats(cur_ts);
while self.rolling_answer_spans.len() >= ROLLING_ANSWERS_SIZE {
self.rolling_answer_spans.pop_front();
}
self.rolling_answer_spans.push_back(AnswerSpan::new(cur_ts));
stats
}
pub fn record_question(&mut self, cur_ts: Timestamp) {
let cas = self.current_span(cur_ts);
cas.questions += 1;
}
pub fn record_answer(&mut self, cur_ts: Timestamp) {
let cas = self.current_span(cur_ts);
cas.answers += 1;
if cas.current_consecutive_lost_answers > 0 {
cas.consecutive_lost_answers_maximum
.max_assign(cas.current_consecutive_lost_answers);
cas.consecutive_lost_answers_minimum
.min_assign(cas.current_consecutive_lost_answers);
cas.consecutive_lost_answers_total += cas.current_consecutive_lost_answers;
cas.consecutive_lost_answers_count += 1;
cas.current_consecutive_lost_answers = 0;
}
cas.current_consecutive_answers = 1;
}
pub fn record_lost_answer(&mut self, cur_ts: Timestamp) {
let cas = self.current_span(cur_ts);
cas.lost_answers += 1;
if cas.current_consecutive_answers > 0 {
cas.consecutive_answers_maximum
.max_assign(cas.current_consecutive_answers);
cas.consecutive_answers_minimum
.min_assign(cas.current_consecutive_answers);
cas.consecutive_answers_total += cas.current_consecutive_answers;
cas.consecutive_answers_count += 1;
cas.current_consecutive_answers = 0;
}
cas.current_consecutive_lost_answers = 1;
}
}

View File

@ -5,7 +5,7 @@ pub mod peer_minimum_refresh;
pub mod ping_validator;
pub mod private_route_management;
pub mod relay_management;
pub mod rolling_transfers;
pub mod update_statistics;
use super::*;
@ -25,6 +25,34 @@ impl RoutingTable {
});
}
// Set update state stats tick task
{
let this = self.clone();
self.unlocked_inner
.update_state_stats_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().update_state_stats_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
// Set rolling answers tick task
{
let this = self.clone();
self.unlocked_inner
.rolling_answers_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().rolling_answers_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
// Set kick buckets tick task
{
let this = self.clone();
@ -67,13 +95,58 @@ impl RoutingTable {
});
}
// Set ping validator tick task
// Set ping validator PublicInternet tick task
{
let this = self.clone();
self.unlocked_inner
.ping_validator_task
.ping_validator_public_internet_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().ping_validator_task_routine(
Box::pin(this.clone().ping_validator_public_internet_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
// Set ping validator LocalNetwork tick task
{
let this = self.clone();
self.unlocked_inner
.ping_validator_local_network_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().ping_validator_local_network_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
// Set ping validator PublicInternet Relay tick task
{
let this = self.clone();
self.unlocked_inner
.ping_validator_public_internet_relay_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.ping_validator_public_internet_relay_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
),
)
});
}
// Set ping validator Active Watch tick task
{
let this = self.clone();
self.unlocked_inner
.ping_validator_active_watch_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().ping_validator_active_watch_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
@ -126,6 +199,12 @@ impl RoutingTable {
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
self.unlocked_inner.rolling_transfers_task.tick().await?;
// Do state stats update every UPDATE_STATE_STATS_INTERVAL_SECS secs
self.unlocked_inner.update_state_stats_task.tick().await?;
// Do rolling answers every ROLLING_ANSWER_INTERVAL_SECS secs
self.unlocked_inner.rolling_answers_task.tick().await?;
// Kick buckets task
let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len();
if kick_bucket_queue_count > 0 {
@ -165,7 +244,22 @@ impl RoutingTable {
}
// Ping validate some nodes to groom the table
self.unlocked_inner.ping_validator_task.tick().await?;
self.unlocked_inner
.ping_validator_public_internet_task
.tick()
.await?;
self.unlocked_inner
.ping_validator_local_network_task
.tick()
.await?;
self.unlocked_inner
.ping_validator_public_internet_relay_task
.tick()
.await?;
self.unlocked_inner
.ping_validator_active_watch_task
.tick()
.await?;
// Run the relay management task
self.unlocked_inner.relay_management_task.tick().await?;
@ -212,6 +306,14 @@ impl RoutingTable {
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
error!("rolling_transfers_task not stopped: {}", e);
}
log_rtab!(debug "stopping update state stats task");
if let Err(e) = self.unlocked_inner.update_state_stats_task.stop().await {
error!("update_state_stats_task not stopped: {}", e);
}
log_rtab!(debug "stopping rolling answers task");
if let Err(e) = self.unlocked_inner.rolling_answers_task.stop().await {
error!("rolling_answers_task not stopped: {}", e);
}
log_rtab!(debug "stopping kick buckets task");
if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await {
error!("kick_buckets_task not stopped: {}", e);
@ -224,10 +326,44 @@ impl RoutingTable {
if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await {
error!("peer_minimum_refresh_task not stopped: {}", e);
}
log_rtab!(debug "stopping ping_validator task");
if let Err(e) = self.unlocked_inner.ping_validator_task.stop().await {
error!("ping_validator_task not stopped: {}", e);
log_rtab!(debug "stopping ping_validator tasks");
if let Err(e) = self
.unlocked_inner
.ping_validator_public_internet_task
.stop()
.await
{
error!("ping_validator_public_internet_task not stopped: {}", e);
}
if let Err(e) = self
.unlocked_inner
.ping_validator_local_network_task
.stop()
.await
{
error!("ping_validator_local_network_task not stopped: {}", e);
}
if let Err(e) = self
.unlocked_inner
.ping_validator_public_internet_relay_task
.stop()
.await
{
error!(
"ping_validator_public_internet_relay_task not stopped: {}",
e
);
}
if let Err(e) = self
.unlocked_inner
.ping_validator_active_watch_task
.stop()
.await
{
error!("ping_validator_active_watch_task not stopped: {}", e);
}
log_rtab!(debug "stopping relay management task");
if let Err(e) = self.unlocked_inner.relay_management_task.stop().await {
warn!("relay_management_task not stopped: {}", e);

View File

@ -7,16 +7,92 @@ const RELAY_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
/// Keepalive pings are done for active watch nodes to make sure they are still there
const ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
/// Ping queue processing depth
/// Ping queue processing depth per validator
const MAX_PARALLEL_PINGS: usize = 16;
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::FutureExt;
use stop_token::future::FutureExt as StopFutureExt;
type PingValidatorFuture = SendPinBoxFuture<Result<(), RPCError>>;
impl RoutingTable {
// Task routine for PublicInternet status pings
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_public_internet_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let mut future_queue: VecDeque<PingValidatorFuture> = VecDeque::new();
self.ping_validator_public_internet(cur_ts, &mut future_queue)
.await?;
self.process_ping_validation_queue("PublicInternet", stop_token, cur_ts, future_queue)
.await;
Ok(())
}
// Task routine for LocalNetwork status pings
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_local_network_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let mut future_queue: VecDeque<PingValidatorFuture> = VecDeque::new();
self.ping_validator_local_network(cur_ts, &mut future_queue)
.await?;
self.process_ping_validation_queue("LocalNetwork", stop_token, cur_ts, future_queue)
.await;
Ok(())
}
// Task routine for PublicInternet relay keepalive pings
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_public_internet_relay_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let mut future_queue: VecDeque<PingValidatorFuture> = VecDeque::new();
self.relay_keepalive_public_internet(cur_ts, &mut future_queue)
.await?;
self.process_ping_validation_queue("RelayKeepalive", stop_token, cur_ts, future_queue)
.await;
Ok(())
}
// Task routine for active watch keepalive pings
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_active_watch_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let mut future_queue: VecDeque<PingValidatorFuture> = VecDeque::new();
self.active_watches_keepalive_public_internet(cur_ts, &mut future_queue)
.await?;
self.process_ping_validation_queue("WatchKeepalive", stop_token, cur_ts, future_queue)
.await;
Ok(())
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Ping the relay to keep it alive, over every protocol it is relaying for us
#[instrument(level = "trace", skip(self, futurequeue), err)]
async fn relay_keepalive_public_internet(
@ -106,11 +182,9 @@ impl RoutingTable {
for relay_nr_filtered in relay_noderefs {
let rpc = rpc.clone();
log_rtab!("--> Keepalive ping to {:?}", relay_nr_filtered);
futurequeue.push_back(
async move {
log_rtab!("--> PublicInternet Relay ping to {:?}", relay_nr_filtered);
let _ = rpc
.rpc_call_status(Destination::direct(relay_nr_filtered))
.await?;
@ -156,11 +230,9 @@ impl RoutingTable {
for watch_nr in watch_node_refs {
let rpc = rpc.clone();
log_rtab!("--> Watch ping to {:?}", watch_nr);
futurequeue.push_back(
async move {
log_rtab!("--> Watch Keepalive ping to {:?}", watch_nr);
let _ = rpc
.rpc_call_status(Destination::direct(watch_nr.default_filtered()))
.await?;
@ -185,20 +257,12 @@ impl RoutingTable {
// Get all nodes needing pings in the PublicInternet routing domain
let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts);
// If we have a relay, let's ping for NAT keepalives and check for address changes
self.relay_keepalive_public_internet(cur_ts, futurequeue)
.await?;
// Check active watch keepalives
self.active_watches_keepalive_public_internet(cur_ts, futurequeue)
.await?;
// Just do a single ping with the best protocol for all the other nodes to check for liveness
for nr in node_refs {
let rpc = rpc.clone();
log_rtab!("--> Validator ping to {:?}", nr);
futurequeue.push_back(
async move {
log_rtab!(debug "--> PublicInternet Validator ping to {:?}", nr);
let _ = rpc.rpc_call_status(Destination::direct(nr)).await?;
Ok(())
}
@ -229,6 +293,7 @@ impl RoutingTable {
// Just do a single ping with the best protocol for all the nodes
futurequeue.push_back(
async move {
log_rtab!("--> LocalNetwork Validator ping to {:?}", nr);
let _ = rpc.rpc_call_status(Destination::direct(nr)).await?;
Ok(())
}
@ -239,69 +304,37 @@ impl RoutingTable {
Ok(())
}
// Ping each node in the routing table if they need to be pinged
// to determine their reliability
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_task_routine(
self,
// Common handler for running ping validations in a batch
async fn process_ping_validation_queue(
&self,
name: &str,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let mut futurequeue: VecDeque<PingValidatorFuture> = VecDeque::new();
future_queue: VecDeque<PingValidatorFuture>,
) {
let count = future_queue.len();
if count == 0 {
return;
}
log_rtab!(debug "[{}] Ping validation queue: {} remaining", name, count);
// PublicInternet
self.ping_validator_public_internet(cur_ts, &mut futurequeue)
.await?;
// LocalNetwork
self.ping_validator_local_network(cur_ts, &mut futurequeue)
.await?;
// Wait for ping futures to complete in parallel
let mut unord = FuturesUnordered::new();
while !unord.is_empty() || !futurequeue.is_empty() {
log_rtab!(
"Ping validation queue: {} remaining, {} in progress",
futurequeue.len(),
unord.len()
let atomic_count = AtomicUsize::new(count);
process_batched_future_queue(future_queue, MAX_PARALLEL_PINGS, stop_token, |res| async {
if let Err(e) = res {
log_rtab!(error "[{}] Error performing status ping: {}", name, e);
}
let remaining = atomic_count.fetch_sub(1, Ordering::AcqRel) - 1;
if remaining > 0 {
log_rtab!(debug "[{}] Ping validation queue: {} remaining", name, remaining);
}
})
.await;
let done_ts = Timestamp::now();
log_rtab!(debug
"[{}] Ping validation queue finished {} in {:?}",
name,
count,
done_ts - cur_ts
);
// Process one unordered futures if we have some
match unord
.next()
.timeout_at(stop_token.clone())
.in_current_span()
.await
{
Ok(Some(res)) => {
// Some ping completed
match res {
Ok(()) => {}
Err(e) => {
log_rtab!(error "Error performing status ping: {}", e);
}
}
}
Ok(None) => {
// We're empty
}
Err(_) => {
// Timeout means we drop the rest because we were asked to stop
break;
}
}
// Fill unord up to max parallelism
while unord.len() < MAX_PARALLEL_PINGS {
let Some(fq) = futurequeue.pop_front() else {
break;
};
unord.push(fq);
}
}
Ok(())
}
}

View File

@ -59,7 +59,7 @@ impl RoutingTable {
}
// If this has been published, always test if we need it
// Also if the route has never been tested, test it at least once
if v.is_published() || stats.last_tested_ts.is_none() {
if v.is_published() || stats.last_known_valid_ts.is_none() {
must_test_routes.push(*k);
}
// If this is a default route hop length, include it in routes to keep alive

View File

@ -1,36 +0,0 @@
use super::*;
impl RoutingTable {
// Compute transfer statistics to determine how 'fast' a node is
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn rolling_transfers_task_routine(
self,
_stop_token: StopToken,
last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
// log_rtab!("--- rolling_transfers task");
{
let inner = &mut *self.inner.write();
// Roll our own node's transfers
inner.self_transfer_stats_accounting.roll_transfers(
last_ts,
cur_ts,
&mut inner.self_transfer_stats,
);
// Roll all bucket entry transfers
let all_entries: Vec<Arc<BucketEntry>> = inner.all_entries.iter().collect();
for entry in all_entries {
entry.with_mut(inner, |_rti, e| e.roll_transfers(last_ts, cur_ts));
}
}
// Roll all route transfers
let rss = self.route_spec_store();
rss.roll_transfers(last_ts, cur_ts);
Ok(())
}
}

View File

@ -0,0 +1,81 @@
use super::*;
impl RoutingTable {
// Compute transfer statistics to determine how 'fast' a node is
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn rolling_transfers_task_routine(
self,
_stop_token: StopToken,
last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
{
let inner = &mut *self.inner.write();
// Roll our own node's transfers
inner.self_transfer_stats_accounting.roll_transfers(
last_ts,
cur_ts,
&mut inner.self_transfer_stats,
);
// Roll all bucket entry transfers
let all_entries: Vec<Arc<BucketEntry>> = inner.all_entries.iter().collect();
for entry in all_entries {
entry.with_mut(inner, |_rti, e| e.roll_transfers(last_ts, cur_ts));
}
}
// Roll all route transfers
let rss = self.route_spec_store();
rss.roll_transfers(last_ts, cur_ts);
Ok(())
}
// Update state statistics in PeerStats
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn update_state_stats_task_routine(
self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
{
let inner = &mut *self.inner.write();
// Roll all bucket entry transfers
let all_entries: Vec<Arc<BucketEntry>> = inner.all_entries.iter().collect();
for entry in all_entries {
entry.with_mut(inner, |_rti, e| e.update_state_stats());
}
}
Ok(())
}
// Update rolling answers in PeerStats
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn rolling_answers_task_routine(
self,
_stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
{
let inner = &mut *self.inner.write();
// Roll all bucket entry answers stats
let all_entries: Vec<Arc<BucketEntry>> = inner.all_entries.iter().collect();
for entry in all_entries {
entry.with_mut(inner, |_rti, e| e.roll_answer_stats(cur_ts));
}
}
// Roll all route answers
let rss = self.route_spec_store();
rss.roll_answers(cur_ts);
Ok(())
}
}

View File

@ -28,21 +28,31 @@ pub struct NodeInfo {
impl fmt::Debug for NodeInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// xxx: use field_with once that is on stable. trying to make this structure use fewer log lines when pretty printed
if !f.alternate() {
writeln!(f, "network_class: {:?}", self.network_class)?;
writeln!(f, "outbound_protocols: {:?}", self.outbound_protocols)?;
writeln!(f, "address_types: {:?}", self.address_types)?;
writeln!(f, "envelope_support: {:?}", self.envelope_support)?;
writeln!(f, "crypto_support: {:?}", self.crypto_support)?;
writeln!(f, "capabilities: {:?}", self.capabilities)?;
writeln!(f, "dial_info_detail_list:")?;
for did in &self.dial_info_detail_list {
writeln!(f, " {:?}", did)?;
}
Ok(())
} else {
f.debug_struct("NodeInfo")
.field("network_class", &self.network_class)
.field(
"outbound_protocols",
&format!("{:?}", &self.outbound_protocols),
)
.field("address_types", &format!("{:?}", &self.address_types))
.field("envelope_support", &format!("{:?}", &self.envelope_support))
.field("crypto_support", &format!("{:?}", &self.crypto_support))
.field("capabilities", &format!("{:?}", &self.capabilities))
.field("outbound_protocols", &self.outbound_protocols)
.field("address_types", &self.address_types)
.field("envelope_support", &self.envelope_support)
.field("crypto_support", &self.crypto_support)
.field("capabilities", &self.capabilities)
.field("dial_info_detail_list", &self.dial_info_detail_list)
.finish()
}
}
}
impl NodeInfo {
pub fn new(

View File

@ -1,12 +1,36 @@
use indent::indent_by;
use super::*;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PeerInfo {
routing_domain: RoutingDomain,
node_ids: TypedKeyGroup,
signed_node_info: SignedNodeInfo,
}
impl fmt::Debug for PeerInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if !f.alternate() {
writeln!(f, "routing_domain: {:?}", self.routing_domain)?;
writeln!(f, "node_ids: {}", self.node_ids)?;
writeln!(f, "signed_node_info:")?;
write!(
f,
" {}",
indent_by(4, format!("{:?}", self.signed_node_info))
)?;
Ok(())
} else {
f.debug_struct("PeerInfo")
.field("routing_domain", &self.routing_domain)
.field("node_ids", &self.node_ids)
.field("signed_node_info", &self.signed_node_info)
.finish()
}
}
}
impl PeerInfo {
pub fn new(
routing_domain: RoutingDomain,

View File

@ -1,12 +1,36 @@
use indent::indent_by;
use super::*;
/// Signed NodeInfo that can be passed around amongst peers and verifiable
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SignedDirectNodeInfo {
node_info: NodeInfo,
timestamp: Timestamp,
signatures: Vec<TypedSignature>,
}
impl fmt::Debug for SignedDirectNodeInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if !f.alternate() {
writeln!(f, "node_info:")?;
write!(f, " {}", indent_by(4, format!("{:?}", self.node_info)))?;
writeln!(f, "timestamp: {}", self.timestamp)?;
writeln!(f, "signatures:")?;
for sig in &self.signatures {
writeln!(f, " {}", sig)?;
}
Ok(())
} else {
f.debug_struct("SignedDirectNodeInfo")
.field("node_info", &self.node_info)
.field("timestamp", &self.timestamp)
.field("signatures", &self.signatures)
.finish()
}
}
}
impl SignedDirectNodeInfo {
/// Returns a new SignedDirectNodeInfo that has its signatures validated.
/// On success, this will modify the node_ids set to only include node_ids whose signatures validate.

View File

@ -1,11 +1,37 @@
use indent::indent_by;
use super::*;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SignedNodeInfo {
Direct(SignedDirectNodeInfo),
Relayed(SignedRelayedNodeInfo),
}
impl fmt::Debug for SignedNodeInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if !f.alternate() {
match self {
Self::Direct(arg0) => {
writeln!(f, "direct:")?;
write!(f, " {}", indent_by(4, format!("{:?}", arg0)))?;
Ok(())
}
Self::Relayed(arg0) => {
writeln!(f, "relayed:")?;
write!(f, " {}", indent_by(4, format!("{:?}", arg0)))?;
Ok(())
}
}
} else {
match self {
Self::Direct(arg0) => f.debug_tuple("Direct").field(arg0).finish(),
Self::Relayed(arg0) => f.debug_tuple("Relayed").field(arg0).finish(),
}
}
}
}
impl SignedNodeInfo {
pub fn validate(
&self,

View File

@ -1,7 +1,9 @@
use indent::indent_by;
use super::*;
/// Signed NodeInfo with a relay that can be passed around amongst peers and verifiable
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SignedRelayedNodeInfo {
node_info: NodeInfo,
relay_ids: TypedKeyGroup,
@ -10,6 +12,32 @@ pub struct SignedRelayedNodeInfo {
signatures: Vec<TypedSignature>,
}
impl fmt::Debug for SignedRelayedNodeInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if !f.alternate() {
writeln!(f, "node_info:")?;
write!(f, " {}", indent_by(4, format!("{:?}", self.node_info)))?;
writeln!(f, "relay_ids: {}", self.relay_ids)?;
writeln!(f, "relay_info:")?;
write!(f, " {}", indent_by(4, format!("{:?}", self.relay_info)))?;
writeln!(f, "timestamp: {}", self.timestamp)?;
writeln!(f, "signatures:")?;
for sig in &self.signatures {
writeln!(f, " {}", sig)?;
}
Ok(())
} else {
f.debug_struct("SignedDirectNodeInfo")
.field("node_info", &self.node_info)
.field("relay_ids", &self.relay_ids)
.field("relay_info", &self.relay_info)
.field("timestamp", &self.timestamp)
.field("signatures", &self.signatures)
.finish()
}
}
}
impl SignedRelayedNodeInfo {
/// Returns a new SignedRelayedNodeInfo that has its signatures validated.
/// On success, this will modify the node_ids set to only include node_ids whose signatures validate.

View File

@ -692,7 +692,7 @@ impl RPCProcessor {
match &out {
Err(e) => {
log_rpc!(debug "RPC Lost (id={} {}): {}", id, debug_string, e);
self.record_question_lost(
self.record_lost_answer(
waitable_reply.send_ts,
waitable_reply.node_ref.clone(),
waitable_reply.safety_route,
@ -702,7 +702,7 @@ impl RPCProcessor {
}
Ok(TimeoutOr::Timeout) => {
log_rpc!(debug "RPC Lost (id={} {}): Timeout", id, debug_string);
self.record_question_lost(
self.record_lost_answer(
waitable_reply.send_ts,
waitable_reply.node_ref.clone(),
waitable_reply.safety_route,
@ -1008,11 +1008,8 @@ impl RPCProcessor {
let routing_table = self.routing_table();
if let Some(published_peer_info) = routing_table.get_published_peer_info(routing_domain) {
// Get our node info timestamp
let our_node_info_ts = published_peer_info.signed_node_info().timestamp();
// If the target has not yet seen our published peer info, send it along if we have it
if !node.has_seen_our_node_info_ts(routing_domain, our_node_info_ts) {
if !node.has_seen_our_node_info_ts(routing_domain) {
return SenderPeerInfo::new(published_peer_info, target_node_info_ts);
}
}
@ -1056,7 +1053,7 @@ impl RPCProcessor {
/// Record question lost to node or route
#[instrument(level = "trace", target = "rpc", skip_all)]
fn record_question_lost(
fn record_lost_answer(
&self,
send_ts: Timestamp,
node_ref: NodeRef,
@ -1066,7 +1063,7 @@ impl RPCProcessor {
) {
// Record for node if this was not sent via a route
if safety_route.is_none() && remote_private_route.is_none() {
node_ref.stats_question_lost();
node_ref.stats_lost_answer();
// Also clear the last_connections for the entry so we make a new connection next time
node_ref.clear_last_flows();
@ -1080,19 +1077,19 @@ impl RPCProcessor {
if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| {
s.record_question_lost();
s.record_lost_answer();
});
}
// If remote private route was used, record question lost there
if let Some(rpr_pubkey) = &remote_private_route {
rss.with_route_stats_mut(send_ts, rpr_pubkey, |s| {
s.record_question_lost();
s.record_lost_answer();
});
}
// If private route was used, record question lost there
if let Some(pr_pubkey) = &private_route {
rss.with_route_stats_mut(send_ts, pr_pubkey, |s| {
s.record_question_lost();
s.record_lost_answer();
});
}
}
@ -1169,8 +1166,8 @@ impl RPCProcessor {
// If safety route was used, record route there
if let Some(sr_pubkey) = &safety_route {
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| {
// If we received an answer, the safety route we sent over can be considered tested
s.record_tested(recv_ts);
// Record received bytes
s.record_answer_received(recv_ts, bytes);
// If we used a safety route to send, use our last tested latency
total_local_latency += s.latency_stats().average
@ -1181,7 +1178,7 @@ impl RPCProcessor {
if let Some(pr_pubkey) = &reply_private_route {
rss.with_route_stats_mut(send_ts, pr_pubkey, |s| {
// Record received bytes
s.record_received(recv_ts, bytes);
s.record_answer_received(recv_ts, bytes);
// If we used a private route to receive, use our last tested latency
total_local_latency += s.latency_stats().average
@ -1192,7 +1189,7 @@ impl RPCProcessor {
if let Some(rpr_pubkey) = &remote_private_route {
rss.with_route_stats_mut(send_ts, rpr_pubkey, |s| {
// Record received bytes
s.record_received(recv_ts, bytes);
s.record_answer_received(recv_ts, bytes);
// The remote route latency is recorded using the total latency minus the total local latency
let remote_latency = total_latency.saturating_sub(total_local_latency);
@ -1248,7 +1245,7 @@ impl RPCProcessor {
// This may record nothing if the remote safety route is not also
// a remote private route that been imported, but that's okay
rss.with_route_stats_mut(recv_ts, &d.remote_safety_route, |s| {
s.record_received(recv_ts, bytes);
s.record_question_received(recv_ts, bytes);
});
}
// Process messages that arrived to our private route
@ -1260,12 +1257,12 @@ impl RPCProcessor {
// it could also be a node id if no remote safety route was used
// in which case this also will do nothing
rss.with_route_stats_mut(recv_ts, &d.remote_safety_route, |s| {
s.record_received(recv_ts, bytes);
s.record_question_received(recv_ts, bytes);
});
// Record for our local private route we received over
rss.with_route_stats_mut(recv_ts, &d.private_route, |s| {
s.record_received(recv_ts, bytes);
s.record_question_received(recv_ts, bytes);
});
}
}
@ -1748,7 +1745,7 @@ impl RPCProcessor {
log_rpc!(debug "Could not complete rpc operation: id = {}: {}", op_id, e);
}
RPCError::Ignore(_) => {
log_rpc!("Answer late: id = {}", op_id);
log_rpc!(debug "Answer late: id = {}", op_id);
}
};
// Don't throw an error here because it's okay if the original operation timed out

View File

@ -26,6 +26,21 @@ pub fn fix_transferstatsdownup() -> TransferStatsDownUp {
}
}
pub fn fix_answerstats() -> AnswerStats {
AnswerStats {
span: TimestampDuration::new_secs(10),
questions: 10,
answers: 8,
lost_answers: 0,
consecutive_answers_maximum: 1,
consecutive_answers_average: 2,
consecutive_answers_minimum: 3,
consecutive_lost_answers_maximum: 4,
consecutive_lost_answers_average: 5,
consecutive_lost_answers_minimum: 6,
}
}
pub fn fix_rpcstats() -> RPCStats {
RPCStats {
messages_sent: 1_000_000,
@ -36,6 +51,26 @@ pub fn fix_rpcstats() -> RPCStats {
first_consecutive_seen_ts: Some(Timestamp::from(1685569111851)),
recent_lost_answers: 5,
failed_to_send: 3,
answer: fix_answerstats(),
}
}
pub fn fix_statestats() -> StateStats {
StateStats {
span: TimestampDuration::new_secs(10),
reliable: TimestampDuration::new_secs(5),
unreliable: TimestampDuration::new_secs(5),
dead: TimestampDuration::new_secs(0),
punished: TimestampDuration::new_secs(0),
reason: StateReasonStats {
can_not_send: TimestampDuration::new_secs(1),
too_many_lost_answers: TimestampDuration::new_secs(2),
no_ping_response: TimestampDuration::new_secs(3),
failed_to_send: TimestampDuration::new_secs(4),
lost_answers: TimestampDuration::new_secs(5),
not_seen_consecutively: TimestampDuration::new_secs(6),
in_unreliable_ping_span: TimestampDuration::new_secs(7),
},
}
}
@ -45,6 +80,7 @@ pub fn fix_peerstats() -> PeerStats {
rpc_stats: fix_rpcstats(),
latency: Some(fix_latencystats()),
transfer: fix_transferstatsdownup(),
state: fix_statestats(),
}
}

View File

@ -1,22 +1,32 @@
use super::*;
/// Measurement of communications latency to this node over all RPC questions
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct LatencyStats {
pub fastest: TimestampDuration, // fastest latency in the ROLLING_LATENCIES_SIZE last latencies
pub average: TimestampDuration, // average latency over the ROLLING_LATENCIES_SIZE last latencies
pub slowest: TimestampDuration, // slowest latency in the ROLLING_LATENCIES_SIZE last latencies
/// fastest latency in the ROLLING_LATENCIES_SIZE last latencies
pub fastest: TimestampDuration,
/// average latency over the ROLLING_LATENCIES_SIZE last latencies
pub average: TimestampDuration,
/// slowest latency in the ROLLING_LATENCIES_SIZE last latencies
pub slowest: TimestampDuration,
}
/// Measurement of how much data has transferred to or from this node over a time span
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct TransferStats {
pub total: ByteCount, // total amount transferred ever
pub maximum: ByteCount, // maximum rate over the ROLLING_TRANSFERS_SIZE last amounts
pub average: ByteCount, // average rate over the ROLLING_TRANSFERS_SIZE last amounts
pub minimum: ByteCount, // minimum rate over the ROLLING_TRANSFERS_SIZE last amounts
/// total amount transferred ever
pub total: ByteCount,
/// maximum rate over the ROLLING_TRANSFERS_SIZE last amounts
pub maximum: ByteCount,
/// average rate over the ROLLING_TRANSFERS_SIZE last amounts
pub average: ByteCount,
/// minimum rate over the ROLLING_TRANSFERS_SIZE last amounts
pub minimum: ByteCount,
}
/// Transfer statistics from a node to our own (down) and
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct TransferStatsDownUp {
@ -24,24 +34,112 @@ pub struct TransferStatsDownUp {
pub up: TransferStats,
}
/// Measurement of what states the node has been in over a time span
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct StateStats {
/// total amount of time measured
pub span: TimestampDuration,
/// amount of time spent in a reliable state
pub reliable: TimestampDuration,
/// amount of time spent in an unreliable state
pub unreliable: TimestampDuration,
/// amount of time spent in a dead state
pub dead: TimestampDuration,
/// amount of time spent in a punished state
pub punished: TimestampDuration,
/// state reason stats for this peer
#[serde(default)]
pub reason: StateReasonStats,
}
/// Measurement of what state reasons the node has been in over a time span
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct StateReasonStats {
/// time spent dead due to being unable to send
pub can_not_send: TimestampDuration,
/// time spent dead because of too many lost answers
pub too_many_lost_answers: TimestampDuration,
/// time spent dead because of no ping response
pub no_ping_response: TimestampDuration,
/// time spent unreliable because of failures to send
pub failed_to_send: TimestampDuration,
/// time spent unreliable because of lost answers
pub lost_answers: TimestampDuration,
/// time spent unreliable because of not being seen consecutively
pub not_seen_consecutively: TimestampDuration,
/// time spent unreliable because we are in the unreliable ping span
pub in_unreliable_ping_span: TimestampDuration,
}
/// Measurement of round-trip RPC question/answer performance
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct AnswerStats {
/// total amount of time measured
pub span: TimestampDuration,
/// number of questions sent in this span
pub questions: u32,
/// number of answers received in this span
pub answers: u32,
/// number of lost answers in this span
pub lost_answers: u32,
/// maximum number of received answers before a lost answer in this span
pub consecutive_answers_maximum: u32,
/// average number of received answers before a lost answer in this span
pub consecutive_answers_average: u32,
/// minimum number of received answers before a lost answer in this span
pub consecutive_answers_minimum: u32,
/// maximum number of timeouts before a received answer in this span
pub consecutive_lost_answers_maximum: u32,
/// average number of timeouts before a received answer in this span
pub consecutive_lost_answers_average: u32,
/// minimum number of timeouts before a received answer in this span
pub consecutive_lost_answers_minimum: u32,
}
/// Statistics for RPC operations performed on a node
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct RPCStats {
pub messages_sent: u32, // number of rpcs that have been sent in the total_time range
pub messages_rcvd: u32, // number of rpcs that have been received in the total_time range
pub questions_in_flight: u32, // number of questions issued that have yet to be answered
pub last_question_ts: Option<Timestamp>, // when the peer was last questioned (either successfully or not) and we wanted an answer
pub last_seen_ts: Option<Timestamp>, // when the peer was last seen for any reason, including when we first attempted to reach out to it
pub first_consecutive_seen_ts: Option<Timestamp>, // the timestamp of the first consecutive proof-of-life for this node (an answer or received question)
pub recent_lost_answers: u32, // number of answers that have been lost since we lost reliability
pub failed_to_send: u32, // number of messages that have failed to send or connections dropped since we last successfully sent one
/// number of rpcs that have been sent in the total entry time range
pub messages_sent: u32,
/// number of rpcs that have been received in the total entry time range
pub messages_rcvd: u32,
/// number of questions issued that have yet to be answered
pub questions_in_flight: u32,
/// when the peer was last questioned (either successfully or not) and we wanted an answer
pub last_question_ts: Option<Timestamp>,
/// when the peer was last seen for any reason, including when we first attempted to reach out to it
pub last_seen_ts: Option<Timestamp>,
/// the timestamp of the first consecutive proof-of-life for this node (an answer or received question)
pub first_consecutive_seen_ts: Option<Timestamp>,
/// number of answers that have been lost consecutively
pub recent_lost_answers: u32,
/// number of messages that have failed to send or connections dropped since we last successfully sent one
pub failed_to_send: u32,
/// rpc answer stats for this peer
#[serde(default)]
pub answer: AnswerStats,
}
/// Statistics for a peer in the routing table
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct PeerStats {
pub time_added: Timestamp, // when the peer was added to the routing table
pub rpc_stats: RPCStats, // information about RPCs
pub latency: Option<LatencyStats>, // latencies for communications with the peer
pub transfer: TransferStatsDownUp, // Stats for communications with the peer
/// when the peer was added to the routing table
pub time_added: Timestamp,
#[serde(default)]
/// information about RPCs
pub rpc_stats: RPCStats,
#[serde(default)]
/// latency stats for this peer
pub latency: Option<LatencyStats>,
/// transfer stats for this peer
#[serde(default)]
pub transfer: TransferStatsDownUp,
/// state stats for this peer
#[serde(default)]
pub state: StateStats,
}

View File

@ -0,0 +1,21 @@
use super::*;
use futures_util::StreamExt as _;
use stop_token::future::FutureExt as _;
pub async fn process_batched_future_queue<I, C, F, R>(
future_queue: I,
batch_size: usize,
stop_token: StopToken,
result_callback: C,
) where
I: IntoIterator,
C: Fn(R) -> F,
F: Future<Output = ()>,
<I as std::iter::IntoIterator>::Item: core::future::Future<Output = R>,
{
let mut buffered_futures =
futures_util::stream::iter(future_queue).buffer_unordered(batch_size);
while let Ok(Some(res)) = buffered_futures.next().timeout_at(stop_token.clone()).await {
result_callback(res).await;
}
}

View File

@ -34,6 +34,7 @@ pub mod eventual;
pub mod eventual_base;
pub mod eventual_value;
pub mod eventual_value_clone;
pub mod future_queue;
pub mod interval;
pub mod ip_addr_port;
pub mod ip_extra;
@ -201,6 +202,8 @@ pub use eventual_value::*;
#[doc(inline)]
pub use eventual_value_clone::*;
#[doc(inline)]
pub use future_queue::*;
#[doc(inline)]
pub use interval::*;
#[doc(inline)]
pub use ip_addr_port::*;