add bandwidth tracking

This commit is contained in:
John Smith 2022-03-20 10:52:03 -04:00
parent 3888a832a0
commit ac0280e0b6
13 changed files with 203 additions and 107 deletions

22
Cargo.lock generated
View File

@ -1782,6 +1782,14 @@ dependencies = [
"hashbrown 0.11.2", "hashbrown 0.11.2",
] ]
[[package]]
name = "hashlink"
version = "0.8.0"
dependencies = [
"hashbrown 0.12.0",
"serde 1.0.136",
]
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.1.19" version = "0.1.19"
@ -3293,7 +3301,7 @@ dependencies = [
"bitflags", "bitflags",
"fallible-iterator", "fallible-iterator",
"fallible-streaming-iterator", "fallible-streaming-iterator",
"hashlink", "hashlink 0.7.0",
"libsqlite3-sys", "libsqlite3-sys",
"memchr", "memchr",
"smallvec", "smallvec",
@ -4042,15 +4050,6 @@ dependencies = [
"static_assertions", "static_assertions",
] ]
[[package]]
name = "uluru"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "794a32261a1f5eb6a4462c81b59cec87b5c27d5deea7dd1ac8fc781c41d226db"
dependencies = [
"arrayvec 0.7.2",
]
[[package]] [[package]]
name = "unicode-bidi" name = "unicode-bidi"
version = "0.3.7" version = "0.3.7"
@ -4201,7 +4200,7 @@ dependencies = [
"generic-array 0.14.5", "generic-array 0.14.5",
"getrandom 0.2.5", "getrandom 0.2.5",
"hashbrown 0.12.0", "hashbrown 0.12.0",
"hashlink", "hashlink 0.8.0",
"hex", "hex",
"ifstructs", "ifstructs",
"jni", "jni",
@ -4239,7 +4238,6 @@ dependencies = [
"socket2", "socket2",
"static_assertions", "static_assertions",
"thiserror", "thiserror",
"uluru",
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-futures", "wasm-bindgen-futures",
"wasm-bindgen-test", "wasm-bindgen-test",

View File

@ -8,7 +8,7 @@ members = [
"veilid-wasm", "veilid-wasm",
] ]
exclude = [ "./external/keyring-rs", "./external/netlink", "./external/cursive" ] exclude = [ "./external/keyring-rs", "./external/netlink", "./external/cursive", "./external/hashlink" ]
[patch.crates-io] [patch.crates-io]
cursive = { path = "./external/cursive/cursive" } cursive = { path = "./external/cursive/cursive" }

2
external/hashlink vendored

@ -1 +1 @@
Subproject commit f5846ec3ff06865a204114bd710253e7e67e4498 Subproject commit 9841db4f9a8b4d6bd46af78927cd88e70befd3f4

View File

@ -25,8 +25,7 @@ hex = "^0"
generic-array = "^0" generic-array = "^0"
secrecy = "^0" secrecy = "^0"
chacha20poly1305 = "^0" chacha20poly1305 = "^0"
uluru = "^3" hashlink = { path = "../external/hashlink", features = ["serde_impl"] }
hashlink = "^0"
serde-big-array = "^0" serde-big-array = "^0"
futures-util = { version = "^0", default_features = false, features = ["alloc"] } futures-util = { version = "^0", default_features = false, features = ["alloc"] }
parking_lot = "^0" parking_lot = "^0"

View File

@ -7,8 +7,9 @@ use chacha20poly1305::aead::{AeadInPlace, NewAead};
use core::convert::TryInto; use core::convert::TryInto;
use curve25519_dalek as cd; use curve25519_dalek as cd;
use ed25519_dalek as ed; use ed25519_dalek as ed;
use hashlink::linked_hash_map::Entry;
use hashlink::LruCache;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uluru;
use x25519_dalek as xd; use x25519_dalek as xd;
pub type SharedSecret = [u8; 32]; pub type SharedSecret = [u8; 32];
@ -17,22 +18,25 @@ pub type Nonce = [u8; 24];
const DH_CACHE_SIZE: usize = 1024; const DH_CACHE_SIZE: usize = 1024;
pub const ENCRYPTION_OVERHEAD: usize = 16; pub const ENCRYPTION_OVERHEAD: usize = 16;
type DHCache = uluru::LRUCache<DHCacheEntry, DH_CACHE_SIZE>; #[derive(Serialize, Deserialize, PartialEq, Eq, Hash)]
struct DHCacheKey {
#[derive(Serialize, Deserialize)]
struct DHCacheEntry {
key: DHTKey, key: DHTKey,
secret: DHTKeySecret, secret: DHTKeySecret,
}
#[derive(Serialize, Deserialize)]
struct DHCacheValue {
shared_secret: SharedSecret, shared_secret: SharedSecret,
} }
type DHCache = LruCache<DHCacheKey, DHCacheValue>;
fn cache_to_bytes(cache: &DHCache) -> Vec<u8> { fn cache_to_bytes(cache: &DHCache) -> Vec<u8> {
let cnt: usize = cache.len(); let cnt: usize = cache.len();
let mut out: Vec<u8> = Vec::with_capacity(cnt * (32 + 32 + 32)); let mut out: Vec<u8> = Vec::with_capacity(cnt * (32 + 32 + 32));
for e in cache.iter() { for e in cache.iter() {
out.extend(&e.key.bytes); out.extend(&e.0.key.bytes);
out.extend(&e.secret.bytes); out.extend(&e.0.secret.bytes);
out.extend(&e.shared_secret); out.extend(&e.1.shared_secret);
} }
let mut rev: Vec<u8> = Vec::with_capacity(out.len()); let mut rev: Vec<u8> = Vec::with_capacity(out.len());
for d in out.chunks(32 + 32 + 32).rev() { for d in out.chunks(32 + 32 + 32).rev() {
@ -43,12 +47,14 @@ fn cache_to_bytes(cache: &DHCache) -> Vec<u8> {
fn bytes_to_cache(bytes: &[u8], cache: &mut DHCache) { fn bytes_to_cache(bytes: &[u8], cache: &mut DHCache) {
for d in bytes.chunks(32 + 32 + 32) { for d in bytes.chunks(32 + 32 + 32) {
let e = DHCacheEntry { let k = DHCacheKey {
key: DHTKey::new(d[0..32].try_into().expect("asdf")), key: DHTKey::new(d[0..32].try_into().expect("asdf")),
secret: DHTKeySecret::new(d[32..64].try_into().expect("asdf")), secret: DHTKeySecret::new(d[32..64].try_into().expect("asdf")),
};
let v = DHCacheValue {
shared_secret: d[64..96].try_into().expect("asdf"), shared_secret: d[64..96].try_into().expect("asdf"),
}; };
cache.insert(e); cache.insert(k, v);
} }
} }
@ -72,7 +78,7 @@ impl Crypto {
table_store, table_store,
node_id: Default::default(), node_id: Default::default(),
node_id_secret: Default::default(), node_id_secret: Default::default(),
dh_cache: DHCache::default(), dh_cache: DHCache::new(DH_CACHE_SIZE),
flush_future: None, flush_future: None,
} }
} }
@ -176,22 +182,19 @@ impl Crypto {
} }
pub fn cached_dh(&self, key: &DHTKey, secret: &DHTKeySecret) -> Result<SharedSecret, String> { pub fn cached_dh(&self, key: &DHTKey, secret: &DHTKeySecret) -> Result<SharedSecret, String> {
if let Some(c) = self Ok(
.inner match self.inner.lock().dh_cache.entry(DHCacheKey {
.lock() key: *key,
.dh_cache secret: *secret,
.find(|entry| entry.key == *key && entry.secret == *secret) }) {
{ Entry::Occupied(e) => e.get().shared_secret,
return Ok(c.shared_secret); Entry::Vacant(e) => {
} let shared_secret = Self::compute_dh(key, secret)?;
e.insert(DHCacheValue { shared_secret });
let shared_secret = Self::compute_dh(key, secret)?; shared_secret
self.inner.lock().dh_cache.insert(DHCacheEntry { }
key: *key, },
secret: *secret, )
shared_secret,
});
Ok(shared_secret)
} }
/////////// ///////////

View File

@ -2,6 +2,7 @@ use crate::xx::*;
use core::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd}; use core::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd};
use core::convert::{TryFrom, TryInto}; use core::convert::{TryFrom, TryInto};
use core::fmt; use core::fmt;
use core::hash::{Hash, Hasher};
use hex; use hex;
use crate::veilid_rng::*; use crate::veilid_rng::*;
@ -203,6 +204,14 @@ macro_rules! byte_array_type {
} }
} }
impl Eq for $name {} impl Eq for $name {}
impl Hash for $name {
fn hash<H: Hasher>(&self, state: &mut H) {
self.valid.hash(state);
if self.valid {
self.bytes.hash(state);
}
}
}
impl Default for $name { impl Default for $name {
fn default() -> Self { fn default() -> Self {
let mut this = $name::new([0u8; $size]); let mut this = $name::new([0u8; $size]);

View File

@ -261,7 +261,8 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> Result<(), String> { ) -> Result<(), String> {
match dial_info.protocol_type() { let data_len = data.len();
let res = match dial_info.protocol_type() {
ProtocolType::UDP => { ProtocolType::UDP => {
let peer_socket_addr = dial_info.to_socket_addr(); let peer_socket_addr = dial_info.to_socket_addr();
RawUdpProtocolHandler::send_unbound_message(peer_socket_addr, data) RawUdpProtocolHandler::send_unbound_message(peer_socket_addr, data)
@ -275,11 +276,17 @@ impl Network {
.map_err(logthru_net!()) .map_err(logthru_net!())
} }
ProtocolType::WS | ProtocolType::WSS => { ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::send_unbound_message(dial_info, data) WebsocketProtocolHandler::send_unbound_message(dial_info.clone(), data)
.await .await
.map_err(logthru_net!()) .map_err(logthru_net!())
} }
};
if res.is_ok() {
// Network accounting
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), data_len as u64);
} }
res
} }
async fn send_data_to_existing_connection( async fn send_data_to_existing_connection(
@ -287,6 +294,8 @@ impl Network {
descriptor: ConnectionDescriptor, descriptor: ConnectionDescriptor,
data: Vec<u8>, data: Vec<u8>,
) -> Result<Option<Vec<u8>>, String> { ) -> Result<Option<Vec<u8>>, String> {
let data_len = data.len();
// Handle connectionless protocol // Handle connectionless protocol
if descriptor.protocol_type() == ProtocolType::UDP { if descriptor.protocol_type() == ProtocolType::UDP {
// send over the best udp socket we have bound since UDP is not connection oriented // send over the best udp socket we have bound since UDP is not connection oriented
@ -299,6 +308,11 @@ impl Network {
.send_message(data, peer_socket_addr) .send_message(data, peer_socket_addr)
.await .await
.map_err(logthru_net!())?; .map_err(logthru_net!())?;
// Network accounting
self.network_manager()
.stats_packet_sent(peer_socket_addr.ip(), data_len as u64);
// Data was consumed // Data was consumed
return Ok(None); return Ok(None);
} }
@ -311,6 +325,10 @@ impl Network {
// connection exists, send over it // connection exists, send over it
conn.send(data).await.map_err(logthru_net!())?; conn.send(data).await.map_err(logthru_net!())?;
// Network accounting
self.network_manager()
.stats_packet_sent(descriptor.remote.to_socket_addr().ip(), data_len as u64);
// Data was consumed // Data was consumed
Ok(None) Ok(None)
} else { } else {
@ -326,14 +344,21 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> Result<(), String> { ) -> Result<(), String> {
let data_len = data.len();
// Handle connectionless protocol // Handle connectionless protocol
if dial_info.protocol_type() == ProtocolType::UDP { if dial_info.protocol_type() == ProtocolType::UDP {
let peer_socket_addr = dial_info.to_socket_addr(); let peer_socket_addr = dial_info.to_socket_addr();
if let Some(ph) = self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { if let Some(ph) = self.find_best_udp_protocol_handler(&peer_socket_addr, &None) {
return ph let res = ph
.send_message(data, peer_socket_addr) .send_message(data, peer_socket_addr)
.await .await
.map_err(logthru_net!()); .map_err(logthru_net!());
if res.is_ok() {
// Network accounting
self.network_manager()
.stats_packet_sent(peer_socket_addr.ip(), data_len as u64);
}
return res;
} }
return Err("no appropriate UDP protocol handler for dial_info".to_owned()) return Err("no appropriate UDP protocol handler for dial_info".to_owned())
.map_err(logthru_net!(error)); .map_err(logthru_net!(error));
@ -343,10 +368,16 @@ impl Network {
let local_addr = self.get_preferred_local_address(&dial_info); let local_addr = self.get_preferred_local_address(&dial_info);
let conn = self let conn = self
.connection_manager() .connection_manager()
.get_or_create_connection(Some(local_addr), dial_info) .get_or_create_connection(Some(local_addr), dial_info.clone())
.await?; .await?;
conn.send(data).await.map_err(logthru_net!(error)) let res = conn.send(data).await.map_err(logthru_net!(error));
if res.is_ok() {
// Network accounting
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), data_len as u64);
}
res
} }
// Send data to node // Send data to node

View File

@ -54,6 +54,13 @@ impl Network {
// XXX: Limit the number of packets from the same IP address? // XXX: Limit the number of packets from the same IP address?
log_net!("UDP packet: {:?}", descriptor); log_net!("UDP packet: {:?}", descriptor);
// Network accounting
network_manager.stats_packet_rcvd(
descriptor.remote.to_socket_addr().ip(),
size as u64,
);
// Pass it up for processing
if let Err(e) = network_manager if let Err(e) = network_manager
.on_recv_envelope(&data[..size], descriptor) .on_recv_envelope(&data[..size], descriptor)
.await .await

View File

@ -49,6 +49,32 @@ impl Network {
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
pub async fn send_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
data: Vec<u8>,
) -> Result<(), String> {
let res = match dial_info.protocol_type() {
ProtocolType::UDP => {
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
}
ProtocolType::TCP => {
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
}
ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::send_unbound_message(dial_info, data)
.await
.map_err(logthru_net!())
}
};
if res.is_ok() {
// Network accounting
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), data_len as u64);
}
res
}
async fn send_data_to_existing_connection( async fn send_data_to_existing_connection(
&self, &self,
descriptor: ConnectionDescriptor, descriptor: ConnectionDescriptor,
@ -71,6 +97,10 @@ impl Network {
// connection exists, send over it // connection exists, send over it
conn.send(data).await.map_err(logthru_net!())?; conn.send(data).await.map_err(logthru_net!())?;
// Network accounting
self.network_manager()
.stats_packet_sent(descriptor.remote.to_socket_addr().ip(), data_len as u64);
// Data was consumed // Data was consumed
Ok(None) Ok(None)
} else { } else {
@ -80,26 +110,6 @@ impl Network {
} }
} }
pub async fn send_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
data: Vec<u8>,
) -> Result<(), String> {
match dial_info.protocol_type() {
ProtocolType::UDP => {
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
}
ProtocolType::TCP => {
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
}
ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::send_unbound_message(dial_info, data)
.await
.map_err(logthru_net!())
}
}
}
pub async fn send_data_to_dial_info( pub async fn send_data_to_dial_info(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
@ -118,7 +128,13 @@ impl Network {
.get_or_create_connection(None, dial_info) .get_or_create_connection(None, dial_info)
.await?; .await?;
conn.send(data).await.map_err(logthru_net!(error)) let res = conn.send(data).await.map_err(logthru_net!(error));
if res.is_ok() {
// Network accounting
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), data_len as u64);
}
res
} }
pub async fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> Result<(), String> { pub async fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> Result<(), String> {

View File

@ -1,6 +1,7 @@
use crate::*; use crate::*;
use connection_manager::*; use connection_manager::*;
use dht::*; use dht::*;
use hashlink::LruCache;
use intf::*; use intf::*;
use lease_manager::*; use lease_manager::*;
use receipt_manager::*; use receipt_manager::*;
@ -89,14 +90,30 @@ pub struct PerAddressStats {
transfer_stats: TransferStatsDownUp, transfer_stats: TransferStatsDownUp,
} }
// Statistics about the low-level network #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
#[derive(Clone, Default)] pub struct PerAddressStatsKey(IpAddr);
pub struct NetworkManagerStats {
self_stats: PerAddressStats, impl Default for PerAddressStatsKey {
per_address_stats: HashMap<IpAddr, PerAddressStats>, fn default() -> Self {
recent_addresses: VecDeque<IpAddr>, Self(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
}
} }
// Statistics about the low-level network
#[derive(Clone)]
pub struct NetworkManagerStats {
self_stats: PerAddressStats,
per_address_stats: LruCache<PerAddressStatsKey, PerAddressStats>,
}
impl Default for NetworkManagerStats {
fn default() -> Self {
Self {
self_stats: PerAddressStats::default(),
per_address_stats: LruCache::new(IPADDR_TABLE_SIZE),
}
}
}
// The mutable state of the network manager // The mutable state of the network manager
struct NetworkManagerInner { struct NetworkManagerInner {
routing_table: Option<RoutingTable>, routing_table: Option<RoutingTable>,
@ -487,6 +504,10 @@ impl NetworkManager {
data.len(), data.len(),
descriptor descriptor
); );
// Network accounting
self.stats_packet_rcvd(descriptor.remote.to_socket_addr().ip(), data.len() as u64);
// Is this an out-of-band receipt instead of an envelope? // Is this an out-of-band receipt instead of an envelope?
if data[0..4] == *RECEIPT_MAGIC { if data[0..4] == *RECEIPT_MAGIC {
self.process_receipt(data).await?; self.process_receipt(data).await?;
@ -603,7 +624,7 @@ impl NetworkManager {
.roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats); .roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats);
// Roll all per-address transfers // Roll all per-address transfers
let mut dead_addrs: HashSet<IpAddr> = HashSet::new(); let mut dead_addrs: HashSet<PerAddressStatsKey> = HashSet::new();
for (addr, stats) in &mut inner.stats.per_address_stats { for (addr, stats) in &mut inner.stats.per_address_stats {
stats.transfer_stats_accounting.roll_transfers( stats.transfer_stats_accounting.roll_transfers(
last_ts, last_ts,
@ -622,15 +643,11 @@ impl NetworkManager {
for da in &dead_addrs { for da in &dead_addrs {
inner.stats.per_address_stats.remove(da); inner.stats.per_address_stats.remove(da);
} }
inner
.stats
.recent_addresses
.retain(|a| !dead_addrs.contains(a));
Ok(()) Ok(())
} }
// Callbacks from low level network for statistics gathering // Callbacks from low level network for statistics gathering
fn packet_sent(&self, addr: IpAddr, bytes: u64) { pub fn stats_packet_sent(&self, addr: IpAddr, bytes: u64) {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
inner inner
.stats .stats
@ -640,13 +657,13 @@ impl NetworkManager {
inner inner
.stats .stats
.per_address_stats .per_address_stats
.entry(addr) .entry(PerAddressStatsKey(addr))
.or_default() .or_insert(PerAddressStats::default())
.transfer_stats_accounting .transfer_stats_accounting
.add_up(bytes); .add_up(bytes);
} }
fn packet_rcvd(&self, addr: IpAddr, bytes: u64) { pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: u64) {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
inner inner
.stats .stats
@ -656,8 +673,8 @@ impl NetworkManager {
inner inner
.stats .stats
.per_address_stats .per_address_stats
.entry(addr) .entry(PerAddressStatsKey(addr))
.or_default() .or_insert(PerAddressStats::default())
.transfer_stats_accounting .transfer_stats_accounting
.add_down(bytes); .add_down(bytes);
} }

View File

@ -7,6 +7,10 @@ impl RoutingTable {
out += "Routing Table Info:\n"; out += "Routing Table Info:\n";
out += &format!(" Node Id: {}\n", inner.node_id.encode()); out += &format!(" Node Id: {}\n", inner.node_id.encode());
out += &format!(
" Self Latency Stats Accounting: {:#?}\n\n",
inner.self_latency_stats_accounting
);
out += &format!( out += &format!(
" Self Transfer Stats Accounting: {:#?}\n\n", " Self Transfer Stats Accounting: {:#?}\n\n",
inner.self_transfer_stats_accounting inner.self_transfer_stats_accounting

View File

@ -653,7 +653,7 @@ impl RoutingTable {
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
// Stats Accounting // Stats Accounting
pub fn ping_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { pub fn stats_ping_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner self.inner
.lock() .lock()
.self_transfer_stats_accounting .self_transfer_stats_accounting
@ -662,7 +662,7 @@ impl RoutingTable {
e.ping_sent(ts, bytes); e.ping_sent(ts, bytes);
}) })
} }
pub fn ping_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { pub fn stats_ping_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner self.inner
.lock() .lock()
.self_transfer_stats_accounting .self_transfer_stats_accounting
@ -671,7 +671,7 @@ impl RoutingTable {
e.ping_rcvd(ts, bytes); e.ping_rcvd(ts, bytes);
}) })
} }
pub fn pong_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { pub fn stats_pong_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner self.inner
.lock() .lock()
.self_transfer_stats_accounting .self_transfer_stats_accounting
@ -680,21 +680,25 @@ impl RoutingTable {
e.pong_sent(ts, bytes); e.pong_sent(ts, bytes);
}) })
} }
pub fn pong_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { pub fn stats_pong_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) {
self.inner self.inner
.lock() .lock()
.self_transfer_stats_accounting .self_transfer_stats_accounting
.add_down(bytes); .add_down(bytes);
self.inner
.lock()
.self_latency_stats_accounting
.record_latency(recv_ts - send_ts);
node_ref.operate(|e| { node_ref.operate(|e| {
e.pong_rcvd(send_ts, recv_ts, bytes); e.pong_rcvd(send_ts, recv_ts, bytes);
}) })
} }
pub fn ping_lost(&self, node_ref: NodeRef, ts: u64) { pub fn stats_ping_lost(&self, node_ref: NodeRef, ts: u64) {
node_ref.operate(|e| { node_ref.operate(|e| {
e.ping_lost(ts); e.ping_lost(ts);
}) })
} }
pub fn question_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { pub fn stats_question_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner self.inner
.lock() .lock()
.self_transfer_stats_accounting .self_transfer_stats_accounting
@ -703,7 +707,7 @@ impl RoutingTable {
e.question_sent(ts, bytes); e.question_sent(ts, bytes);
}) })
} }
pub fn question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { pub fn stats_question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner self.inner
.lock() .lock()
.self_transfer_stats_accounting .self_transfer_stats_accounting
@ -712,7 +716,7 @@ impl RoutingTable {
e.question_rcvd(ts, bytes); e.question_rcvd(ts, bytes);
}) })
} }
pub fn answer_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { pub fn stats_answer_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner self.inner
.lock() .lock()
.self_transfer_stats_accounting .self_transfer_stats_accounting
@ -721,16 +725,20 @@ impl RoutingTable {
e.answer_sent(ts, bytes); e.answer_sent(ts, bytes);
}) })
} }
pub fn answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { pub fn stats_answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) {
self.inner self.inner
.lock() .lock()
.self_transfer_stats_accounting .self_transfer_stats_accounting
.add_down(bytes); .add_down(bytes);
self.inner
.lock()
.self_latency_stats_accounting
.record_latency(recv_ts - send_ts);
node_ref.operate(|e| { node_ref.operate(|e| {
e.answer_rcvd(send_ts, recv_ts, bytes); e.answer_rcvd(send_ts, recv_ts, bytes);
}) })
} }
pub fn question_lost(&self, node_ref: NodeRef, ts: u64) { pub fn stats_question_lost(&self, node_ref: NodeRef, ts: u64) {
node_ref.operate(|e| { node_ref.operate(|e| {
e.question_lost(ts); e.question_lost(ts);
}) })

View File

@ -348,24 +348,26 @@ impl RPCProcessor {
self.cancel_op_id_waiter(waitable_reply.op_id); self.cancel_op_id_waiter(waitable_reply.op_id);
if waitable_reply.is_ping { if waitable_reply.is_ping {
self.routing_table() self.routing_table()
.ping_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); .stats_ping_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts);
} else { } else {
self.routing_table() self.routing_table().stats_question_lost(
.question_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); waitable_reply.node_ref.clone(),
waitable_reply.send_ts,
);
} }
} }
Ok((rpcreader, _)) => { Ok((rpcreader, _)) => {
// Reply received // Reply received
let recv_ts = get_timestamp(); let recv_ts = get_timestamp();
if waitable_reply.is_ping { if waitable_reply.is_ping {
self.routing_table().pong_rcvd( self.routing_table().stats_pong_rcvd(
waitable_reply.node_ref, waitable_reply.node_ref,
waitable_reply.send_ts, waitable_reply.send_ts,
recv_ts, recv_ts,
rpcreader.header.body_len, rpcreader.header.body_len,
) )
} else { } else {
self.routing_table().answer_rcvd( self.routing_table().stats_answer_rcvd(
waitable_reply.node_ref, waitable_reply.node_ref,
waitable_reply.send_ts, waitable_reply.send_ts,
recv_ts, recv_ts,
@ -538,10 +540,10 @@ impl RPCProcessor {
let send_ts = get_timestamp(); let send_ts = get_timestamp();
if is_ping { if is_ping {
self.routing_table() self.routing_table()
.ping_sent(node_ref.clone(), send_ts, bytes); .stats_ping_sent(node_ref.clone(), send_ts, bytes);
} else { } else {
self.routing_table() self.routing_table()
.question_sent(node_ref.clone(), send_ts, bytes); .stats_question_sent(node_ref.clone(), send_ts, bytes);
} }
// Pass back waitable reply completion // Pass back waitable reply completion
@ -718,9 +720,11 @@ impl RPCProcessor {
let send_ts = get_timestamp(); let send_ts = get_timestamp();
if is_pong { if is_pong {
self.routing_table().pong_sent(node_ref, send_ts, bytes); self.routing_table()
.stats_pong_sent(node_ref, send_ts, bytes);
} else { } else {
self.routing_table().answer_sent(node_ref, send_ts, bytes); self.routing_table()
.stats_answer_sent(node_ref, send_ts, bytes);
} }
Ok(()) Ok(())
@ -1190,13 +1194,13 @@ impl RPCProcessor {
.lookup_node_ref(rpcreader.header.envelope.get_sender_id()) .lookup_node_ref(rpcreader.header.envelope.get_sender_id())
{ {
if which == 0u32 { if which == 0u32 {
self.routing_table().ping_rcvd( self.routing_table().stats_ping_rcvd(
sender_nr, sender_nr,
rpcreader.header.timestamp, rpcreader.header.timestamp,
rpcreader.header.body_len, rpcreader.header.body_len,
); );
} else { } else {
self.routing_table().question_rcvd( self.routing_table().stats_question_rcvd(
sender_nr, sender_nr,
rpcreader.header.timestamp, rpcreader.header.timestamp,
rpcreader.header.body_len, rpcreader.header.body_len,