From a80178da54be935048b91921f9f281704fa42b70 Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 26 Nov 2021 09:54:38 -0500 Subject: [PATCH] cleanup and stats accounting organization --- .gitlab-ci.yml | 3 + veilid-core/src/lib.rs | 1 + veilid-core/src/network_manager.rs | 42 ++---- veilid-core/src/routing_table/bucket_entry.rs | 127 ++++------------ veilid-core/src/routing_table/mod.rs | 127 ++++++++++++---- .../src/routing_table/stats_accounting.rs | 104 +++++++++++++ veilid-core/src/rpc_processor/mod.rs | 137 +++++++++--------- veilid-core/src/veilid_api.rs | 10 +- veilid-core/src/xx/tools.rs | 21 +++ 9 files changed, 341 insertions(+), 231 deletions(-) create mode 100644 veilid-core/src/routing_table/stats_accounting.rs diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 47c43a7d..11e55c87 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,3 +1,6 @@ +variables: + GIT_SUBMODULE_STRATEGY: recursive + stages: - clippy - test diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index 55eafb40..30340249 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -1,3 +1,4 @@ +#![warn(clippy::all)] #![cfg_attr(target_arch = "wasm32", no_std)] #[macro_use] diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index f5523955..1213f246 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -12,7 +12,6 @@ use xx::*; //////////////////////////////////////////////////////////////////////////////////////// -const BANDWIDTH_TABLE_SIZE: usize = 10usize; const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; @@ -32,14 +31,14 @@ pub enum NetworkClass { impl NetworkClass { pub fn inbound_capable(&self) -> bool { - match self { + matches!( + self, Self::Server - | Self::Mapped - | Self::FullNAT - | Self::AddressRestrictedNAT - | Self::PortRestrictedNAT => true, - _ => false, - } + | Self::Mapped + | Self::FullNAT + | Self::AddressRestrictedNAT + | Self::PortRestrictedNAT + ) } } @@ -59,12 +58,6 @@ pub struct NetworkManagerInner { routing_table: Option, components: Option, network_class: Option, - incoming_avg_bandwidth: f32, - incoming_max_bandwidth: f32, - incoming_bandwidth_table: Vec, - outgoing_avg_bandwidth: f32, - outgoing_max_bandwidth: f32, - outgoing_bandwidth_table: Vec, connection_processor_jh: Option>, connection_add_channel_tx: Option>>, } @@ -79,33 +72,20 @@ pub struct NetworkManager { impl NetworkManager { fn new_inner() -> NetworkManagerInner { - let mut inner = NetworkManagerInner { + NetworkManagerInner { routing_table: None, components: None, network_class: None, - incoming_avg_bandwidth: 0.0f32, - incoming_max_bandwidth: 0.0f32, - incoming_bandwidth_table: Vec::new(), - outgoing_avg_bandwidth: 0.0f32, - outgoing_max_bandwidth: 0.0f32, - outgoing_bandwidth_table: Vec::new(), connection_processor_jh: None, connection_add_channel_tx: None, - }; - inner - .incoming_bandwidth_table - .resize(BANDWIDTH_TABLE_SIZE, 0.0f32); - inner - .outgoing_bandwidth_table - .resize(BANDWIDTH_TABLE_SIZE, 0.0f32); - inner + } } pub fn new(config: VeilidConfig, table_store: TableStore, crypto: Crypto) -> Self { Self { config: config.clone(), - table_store: table_store, - crypto: crypto, + table_store, + crypto, inner: Arc::new(Mutex::new(Self::new_inner())), } } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index ca5edabd..052e79f0 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -1,15 +1,5 @@ use super::*; -// Latency entry is per round-trip packet (ping or data) -// - Size is number of entries -const ROLLING_LATENCIES_SIZE: usize = 10; - -// Transfers entries are in bytes total for the interval -// - Size is number of entries -// - Interval is number of seconds in each entry -const ROLLING_TRANSFERS_SIZE: usize = 10; -pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 10; - // Reliable pings are done with increased spacing between pings // - Start secs is the number of seconds between the first two pings // - Max secs is the maximum number of seconds between consecutive pings @@ -42,9 +32,7 @@ pub struct BucketEntry { min_max_version: Option<(u8, u8)>, last_connection: Option<(ConnectionDescriptor, u64)>, dial_info_entries: VecDeque, - rolling_latencies: VecDeque, - rolling_transfers: VecDeque<(u64, u64)>, - current_transfer: (u64, u64), + stats_accounting: StatsAccounting, peer_stats: PeerStats, } @@ -55,15 +43,13 @@ impl BucketEntry { min_max_version: None, last_connection: None, dial_info_entries: VecDeque::new(), - rolling_latencies: VecDeque::new(), - rolling_transfers: VecDeque::new(), - current_transfer: (0, 0), + stats_accounting: StatsAccounting::new(), peer_stats: PeerStats { time_added: get_timestamp(), last_seen: None, ping_stats: PingStats::default(), latency: None, - transfer: (TransferStats::default(), TransferStats::default()), + transfer: TransferStatsDownUp::default(), node_info: None, }, } @@ -213,10 +199,7 @@ impl BucketEntry { } pub fn last_connection(&self) -> Option { - match self.last_connection.as_ref() { - Some(x) => Some(x.0.clone()), - None => None, - } + self.last_connection.as_ref().map(|x| x.0.clone()) } pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) { @@ -248,71 +231,13 @@ impl BucketEntry { ///// stats methods // called every ROLLING_TRANSFERS_INTERVAL_SECS seconds pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { - let dur_ms = (cur_ts - last_ts) / 1000u64; - while self.rolling_transfers.len() >= ROLLING_TRANSFERS_SIZE { - self.rolling_transfers.pop_front(); - } - self.rolling_transfers.push_back(self.current_transfer); - self.current_transfer = (0, 0); - - let xd = &mut self.peer_stats.transfer.0; - let xu = &mut self.peer_stats.transfer.1; - - xd.maximum = 0; - xu.maximum = 0; - xd.minimum = u64::MAX; - xu.minimum = u64::MAX; - xd.average = 0; - xu.average = 0; - for (rtd, rtu) in &self.rolling_transfers { - let bpsd = rtd * 1000u64 / dur_ms; - let bpsu = rtu * 1000u64 / dur_ms; - if bpsd > xd.maximum { - xd.maximum = bpsd; - } - if bpsu > xu.maximum { - xu.maximum = bpsu; - } - if bpsd < xd.minimum { - xd.minimum = bpsd; - } - if bpsu < xu.minimum { - xu.minimum = bpsu; - } - xd.average += bpsd; - xu.average += bpsu; - } - let len = self.rolling_transfers.len() as u64; - xd.average /= len; - xu.average /= len; - // total remains unchanged + self.stats_accounting + .roll_transfers(last_ts, cur_ts, &mut self.peer_stats.transfer); } // Called for every round trip packet we receive fn record_latency(&mut self, latency: u64) { - while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE { - self.rolling_latencies.pop_front(); - } - self.rolling_latencies.push_back(latency); - - let mut ls = LatencyStats { - fastest: 0, - average: 0, - slowest: 0, - }; - for rl in &self.rolling_latencies { - if *rl < ls.fastest { - ls.fastest = *rl; - } - if *rl > ls.slowest { - ls.slowest = *rl; - } - ls.average += *rl; - } - let len = self.rolling_latencies.len() as u64; - ls.average /= len; - - self.peer_stats.latency = Some(ls); + self.peer_stats.latency = Some(self.stats_accounting.record_latency(latency)); } ///// state machine handling @@ -381,23 +306,23 @@ impl BucketEntry { } //////////////////////////////////////////////////////////////// - /// Called by RPC processor as events happen + /// Called when rpc processor things happen - pub fn ping_sent(&mut self, ts: u64, bytes: u64) { + pub(super) fn ping_sent(&mut self, ts: u64, bytes: u64) { self.peer_stats.ping_stats.total_sent += 1; - self.current_transfer.1 += bytes; + self.stats_accounting.add_up(bytes); self.peer_stats.ping_stats.in_flight += 1; self.peer_stats.ping_stats.last_pinged = Some(ts); } - pub fn ping_rcvd(&mut self, ts: u64, bytes: u64) { - self.current_transfer.0 += bytes; + pub(super) fn ping_rcvd(&mut self, ts: u64, bytes: u64) { + self.stats_accounting.add_down(bytes); self.touch_last_seen(ts); } - pub fn pong_sent(&mut self, _ts: u64, bytes: u64) { - self.current_transfer.1 += bytes; + pub(super) fn pong_sent(&mut self, _ts: u64, bytes: u64) { + self.stats_accounting.add_up(bytes); } - pub fn pong_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { - self.current_transfer.0 += bytes; + pub(super) fn pong_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { + self.stats_accounting.add_down(bytes); self.peer_stats.ping_stats.in_flight -= 1; self.peer_stats.ping_stats.total_returned += 1; self.peer_stats.ping_stats.consecutive_pongs += 1; @@ -412,28 +337,28 @@ impl BucketEntry { self.record_latency(recv_ts - send_ts); self.touch_last_seen(recv_ts); } - pub fn ping_lost(&mut self, _ts: u64) { + pub(super) fn ping_lost(&mut self, _ts: u64) { self.peer_stats.ping_stats.in_flight -= 1; self.peer_stats.ping_stats.recent_lost_pings += 1; self.peer_stats.ping_stats.consecutive_pongs = 0; self.peer_stats.ping_stats.first_consecutive_pong_time = None; } - pub fn question_sent(&mut self, _ts: u64, bytes: u64) { - self.current_transfer.1 += bytes; + pub(super) fn question_sent(&mut self, _ts: u64, bytes: u64) { + self.stats_accounting.add_up(bytes); } - pub fn question_rcvd(&mut self, ts: u64, bytes: u64) { - self.current_transfer.0 += bytes; + pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) { + self.stats_accounting.add_down(bytes); self.touch_last_seen(ts); } - pub fn answer_sent(&mut self, _ts: u64, bytes: u64) { - self.current_transfer.1 += bytes; + pub(super) fn answer_sent(&mut self, _ts: u64, bytes: u64) { + self.stats_accounting.add_up(bytes); } - pub fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { - self.current_transfer.0 += bytes; + pub(super) fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { + self.stats_accounting.add_down(bytes); self.record_latency(recv_ts - send_ts); self.touch_last_seen(recv_ts); } - pub fn question_lost(&mut self, _ts: u64) { + pub(super) fn question_lost(&mut self, _ts: u64) { self.peer_stats.ping_stats.consecutive_pongs = 0; self.peer_stats.ping_stats.first_consecutive_pong_time = None; } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index f6260a4c..20b5d372 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -3,12 +3,7 @@ mod bucket_entry; mod dial_info_entry; mod find_nodes; mod node_ref; - -use bucket::*; -pub use bucket_entry::*; -pub use dial_info_entry::*; -pub use find_nodes::*; -pub use node_ref::*; +mod stats_accounting; use crate::dht::*; use crate::intf::*; @@ -18,7 +13,13 @@ use crate::xx::*; use crate::*; use alloc::collections::VecDeque; use alloc::str::FromStr; +use bucket::*; +pub use bucket_entry::*; +pub use dial_info_entry::*; +pub use find_nodes::*; use futures_util::stream::{FuturesUnordered, StreamExt}; +pub use node_ref::*; +pub use stats_accounting::*; ////////////////////////////////////////////////////////////////////////// @@ -42,16 +43,15 @@ struct RoutingTableInner { node_id: DHTKey, node_id_secret: DHTKeySecret, buckets: Vec, - //recent_nodes: VecDeque, - //closest_reliable_nodes: Vec, - //fastest_reliable_nodes: Vec, - //closest_nodes: Vec, - //fastest_nodes: Vec, local_dial_info: Vec, public_dial_info: Vec, bucket_entry_count: usize, // Waiters eventual_changed_dial_info: Eventual, + // Transfer stats for this node + stats_accounting: StatsAccounting, + // latency: Option, + transfer_stats: TransferStatsDownUp, } struct RoutingTableUnlockedInner { @@ -72,25 +72,22 @@ pub struct RoutingTable { impl RoutingTable { fn new_inner(network_manager: NetworkManager) -> RoutingTableInner { RoutingTableInner { - network_manager: network_manager, + network_manager, node_id: DHTKey::default(), node_id_secret: DHTKeySecret::default(), buckets: Vec::new(), - //recent_nodes: VecDeque::new(), - //closest_reliable_nodes: Vec::new(), - //fastest_reliable_nodes: Vec::new(), - //closest_nodes: Vec::new(), - //fastest_nodes: Vec::new(), local_dial_info: Vec::new(), public_dial_info: Vec::new(), bucket_entry_count: 0, eventual_changed_dial_info: Eventual::new(), + stats_accounting: StatsAccounting::new(), + transfer_stats: TransferStatsDownUp::default(), } } fn new_unlocked_inner(config: VeilidConfig) -> RoutingTableUnlockedInner { let c = config.get(); RoutingTableUnlockedInner { - rolling_transfers_task: TickTask::new(bucket_entry::ROLLING_TRANSFERS_INTERVAL_SECS), + rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), bootstrap_task: TickTask::new(1), peer_minimum_refresh_task: TickTask::new_us(c.network.dht.min_peer_refresh_time), ping_validator_task: TickTask::new(1), @@ -155,7 +152,7 @@ impl RoutingTable { pub fn has_local_dial_info(&self) -> bool { let inner = self.inner.lock(); - inner.local_dial_info.len() > 0 + !inner.local_dial_info.is_empty() } pub fn local_dial_info(&self) -> Vec { @@ -197,14 +194,14 @@ impl RoutingTable { } pub fn register_local_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) { - let ts = get_timestamp(); + let timestamp = get_timestamp(); let mut inner = self.inner.lock(); inner.local_dial_info.push(DialInfoDetail { dial_info: dial_info.clone(), - origin: origin, + origin, network_class: None, - timestamp: ts, + timestamp, }); info!( @@ -224,7 +221,7 @@ impl RoutingTable { pub fn has_public_dial_info(&self) -> bool { let inner = self.inner.lock(); - inner.public_dial_info.len() > 0 + !inner.public_dial_info.is_empty() } pub fn public_dial_info(&self) -> Vec { @@ -278,8 +275,8 @@ impl RoutingTable { inner.public_dial_info.push(DialInfoDetail { dial_info: dial_info.clone(), - origin: origin, - network_class: network_class, + origin, + network_class, timestamp: ts, }); @@ -466,10 +463,9 @@ impl RoutingTable { let mut inner = self.inner.lock(); let idx = Self::find_bucket_index(&*inner, node_id); let bucket = &mut inner.buckets[idx]; - match bucket.entry_mut(&node_id) { - None => None, - Some(e) => Some(NodeRef::new(self.clone(), node_id, e)), - } + bucket + .entry_mut(&node_id) + .map(|e| NodeRef::new(self.clone(), node_id, e)) } // Shortcut function to add a node to our routing table if it doesn't exist @@ -629,7 +625,7 @@ impl RoutingTable { let node_id = ndis.node_id.key; bsmap .entry(node_id) - .or_insert(Vec::new()) + .or_insert_with(Vec::new) .push(ndis.dial_info); } @@ -697,7 +693,14 @@ impl RoutingTable { // Compute transfer statistics to determine how 'fast' a node is async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { - let mut inner = self.inner.lock(); + let inner = &mut *self.inner.lock(); + + // Roll our own node's transfers + inner + .stats_accounting + .roll_transfers(last_ts, cur_ts, &mut inner.transfer_stats); + + // Roll all bucket entry transfers for b in &mut inner.buckets { b.roll_transfers(last_ts, cur_ts); } @@ -728,4 +731,66 @@ impl RoutingTable { Ok(()) } + + ////////////////////////////////////////////////////////////////////// + // Stats Accounting + + pub fn ping_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + self.inner.lock().stats_accounting.add_up(bytes); + node_ref.operate(|e| { + e.ping_sent(ts, bytes); + }) + } + pub fn ping_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + self.inner.lock().stats_accounting.add_down(bytes); + node_ref.operate(|e| { + e.ping_rcvd(ts, bytes); + }) + } + pub fn pong_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + self.inner.lock().stats_accounting.add_up(bytes); + node_ref.operate(|e| { + e.pong_sent(ts, bytes); + }) + } + pub fn pong_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { + self.inner.lock().stats_accounting.add_down(bytes); + node_ref.operate(|e| { + e.pong_rcvd(send_ts, recv_ts, bytes); + }) + } + pub fn ping_lost(&self, node_ref: NodeRef, ts: u64) { + node_ref.operate(|e| { + e.ping_lost(ts); + }) + } + pub fn question_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + self.inner.lock().stats_accounting.add_up(bytes); + node_ref.operate(|e| { + e.question_sent(ts, bytes); + }) + } + pub fn question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + self.inner.lock().stats_accounting.add_down(bytes); + node_ref.operate(|e| { + e.question_rcvd(ts, bytes); + }) + } + pub fn answer_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + self.inner.lock().stats_accounting.add_up(bytes); + node_ref.operate(|e| { + e.answer_sent(ts, bytes); + }) + } + pub fn answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { + self.inner.lock().stats_accounting.add_down(bytes); + node_ref.operate(|e| { + e.answer_rcvd(send_ts, recv_ts, bytes); + }) + } + pub fn question_lost(&self, node_ref: NodeRef, ts: u64) { + node_ref.operate(|e| { + e.question_lost(ts); + }) + } } diff --git a/veilid-core/src/routing_table/stats_accounting.rs b/veilid-core/src/routing_table/stats_accounting.rs new file mode 100644 index 00000000..e63ffcd1 --- /dev/null +++ b/veilid-core/src/routing_table/stats_accounting.rs @@ -0,0 +1,104 @@ +use crate::xx::*; +use crate::*; +use alloc::collections::VecDeque; + +// Latency entry is per round-trip packet (ping or data) +// - Size is number of entries +const ROLLING_LATENCIES_SIZE: usize = 10; + +// Transfers entries are in bytes total for the interval +// - Size is number of entries +// - Interval is number of seconds in each entry +const ROLLING_TRANSFERS_SIZE: usize = 10; +pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 10; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct TransferCount { + down: u64, + up: u64, +} + +#[derive(Debug, Clone, Default)] +pub struct StatsAccounting { + rolling_latencies: VecDeque, + rolling_transfers: VecDeque, + current_transfer: TransferCount, +} + +impl StatsAccounting { + pub fn new() -> Self { + Self { + rolling_latencies: VecDeque::new(), + rolling_transfers: VecDeque::new(), + current_transfer: TransferCount::default(), + } + } + + pub fn add_down(&mut self, bytes: u64) { + self.current_transfer.down += bytes; + } + + pub fn add_up(&mut self, bytes: u64) { + self.current_transfer.up += bytes; + } + + pub fn roll_transfers( + &mut self, + last_ts: u64, + cur_ts: u64, + transfer_stats: &mut TransferStatsDownUp, + ) { + let dur_ms = (cur_ts - last_ts) / 1000u64; + while self.rolling_transfers.len() >= ROLLING_TRANSFERS_SIZE { + self.rolling_transfers.pop_front(); + } + self.rolling_transfers.push_back(self.current_transfer); + + transfer_stats.down.total += self.current_transfer.down; + transfer_stats.up.total += self.current_transfer.up; + + self.current_transfer = TransferCount::default(); + + transfer_stats.down.maximum = 0; + transfer_stats.up.maximum = 0; + transfer_stats.down.minimum = u64::MAX; + transfer_stats.up.minimum = u64::MAX; + transfer_stats.down.average = 0; + transfer_stats.up.average = 0; + for xfer in &self.rolling_transfers { + let bpsd = xfer.down * 1000u64 / dur_ms; + let bpsu = xfer.up * 1000u64 / dur_ms; + transfer_stats.down.maximum.max_assign(bpsd); + transfer_stats.up.maximum.max_assign(bpsu); + transfer_stats.down.minimum.min_assign(bpsd); + transfer_stats.up.minimum.min_assign(bpsu); + transfer_stats.down.average += bpsd; + transfer_stats.down.average += bpsu; + } + let len = self.rolling_transfers.len() as u64; + transfer_stats.down.average /= len; + transfer_stats.up.average /= len; + } + + pub fn record_latency(&mut self, latency: u64) -> veilid_api::LatencyStats { + while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE { + self.rolling_latencies.pop_front(); + } + self.rolling_latencies.push_back(latency); + + let mut ls = LatencyStats { + fastest: 0, + average: 0, + slowest: 0, + }; + for rl in &self.rolling_latencies { + ls.fastest.min_assign(*rl); + ls.slowest.max_assign(*rl); + ls.average += *rl; + } + let len = self.rolling_latencies.len() as u64; + ls.average /= len; + + ls + } +} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index eab537cc..ec56b10a 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -575,24 +575,32 @@ impl RPCProcessor { match &out { Err(_) => { self.cancel_op_id_waiter(waitable_reply.op_id); - waitable_reply.node_ref.operate(|e| { - if waitable_reply.is_ping { - e.ping_lost(waitable_reply.send_ts); - } else { - e.question_lost(waitable_reply.send_ts); - } - }); + if waitable_reply.is_ping { + self.routing_table() + .ping_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); + } else { + self.routing_table() + .question_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); + } } Ok((rpcreader, _)) => { // Reply received let recv_ts = get_timestamp(); - waitable_reply.node_ref.operate(|e| { - if waitable_reply.is_ping { - e.pong_rcvd(waitable_reply.send_ts, recv_ts, rpcreader.header.body_len); - } else { - e.answer_rcvd(waitable_reply.send_ts, recv_ts, rpcreader.header.body_len); - } - }); + if waitable_reply.is_ping { + self.routing_table().pong_rcvd( + waitable_reply.node_ref, + waitable_reply.send_ts, + recv_ts, + rpcreader.header.body_len, + ) + } else { + self.routing_table().answer_rcvd( + waitable_reply.node_ref, + waitable_reply.send_ts, + recv_ts, + rpcreader.header.body_len, + ) + } } }; @@ -749,13 +757,13 @@ impl RPCProcessor { // Successfully sent let send_ts = get_timestamp(); - node_ref.operate(|e| { - if is_ping { - e.ping_sent(send_ts, bytes); - } else { - e.question_sent(send_ts, bytes); - } - }); + if is_ping { + self.routing_table() + .ping_sent(node_ref.clone(), send_ts, bytes); + } else { + self.routing_table() + .question_sent(node_ref.clone(), send_ts, bytes); + } // Pass back waitable reply completion match eventual { @@ -763,13 +771,13 @@ impl RPCProcessor { // if we don't want an answer, don't wait for one Ok(None) } - Some(e) => Ok(Some(WaitableReply { - op_id: op_id, - eventual: e, - timeout: timeout, - node_ref: node_ref, - send_ts: send_ts, - is_ping: is_ping, + Some(eventual) => Ok(Some(WaitableReply { + op_id, + eventual, + timeout, + node_ref, + send_ts, + is_ping, })), } } @@ -916,13 +924,11 @@ impl RPCProcessor { // Reply successfully sent let send_ts = get_timestamp(); - node_ref.operate(|e| { - if is_pong { - e.pong_sent(send_ts, bytes); - } else { - e.answer_sent(send_ts, bytes); - } - }); + if is_pong { + self.routing_table().pong_sent(node_ref, send_ts, bytes); + } else { + self.routing_table().answer_sent(node_ref, send_ts, bytes); + } Ok(()) } @@ -985,16 +991,16 @@ impl RPCProcessor { let will_validate_dial_info = self.will_validate_dial_info(); NodeInfo { - can_route: can_route, - will_route: will_route, - can_tunnel: can_tunnel, - will_tunnel: will_tunnel, - can_signal_lease: can_signal_lease, - will_signal_lease: will_signal_lease, - can_relay_lease: can_relay_lease, - will_relay_lease: will_relay_lease, - can_validate_dial_info: can_validate_dial_info, - will_validate_dial_info: will_validate_dial_info, + can_route, + will_route, + can_tunnel, + will_tunnel, + can_signal_lease, + will_signal_lease, + can_relay_lease, + will_relay_lease, + can_validate_dial_info, + will_validate_dial_info, } } @@ -1007,9 +1013,7 @@ impl RPCProcessor { None => None, Some(c) => c.remote.to_socket_addr().ok(), }); - SenderInfo { - socket_address: socket_address, - } + SenderInfo { socket_address } } async fn process_info_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { @@ -1329,7 +1333,7 @@ impl RPCProcessor { let reader = capnp::message::Reader::new(msg.data, Default::default()); let rpcreader = RPCMessageReader { header: msg.header, - reader: reader, + reader, }; let (which, is_q) = { @@ -1379,13 +1383,17 @@ impl RPCProcessor { .lookup_node_ref(rpcreader.header.envelope.get_sender_id()) { if which == 0u32 { - sender_nr.operate(|e| { - e.ping_rcvd(rpcreader.header.timestamp, rpcreader.header.body_len); - }); + self.routing_table().ping_rcvd( + sender_nr, + rpcreader.header.timestamp, + rpcreader.header.body_len, + ); } else { - sender_nr.operate(|e| { - e.question_rcvd(rpcreader.header.timestamp, rpcreader.header.body_len); - }); + self.routing_table().question_rcvd( + sender_nr, + rpcreader.header.timestamp, + rpcreader.header.body_len, + ); } } }; @@ -1508,9 +1516,9 @@ impl RPCProcessor { let msg = RPCMessage { header: RPCMessageHeader { timestamp: get_timestamp(), - envelope: envelope, + envelope, body_len: body.len() as u64, - peer_noderef: peer_noderef, + peer_noderef, }, data: RPCMessageData { contents: body }, }; @@ -1586,9 +1594,9 @@ impl RPCProcessor { // Return the answer for anyone who may care let out = InfoAnswer { - latency: latency, - node_info: node_info, - sender_info: sender_info, + latency, + node_info, + sender_info, }; Ok(out) @@ -1713,7 +1721,7 @@ impl RPCProcessor { let peers_reader = find_node_a .get_peers() .map_err(map_error_internal!("Missing peers"))?; - let mut peers_vec = Vec::::with_capacity( + let mut peers = Vec::::with_capacity( peers_reader .len() .try_into() @@ -1732,13 +1740,10 @@ impl RPCProcessor { } } - peers_vec.push(peer_info); + peers.push(peer_info); } - let out = FindNodeAnswer { - latency: latency, - peers: peers_vec, - }; + let out = FindNodeAnswer { latency, peers }; Ok(out) } diff --git a/veilid-core/src/veilid_api.rs b/veilid-core/src/veilid_api.rs index 5d87330c..06313529 100644 --- a/veilid-core/src/veilid_api.rs +++ b/veilid-core/src/veilid_api.rs @@ -671,6 +671,12 @@ pub struct LatencyStats { pub slowest: u64, // slowest latency in the ROLLING_LATENCIES_SIZE last latencies } +#[derive(Clone, Debug, Default)] +pub struct TransferStatsDownUp { + pub down: TransferStats, + pub up: TransferStats, +} + #[derive(Clone, Debug, Default)] pub struct TransferStats { pub total: u64, // total amount transferred ever @@ -696,8 +702,8 @@ pub struct PeerStats { pub last_seen: Option, // when the peer was last seen for any reason pub ping_stats: PingStats, // information about pings pub latency: Option, // latencies for communications with the peer - pub transfer: (TransferStats, TransferStats), // (download, upload) stats for communications with the peer - pub node_info: Option, // last known node info + pub transfer: TransferStatsDownUp, // Stats for communications with the peer + pub node_info: Option, // Last known node info } cfg_if! { diff --git a/veilid-core/src/xx/tools.rs b/veilid-core/src/xx/tools.rs index bb64df75..26d8819a 100644 --- a/veilid-core/src/xx/tools.rs +++ b/veilid-core/src/xx/tools.rs @@ -99,3 +99,24 @@ where } None } + +pub trait CmpAssign { + fn min_assign(&mut self, other: Self); + fn max_assign(&mut self, other: Self); +} + +impl CmpAssign for T +where + T: core::cmp::Ord, +{ + fn min_assign(&mut self, other: Self) { + if &other < self { + *self = other; + } + } + fn max_assign(&mut self, other: Self) { + if &other > self { + *self = other; + } + } +}