Merge branch 'testing-work' into 'main'

Improved latency and reliability testing

See merge request veilid/veilid!357
This commit is contained in:
Christien Rioux 2025-03-01 00:16:18 +00:00
commit d29a23f341
46 changed files with 995 additions and 314 deletions

13
Cargo.lock generated
View File

@ -6701,6 +6701,17 @@ dependencies = [
"ws_stream_wasm",
]
[[package]]
name = "veilid-tracing-wasm"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b24035e775ff3d7f0c9f552f48296f8992dbc9e0347c5888891c9e96709ea7f"
dependencies = [
"tracing",
"tracing-subscriber",
"wasm-bindgen",
]
[[package]]
name = "veilid-wasm"
version = "0.4.3"
@ -6720,9 +6731,9 @@ dependencies = [
"serde_json",
"tracing",
"tracing-subscriber",
"tracing-wasm",
"tsify",
"veilid-core",
"veilid-tracing-wasm",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-bindgen-test",

View File

@ -73,6 +73,7 @@ unused_async = "deny"
ptr_cast_constness = "deny"
comparison_chain = "allow"
upper_case_acronyms = "allow"
needless_range_loop = "allow"
[workspace.lints.rust]
unused_must_use = "deny"

View File

@ -76,6 +76,10 @@ deps-rust:
# Linux
x86_64-unknown-linux-gnu \
aarch64-unknown-linux-gnu \
# Windows
x86_64-pc-windows-gnu \
# MacOS
aarch64-apple-darwin \
# Android
aarch64-linux-android \
armv7-linux-androideabi \
@ -146,6 +150,8 @@ build-linux-cache:
RUN cargo chef cook --profile=test --tests --target $DEFAULT_CARGO_TARGET --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core
RUN cargo chef cook --zigbuild --release --target x86_64-unknown-linux-gnu --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core
RUN cargo chef cook --zigbuild --release --target aarch64-unknown-linux-gnu --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core
# RUN cargo chef cook --zigbuild --release --target x86_64-pc-windows-gnu --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core
# RUN cargo chef cook --zigbuild --release --target aarch64-apple-darwin --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core
RUN veilid-wasm/wasm_remap_paths.sh cargo chef cook --zigbuild --release --target wasm32-unknown-unknown --recipe-path recipe.json -p veilid-wasm
ARG CI_REGISTRY_IMAGE=registry.gitlab.com/veilid/veilid
SAVE IMAGE --push $CI_REGISTRY_IMAGE/build-cache:latest
@ -176,6 +182,8 @@ code-android:
clippy:
FROM +code-linux
RUN cargo clippy --target x86_64-unknown-linux-gnu
RUN cargo clippy --target x86_64-pc-windows-gnu
RUN cargo clippy --target aarch64-apple-darwin
RUN cargo clippy --manifest-path=veilid-wasm/Cargo.toml --target wasm32-unknown-unknown
# Build
@ -191,9 +199,35 @@ build-linux-amd64:
build-linux-arm64:
FROM +code-linux
# Ensure we have enough memory
IF [ $(free -wmt | grep Total | awk '{print $2}') -lt 7500 ]
RUN echo "not enough container memory to build. increase build host memory."
RUN false
END
RUN cargo zigbuild --target aarch64-unknown-linux-gnu --release -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core
SAVE ARTIFACT ./target/aarch64-unknown-linux-gnu AS LOCAL ./target/artifacts/aarch64-unknown-linux-gnu
# build-windows-amd64:
# FROM +code-linux
# # Ensure we have enough memory
# IF [ $(free -wmt | grep Total | awk '{print $2}') -lt 7500 ]
# RUN echo "not enough container memory to build. increase build host memory."
# RUN false
# END
# RUN cargo zigbuild --target x86_64-pc-windows-gnu --release -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core
# SAVE ARTIFACT ./target/x86_64-pc-windows-gnu AS LOCAL ./target/artifacts/x86_64-pc-windows-gnu
# build-macos-arm64:
# FROM +code-linux
# # Ensure we have enough memory
# IF [ $(free -wmt | grep Total | awk '{print $2}') -lt 7500 ]
# RUN echo "not enough container memory to build. increase build host memory."
# RUN false
# END
# RUN cargo zigbuild --target aarch64-apple-darwin --release -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core
# SAVE ARTIFACT ./target/aarch64-apple-darwin AS LOCAL ./target/artifacts/aarch64-apple-darwin
build-android:
FROM +code-android
WORKDIR /veilid/veilid-core
@ -217,6 +251,14 @@ unit-tests-clippy-wasm-linux:
FROM +code-linux
RUN cargo clippy --manifest-path=veilid-wasm/Cargo.toml --target wasm32-unknown-unknown
unit-tests-clippy-windows-linux:
FROM +code-linux
RUN cargo-zigbuild clippy --target x86_64-pc-windows-gnu
unit-tests-clippy-macos-linux:
FROM +code-linux
RUN cargo-zigbuild clippy --target aarch64-apple-darwin
unit-tests-docs-linux:
FROM +code-linux
RUN ./build_docs.sh
@ -238,6 +280,12 @@ unit-tests-linux:
WAIT
BUILD +unit-tests-clippy-wasm-linux
END
WAIT
BUILD +unit-tests-clippy-windows-linux
END
WAIT
BUILD +unit-tests-clippy-macos-linux
END
WAIT
BUILD +unit-tests-docs-linux
END

View File

@ -84,7 +84,7 @@ cfg_if! {
}
}
#[expect(clippy::unused_async)]
#[allow(clippy::unused_async)]
pub async fn txt_lookup<S: AsRef<str>>(host: S) -> EyreResult<Vec<String>> {
cfg_if! {
if #[cfg(target_os = "windows")] {
@ -157,7 +157,7 @@ pub async fn txt_lookup<S: AsRef<str>>(host: S) -> EyreResult<Vec<String>> {
}
}
#[expect(clippy::unused_async)]
#[allow(clippy::unused_async)]
pub async fn ptr_lookup(ip_addr: IpAddr) -> EyreResult<String> {
cfg_if! {
if #[cfg(target_os = "windows")] {

View File

@ -298,6 +298,8 @@ impl tracing::field::Visit for VeilidKeyedStringRecorder {
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == VEILID_LOG_KEY_FIELD {
self.log_key = value.to_owned();
} else {
self.record_debug(field, &value)
}
}
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn core::fmt::Debug) {

View File

@ -214,6 +214,8 @@ impl tracing::field::Visit for LogKeyFilterVisitor {
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == VEILID_LOG_KEY_FIELD {
self.enabled = Some((self.filter)(value));
} else {
self.record_debug(field, &value)
}
}
fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn fmt::Debug) {}

View File

@ -93,6 +93,9 @@ impl SendDataResult {
Some(ncm) if ncm.is_direct()
)
}
pub fn is_ordered(&self) -> bool {
self.unique_flow.flow.protocol_type().is_ordered()
}
pub fn unique_flow(&self) -> UniqueFlow {
self.unique_flow

View File

@ -170,24 +170,26 @@ impl NetworkManager {
ncm_kind: NodeContactMethodKind::OutboundRelay(relay_nr),
}) => {
// Relay loop or multiple relays
bail!(
veilid_log!(self debug
"Outbound relay loop or multiple relays detected: destination {} resolved to target {} via extraneous relay {}",
destination_node_ref,
target_node_ref,
relay_nr,
relay_nr
);
return Ok(NetworkResult::no_connection_other("outbound relay loop"));
}
Some(NodeContactMethod {
ncm_key: _,
ncm_kind: NodeContactMethodKind::InboundRelay(relay_nr),
}) => {
// Relay loop or multiple relays
bail!(
veilid_log!(self debug
"Inbound relay loop or multiple relays detected: destination {} resolved to target {} via extraneous relay {}",
destination_node_ref,
target_node_ref,
relay_nr,
relay_nr
);
return Ok(NetworkResult::no_connection_other("inbound relay loop"));
}
Some(NodeContactMethod {
ncm_key: _,

View File

@ -10,7 +10,7 @@ pub(crate) enum PunishmentReason {
// Node-level punishments
FailedToDecodeOperation,
WrongSenderPeerInfo,
// FailedToVerifySenderPeerInfo,
FailedToVerifySenderPeerInfo,
FailedToRegisterSenderPeerInfo,
// Route-level punishments
// FailedToDecodeRoutedMessage,

View File

@ -2,7 +2,7 @@ use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
/// Reliable pings are done with increased spacing between pings
///
/// - Start secs is the number of seconds between the first two pings
const RELIABLE_PING_INTERVAL_START_SECS: u32 = 10;
/// - Max secs is the maximum number of seconds between consecutive pings
@ -21,7 +21,15 @@ const RELIABLE_PING_INTERVAL_MULTIPLIER: f64 = 2.0;
const UNRELIABLE_PING_SPAN_SECS: u32 = 60;
/// - Interval is the number of seconds between each ping
const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
/// - Number of consecutive lost answers on an unordered protocol we will
/// tolerate before we call something unreliable
const UNRELIABLE_LOST_ANSWERS_UNORDERED: u32 = 1;
/// - Number of consecutive lost answers on an ordered protocol we will
/// tolerate before we call something unreliable
const UNRELIABLE_LOST_ANSWERS_ORDERED: u32 = 0;
/// Dead nodes are unreachable nodes, not 'never reached' nodes
///
/// How many times do we try to ping a never-reached node before we call it dead
const NEVER_SEEN_PING_COUNT: u32 = 3;
@ -194,9 +202,12 @@ pub(crate) struct BucketEntryInner {
/// The account for the state and reason statistics
#[serde(skip)]
state_stats_accounting: Mutex<StateStatsAccounting>,
/// RPC answer stats accounting
/// RPC answer stats accounting for unordered protocols
#[serde(skip)]
answer_stats_accounting: AnswerStatsAccounting,
answer_stats_accounting_unordered: AnswerStatsAccounting,
/// RPC answer stats accounting for ordered protocols
#[serde(skip)]
answer_stats_accounting_ordered: AnswerStatsAccounting,
/// If the entry is being punished and should be considered dead
#[serde(skip)]
punishment: Option<PunishmentReason>,
@ -375,11 +386,15 @@ impl BucketEntryInner {
}
// Less is faster
pub fn cmp_fastest(e1: &Self, e2: &Self) -> std::cmp::Ordering {
pub fn cmp_fastest(
e1: &Self,
e2: &Self,
metric: impl Fn(&LatencyStats) -> TimestampDuration,
) -> std::cmp::Ordering {
// Lower latency to the front
if let Some(e1_latency) = &e1.peer_stats.latency {
if let Some(e2_latency) = &e2.peer_stats.latency {
e1_latency.average.cmp(&e2_latency.average)
metric(e1_latency).cmp(&metric(e2_latency))
} else {
std::cmp::Ordering::Less
}
@ -391,7 +406,12 @@ impl BucketEntryInner {
}
// Less is more reliable then faster
pub fn cmp_fastest_reliable(cur_ts: Timestamp, e1: &Self, e2: &Self) -> std::cmp::Ordering {
pub fn cmp_fastest_reliable(
cur_ts: Timestamp,
e1: &Self,
e2: &Self,
metric: impl Fn(&LatencyStats) -> TimestampDuration,
) -> std::cmp::Ordering {
// Reverse compare so most reliable is at front
let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts));
if ret != std::cmp::Ordering::Equal {
@ -399,7 +419,7 @@ impl BucketEntryInner {
}
// Lower latency to the front
Self::cmp_fastest(e1, e2)
Self::cmp_fastest(e1, e2, metric)
}
// Less is more reliable then older
@ -426,13 +446,6 @@ impl BucketEntryInner {
}
}
#[expect(dead_code)]
pub fn sort_fastest_reliable_fn(
cur_ts: Timestamp,
) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering {
move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2)
}
pub fn update_signed_node_info(
&mut self,
routing_domain: RoutingDomain,
@ -877,7 +890,10 @@ impl BucketEntryInner {
// called every ROLLING_ANSWERS_INTERVAL_SECS seconds
pub(super) fn roll_answer_stats(&mut self, cur_ts: Timestamp) {
self.peer_stats.rpc_stats.answer = self.answer_stats_accounting.roll_answers(cur_ts);
self.peer_stats.rpc_stats.answer_unordered =
self.answer_stats_accounting_unordered.roll_answers(cur_ts);
self.peer_stats.rpc_stats.answer_ordered =
self.answer_stats_accounting_ordered.roll_answers(cur_ts);
}
///// state machine handling
@ -890,8 +906,14 @@ impl BucketEntryInner {
return Some(BucketEntryUnreliableReason::FailedToSend);
}
// If we have had any lost answers recently, this is not reliable
if self.peer_stats.rpc_stats.recent_lost_answers > 0 {
// If we have had more than UNRELIABLE_LOST_ANSWERS_UNORDERED lost answers recently on an unordered protocol, this is not reliable
if self.peer_stats.rpc_stats.recent_lost_answers_unordered
> UNRELIABLE_LOST_ANSWERS_UNORDERED
{
return Some(BucketEntryUnreliableReason::LostAnswers);
}
// If we have had more than UNRELIABLE_LOST_ANSWERS_ORDERED lost answers recently on an ordered protocol, this is not reliable
if self.peer_stats.rpc_stats.recent_lost_answers_ordered > UNRELIABLE_LOST_ANSWERS_ORDERED {
return Some(BucketEntryUnreliableReason::LostAnswers);
}
@ -920,8 +942,9 @@ impl BucketEntryInner {
// a node is not dead if we haven't heard from it yet,
// but we give it NEVER_REACHED_PING_COUNT chances to ping before we say it's dead
None => {
let no_answers =
self.peer_stats.rpc_stats.recent_lost_answers >= NEVER_SEEN_PING_COUNT;
let no_answers = self.peer_stats.rpc_stats.recent_lost_answers_unordered
+ self.peer_stats.rpc_stats.recent_lost_answers_ordered
>= NEVER_SEEN_PING_COUNT;
if no_answers {
return Some(BucketEntryDeadReason::TooManyLostAnswers);
}
@ -932,7 +955,8 @@ impl BucketEntryInner {
Some(ts) => {
let not_seen = cur_ts.saturating_sub(ts)
>= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1_000_000u64);
let no_answers = self.peer_stats.rpc_stats.recent_lost_answers
let no_answers = self.peer_stats.rpc_stats.recent_lost_answers_unordered
+ self.peer_stats.rpc_stats.recent_lost_answers_ordered
>= (UNRELIABLE_PING_SPAN_SECS / UNRELIABLE_PING_INTERVAL_SECS);
if not_seen && no_answers {
return Some(BucketEntryDeadReason::NoPingResponse);
@ -1046,7 +1070,8 @@ impl BucketEntryInner {
pub(super) fn make_not_dead(&mut self, cur_ts: Timestamp) {
self.peer_stats.rpc_stats.last_seen_ts = None;
self.peer_stats.rpc_stats.failed_to_send = 0;
self.peer_stats.rpc_stats.recent_lost_answers = 0;
self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0;
self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0;
assert!(self.check_dead(cur_ts).is_none());
}
@ -1081,9 +1106,19 @@ impl BucketEntryInner {
////////////////////////////////////////////////////////////////
/// Called when rpc processor things happen
pub(super) fn question_sent(&mut self, ts: Timestamp, bytes: ByteCount, expects_answer: bool) {
pub(super) fn question_sent(
&mut self,
ts: Timestamp,
bytes: ByteCount,
expects_answer: bool,
ordered: bool,
) {
self.transfer_stats_accounting.add_up(bytes);
self.answer_stats_accounting.record_question(ts);
if ordered {
self.answer_stats_accounting_ordered.record_question(ts);
} else {
self.answer_stats_accounting_unordered.record_question(ts);
}
self.peer_stats.rpc_stats.messages_sent += 1;
self.peer_stats.rpc_stats.failed_to_send = 0;
if expects_answer {
@ -1101,21 +1136,40 @@ impl BucketEntryInner {
self.peer_stats.rpc_stats.messages_sent += 1;
self.peer_stats.rpc_stats.failed_to_send = 0;
}
pub(super) fn answer_rcvd(&mut self, send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount) {
pub(super) fn answer_rcvd(
&mut self,
send_ts: Timestamp,
recv_ts: Timestamp,
bytes: ByteCount,
ordered: bool,
) {
self.transfer_stats_accounting.add_down(bytes);
self.answer_stats_accounting.record_answer(recv_ts);
if ordered {
self.answer_stats_accounting_ordered.record_answer(recv_ts);
self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0;
} else {
self.answer_stats_accounting_unordered
.record_answer(recv_ts);
self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0;
}
self.peer_stats.rpc_stats.messages_rcvd += 1;
self.peer_stats.rpc_stats.questions_in_flight -= 1;
self.record_latency(recv_ts.saturating_sub(send_ts));
self.touch_last_seen(recv_ts);
self.peer_stats.rpc_stats.recent_lost_answers = 0;
}
pub(super) fn lost_answer(&mut self) {
pub(super) fn lost_answer(&mut self, ordered: bool) {
let cur_ts = Timestamp::now();
self.answer_stats_accounting.record_lost_answer(cur_ts);
if ordered {
self.answer_stats_accounting_ordered
.record_lost_answer(cur_ts);
self.peer_stats.rpc_stats.recent_lost_answers_ordered += 1;
} else {
self.answer_stats_accounting_unordered
.record_lost_answer(cur_ts);
self.peer_stats.rpc_stats.recent_lost_answers_unordered += 1;
}
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
self.peer_stats.rpc_stats.questions_in_flight -= 1;
self.peer_stats.rpc_stats.recent_lost_answers += 1;
}
pub(super) fn failed_to_send(&mut self, ts: Timestamp, expects_answer: bool) {
if expects_answer {
@ -1181,7 +1235,8 @@ impl BucketEntry {
latency_stats_accounting: LatencyStatsAccounting::new(),
transfer_stats_accounting: TransferStatsAccounting::new(),
state_stats_accounting: Mutex::new(StateStatsAccounting::new()),
answer_stats_accounting: AnswerStatsAccounting::new(),
answer_stats_accounting_ordered: AnswerStatsAccounting::new(),
answer_stats_accounting_unordered: AnswerStatsAccounting::new(),
punishment: None,
#[cfg(feature = "tracking")]
next_track_id: 0,

View File

@ -117,7 +117,7 @@ impl RoutingTable {
PunishmentReason::InvalidFraming => "PFRAME",
PunishmentReason::FailedToDecodeOperation => "PDECOP",
PunishmentReason::WrongSenderPeerInfo => "PSPBAD",
// PunishmentReason::FailedToVerifySenderPeerInfo => "PSPVER",
PunishmentReason::FailedToVerifySenderPeerInfo => "PSPVER",
PunishmentReason::FailedToRegisterSenderPeerInfo => "PSPREG",
//
},

View File

@ -40,6 +40,10 @@ pub const MIN_PUBLIC_INTERNET_ROUTING_DOMAIN_NODE_COUNT: usize = 4;
/// How frequently we tick the relay management routine
pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1;
/// How frequently we optimize relays
pub const RELAY_OPTIMIZATION_INTERVAL_SECS: u32 = 10;
/// What percentile to keep our relays optimized to
pub const RELAY_OPTIMIZATION_PERCENTILE: f32 = 75.0;
/// How frequently we tick the private route management routine
pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1;
@ -1130,4 +1134,27 @@ impl RoutingTable {
}
}
}
#[instrument(level = "trace", skip(self, filter, metric), ret)]
pub fn find_fastest_node(
&self,
cur_ts: Timestamp,
filter: impl Fn(&BucketEntryInner) -> bool,
metric: impl Fn(&LatencyStats) -> TimestampDuration,
) -> Option<NodeRef> {
let inner = self.inner.read();
inner.find_fastest_node(cur_ts, filter, metric)
}
#[instrument(level = "trace", skip(self, filter, metric), ret)]
pub fn get_node_speed_percentile(
&self,
node_id: TypedKey,
cur_ts: Timestamp,
filter: impl Fn(&BucketEntryInner) -> bool,
metric: impl Fn(&LatencyStats) -> TimestampDuration,
) -> Option<NodeRelativePerformance> {
let inner = self.inner.read();
inner.get_node_relative_performance(node_id, cur_ts, filter, metric)
}
}

View File

@ -278,10 +278,16 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait
self.stats_failed_to_send(Timestamp::now(), false);
}
fn stats_question_sent(&self, ts: Timestamp, bytes: ByteCount, expects_answer: bool) {
fn stats_question_sent(
&self,
ts: Timestamp,
bytes: ByteCount,
expects_answer: bool,
ordered: bool,
) {
self.operate_mut(|rti, e| {
rti.transfer_stats_accounting().add_up(bytes);
e.question_sent(ts, bytes, expects_answer);
e.question_sent(ts, bytes, expects_answer, ordered);
})
}
fn stats_question_rcvd(&self, ts: Timestamp, bytes: ByteCount) {
@ -296,17 +302,23 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait
e.answer_sent(bytes);
})
}
fn stats_answer_rcvd(&self, send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount) {
fn stats_answer_rcvd(
&self,
send_ts: Timestamp,
recv_ts: Timestamp,
bytes: ByteCount,
ordered: bool,
) {
self.operate_mut(|rti, e| {
rti.transfer_stats_accounting().add_down(bytes);
rti.latency_stats_accounting()
.record_latency(recv_ts.saturating_sub(send_ts));
e.answer_rcvd(send_ts, recv_ts, bytes);
e.answer_rcvd(send_ts, recv_ts, bytes, ordered);
})
}
fn stats_lost_answer(&self) {
fn stats_lost_answer(&self, ordered: bool) {
self.operate_mut(|_rti, e| {
e.lost_answer();
e.lost_answer(ordered);
})
}
fn stats_failed_to_send(&self, ts: Timestamp, expects_answer: bool) {

View File

@ -466,7 +466,9 @@ impl RouteSpecStore {
// always prioritize reliable nodes, but sort by oldest or fastest
entry1.with_inner(|e1| {
entry2.with_inner(|e2| match safety_spec.stability {
Stability::LowLatency => BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2),
Stability::LowLatency => {
BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2, |ls| ls.tm90)
}
Stability::Reliable => BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2),
})
})

View File

@ -13,6 +13,14 @@ pub const RECENT_PEERS_TABLE_SIZE: usize = 64;
pub const LOCK_TAG_TICK: &str = "TICK";
pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>;
#[derive(Debug)]
pub struct NodeRelativePerformance {
pub percentile: f32,
pub node_index: usize,
pub node_count: usize,
}
//////////////////////////////////////////////////////////////////////////
/// RoutingTable rwlock-internal data
@ -108,6 +116,9 @@ impl RoutingTableInner {
pub fn relay_node_last_keepalive(&self, domain: RoutingDomain) -> Option<Timestamp> {
self.with_routing_domain(domain, |rdd| rdd.relay_node_last_keepalive())
}
pub fn relay_node_last_optimized(&self, domain: RoutingDomain) -> Option<Timestamp> {
self.with_routing_domain(domain, |rdd| rdd.relay_node_last_optimized())
}
pub fn set_relay_node_last_keepalive(&mut self, domain: RoutingDomain, ts: Timestamp) {
match domain {
@ -119,6 +130,16 @@ impl RoutingTableInner {
.set_relay_node_last_keepalive(Some(ts)),
};
}
pub fn set_relay_node_last_optimized(&mut self, domain: RoutingDomain, ts: Timestamp) {
match domain {
RoutingDomain::PublicInternet => self
.public_internet_routing_domain
.set_relay_node_last_optimized(Some(ts)),
RoutingDomain::LocalNetwork => self
.local_network_routing_domain
.set_relay_node_last_optimized(Some(ts)),
};
}
#[expect(dead_code)]
pub fn has_dial_info(&self, domain: RoutingDomain) -> bool {
@ -1382,6 +1403,105 @@ impl RoutingTableInner {
// Unlock noderefs
closest_nodes_locked.iter().map(|x| x.unlocked()).collect()
}
#[instrument(level = "trace", skip(self, filter, metric), ret)]
pub fn find_fastest_node(
&self,
cur_ts: Timestamp,
filter: impl Fn(&BucketEntryInner) -> bool,
metric: impl Fn(&LatencyStats) -> TimestampDuration,
) -> Option<NodeRef> {
// Go through all entries and find fastest entry that matches filter function
let mut fastest_node: Option<Arc<BucketEntry>> = None;
// Iterate all known nodes for candidates
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
let entry2 = entry.clone();
entry.with(rti, |rti, e| {
// Filter this node
if filter(e) {
// Compare against previous candidate
if let Some(fastest_node) = fastest_node.as_mut() {
// Less is faster
let better = fastest_node.with(rti, |_rti, best| {
// choose low latency stability for relays
BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best, &metric)
== std::cmp::Ordering::Less
});
// Now apply filter function and see if this node should be included
if better {
*fastest_node = entry2;
}
} else {
// Always store the first candidate
fastest_node = Some(entry2);
}
}
});
// Don't end early, iterate through all entries
Option::<()>::None
});
// Return the fastest node
fastest_node.map(|e| NodeRef::new(self.registry(), e))
}
#[instrument(level = "trace", skip(self, filter, metric), ret)]
pub fn get_node_relative_performance(
&self,
node_id: TypedKey,
cur_ts: Timestamp,
filter: impl Fn(&BucketEntryInner) -> bool,
metric: impl Fn(&LatencyStats) -> TimestampDuration,
) -> Option<NodeRelativePerformance> {
// Go through all entries and find all entries that matches filter function
let mut all_filtered_nodes: Vec<Arc<BucketEntry>> = Vec::new();
// Iterate all known nodes for candidates
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
let entry2 = entry.clone();
entry.with(rti, |_rti, e| {
// Filter this node
if filter(e) {
all_filtered_nodes.push(entry2);
}
});
// Don't end early, iterate through all entries
Option::<()>::None
});
// Sort by fastest tm90 reliable
all_filtered_nodes.sort_by(|a, b| {
a.with(self, |rti, ea| {
b.with(rti, |_rti, eb| {
BucketEntryInner::cmp_fastest_reliable(cur_ts, ea, eb, &metric)
})
})
});
// Get position in list of requested node
let node_count = all_filtered_nodes.len();
let node_index = all_filtered_nodes
.iter()
.position(|x| x.with(self, |_rti, e| e.node_ids().contains(&node_id)))?;
// Print faster node stats
#[cfg(feature = "verbose-tracing")]
for nl in 0..node_index {
let (latency, node_id) = all_filtered_nodes[nl].with(self, |_rti, e| {
(e.peer_stats().latency.clone(), e.best_node_id())
});
if let Some(latency) = latency {
veilid_log!(self debug "Better relay {}: {}: {}", nl, node_id, latency);
}
}
// Return 'percentile' position. Fastest node is 100%.
Some(NodeRelativePerformance {
percentile: 100.0f32 - ((node_index * 100) as f32) / (node_count as f32),
node_index,
node_count,
})
}
}
#[instrument(level = "trace", skip_all)]

View File

@ -89,6 +89,9 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
fn relay_node_last_keepalive(&self) -> Option<Timestamp> {
self.common.relay_node_last_keepalive()
}
fn relay_node_last_optimized(&self) -> Option<Timestamp> {
self.common.relay_node_last_optimized()
}
fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
self.common.dial_info_details()
}
@ -236,4 +239,8 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
fn set_relay_node_last_keepalive(&mut self, ts: Option<Timestamp>) {
self.common.set_relay_node_last_keepalive(ts);
}
fn set_relay_node_last_optimized(&mut self, ts: Option<Timestamp>) {
self.common.set_relay_node_last_optimized(ts);
}
}

View File

@ -22,6 +22,7 @@ pub trait RoutingDomainDetail {
fn requires_relay(&self) -> Option<RelayKind>;
fn relay_node(&self) -> Option<FilteredNodeRef>;
fn relay_node_last_keepalive(&self) -> Option<Timestamp>;
fn relay_node_last_optimized(&self) -> Option<Timestamp>;
fn dial_info_details(&self) -> &Vec<DialInfoDetail>;
fn get_published_peer_info(&self) -> Option<Arc<PeerInfo>>;
fn inbound_dial_info_filter(&self) -> DialInfoFilter;
@ -55,6 +56,8 @@ pub trait RoutingDomainDetail {
// Set last relay keepalive time
fn set_relay_node_last_keepalive(&mut self, ts: Option<Timestamp>);
// Set last relay optimized time
fn set_relay_node_last_optimized(&mut self, ts: Option<Timestamp>);
}
trait RoutingDomainDetailCommonAccessors: RoutingDomainDetail {
@ -125,6 +128,7 @@ struct RoutingDomainDetailCommon {
// caches
cached_peer_info: Mutex<Option<Arc<PeerInfo>>>,
relay_node_last_keepalive: Option<Timestamp>,
relay_node_last_optimized: Option<Timestamp>,
}
impl RoutingDomainDetailCommon {
@ -140,6 +144,7 @@ impl RoutingDomainDetailCommon {
confirmed: false,
cached_peer_info: Mutex::new(Default::default()),
relay_node_last_keepalive: Default::default(),
relay_node_last_optimized: Default::default(),
}
}
@ -227,6 +232,10 @@ impl RoutingDomainDetailCommon {
self.relay_node_last_keepalive
}
pub fn relay_node_last_optimized(&self) -> Option<Timestamp> {
self.relay_node_last_optimized
}
pub fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
&self.dial_info_details
}
@ -277,6 +286,12 @@ impl RoutingDomainDetailCommon {
fn set_relay_node(&mut self, opt_relay_node: Option<NodeRef>) {
self.relay_node = opt_relay_node;
self.relay_node_last_keepalive = None;
self.relay_node_last_optimized = if self.relay_node.is_some() {
Some(Timestamp::now())
} else {
None
};
self.clear_cache();
}
@ -317,6 +332,9 @@ impl RoutingDomainDetailCommon {
fn set_relay_node_last_keepalive(&mut self, ts: Option<Timestamp>) {
self.relay_node_last_keepalive = ts;
}
fn set_relay_node_last_optimized(&mut self, ts: Option<Timestamp>) {
self.relay_node_last_optimized = ts;
}
//////////////////////////////////////////////////////////////////////////////
// Internal functions

View File

@ -70,6 +70,9 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
fn relay_node_last_keepalive(&self) -> Option<Timestamp> {
self.common.relay_node_last_keepalive()
}
fn relay_node_last_optimized(&self) -> Option<Timestamp> {
self.common.relay_node_last_optimized()
}
fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
self.common.dial_info_details()
}
@ -400,4 +403,8 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
fn set_relay_node_last_keepalive(&mut self, ts: Option<Timestamp>) {
self.common.set_relay_node_last_keepalive(ts);
}
fn set_relay_node_last_optimized(&mut self, ts: Option<Timestamp>) {
self.common.set_relay_node_last_optimized(ts);
}
}

View File

@ -2,7 +2,7 @@ use super::*;
// Latency entry is per round-trip packet (ping or data)
// - Size is number of entries
const ROLLING_LATENCIES_SIZE: usize = 10;
const ROLLING_LATENCIES_SIZE: usize = 50;
// Transfers entries are in bytes total for the interval
// - Size is number of entries
@ -102,28 +102,64 @@ impl LatencyStatsAccounting {
}
}
fn get_tm_n(sorted_latencies: &[TimestampDuration], n: usize) -> Option<TimestampDuration> {
let tmcount = sorted_latencies.len() * n / 100;
if tmcount == 0 {
None
} else {
let mut tm = TimestampDuration::new(0);
for l in &sorted_latencies[..tmcount] {
tm += *l;
}
tm /= tmcount as u64;
Some(tm)
}
}
fn get_p_n(sorted_latencies: &[TimestampDuration], n: usize) -> TimestampDuration {
let pindex = (sorted_latencies.len() * n / 100).saturating_sub(1);
sorted_latencies[pindex]
}
pub fn record_latency(&mut self, latency: TimestampDuration) -> LatencyStats {
while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE {
self.rolling_latencies.pop_front();
}
self.rolling_latencies.push_back(latency);
let mut ls = LatencyStats {
fastest: u64::MAX.into(),
average: 0.into(),
slowest: 0.into(),
};
// Calculate latency stats
let mut fastest = TimestampDuration::new(u64::MAX);
let mut slowest = TimestampDuration::new(0u64);
let mut average = TimestampDuration::new(0u64);
for rl in &self.rolling_latencies {
ls.fastest.min_assign(*rl);
ls.slowest.max_assign(*rl);
ls.average += *rl;
fastest.min_assign(*rl);
slowest.max_assign(*rl);
average += *rl;
}
let len = self.rolling_latencies.len() as u64;
if len > 0 {
ls.average /= len;
average /= len;
}
ls
let mut sorted_latencies: Vec<_> = self.rolling_latencies.iter().copied().collect();
sorted_latencies.sort();
let tm90 = Self::get_tm_n(&sorted_latencies, 90).unwrap_or(average);
let tm75 = Self::get_tm_n(&sorted_latencies, 75).unwrap_or(average);
let p90 = Self::get_p_n(&sorted_latencies, 90);
let p75 = Self::get_p_n(&sorted_latencies, 75);
LatencyStats {
fastest,
average,
slowest,
tm90,
tm75,
p90,
p75,
}
}
}

View File

@ -317,7 +317,7 @@ impl RoutingTable {
if count == 0 {
return;
}
veilid_log!(self debug "[{}] Ping validation queue: {} remaining", name, count);
veilid_log!(self debug target:"network_result", "[{}] Ping validation queue: {} remaining", name, count);
let atomic_count = AtomicUsize::new(count);
process_batched_future_queue(future_queue, MAX_PARALLEL_PINGS, stop_token, |res| async {
@ -326,7 +326,7 @@ impl RoutingTable {
}
let remaining = atomic_count.fetch_sub(1, Ordering::AcqRel) - 1;
if remaining > 0 {
veilid_log!(self debug "[{}] Ping validation queue: {} remaining", name, remaining);
veilid_log!(self debug target:"network_result", "[{}] Ping validation queue: {} remaining", name, remaining);
}
})
.await;

View File

@ -71,7 +71,7 @@ impl RoutingTable {
false
}
// Relay node no longer can relay
else if relay_node.operate(|_rti, e| !relay_node_filter(e)) {
else if relay_node.operate(|_rti, e| !&relay_node_filter(e)) {
veilid_log!(self debug
"Relay node can no longer relay, dropping relay {}",
relay_node
@ -88,7 +88,86 @@ impl RoutingTable {
editor.set_relay_node(None);
false
} else {
true
// See if our relay was optimized last long enough ago to consider getting a new one
// if it is no longer fast enough
let mut has_relay = true;
let mut inner = self.inner.upgradable_read();
if let Some(last_optimized) =
inner.relay_node_last_optimized(RoutingDomain::PublicInternet)
{
let last_optimized_duration = cur_ts - last_optimized;
if last_optimized_duration
> TimestampDuration::new_secs(RELAY_OPTIMIZATION_INTERVAL_SECS)
{
// See what our relay's current percentile is
let relay_node_id = relay_node.best_node_id();
if let Some(relay_relative_performance) = inner
.get_node_relative_performance(
relay_node_id,
cur_ts,
&relay_node_filter,
|ls| ls.tm90,
)
{
// Get latency numbers
let latency_stats =
if let Some(latency) = relay_node.peer_stats().latency {
latency.to_string()
} else {
"[no stats]".to_owned()
};
// Get current relay reliability
let state_reason = relay_node.state_reason(cur_ts);
if relay_relative_performance.percentile
< RELAY_OPTIMIZATION_PERCENTILE
{
// Drop the current relay so we can get the best new one
veilid_log!(self debug
"Relay tm90 is ({:.2}% < {:.2}%) ({} out of {}) (latency {}, {:?}) optimizing relay {}",
relay_relative_performance.percentile,
RELAY_OPTIMIZATION_PERCENTILE,
relay_relative_performance.node_index,
relay_relative_performance.node_count,
latency_stats,
state_reason,
relay_node
);
editor.set_relay_node(None);
has_relay = false;
} else {
// Note that we tried to optimize the relay but it was good
veilid_log!(self debug
"Relay tm90 is ({:.2}% >= {:.2}%) ({} out of {}) (latency {}, {:?}) keeping {}",
relay_relative_performance.percentile,
RELAY_OPTIMIZATION_PERCENTILE,
relay_relative_performance.node_index,
relay_relative_performance.node_count,
latency_stats,
state_reason,
relay_node
);
inner.with_upgraded(|inner| {
inner.set_relay_node_last_optimized(
RoutingDomain::PublicInternet,
cur_ts,
)
});
}
} else {
// Drop the current relay because it could not be measured
veilid_log!(self debug
"Relay relative performance not found {}",
relay_node
);
editor.set_relay_node(None);
has_relay = false;
}
}
}
has_relay
}
} else {
false
@ -123,11 +202,7 @@ impl RoutingTable {
}
if !got_outbound_relay {
// Find a node in our routing table that is an acceptable inbound relay
if let Some(nr) = self.find_inbound_relay(
RoutingDomain::PublicInternet,
cur_ts,
relay_node_filter,
) {
if let Some(nr) = self.find_fastest_node(cur_ts, &relay_node_filter, |ls| ls.tm90) {
veilid_log!(self debug "Inbound relay node selected: {}", nr);
editor.set_relay_node(Some(nr));
}
@ -233,47 +308,4 @@ impl RoutingTable {
true
}
}
#[instrument(level = "trace", skip(self, relay_node_filter), ret)]
pub fn find_inbound_relay(
&self,
routing_domain: RoutingDomain,
cur_ts: Timestamp,
relay_node_filter: impl Fn(&BucketEntryInner) -> bool,
) -> Option<NodeRef> {
// Go through all entries and find fastest entry that matches filter function
let inner = self.inner.read();
let inner = &*inner;
let mut best_inbound_relay: Option<Arc<BucketEntry>> = None;
// Iterate all known nodes for candidates
inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
let entry2 = entry.clone();
entry.with(rti, |rti, e| {
// Filter this node
if relay_node_filter(e) {
// Compare against previous candidate
if let Some(best_inbound_relay) = best_inbound_relay.as_mut() {
// Less is faster
let better = best_inbound_relay.with(rti, |_rti, best| {
// choose low latency stability for relays
BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best)
== std::cmp::Ordering::Less
});
// Now apply filter function and see if this node should be included
if better {
*best_inbound_relay = entry2;
}
} else {
// Always store the first candidate
best_inbound_relay = Some(entry2);
}
}
});
// Don't end early, iterate through all entries
Option::<()>::None
});
// Return the best inbound relay noderef
best_inbound_relay.map(|e| NodeRef::new(self.registry(), e))
}
}

View File

@ -184,7 +184,12 @@ impl NodeInfo {
true
}
/// Does this appear on the same network within the routing domain
/// Does this appear on the same network within the routing domain?
/// The notion of 'ipblock' is a single external IP address for ipv4, and a fixed prefix for ipv6.
/// If a NAT is present, this detects if two public peerinfo would share the same router and be
/// subject to hairpin NAT (for ipv4 typically). This is also overloaded for the concept
/// of rate-limiting the number of nodes coming from the same ip 'block' within a specific amount of
/// time for the address filter.
pub fn node_is_on_same_ipblock(&self, node_b: &NodeInfo, ip6_prefix_size: usize) -> bool {
let our_ip_blocks = self
.dial_info_detail_list()

View File

@ -69,16 +69,22 @@ impl_veilid_log_facility!("rpc");
#[derive(Debug)]
#[must_use]
struct WaitableReply {
handle: OperationWaitHandle<Message, Option<QuestionContext>>,
struct WaitableReplyContext {
timeout_us: TimestampDuration,
node_ref: NodeRef,
send_ts: Timestamp,
send_data_method: SendDataResult,
send_data_result: SendDataResult,
safety_route: Option<PublicKey>,
remote_private_route: Option<PublicKey>,
reply_private_route: Option<PublicKey>,
}
#[derive(Debug)]
#[must_use]
struct WaitableReply {
handle: OperationWaitHandle<Message, Option<QuestionContext>>,
_opt_connection_ref_scope: Option<ConnectionRefScope>,
context: WaitableReplyContext,
}
/////////////////////////////////////////////////////////////////////
@ -327,14 +333,19 @@ impl RPCProcessor {
// Sender PeerInfo was specified, update our routing table with it
if !self.verify_node_info(routing_domain, peer_info.signed_node_info(), &[]) {
veilid_log!(self debug target:"network_result", "Dropping invalid PeerInfo in {:?} for id {}: {:?}", routing_domain, sender_node_id, peer_info);
// Don't punish for this because in the case of hairpin NAT
// you can legally get LocalNetwork PeerInfo when you expect PublicInternet PeerInfo
//
// self.network_manager().address_filter().punish_node_id(
// sender_node_id,
// PunishmentReason::FailedToVerifySenderPeerInfo,
// );
veilid_log!(self debug target:"network_result", "Punishing invalid PeerInfo in {:?} for id {}: {:?}", routing_domain, sender_node_id, peer_info);
// Punish nodes that send peer info for the wrong routing domain
// Hairpin NAT situations where routing domain appears to be LocalNetwork
// shoud not happen. These nodes should be using InboundRelay now to communicate
// due to the 'node_is_on_same_ipblock' check in PublicInternetRoutigDomainDetail::get_contact_method.
// Nodes that are actually sending LocalNetwork ip addresses over the PublicInternet domain
// or vice-versa need to be punished.
self.network_manager().address_filter().punish_node_id(
sender_node_id,
PunishmentReason::FailedToVerifySenderPeerInfo,
);
return Ok(NetworkResult::value(None));
}
let sender_nr = match self
@ -512,28 +523,16 @@ impl RPCProcessor {
let id = waitable_reply.handle.id();
let out = self
.waiting_rpc_table
.wait_for_op(waitable_reply.handle, waitable_reply.timeout_us)
.wait_for_op(waitable_reply.handle, waitable_reply.context.timeout_us)
.await;
match &out {
Err(e) => {
veilid_log!(self debug "RPC Lost (id={} {}): {}", id, debug_string, e);
self.record_lost_answer(
waitable_reply.send_ts,
waitable_reply.node_ref.clone(),
waitable_reply.safety_route,
waitable_reply.remote_private_route,
waitable_reply.reply_private_route,
);
self.record_lost_answer(&waitable_reply.context);
}
Ok(TimeoutOr::Timeout) => {
veilid_log!(self debug "RPC Lost (id={} {}): Timeout", id, debug_string);
self.record_lost_answer(
waitable_reply.send_ts,
waitable_reply.node_ref.clone(),
waitable_reply.safety_route,
waitable_reply.remote_private_route,
waitable_reply.reply_private_route,
);
self.record_lost_answer(&waitable_reply.context);
}
Ok(TimeoutOr::Value((rpcreader, _))) => {
// Reply received
@ -541,17 +540,13 @@ impl RPCProcessor {
// Record answer received
self.record_answer_received(
waitable_reply.send_ts,
recv_ts,
rpcreader.header.body_len,
waitable_reply.node_ref.clone(),
waitable_reply.safety_route,
waitable_reply.remote_private_route,
waitable_reply.reply_private_route,
&waitable_reply.context,
);
// Ensure the reply comes over the private route that was requested
if let Some(reply_private_route) = waitable_reply.reply_private_route {
if let Some(reply_private_route) = waitable_reply.context.reply_private_route {
match &rpcreader.header.detail {
RPCMessageHeaderDetail::Direct(_) => {
return Err(RPCError::protocol(
@ -882,20 +877,15 @@ impl RPCProcessor {
/// Record question lost to node or route
#[instrument(level = "trace", target = "rpc", skip_all)]
fn record_lost_answer(
&self,
send_ts: Timestamp,
node_ref: NodeRef,
safety_route: Option<PublicKey>,
remote_private_route: Option<PublicKey>,
private_route: Option<PublicKey>,
) {
fn record_lost_answer(&self, context: &WaitableReplyContext) {
// Record for node if this was not sent via a route
if safety_route.is_none() && remote_private_route.is_none() {
node_ref.stats_lost_answer();
if context.safety_route.is_none() && context.remote_private_route.is_none() {
context
.node_ref
.stats_lost_answer(context.send_data_result.is_ordered());
// Also clear the last_connections for the entry so we make a new connection next time
node_ref.clear_last_flows();
context.node_ref.clear_last_flows();
return;
}
@ -904,20 +894,20 @@ impl RPCProcessor {
let rss = routing_table.route_spec_store();
// If safety route was used, record question lost there
if let Some(sr_pubkey) = &safety_route {
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| {
if let Some(sr_pubkey) = &context.safety_route {
rss.with_route_stats_mut(context.send_ts, sr_pubkey, |s| {
s.record_lost_answer();
});
}
// If remote private route was used, record question lost there
if let Some(rpr_pubkey) = &remote_private_route {
rss.with_route_stats_mut(send_ts, rpr_pubkey, |s| {
if let Some(rpr_pubkey) = &context.remote_private_route {
rss.with_route_stats_mut(context.send_ts, rpr_pubkey, |s| {
s.record_lost_answer();
});
}
// If private route was used, record question lost there
if let Some(pr_pubkey) = &private_route {
rss.with_route_stats_mut(send_ts, pr_pubkey, |s| {
// If reply private route was used, record question lost there
if let Some(pr_pubkey) = &context.reply_private_route {
rss.with_route_stats_mut(context.send_ts, pr_pubkey, |s| {
s.record_lost_answer();
});
}
@ -925,6 +915,7 @@ impl RPCProcessor {
/// Record success sending to node or route
#[instrument(level = "trace", target = "rpc", skip_all)]
#[expect(clippy::too_many_arguments)]
fn record_send_success(
&self,
rpc_kind: RPCKind,
@ -933,6 +924,7 @@ impl RPCProcessor {
node_ref: NodeRef,
safety_route: Option<PublicKey>,
remote_private_route: Option<PublicKey>,
ordered: bool,
) {
// Record for node if this was not sent via a route
if safety_route.is_none() && remote_private_route.is_none() {
@ -942,7 +934,7 @@ impl RPCProcessor {
if is_answer {
node_ref.stats_answer_sent(bytes);
} else {
node_ref.stats_question_sent(send_ts, bytes, wants_answer);
node_ref.stats_question_sent(send_ts, bytes, wants_answer, ordered);
}
return;
}
@ -971,18 +963,21 @@ impl RPCProcessor {
#[instrument(level = "trace", target = "rpc", skip_all)]
fn record_answer_received(
&self,
send_ts: Timestamp,
recv_ts: Timestamp,
bytes: ByteCount,
node_ref: NodeRef,
safety_route: Option<PublicKey>,
remote_private_route: Option<PublicKey>,
reply_private_route: Option<PublicKey>,
context: &WaitableReplyContext,
) {
// Record stats for remote node if this was direct
if safety_route.is_none() && remote_private_route.is_none() && reply_private_route.is_none()
if context.safety_route.is_none()
&& context.remote_private_route.is_none()
&& context.reply_private_route.is_none()
{
node_ref.stats_answer_rcvd(send_ts, recv_ts, bytes);
context.node_ref.stats_answer_rcvd(
context.send_ts,
recv_ts,
bytes,
context.send_data_result.is_ordered(),
);
return;
}
// Get route spec store
@ -991,11 +986,11 @@ impl RPCProcessor {
// Get latency for all local routes
let mut total_local_latency = TimestampDuration::new(0u64);
let total_latency: TimestampDuration = recv_ts.saturating_sub(send_ts);
let total_latency: TimestampDuration = recv_ts.saturating_sub(context.send_ts);
// If safety route was used, record route there
if let Some(sr_pubkey) = &safety_route {
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| {
if let Some(sr_pubkey) = &context.safety_route {
rss.with_route_stats_mut(context.send_ts, sr_pubkey, |s| {
// Record received bytes
s.record_answer_received(recv_ts, bytes);
@ -1005,8 +1000,8 @@ impl RPCProcessor {
}
// If local private route was used, record route there
if let Some(pr_pubkey) = &reply_private_route {
rss.with_route_stats_mut(send_ts, pr_pubkey, |s| {
if let Some(pr_pubkey) = &context.reply_private_route {
rss.with_route_stats_mut(context.send_ts, pr_pubkey, |s| {
// Record received bytes
s.record_answer_received(recv_ts, bytes);
@ -1016,8 +1011,8 @@ impl RPCProcessor {
}
// If remote private route was used, record there
if let Some(rpr_pubkey) = &remote_private_route {
rss.with_route_stats_mut(send_ts, rpr_pubkey, |s| {
if let Some(rpr_pubkey) = &context.remote_private_route {
rss.with_route_stats_mut(context.send_ts, rpr_pubkey, |s| {
// Record received bytes
s.record_answer_received(recv_ts, bytes);
@ -1040,13 +1035,13 @@ impl RPCProcessor {
// If no remote private route was used, then record half the total latency on our local routes
// This is fine because if we sent with a local safety route,
// then we must have received with a local private route too, per the design rules
if let Some(sr_pubkey) = &safety_route {
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| {
if let Some(sr_pubkey) = &context.safety_route {
rss.with_route_stats_mut(context.send_ts, sr_pubkey, |s| {
s.record_latency(total_latency / 2u64);
});
}
if let Some(pr_pubkey) = &reply_private_route {
rss.with_route_stats_mut(send_ts, pr_pubkey, |s| {
if let Some(pr_pubkey) = &context.reply_private_route {
rss.with_route_stats_mut(context.send_ts, pr_pubkey, |s| {
s.record_latency(total_latency / 2u64);
});
}
@ -1157,7 +1152,7 @@ impl RPCProcessor {
);
RPCError::network(e)
})?;
let send_data_method = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Question, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);
network_result_raise!(res);
@ -1172,11 +1167,12 @@ impl RPCProcessor {
node_ref.unfiltered(),
safety_route,
remote_private_route,
send_data_result.is_ordered(),
);
// Ref the connection so it doesn't go away until we're done with the waitable reply
let opt_connection_ref_scope =
send_data_method.unique_flow().connection_id.and_then(|id| {
send_data_result.unique_flow().connection_id.and_then(|id| {
self.network_manager()
.connection_manager()
.try_connection_ref_scope(id)
@ -1185,14 +1181,16 @@ impl RPCProcessor {
// Pass back waitable reply completion
Ok(NetworkResult::value(WaitableReply {
handle,
_opt_connection_ref_scope: opt_connection_ref_scope,
context: WaitableReplyContext {
timeout_us,
node_ref: node_ref.unfiltered(),
send_ts,
send_data_method,
send_data_result,
safety_route,
remote_private_route,
reply_private_route,
_opt_connection_ref_scope: opt_connection_ref_scope,
},
}))
}
@ -1243,7 +1241,7 @@ impl RPCProcessor {
);
RPCError::network(e)
})?;
let _send_data_method = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Statement, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);
network_result_raise!(res);
@ -1258,6 +1256,7 @@ impl RPCProcessor {
node_ref.unfiltered(),
safety_route,
remote_private_route,
send_data_result.is_ordered(),
);
Ok(NetworkResult::value(()))
@ -1313,7 +1312,7 @@ impl RPCProcessor {
);
RPCError::network(e)
})?;
let _send_data_kind = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Answer, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);
network_result_raise!(res);
@ -1328,6 +1327,7 @@ impl RPCProcessor {
node_ref.unfiltered(),
safety_route,
remote_private_route,
send_data_result.is_ordered(),
);
Ok(NetworkResult::value(()))

View File

@ -27,7 +27,7 @@ impl RPCProcessor {
let waitable_reply = network_result_try!(self.question(dest, question, None).await?);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route;
let reply_private_route = waitable_reply.context.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -47,7 +47,7 @@ impl RPCProcessor {
let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route;
let reply_private_route = waitable_reply.context.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -86,7 +86,7 @@ impl RPCProcessor {
);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route;
let reply_private_route = waitable_reply.context.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -89,7 +89,7 @@ impl RPCProcessor {
);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route;
let reply_private_route = waitable_reply.context.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -98,7 +98,7 @@ impl RPCProcessor {
);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route;
let reply_private_route = waitable_reply.context.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -73,10 +73,10 @@ impl RPCProcessor {
network_result_try!(self.question(dest.clone(), question, None).await?);
// Note what kind of ping this was and to what peer scope
let send_data_method = waitable_reply.send_data_method.clone();
let send_data_method = waitable_reply.context.send_data_result.clone();
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route;
let reply_private_route = waitable_reply.context.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -91,7 +91,7 @@ impl RPCProcessor {
network_result_try!(self.question(dest.clone(), question, None).await?);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route;
let reply_private_route = waitable_reply.context.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -7,6 +7,10 @@ pub fn fix_latencystats() -> LatencyStats {
fastest: TimestampDuration::from(1234),
average: TimestampDuration::from(2345),
slowest: TimestampDuration::from(3456),
tm90: TimestampDuration::from(4567),
tm75: TimestampDuration::from(5678),
p90: TimestampDuration::from(6789),
p75: TimestampDuration::from(7890),
}
}
@ -49,9 +53,11 @@ pub fn fix_rpcstats() -> RPCStats {
last_question_ts: Some(Timestamp::from(1685569084280)),
last_seen_ts: Some(Timestamp::from(1685569101256)),
first_consecutive_seen_ts: Some(Timestamp::from(1685569111851)),
recent_lost_answers: 5,
recent_lost_answers_unordered: 5,
recent_lost_answers_ordered: 6,
failed_to_send: 3,
answer: fix_answerstats(),
answer_unordered: fix_answerstats(),
answer_ordered: fix_answerstats(),
}
}

View File

@ -10,14 +10,26 @@ pub struct LatencyStats {
pub average: TimestampDuration,
/// slowest latency in the ROLLING_LATENCIES_SIZE last latencies
pub slowest: TimestampDuration,
/// trimmed mean with lowest 90% latency in the ROLLING_LATENCIES_SIZE
#[serde(default)]
pub tm90: TimestampDuration,
/// trimmed mean with lowest 75% latency in the ROLLING_LATENCIES_SIZE
#[serde(default)]
pub tm75: TimestampDuration,
/// p90 latency in the ROLLING_LATENCIES_SIZE
#[serde(default)]
pub p90: TimestampDuration,
/// p75 latency in the ROLLING_LATENCIES_SIZE
#[serde(default)]
pub p75: TimestampDuration,
}
impl fmt::Display for LatencyStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{} slow / {} avg / {} fast",
self.slowest, self.average, self.fastest
"slow={} / avg={} / fast={} / tm90={} / tm75={} / p90={} / p75={}",
self.slowest, self.average, self.fastest, self.tm90, self.tm75, self.p90, self.p75
)?;
Ok(())
}
@ -206,13 +218,20 @@ pub struct RPCStats {
pub last_seen_ts: Option<Timestamp>,
/// the timestamp of the first consecutive proof-of-life for this node (an answer or received question)
pub first_consecutive_seen_ts: Option<Timestamp>,
/// number of answers that have been lost consecutively
pub recent_lost_answers: u32,
/// number of messages that have failed to send or connections dropped since we last successfully sent one
pub failed_to_send: u32,
/// rpc answer stats for this peer
/// number of answers that have been lost consecutively over an unordered channel
#[serde(default)]
pub answer: AnswerStats,
pub recent_lost_answers_unordered: u32,
/// number of answers that have been lost consecutively over an ordered channel
#[serde(default)]
pub recent_lost_answers_ordered: u32,
/// unordered rpc answer stats for this peer
#[serde(default)]
pub answer_unordered: AnswerStats,
/// ordered rpc answer stats for this peer
#[serde(default)]
pub answer_ordered: AnswerStats,
}
impl fmt::Display for RPCStats {
@ -224,9 +243,10 @@ impl fmt::Display for RPCStats {
)?;
writeln!(
f,
"# recently-lost/failed-to-send: {} / {}",
self.recent_lost_answers, self.failed_to_send
"# recently-lost unordered/ordered: {} / {}",
self.recent_lost_answers_unordered, self.recent_lost_answers_ordered,
)?;
writeln!(f, "# failed-to-send: {}", self.failed_to_send)?;
writeln!(
f,
"last_question: {}",
@ -255,7 +275,16 @@ impl fmt::Display for RPCStats {
}
)?;
write!(f, "answers:\n{}", indent_all_string(&self.answer))?;
write!(
f,
"unreliable answers:\n{}",
indent_all_string(&self.answer_unordered)
)?;
write!(
f,
"reliable answers:\n{}",
indent_all_string(&self.answer_ordered)
)?;
Ok(())
}

View File

@ -19,11 +19,11 @@ pub fn setup() -> () {
SETUP_ONCE.call_once(|| {
console_error_panic_hook::set_once();
let mut builder = tracing_wasm::WASMLayerConfigBuilder::new();
builder.set_report_logs_in_timings(false);
builder.set_max_level(Level::TRACE);
builder.set_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
tracing_wasm::set_as_global_default_with_config(builder.build());
let config = veilid_tracing_wasm::WASMLayerConfig::new()
.with_report_logs_in_timings(false)
.with_max_level(Level::TRACE)
.with_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
tracing_wasm::set_as_global_default_with_config(config);
});
}

View File

@ -1,8 +1,8 @@
// ignore_for_file: prefer_single_quotes
import 'dart:io' show Platform;
import 'package:ansicolor/ansicolor.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'package:loggy/loggy.dart';
import 'package:veilid/veilid.dart';
@ -84,7 +84,14 @@ class CallbackPrinter extends LoggyPrinter {
@override
void onLog(LogRecord record) {
debugPrint(record.pretty());
final out = record.pretty().replaceAll('\uFFFD', '');
if (!kIsWeb && Platform.isAndroid) {
debugPrint(out);
} else {
debugPrintSynchronously(out);
}
callback?.call(record);
}

View File

@ -27,8 +27,9 @@ class _LogTerminalState extends State<LogTerminal> {
void initState() {
super.initState();
_terminal.setLineFeedMode(true);
globalTerminalPrinter.setCallback((log) {
_terminal.write('${log.pretty()}\n');
globalTerminalPrinter.setCallback((record) {
final out = record.pretty().replaceAll('\uFFFD', '');
_terminal.write('$out\n');
});
}

View File

@ -51,6 +51,10 @@ class LatencyStats with _$LatencyStats {
required TimestampDuration fastest,
required TimestampDuration average,
required TimestampDuration slowest,
required TimestampDuration tm90,
required TimestampDuration tm75,
required TimestampDuration p90,
required TimestampDuration p75,
}) = _LatencyStats;
factory LatencyStats.fromJson(dynamic json) =>
@ -152,9 +156,11 @@ class RPCStats with _$RPCStats {
required Timestamp? lastQuestionTs,
required Timestamp? lastSeenTs,
required Timestamp? firstConsecutiveSeenTs,
required int recentLostAnswers,
required int recentLostAnswersUnordered,
required int recentLostAnswersOrdered,
required int failedToSend,
required AnswerStats answer,
required AnswerStats answerUnordered,
required AnswerStats answerOrdered,
}) = _RPCStats;
factory RPCStats.fromJson(dynamic json) =>

View File

@ -23,6 +23,10 @@ mixin _$LatencyStats {
TimestampDuration get fastest => throw _privateConstructorUsedError;
TimestampDuration get average => throw _privateConstructorUsedError;
TimestampDuration get slowest => throw _privateConstructorUsedError;
TimestampDuration get tm90 => throw _privateConstructorUsedError;
TimestampDuration get tm75 => throw _privateConstructorUsedError;
TimestampDuration get p90 => throw _privateConstructorUsedError;
TimestampDuration get p75 => throw _privateConstructorUsedError;
/// Serializes this LatencyStats to a JSON map.
Map<String, dynamic> toJson() => throw _privateConstructorUsedError;
@ -43,7 +47,11 @@ abstract class $LatencyStatsCopyWith<$Res> {
$Res call(
{TimestampDuration fastest,
TimestampDuration average,
TimestampDuration slowest});
TimestampDuration slowest,
TimestampDuration tm90,
TimestampDuration tm75,
TimestampDuration p90,
TimestampDuration p75});
}
/// @nodoc
@ -64,6 +72,10 @@ class _$LatencyStatsCopyWithImpl<$Res, $Val extends LatencyStats>
Object? fastest = null,
Object? average = null,
Object? slowest = null,
Object? tm90 = null,
Object? tm75 = null,
Object? p90 = null,
Object? p75 = null,
}) {
return _then(_value.copyWith(
fastest: null == fastest
@ -78,6 +90,22 @@ class _$LatencyStatsCopyWithImpl<$Res, $Val extends LatencyStats>
? _value.slowest
: slowest // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
tm90: null == tm90
? _value.tm90
: tm90 // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
tm75: null == tm75
? _value.tm75
: tm75 // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
p90: null == p90
? _value.p90
: p90 // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
p75: null == p75
? _value.p75
: p75 // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
) as $Val);
}
}
@ -93,7 +121,11 @@ abstract class _$$LatencyStatsImplCopyWith<$Res>
$Res call(
{TimestampDuration fastest,
TimestampDuration average,
TimestampDuration slowest});
TimestampDuration slowest,
TimestampDuration tm90,
TimestampDuration tm75,
TimestampDuration p90,
TimestampDuration p75});
}
/// @nodoc
@ -112,6 +144,10 @@ class __$$LatencyStatsImplCopyWithImpl<$Res>
Object? fastest = null,
Object? average = null,
Object? slowest = null,
Object? tm90 = null,
Object? tm75 = null,
Object? p90 = null,
Object? p75 = null,
}) {
return _then(_$LatencyStatsImpl(
fastest: null == fastest
@ -126,6 +162,22 @@ class __$$LatencyStatsImplCopyWithImpl<$Res>
? _value.slowest
: slowest // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
tm90: null == tm90
? _value.tm90
: tm90 // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
tm75: null == tm75
? _value.tm75
: tm75 // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
p90: null == p90
? _value.p90
: p90 // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
p75: null == p75
? _value.p75
: p75 // ignore: cast_nullable_to_non_nullable
as TimestampDuration,
));
}
}
@ -134,7 +186,13 @@ class __$$LatencyStatsImplCopyWithImpl<$Res>
@JsonSerializable()
class _$LatencyStatsImpl implements _LatencyStats {
const _$LatencyStatsImpl(
{required this.fastest, required this.average, required this.slowest});
{required this.fastest,
required this.average,
required this.slowest,
required this.tm90,
required this.tm75,
required this.p90,
required this.p75});
factory _$LatencyStatsImpl.fromJson(Map<String, dynamic> json) =>
_$$LatencyStatsImplFromJson(json);
@ -145,10 +203,18 @@ class _$LatencyStatsImpl implements _LatencyStats {
final TimestampDuration average;
@override
final TimestampDuration slowest;
@override
final TimestampDuration tm90;
@override
final TimestampDuration tm75;
@override
final TimestampDuration p90;
@override
final TimestampDuration p75;
@override
String toString() {
return 'LatencyStats(fastest: $fastest, average: $average, slowest: $slowest)';
return 'LatencyStats(fastest: $fastest, average: $average, slowest: $slowest, tm90: $tm90, tm75: $tm75, p90: $p90, p75: $p75)';
}
@override
@ -158,12 +224,17 @@ class _$LatencyStatsImpl implements _LatencyStats {
other is _$LatencyStatsImpl &&
(identical(other.fastest, fastest) || other.fastest == fastest) &&
(identical(other.average, average) || other.average == average) &&
(identical(other.slowest, slowest) || other.slowest == slowest));
(identical(other.slowest, slowest) || other.slowest == slowest) &&
(identical(other.tm90, tm90) || other.tm90 == tm90) &&
(identical(other.tm75, tm75) || other.tm75 == tm75) &&
(identical(other.p90, p90) || other.p90 == p90) &&
(identical(other.p75, p75) || other.p75 == p75));
}
@JsonKey(includeFromJson: false, includeToJson: false)
@override
int get hashCode => Object.hash(runtimeType, fastest, average, slowest);
int get hashCode =>
Object.hash(runtimeType, fastest, average, slowest, tm90, tm75, p90, p75);
/// Create a copy of LatencyStats
/// with the given fields replaced by the non-null parameter values.
@ -185,7 +256,11 @@ abstract class _LatencyStats implements LatencyStats {
const factory _LatencyStats(
{required final TimestampDuration fastest,
required final TimestampDuration average,
required final TimestampDuration slowest}) = _$LatencyStatsImpl;
required final TimestampDuration slowest,
required final TimestampDuration tm90,
required final TimestampDuration tm75,
required final TimestampDuration p90,
required final TimestampDuration p75}) = _$LatencyStatsImpl;
factory _LatencyStats.fromJson(Map<String, dynamic> json) =
_$LatencyStatsImpl.fromJson;
@ -196,6 +271,14 @@ abstract class _LatencyStats implements LatencyStats {
TimestampDuration get average;
@override
TimestampDuration get slowest;
@override
TimestampDuration get tm90;
@override
TimestampDuration get tm75;
@override
TimestampDuration get p90;
@override
TimestampDuration get p75;
/// Create a copy of LatencyStats
/// with the given fields replaced by the non-null parameter values.
@ -1545,9 +1628,11 @@ mixin _$RPCStats {
Timestamp? get lastQuestionTs => throw _privateConstructorUsedError;
Timestamp? get lastSeenTs => throw _privateConstructorUsedError;
Timestamp? get firstConsecutiveSeenTs => throw _privateConstructorUsedError;
int get recentLostAnswers => throw _privateConstructorUsedError;
int get recentLostAnswersUnordered => throw _privateConstructorUsedError;
int get recentLostAnswersOrdered => throw _privateConstructorUsedError;
int get failedToSend => throw _privateConstructorUsedError;
AnswerStats get answer => throw _privateConstructorUsedError;
AnswerStats get answerUnordered => throw _privateConstructorUsedError;
AnswerStats get answerOrdered => throw _privateConstructorUsedError;
/// Serializes this RPCStats to a JSON map.
Map<String, dynamic> toJson() => throw _privateConstructorUsedError;
@ -1571,11 +1656,14 @@ abstract class $RPCStatsCopyWith<$Res> {
Timestamp? lastQuestionTs,
Timestamp? lastSeenTs,
Timestamp? firstConsecutiveSeenTs,
int recentLostAnswers,
int recentLostAnswersUnordered,
int recentLostAnswersOrdered,
int failedToSend,
AnswerStats answer});
AnswerStats answerUnordered,
AnswerStats answerOrdered});
$AnswerStatsCopyWith<$Res> get answer;
$AnswerStatsCopyWith<$Res> get answerUnordered;
$AnswerStatsCopyWith<$Res> get answerOrdered;
}
/// @nodoc
@ -1599,9 +1687,11 @@ class _$RPCStatsCopyWithImpl<$Res, $Val extends RPCStats>
Object? lastQuestionTs = freezed,
Object? lastSeenTs = freezed,
Object? firstConsecutiveSeenTs = freezed,
Object? recentLostAnswers = null,
Object? recentLostAnswersUnordered = null,
Object? recentLostAnswersOrdered = null,
Object? failedToSend = null,
Object? answer = null,
Object? answerUnordered = null,
Object? answerOrdered = null,
}) {
return _then(_value.copyWith(
messagesSent: null == messagesSent
@ -1628,17 +1718,25 @@ class _$RPCStatsCopyWithImpl<$Res, $Val extends RPCStats>
? _value.firstConsecutiveSeenTs
: firstConsecutiveSeenTs // ignore: cast_nullable_to_non_nullable
as Timestamp?,
recentLostAnswers: null == recentLostAnswers
? _value.recentLostAnswers
: recentLostAnswers // ignore: cast_nullable_to_non_nullable
recentLostAnswersUnordered: null == recentLostAnswersUnordered
? _value.recentLostAnswersUnordered
: recentLostAnswersUnordered // ignore: cast_nullable_to_non_nullable
as int,
recentLostAnswersOrdered: null == recentLostAnswersOrdered
? _value.recentLostAnswersOrdered
: recentLostAnswersOrdered // ignore: cast_nullable_to_non_nullable
as int,
failedToSend: null == failedToSend
? _value.failedToSend
: failedToSend // ignore: cast_nullable_to_non_nullable
as int,
answer: null == answer
? _value.answer
: answer // ignore: cast_nullable_to_non_nullable
answerUnordered: null == answerUnordered
? _value.answerUnordered
: answerUnordered // ignore: cast_nullable_to_non_nullable
as AnswerStats,
answerOrdered: null == answerOrdered
? _value.answerOrdered
: answerOrdered // ignore: cast_nullable_to_non_nullable
as AnswerStats,
) as $Val);
}
@ -1647,9 +1745,19 @@ class _$RPCStatsCopyWithImpl<$Res, $Val extends RPCStats>
/// with the given fields replaced by the non-null parameter values.
@override
@pragma('vm:prefer-inline')
$AnswerStatsCopyWith<$Res> get answer {
return $AnswerStatsCopyWith<$Res>(_value.answer, (value) {
return _then(_value.copyWith(answer: value) as $Val);
$AnswerStatsCopyWith<$Res> get answerUnordered {
return $AnswerStatsCopyWith<$Res>(_value.answerUnordered, (value) {
return _then(_value.copyWith(answerUnordered: value) as $Val);
});
}
/// Create a copy of RPCStats
/// with the given fields replaced by the non-null parameter values.
@override
@pragma('vm:prefer-inline')
$AnswerStatsCopyWith<$Res> get answerOrdered {
return $AnswerStatsCopyWith<$Res>(_value.answerOrdered, (value) {
return _then(_value.copyWith(answerOrdered: value) as $Val);
});
}
}
@ -1669,12 +1777,16 @@ abstract class _$$RPCStatsImplCopyWith<$Res>
Timestamp? lastQuestionTs,
Timestamp? lastSeenTs,
Timestamp? firstConsecutiveSeenTs,
int recentLostAnswers,
int recentLostAnswersUnordered,
int recentLostAnswersOrdered,
int failedToSend,
AnswerStats answer});
AnswerStats answerUnordered,
AnswerStats answerOrdered});
@override
$AnswerStatsCopyWith<$Res> get answer;
$AnswerStatsCopyWith<$Res> get answerUnordered;
@override
$AnswerStatsCopyWith<$Res> get answerOrdered;
}
/// @nodoc
@ -1696,9 +1808,11 @@ class __$$RPCStatsImplCopyWithImpl<$Res>
Object? lastQuestionTs = freezed,
Object? lastSeenTs = freezed,
Object? firstConsecutiveSeenTs = freezed,
Object? recentLostAnswers = null,
Object? recentLostAnswersUnordered = null,
Object? recentLostAnswersOrdered = null,
Object? failedToSend = null,
Object? answer = null,
Object? answerUnordered = null,
Object? answerOrdered = null,
}) {
return _then(_$RPCStatsImpl(
messagesSent: null == messagesSent
@ -1725,17 +1839,25 @@ class __$$RPCStatsImplCopyWithImpl<$Res>
? _value.firstConsecutiveSeenTs
: firstConsecutiveSeenTs // ignore: cast_nullable_to_non_nullable
as Timestamp?,
recentLostAnswers: null == recentLostAnswers
? _value.recentLostAnswers
: recentLostAnswers // ignore: cast_nullable_to_non_nullable
recentLostAnswersUnordered: null == recentLostAnswersUnordered
? _value.recentLostAnswersUnordered
: recentLostAnswersUnordered // ignore: cast_nullable_to_non_nullable
as int,
recentLostAnswersOrdered: null == recentLostAnswersOrdered
? _value.recentLostAnswersOrdered
: recentLostAnswersOrdered // ignore: cast_nullable_to_non_nullable
as int,
failedToSend: null == failedToSend
? _value.failedToSend
: failedToSend // ignore: cast_nullable_to_non_nullable
as int,
answer: null == answer
? _value.answer
: answer // ignore: cast_nullable_to_non_nullable
answerUnordered: null == answerUnordered
? _value.answerUnordered
: answerUnordered // ignore: cast_nullable_to_non_nullable
as AnswerStats,
answerOrdered: null == answerOrdered
? _value.answerOrdered
: answerOrdered // ignore: cast_nullable_to_non_nullable
as AnswerStats,
));
}
@ -1751,9 +1873,11 @@ class _$RPCStatsImpl implements _RPCStats {
required this.lastQuestionTs,
required this.lastSeenTs,
required this.firstConsecutiveSeenTs,
required this.recentLostAnswers,
required this.recentLostAnswersUnordered,
required this.recentLostAnswersOrdered,
required this.failedToSend,
required this.answer});
required this.answerUnordered,
required this.answerOrdered});
factory _$RPCStatsImpl.fromJson(Map<String, dynamic> json) =>
_$$RPCStatsImplFromJson(json);
@ -1771,15 +1895,19 @@ class _$RPCStatsImpl implements _RPCStats {
@override
final Timestamp? firstConsecutiveSeenTs;
@override
final int recentLostAnswers;
final int recentLostAnswersUnordered;
@override
final int recentLostAnswersOrdered;
@override
final int failedToSend;
@override
final AnswerStats answer;
final AnswerStats answerUnordered;
@override
final AnswerStats answerOrdered;
@override
String toString() {
return 'RPCStats(messagesSent: $messagesSent, messagesRcvd: $messagesRcvd, questionsInFlight: $questionsInFlight, lastQuestionTs: $lastQuestionTs, lastSeenTs: $lastSeenTs, firstConsecutiveSeenTs: $firstConsecutiveSeenTs, recentLostAnswers: $recentLostAnswers, failedToSend: $failedToSend, answer: $answer)';
return 'RPCStats(messagesSent: $messagesSent, messagesRcvd: $messagesRcvd, questionsInFlight: $questionsInFlight, lastQuestionTs: $lastQuestionTs, lastSeenTs: $lastSeenTs, firstConsecutiveSeenTs: $firstConsecutiveSeenTs, recentLostAnswersUnordered: $recentLostAnswersUnordered, recentLostAnswersOrdered: $recentLostAnswersOrdered, failedToSend: $failedToSend, answerUnordered: $answerUnordered, answerOrdered: $answerOrdered)';
}
@override
@ -1799,11 +1927,19 @@ class _$RPCStatsImpl implements _RPCStats {
other.lastSeenTs == lastSeenTs) &&
(identical(other.firstConsecutiveSeenTs, firstConsecutiveSeenTs) ||
other.firstConsecutiveSeenTs == firstConsecutiveSeenTs) &&
(identical(other.recentLostAnswers, recentLostAnswers) ||
other.recentLostAnswers == recentLostAnswers) &&
(identical(other.recentLostAnswersUnordered,
recentLostAnswersUnordered) ||
other.recentLostAnswersUnordered ==
recentLostAnswersUnordered) &&
(identical(
other.recentLostAnswersOrdered, recentLostAnswersOrdered) ||
other.recentLostAnswersOrdered == recentLostAnswersOrdered) &&
(identical(other.failedToSend, failedToSend) ||
other.failedToSend == failedToSend) &&
(identical(other.answer, answer) || other.answer == answer));
(identical(other.answerUnordered, answerUnordered) ||
other.answerUnordered == answerUnordered) &&
(identical(other.answerOrdered, answerOrdered) ||
other.answerOrdered == answerOrdered));
}
@JsonKey(includeFromJson: false, includeToJson: false)
@ -1816,9 +1952,11 @@ class _$RPCStatsImpl implements _RPCStats {
lastQuestionTs,
lastSeenTs,
firstConsecutiveSeenTs,
recentLostAnswers,
recentLostAnswersUnordered,
recentLostAnswersOrdered,
failedToSend,
answer);
answerUnordered,
answerOrdered);
/// Create a copy of RPCStats
/// with the given fields replaced by the non-null parameter values.
@ -1844,9 +1982,11 @@ abstract class _RPCStats implements RPCStats {
required final Timestamp? lastQuestionTs,
required final Timestamp? lastSeenTs,
required final Timestamp? firstConsecutiveSeenTs,
required final int recentLostAnswers,
required final int recentLostAnswersUnordered,
required final int recentLostAnswersOrdered,
required final int failedToSend,
required final AnswerStats answer}) = _$RPCStatsImpl;
required final AnswerStats answerUnordered,
required final AnswerStats answerOrdered}) = _$RPCStatsImpl;
factory _RPCStats.fromJson(Map<String, dynamic> json) =
_$RPCStatsImpl.fromJson;
@ -1864,11 +2004,15 @@ abstract class _RPCStats implements RPCStats {
@override
Timestamp? get firstConsecutiveSeenTs;
@override
int get recentLostAnswers;
int get recentLostAnswersUnordered;
@override
int get recentLostAnswersOrdered;
@override
int get failedToSend;
@override
AnswerStats get answer;
AnswerStats get answerUnordered;
@override
AnswerStats get answerOrdered;
/// Create a copy of RPCStats
/// with the given fields replaced by the non-null parameter values.

View File

@ -11,6 +11,10 @@ _$LatencyStatsImpl _$$LatencyStatsImplFromJson(Map<String, dynamic> json) =>
fastest: TimestampDuration.fromJson(json['fastest']),
average: TimestampDuration.fromJson(json['average']),
slowest: TimestampDuration.fromJson(json['slowest']),
tm90: TimestampDuration.fromJson(json['tm90']),
tm75: TimestampDuration.fromJson(json['tm75']),
p90: TimestampDuration.fromJson(json['p90']),
p75: TimestampDuration.fromJson(json['p75']),
);
Map<String, dynamic> _$$LatencyStatsImplToJson(_$LatencyStatsImpl instance) =>
@ -18,6 +22,10 @@ Map<String, dynamic> _$$LatencyStatsImplToJson(_$LatencyStatsImpl instance) =>
'fastest': instance.fastest.toJson(),
'average': instance.average.toJson(),
'slowest': instance.slowest.toJson(),
'tm90': instance.tm90.toJson(),
'tm75': instance.tm75.toJson(),
'p90': instance.p90.toJson(),
'p75': instance.p75.toJson(),
};
_$TransferStatsImpl _$$TransferStatsImplFromJson(Map<String, dynamic> json) =>
@ -148,9 +156,13 @@ _$RPCStatsImpl _$$RPCStatsImplFromJson(Map<String, dynamic> json) =>
firstConsecutiveSeenTs: json['first_consecutive_seen_ts'] == null
? null
: Timestamp.fromJson(json['first_consecutive_seen_ts']),
recentLostAnswers: (json['recent_lost_answers'] as num).toInt(),
recentLostAnswersUnordered:
(json['recent_lost_answers_unordered'] as num).toInt(),
recentLostAnswersOrdered:
(json['recent_lost_answers_ordered'] as num).toInt(),
failedToSend: (json['failed_to_send'] as num).toInt(),
answer: AnswerStats.fromJson(json['answer']),
answerUnordered: AnswerStats.fromJson(json['answer_unordered']),
answerOrdered: AnswerStats.fromJson(json['answer_ordered']),
);
Map<String, dynamic> _$$RPCStatsImplToJson(_$RPCStatsImpl instance) =>
@ -161,9 +173,11 @@ Map<String, dynamic> _$$RPCStatsImplToJson(_$RPCStatsImpl instance) =>
'last_question_ts': instance.lastQuestionTs?.toJson(),
'last_seen_ts': instance.lastSeenTs?.toJson(),
'first_consecutive_seen_ts': instance.firstConsecutiveSeenTs?.toJson(),
'recent_lost_answers': instance.recentLostAnswers,
'recent_lost_answers_unordered': instance.recentLostAnswersUnordered,
'recent_lost_answers_ordered': instance.recentLostAnswersOrdered,
'failed_to_send': instance.failedToSend,
'answer': instance.answer.toJson(),
'answer_unordered': instance.answerUnordered.toJson(),
'answer_ordered': instance.answerOrdered.toJson(),
};
_$PeerStatsImpl _$$PeerStatsImplFromJson(Map<String, dynamic> json) =>

View File

@ -3110,9 +3110,29 @@
"description": "fastest latency in the ROLLING_LATENCIES_SIZE last latencies",
"type": "string"
},
"p75": {
"description": "p75 latency in the ROLLING_LATENCIES_SIZE",
"default": "0",
"type": "string"
},
"p90": {
"description": "p90 latency in the ROLLING_LATENCIES_SIZE",
"default": "0",
"type": "string"
},
"slowest": {
"description": "slowest latency in the ROLLING_LATENCIES_SIZE last latencies",
"type": "string"
},
"tm75": {
"description": "trimmed mean with lowest 75% latency in the ROLLING_LATENCIES_SIZE",
"default": "0",
"type": "string"
},
"tm90": {
"description": "trimmed mean with lowest 90% latency in the ROLLING_LATENCIES_SIZE",
"default": "0",
"type": "string"
}
}
},
@ -3153,7 +3173,19 @@
"rpc_stats": {
"description": "information about RPCs",
"default": {
"answer": {
"answer_ordered": {
"answers": 0,
"consecutive_answers_average": 0,
"consecutive_answers_maximum": 0,
"consecutive_answers_minimum": 0,
"consecutive_lost_answers_average": 0,
"consecutive_lost_answers_maximum": 0,
"consecutive_lost_answers_minimum": 0,
"lost_answers": 0,
"questions": 0,
"span": "0"
},
"answer_unordered": {
"answers": 0,
"consecutive_answers_average": 0,
"consecutive_answers_maximum": 0,
@ -3172,7 +3204,8 @@
"messages_rcvd": 0,
"messages_sent": 0,
"questions_in_flight": 0,
"recent_lost_answers": 0
"recent_lost_answers_ordered": 0,
"recent_lost_answers_unordered": 0
},
"allOf": [
{
@ -3269,12 +3302,31 @@
"failed_to_send",
"messages_rcvd",
"messages_sent",
"questions_in_flight",
"recent_lost_answers"
"questions_in_flight"
],
"properties": {
"answer": {
"description": "rpc answer stats for this peer",
"answer_ordered": {
"description": "ordered rpc answer stats for this peer",
"default": {
"answers": 0,
"consecutive_answers_average": 0,
"consecutive_answers_maximum": 0,
"consecutive_answers_minimum": 0,
"consecutive_lost_answers_average": 0,
"consecutive_lost_answers_maximum": 0,
"consecutive_lost_answers_minimum": 0,
"lost_answers": 0,
"questions": 0,
"span": "0"
},
"allOf": [
{
"$ref": "#/definitions/AnswerStats"
}
]
},
"answer_unordered": {
"description": "unordered rpc answer stats for this peer",
"default": {
"answers": 0,
"consecutive_answers_average": 0,
@ -3338,8 +3390,16 @@
"format": "uint32",
"minimum": 0.0
},
"recent_lost_answers": {
"description": "number of answers that have been lost consecutively",
"recent_lost_answers_ordered": {
"description": "number of answers that have been lost consecutively over an ordered channel",
"default": 0,
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
"recent_lost_answers_unordered": {
"description": "number of answers that have been lost consecutively over an unordered channel",
"default": 0,
"type": "integer",
"format": "uint32",
"minimum": 0.0

View File

@ -121,9 +121,11 @@ class RPCStats:
last_question_ts: Optional[Timestamp]
last_seen_ts: Optional[Timestamp]
first_consecutive_seen_ts: Optional[Timestamp]
recent_lost_answers: int
recent_lost_answers_unordered: int
recent_lost_answers_ordered: int
failed_to_send: int
answer: AnswerStats
answer_unordered: AnswerStats
answer_ordered: AnswerStats
def __init__(
self,
@ -133,9 +135,11 @@ class RPCStats:
last_question_ts: Optional[Timestamp],
last_seen_ts: Optional[Timestamp],
first_consecutive_seen_ts: Optional[Timestamp],
recent_lost_answers: int,
recent_lost_answers_unordered: int,
recent_lost_answers_ordered: int,
failed_to_send: int,
answer: AnswerStats,
answer_unordered: AnswerStats,
answer_ordered: AnswerStats,
):
self.messages_sent = messages_sent
self.messages_rcvd = messages_rcvd
@ -143,9 +147,11 @@ class RPCStats:
self.last_question_ts = last_question_ts
self.last_seen_ts = last_seen_ts
self.first_consecutive_seen_ts = first_consecutive_seen_ts
self.recent_lost_answers = recent_lost_answers
self.recent_lost_answers_unordered = recent_lost_answers_unordered
self.recent_lost_answers_ordered = recent_lost_answers_ordered
self.failed_to_send = failed_to_send
self.answer = answer
self.answer_unordered = answer_unordered
self.answer_ordered = answer_ordered
@classmethod
def from_json(cls, j: dict) -> Self:
@ -159,9 +165,11 @@ class RPCStats:
None
if j["first_consecutive_seen_ts"] is None
else Timestamp(j["first_consecutive_seen_ts"]),
j["recent_lost_answers"],
j["recent_lost_answers_unordered"],
j["recent_lost_answers_ordered"],
j["failed_to_send"],
AnswerStats.from_json(j["answer"]),
AnswerStats.from_json(j["answer_unordered"]),
AnswerStats.from_json(j["answer_ordered"]),
)
@ -169,16 +177,28 @@ class LatencyStats:
fastest: TimestampDuration
average: TimestampDuration
slowest: TimestampDuration
tm90: TimestampDuration
tm75: TimestampDuration
p90: TimestampDuration
p75: TimestampDuration
def __init__(
self,
fastest: TimestampDuration,
average: TimestampDuration,
slowest: TimestampDuration,
tm90: TimestampDuration,
tm75: TimestampDuration,
p90: TimestampDuration,
p75: TimestampDuration,
):
self.fastest = fastest
self.average = average
self.slowest = slowest
self.tm90 = tm90
self.tm75 = tm75
self.p90 = p90
self.p75 = p75
@classmethod
def from_json(cls, j: dict) -> Self:
@ -187,6 +207,10 @@ class LatencyStats:
TimestampDuration(j["fastest"]),
TimestampDuration(j["average"]),
TimestampDuration(j["slowest"]),
TimestampDuration(j["tm90"]),
TimestampDuration(j["tm75"]),
TimestampDuration(j["p90"]),
TimestampDuration(j["p75"]),
)

View File

@ -242,7 +242,7 @@ pub async fn run_veilid_server(
// Run all subnodes
let mut all_subnodes_jh = vec![];
for subnode in subnode_index..(subnode_index + subnode_count) {
for subnode in subnode_index..=(subnode_index + subnode_count - 1) {
debug!("Spawning subnode {}", subnode);
let jh = spawn(
&format!("subnode{}", subnode),
@ -254,7 +254,7 @@ pub async fn run_veilid_server(
// Wait for all subnodes to complete
for (sn, jh) in all_subnodes_jh.into_iter().enumerate() {
jh.await?;
debug!("Subnode {} exited", sn);
debug!("Subnode {} exited", (sn as u16) + subnode_index);
}
// Finally, drop logs

View File

@ -19,11 +19,11 @@ pub fn setup() -> () {
console_error_panic_hook::set_once();
cfg_if! {
if #[cfg(feature = "tracing")] {
let mut builder = tracing_wasm::WASMLayerConfigBuilder::new();
builder.set_report_logs_in_timings(false);
builder.set_max_level(Level::TRACE);
builder.set_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
tracing_wasm::set_as_global_default_with_config(builder.build());
let config = veilid_tracing_wasm::WASMLayerConfig::new()
.with_report_logs_in_timings(false);
.with_max_level(Level::TRACE);
.with_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
tracing_wasm::set_as_global_default_with_config(config);
} else {
wasm_logger::init(wasm_logger::Config::default());
}

View File

@ -23,7 +23,7 @@ crypto-test = ["veilid-core/crypto-test"]
veilid-core = { version = "0.4.3", path = "../veilid-core", default-features = false }
tracing = { version = "^0", features = ["log", "attributes"] }
tracing-wasm = "^0"
veilid-tracing-wasm = "^0"
tracing-subscriber = "^0"
wasm-bindgen = { version = "^0", features = ["serde-serialize"] }

View File

@ -20,10 +20,10 @@ use send_wrapper::*;
use serde::*;
use tracing_subscriber::prelude::*;
use tracing_subscriber::*;
use tracing_wasm::{WASMLayerConfigBuilder, *};
use tsify::*;
use veilid_core::*;
use veilid_core::{tools::*, VeilidAPIError};
use veilid_tracing_wasm::*;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::*;
@ -220,14 +220,14 @@ pub fn initialize_veilid_core(platform_config: String) {
None,
);
let layer = WASMLayer::new(
WASMLayerConfigBuilder::new()
.set_report_logs_in_timings(platform_config.logging.performance.logs_in_timings)
.set_console_config(if platform_config.logging.performance.logs_in_console {
WASMLayerConfig::new()
.with_report_logs_in_timings(platform_config.logging.performance.logs_in_timings)
.with_console_config(if platform_config.logging.performance.logs_in_console {
ConsoleConfig::ReportWithConsoleColor
} else {
ConsoleConfig::NoReporting
})
.build(),
.with_field_filter(Some(Arc::new(|k| k != veilid_core::VEILID_LOG_KEY_FIELD))),
)
.with_filter(filter.clone());
filters.insert("performance", filter);

View File

@ -49,14 +49,14 @@ impl VeilidClient {
None,
);
let layer = WASMLayer::new(
WASMLayerConfigBuilder::new()
.set_report_logs_in_timings(platformConfig.logging.performance.logs_in_timings)
.set_console_config(if platformConfig.logging.performance.logs_in_console {
WASMLayerConfig::new()
.with_report_logs_in_timings(platformConfig.logging.performance.logs_in_timings)
.with_console_config(if platformConfig.logging.performance.logs_in_console {
ConsoleConfig::ReportWithConsoleColor
} else {
ConsoleConfig::NoReporting
})
.build(),
.with_field_filter(Some(Arc::new(|k| k != veilid_core::VEILID_LOG_KEY_FIELD))),
)
.with_filter(filter.clone());
filters.insert("performance", filter);

View File

@ -21,7 +21,7 @@
},
"../pkg": {
"name": "veilid-wasm",
"version": "0.4.1",
"version": "0.4.3",
"dev": true,
"license": "MPL-2.0"
},