From 1a594d0d6feacf3241bcabca4d0223c586af9a60 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 1 Mar 2025 00:16:18 +0000 Subject: [PATCH] Improved latency and reliability testing --- Cargo.lock | 13 +- Cargo.toml | 1 + Earthfile | 48 ++++ veilid-core/src/intf/native/system.rs | 4 +- veilid-core/src/logging/api_tracing_layer.rs | 2 + .../src/logging/veilid_layer_filter.rs | 2 + veilid-core/src/network_manager/mod.rs | 3 + veilid-core/src/network_manager/send_data.rs | 26 +- .../src/network_manager/types/punishment.rs | 2 +- veilid-core/src/routing_table/bucket_entry.rs | 115 ++++++--- veilid-core/src/routing_table/debug.rs | 2 +- veilid-core/src/routing_table/mod.rs | 27 ++ .../src/routing_table/node_ref/traits.rs | 24 +- .../src/routing_table/route_spec_store/mod.rs | 4 +- .../routing_table/routing_table_inner/mod.rs | 120 +++++++++ .../routing_domains/local_network/mod.rs | 7 + .../routing_domains/mod.rs | 18 ++ .../routing_domains/public_internet/mod.rs | 7 + .../src/routing_table/stats_accounting.rs | 58 ++++- .../src/routing_table/tasks/ping_validator.rs | 4 +- .../routing_table/tasks/relay_management.rs | 132 ++++++---- .../src/routing_table/types/node_info.rs | 7 +- veilid-core/src/rpc_processor/mod.rs | 160 ++++++------ veilid-core/src/rpc_processor/rpc_app_call.rs | 2 +- .../src/rpc_processor/rpc_find_node.rs | 2 +- .../src/rpc_processor/rpc_get_value.rs | 2 +- .../src/rpc_processor/rpc_inspect_value.rs | 2 +- .../src/rpc_processor/rpc_set_value.rs | 2 +- veilid-core/src/rpc_processor/rpc_status.rs | 4 +- .../src/rpc_processor/rpc_watch_value.rs | 2 +- veilid-core/src/veilid_api/tests/fixtures.rs | 10 +- veilid-core/src/veilid_api/types/stats.rs | 47 +++- veilid-core/tests/web.rs | 10 +- veilid-flutter/example/lib/log.dart | 11 +- veilid-flutter/example/lib/log_terminal.dart | 5 +- veilid-flutter/lib/veilid_state.dart | 10 +- veilid-flutter/lib/veilid_state.freezed.dart | 240 ++++++++++++++---- veilid-flutter/lib/veilid_state.g.dart | 22 +- veilid-python/veilid/schema/RecvMessage.json | 76 +++++- veilid-python/veilid/state.py | 40 ++- veilid-server/src/server.rs | 4 +- veilid-tools/tests/web.rs | 10 +- veilid-wasm/Cargo.toml | 2 +- veilid-wasm/src/lib.rs | 10 +- veilid-wasm/src/veilid_client_js.rs | 8 +- veilid-wasm/tests/package-lock.json | 2 +- 46 files changed, 995 insertions(+), 314 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14080a88..b246c049 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6701,6 +6701,17 @@ dependencies = [ "ws_stream_wasm", ] +[[package]] +name = "veilid-tracing-wasm" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b24035e775ff3d7f0c9f552f48296f8992dbc9e0347c5888891c9e96709ea7f" +dependencies = [ + "tracing", + "tracing-subscriber", + "wasm-bindgen", +] + [[package]] name = "veilid-wasm" version = "0.4.3" @@ -6720,9 +6731,9 @@ dependencies = [ "serde_json", "tracing", "tracing-subscriber", - "tracing-wasm", "tsify", "veilid-core", + "veilid-tracing-wasm", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", diff --git a/Cargo.toml b/Cargo.toml index 04a37bf9..d342aeb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ unused_async = "deny" ptr_cast_constness = "deny" comparison_chain = "allow" upper_case_acronyms = "allow" +needless_range_loop = "allow" [workspace.lints.rust] unused_must_use = "deny" diff --git a/Earthfile b/Earthfile index 907277a2..0e4d5652 100644 --- a/Earthfile +++ b/Earthfile @@ -76,6 +76,10 @@ deps-rust: # Linux x86_64-unknown-linux-gnu \ aarch64-unknown-linux-gnu \ + # Windows + x86_64-pc-windows-gnu \ + # MacOS + aarch64-apple-darwin \ # Android aarch64-linux-android \ armv7-linux-androideabi \ @@ -146,6 +150,8 @@ build-linux-cache: RUN cargo chef cook --profile=test --tests --target $DEFAULT_CARGO_TARGET --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core RUN cargo chef cook --zigbuild --release --target x86_64-unknown-linux-gnu --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core RUN cargo chef cook --zigbuild --release --target aarch64-unknown-linux-gnu --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core + # RUN cargo chef cook --zigbuild --release --target x86_64-pc-windows-gnu --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core + # RUN cargo chef cook --zigbuild --release --target aarch64-apple-darwin --recipe-path recipe.json -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core RUN veilid-wasm/wasm_remap_paths.sh cargo chef cook --zigbuild --release --target wasm32-unknown-unknown --recipe-path recipe.json -p veilid-wasm ARG CI_REGISTRY_IMAGE=registry.gitlab.com/veilid/veilid SAVE IMAGE --push $CI_REGISTRY_IMAGE/build-cache:latest @@ -176,6 +182,8 @@ code-android: clippy: FROM +code-linux RUN cargo clippy --target x86_64-unknown-linux-gnu + RUN cargo clippy --target x86_64-pc-windows-gnu + RUN cargo clippy --target aarch64-apple-darwin RUN cargo clippy --manifest-path=veilid-wasm/Cargo.toml --target wasm32-unknown-unknown # Build @@ -191,9 +199,35 @@ build-linux-amd64: build-linux-arm64: FROM +code-linux + # Ensure we have enough memory + IF [ $(free -wmt | grep Total | awk '{print $2}') -lt 7500 ] + RUN echo "not enough container memory to build. increase build host memory." + RUN false + END RUN cargo zigbuild --target aarch64-unknown-linux-gnu --release -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core SAVE ARTIFACT ./target/aarch64-unknown-linux-gnu AS LOCAL ./target/artifacts/aarch64-unknown-linux-gnu +# build-windows-amd64: +# FROM +code-linux +# # Ensure we have enough memory +# IF [ $(free -wmt | grep Total | awk '{print $2}') -lt 7500 ] +# RUN echo "not enough container memory to build. increase build host memory." +# RUN false +# END +# RUN cargo zigbuild --target x86_64-pc-windows-gnu --release -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core +# SAVE ARTIFACT ./target/x86_64-pc-windows-gnu AS LOCAL ./target/artifacts/x86_64-pc-windows-gnu + +# build-macos-arm64: +# FROM +code-linux +# # Ensure we have enough memory +# IF [ $(free -wmt | grep Total | awk '{print $2}') -lt 7500 ] +# RUN echo "not enough container memory to build. increase build host memory." +# RUN false +# END +# RUN cargo zigbuild --target aarch64-apple-darwin --release -p veilid-server -p veilid-cli -p veilid-tools -p veilid-core +# SAVE ARTIFACT ./target/aarch64-apple-darwin AS LOCAL ./target/artifacts/aarch64-apple-darwin + + build-android: FROM +code-android WORKDIR /veilid/veilid-core @@ -217,6 +251,14 @@ unit-tests-clippy-wasm-linux: FROM +code-linux RUN cargo clippy --manifest-path=veilid-wasm/Cargo.toml --target wasm32-unknown-unknown +unit-tests-clippy-windows-linux: + FROM +code-linux + RUN cargo-zigbuild clippy --target x86_64-pc-windows-gnu + +unit-tests-clippy-macos-linux: + FROM +code-linux + RUN cargo-zigbuild clippy --target aarch64-apple-darwin + unit-tests-docs-linux: FROM +code-linux RUN ./build_docs.sh @@ -238,6 +280,12 @@ unit-tests-linux: WAIT BUILD +unit-tests-clippy-wasm-linux END + WAIT + BUILD +unit-tests-clippy-windows-linux + END + WAIT + BUILD +unit-tests-clippy-macos-linux + END WAIT BUILD +unit-tests-docs-linux END diff --git a/veilid-core/src/intf/native/system.rs b/veilid-core/src/intf/native/system.rs index 8a81969c..57e68367 100644 --- a/veilid-core/src/intf/native/system.rs +++ b/veilid-core/src/intf/native/system.rs @@ -84,7 +84,7 @@ cfg_if! { } } -#[expect(clippy::unused_async)] +#[allow(clippy::unused_async)] pub async fn txt_lookup>(host: S) -> EyreResult> { cfg_if! { if #[cfg(target_os = "windows")] { @@ -157,7 +157,7 @@ pub async fn txt_lookup>(host: S) -> EyreResult> { } } -#[expect(clippy::unused_async)] +#[allow(clippy::unused_async)] pub async fn ptr_lookup(ip_addr: IpAddr) -> EyreResult { cfg_if! { if #[cfg(target_os = "windows")] { diff --git a/veilid-core/src/logging/api_tracing_layer.rs b/veilid-core/src/logging/api_tracing_layer.rs index 8647ca20..812e8d08 100644 --- a/veilid-core/src/logging/api_tracing_layer.rs +++ b/veilid-core/src/logging/api_tracing_layer.rs @@ -298,6 +298,8 @@ impl tracing::field::Visit for VeilidKeyedStringRecorder { fn record_str(&mut self, field: &tracing::field::Field, value: &str) { if field.name() == VEILID_LOG_KEY_FIELD { self.log_key = value.to_owned(); + } else { + self.record_debug(field, &value) } } fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn core::fmt::Debug) { diff --git a/veilid-core/src/logging/veilid_layer_filter.rs b/veilid-core/src/logging/veilid_layer_filter.rs index dbd9026f..e563bb0a 100644 --- a/veilid-core/src/logging/veilid_layer_filter.rs +++ b/veilid-core/src/logging/veilid_layer_filter.rs @@ -214,6 +214,8 @@ impl tracing::field::Visit for LogKeyFilterVisitor { fn record_str(&mut self, field: &tracing::field::Field, value: &str) { if field.name() == VEILID_LOG_KEY_FIELD { self.enabled = Some((self.filter)(value)); + } else { + self.record_debug(field, &value) } } fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn fmt::Debug) {} diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 1deba764..1eb2beda 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -93,6 +93,9 @@ impl SendDataResult { Some(ncm) if ncm.is_direct() ) } + pub fn is_ordered(&self) -> bool { + self.unique_flow.flow.protocol_type().is_ordered() + } pub fn unique_flow(&self) -> UniqueFlow { self.unique_flow diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 808633b5..51d179b6 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -170,24 +170,26 @@ impl NetworkManager { ncm_kind: NodeContactMethodKind::OutboundRelay(relay_nr), }) => { // Relay loop or multiple relays - bail!( - "Outbound relay loop or multiple relays detected: destination {} resolved to target {} via extraneous relay {}", - destination_node_ref, - target_node_ref, - relay_nr, - ); + veilid_log!(self debug + "Outbound relay loop or multiple relays detected: destination {} resolved to target {} via extraneous relay {}", + destination_node_ref, + target_node_ref, + relay_nr + ); + return Ok(NetworkResult::no_connection_other("outbound relay loop")); } Some(NodeContactMethod { ncm_key: _, ncm_kind: NodeContactMethodKind::InboundRelay(relay_nr), }) => { // Relay loop or multiple relays - bail!( - "Inbound relay loop or multiple relays detected: destination {} resolved to target {} via extraneous relay {}", - destination_node_ref, - target_node_ref, - relay_nr, - ); + veilid_log!(self debug + "Inbound relay loop or multiple relays detected: destination {} resolved to target {} via extraneous relay {}", + destination_node_ref, + target_node_ref, + relay_nr + ); + return Ok(NetworkResult::no_connection_other("inbound relay loop")); } Some(NodeContactMethod { ncm_key: _, diff --git a/veilid-core/src/network_manager/types/punishment.rs b/veilid-core/src/network_manager/types/punishment.rs index 8caf34c8..541d4c43 100644 --- a/veilid-core/src/network_manager/types/punishment.rs +++ b/veilid-core/src/network_manager/types/punishment.rs @@ -10,7 +10,7 @@ pub(crate) enum PunishmentReason { // Node-level punishments FailedToDecodeOperation, WrongSenderPeerInfo, - // FailedToVerifySenderPeerInfo, + FailedToVerifySenderPeerInfo, FailedToRegisterSenderPeerInfo, // Route-level punishments // FailedToDecodeRoutedMessage, diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 1efd82dc..56af514f 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -2,7 +2,7 @@ use super::*; use core::sync::atomic::{AtomicU32, Ordering}; /// Reliable pings are done with increased spacing between pings - +/// /// - Start secs is the number of seconds between the first two pings const RELIABLE_PING_INTERVAL_START_SECS: u32 = 10; /// - Max secs is the maximum number of seconds between consecutive pings @@ -21,7 +21,15 @@ const RELIABLE_PING_INTERVAL_MULTIPLIER: f64 = 2.0; const UNRELIABLE_PING_SPAN_SECS: u32 = 60; /// - Interval is the number of seconds between each ping const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; +/// - Number of consecutive lost answers on an unordered protocol we will +/// tolerate before we call something unreliable +const UNRELIABLE_LOST_ANSWERS_UNORDERED: u32 = 1; +/// - Number of consecutive lost answers on an ordered protocol we will +/// tolerate before we call something unreliable +const UNRELIABLE_LOST_ANSWERS_ORDERED: u32 = 0; +/// Dead nodes are unreachable nodes, not 'never reached' nodes +/// /// How many times do we try to ping a never-reached node before we call it dead const NEVER_SEEN_PING_COUNT: u32 = 3; @@ -194,9 +202,12 @@ pub(crate) struct BucketEntryInner { /// The account for the state and reason statistics #[serde(skip)] state_stats_accounting: Mutex, - /// RPC answer stats accounting + /// RPC answer stats accounting for unordered protocols #[serde(skip)] - answer_stats_accounting: AnswerStatsAccounting, + answer_stats_accounting_unordered: AnswerStatsAccounting, + /// RPC answer stats accounting for ordered protocols + #[serde(skip)] + answer_stats_accounting_ordered: AnswerStatsAccounting, /// If the entry is being punished and should be considered dead #[serde(skip)] punishment: Option, @@ -375,11 +386,15 @@ impl BucketEntryInner { } // Less is faster - pub fn cmp_fastest(e1: &Self, e2: &Self) -> std::cmp::Ordering { + pub fn cmp_fastest( + e1: &Self, + e2: &Self, + metric: impl Fn(&LatencyStats) -> TimestampDuration, + ) -> std::cmp::Ordering { // Lower latency to the front if let Some(e1_latency) = &e1.peer_stats.latency { if let Some(e2_latency) = &e2.peer_stats.latency { - e1_latency.average.cmp(&e2_latency.average) + metric(e1_latency).cmp(&metric(e2_latency)) } else { std::cmp::Ordering::Less } @@ -391,7 +406,12 @@ impl BucketEntryInner { } // Less is more reliable then faster - pub fn cmp_fastest_reliable(cur_ts: Timestamp, e1: &Self, e2: &Self) -> std::cmp::Ordering { + pub fn cmp_fastest_reliable( + cur_ts: Timestamp, + e1: &Self, + e2: &Self, + metric: impl Fn(&LatencyStats) -> TimestampDuration, + ) -> std::cmp::Ordering { // Reverse compare so most reliable is at front let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts)); if ret != std::cmp::Ordering::Equal { @@ -399,7 +419,7 @@ impl BucketEntryInner { } // Lower latency to the front - Self::cmp_fastest(e1, e2) + Self::cmp_fastest(e1, e2, metric) } // Less is more reliable then older @@ -426,13 +446,6 @@ impl BucketEntryInner { } } - #[expect(dead_code)] - pub fn sort_fastest_reliable_fn( - cur_ts: Timestamp, - ) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering { - move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2) - } - pub fn update_signed_node_info( &mut self, routing_domain: RoutingDomain, @@ -877,7 +890,10 @@ impl BucketEntryInner { // called every ROLLING_ANSWERS_INTERVAL_SECS seconds pub(super) fn roll_answer_stats(&mut self, cur_ts: Timestamp) { - self.peer_stats.rpc_stats.answer = self.answer_stats_accounting.roll_answers(cur_ts); + self.peer_stats.rpc_stats.answer_unordered = + self.answer_stats_accounting_unordered.roll_answers(cur_ts); + self.peer_stats.rpc_stats.answer_ordered = + self.answer_stats_accounting_ordered.roll_answers(cur_ts); } ///// state machine handling @@ -890,8 +906,14 @@ impl BucketEntryInner { return Some(BucketEntryUnreliableReason::FailedToSend); } - // If we have had any lost answers recently, this is not reliable - if self.peer_stats.rpc_stats.recent_lost_answers > 0 { + // If we have had more than UNRELIABLE_LOST_ANSWERS_UNORDERED lost answers recently on an unordered protocol, this is not reliable + if self.peer_stats.rpc_stats.recent_lost_answers_unordered + > UNRELIABLE_LOST_ANSWERS_UNORDERED + { + return Some(BucketEntryUnreliableReason::LostAnswers); + } + // If we have had more than UNRELIABLE_LOST_ANSWERS_ORDERED lost answers recently on an ordered protocol, this is not reliable + if self.peer_stats.rpc_stats.recent_lost_answers_ordered > UNRELIABLE_LOST_ANSWERS_ORDERED { return Some(BucketEntryUnreliableReason::LostAnswers); } @@ -920,8 +942,9 @@ impl BucketEntryInner { // a node is not dead if we haven't heard from it yet, // but we give it NEVER_REACHED_PING_COUNT chances to ping before we say it's dead None => { - let no_answers = - self.peer_stats.rpc_stats.recent_lost_answers >= NEVER_SEEN_PING_COUNT; + let no_answers = self.peer_stats.rpc_stats.recent_lost_answers_unordered + + self.peer_stats.rpc_stats.recent_lost_answers_ordered + >= NEVER_SEEN_PING_COUNT; if no_answers { return Some(BucketEntryDeadReason::TooManyLostAnswers); } @@ -932,7 +955,8 @@ impl BucketEntryInner { Some(ts) => { let not_seen = cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1_000_000u64); - let no_answers = self.peer_stats.rpc_stats.recent_lost_answers + let no_answers = self.peer_stats.rpc_stats.recent_lost_answers_unordered + + self.peer_stats.rpc_stats.recent_lost_answers_ordered >= (UNRELIABLE_PING_SPAN_SECS / UNRELIABLE_PING_INTERVAL_SECS); if not_seen && no_answers { return Some(BucketEntryDeadReason::NoPingResponse); @@ -1046,7 +1070,8 @@ impl BucketEntryInner { pub(super) fn make_not_dead(&mut self, cur_ts: Timestamp) { self.peer_stats.rpc_stats.last_seen_ts = None; self.peer_stats.rpc_stats.failed_to_send = 0; - self.peer_stats.rpc_stats.recent_lost_answers = 0; + self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0; + self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0; assert!(self.check_dead(cur_ts).is_none()); } @@ -1081,9 +1106,19 @@ impl BucketEntryInner { //////////////////////////////////////////////////////////////// /// Called when rpc processor things happen - pub(super) fn question_sent(&mut self, ts: Timestamp, bytes: ByteCount, expects_answer: bool) { + pub(super) fn question_sent( + &mut self, + ts: Timestamp, + bytes: ByteCount, + expects_answer: bool, + ordered: bool, + ) { self.transfer_stats_accounting.add_up(bytes); - self.answer_stats_accounting.record_question(ts); + if ordered { + self.answer_stats_accounting_ordered.record_question(ts); + } else { + self.answer_stats_accounting_unordered.record_question(ts); + } self.peer_stats.rpc_stats.messages_sent += 1; self.peer_stats.rpc_stats.failed_to_send = 0; if expects_answer { @@ -1101,21 +1136,40 @@ impl BucketEntryInner { self.peer_stats.rpc_stats.messages_sent += 1; self.peer_stats.rpc_stats.failed_to_send = 0; } - pub(super) fn answer_rcvd(&mut self, send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount) { + pub(super) fn answer_rcvd( + &mut self, + send_ts: Timestamp, + recv_ts: Timestamp, + bytes: ByteCount, + ordered: bool, + ) { self.transfer_stats_accounting.add_down(bytes); - self.answer_stats_accounting.record_answer(recv_ts); + if ordered { + self.answer_stats_accounting_ordered.record_answer(recv_ts); + self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0; + } else { + self.answer_stats_accounting_unordered + .record_answer(recv_ts); + self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0; + } self.peer_stats.rpc_stats.messages_rcvd += 1; self.peer_stats.rpc_stats.questions_in_flight -= 1; self.record_latency(recv_ts.saturating_sub(send_ts)); self.touch_last_seen(recv_ts); - self.peer_stats.rpc_stats.recent_lost_answers = 0; } - pub(super) fn lost_answer(&mut self) { + pub(super) fn lost_answer(&mut self, ordered: bool) { let cur_ts = Timestamp::now(); - self.answer_stats_accounting.record_lost_answer(cur_ts); + if ordered { + self.answer_stats_accounting_ordered + .record_lost_answer(cur_ts); + self.peer_stats.rpc_stats.recent_lost_answers_ordered += 1; + } else { + self.answer_stats_accounting_unordered + .record_lost_answer(cur_ts); + self.peer_stats.rpc_stats.recent_lost_answers_unordered += 1; + } self.peer_stats.rpc_stats.first_consecutive_seen_ts = None; self.peer_stats.rpc_stats.questions_in_flight -= 1; - self.peer_stats.rpc_stats.recent_lost_answers += 1; } pub(super) fn failed_to_send(&mut self, ts: Timestamp, expects_answer: bool) { if expects_answer { @@ -1181,7 +1235,8 @@ impl BucketEntry { latency_stats_accounting: LatencyStatsAccounting::new(), transfer_stats_accounting: TransferStatsAccounting::new(), state_stats_accounting: Mutex::new(StateStatsAccounting::new()), - answer_stats_accounting: AnswerStatsAccounting::new(), + answer_stats_accounting_ordered: AnswerStatsAccounting::new(), + answer_stats_accounting_unordered: AnswerStatsAccounting::new(), punishment: None, #[cfg(feature = "tracking")] next_track_id: 0, diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 9bb02b95..c9b73a1e 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -117,7 +117,7 @@ impl RoutingTable { PunishmentReason::InvalidFraming => "PFRAME", PunishmentReason::FailedToDecodeOperation => "PDECOP", PunishmentReason::WrongSenderPeerInfo => "PSPBAD", - // PunishmentReason::FailedToVerifySenderPeerInfo => "PSPVER", + PunishmentReason::FailedToVerifySenderPeerInfo => "PSPVER", PunishmentReason::FailedToRegisterSenderPeerInfo => "PSPREG", // }, diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 43c58db7..d70bb20b 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -40,6 +40,10 @@ pub const MIN_PUBLIC_INTERNET_ROUTING_DOMAIN_NODE_COUNT: usize = 4; /// How frequently we tick the relay management routine pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; +/// How frequently we optimize relays +pub const RELAY_OPTIMIZATION_INTERVAL_SECS: u32 = 10; +/// What percentile to keep our relays optimized to +pub const RELAY_OPTIMIZATION_PERCENTILE: f32 = 75.0; /// How frequently we tick the private route management routine pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1; @@ -1130,4 +1134,27 @@ impl RoutingTable { } } } + + #[instrument(level = "trace", skip(self, filter, metric), ret)] + pub fn find_fastest_node( + &self, + cur_ts: Timestamp, + filter: impl Fn(&BucketEntryInner) -> bool, + metric: impl Fn(&LatencyStats) -> TimestampDuration, + ) -> Option { + let inner = self.inner.read(); + inner.find_fastest_node(cur_ts, filter, metric) + } + + #[instrument(level = "trace", skip(self, filter, metric), ret)] + pub fn get_node_speed_percentile( + &self, + node_id: TypedKey, + cur_ts: Timestamp, + filter: impl Fn(&BucketEntryInner) -> bool, + metric: impl Fn(&LatencyStats) -> TimestampDuration, + ) -> Option { + let inner = self.inner.read(); + inner.get_node_relative_performance(node_id, cur_ts, filter, metric) + } } diff --git a/veilid-core/src/routing_table/node_ref/traits.rs b/veilid-core/src/routing_table/node_ref/traits.rs index 22fceac9..02649e1e 100644 --- a/veilid-core/src/routing_table/node_ref/traits.rs +++ b/veilid-core/src/routing_table/node_ref/traits.rs @@ -278,10 +278,16 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait self.stats_failed_to_send(Timestamp::now(), false); } - fn stats_question_sent(&self, ts: Timestamp, bytes: ByteCount, expects_answer: bool) { + fn stats_question_sent( + &self, + ts: Timestamp, + bytes: ByteCount, + expects_answer: bool, + ordered: bool, + ) { self.operate_mut(|rti, e| { rti.transfer_stats_accounting().add_up(bytes); - e.question_sent(ts, bytes, expects_answer); + e.question_sent(ts, bytes, expects_answer, ordered); }) } fn stats_question_rcvd(&self, ts: Timestamp, bytes: ByteCount) { @@ -296,17 +302,23 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait e.answer_sent(bytes); }) } - fn stats_answer_rcvd(&self, send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount) { + fn stats_answer_rcvd( + &self, + send_ts: Timestamp, + recv_ts: Timestamp, + bytes: ByteCount, + ordered: bool, + ) { self.operate_mut(|rti, e| { rti.transfer_stats_accounting().add_down(bytes); rti.latency_stats_accounting() .record_latency(recv_ts.saturating_sub(send_ts)); - e.answer_rcvd(send_ts, recv_ts, bytes); + e.answer_rcvd(send_ts, recv_ts, bytes, ordered); }) } - fn stats_lost_answer(&self) { + fn stats_lost_answer(&self, ordered: bool) { self.operate_mut(|_rti, e| { - e.lost_answer(); + e.lost_answer(ordered); }) } fn stats_failed_to_send(&self, ts: Timestamp, expects_answer: bool) { diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs index 51be2aca..45623c11 100644 --- a/veilid-core/src/routing_table/route_spec_store/mod.rs +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -466,7 +466,9 @@ impl RouteSpecStore { // always prioritize reliable nodes, but sort by oldest or fastest entry1.with_inner(|e1| { entry2.with_inner(|e2| match safety_spec.stability { - Stability::LowLatency => BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2), + Stability::LowLatency => { + BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2, |ls| ls.tm90) + } Stability::Reliable => BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2), }) }) diff --git a/veilid-core/src/routing_table/routing_table_inner/mod.rs b/veilid-core/src/routing_table/routing_table_inner/mod.rs index 22505f21..e4a314f8 100644 --- a/veilid-core/src/routing_table/routing_table_inner/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/mod.rs @@ -13,6 +13,14 @@ pub const RECENT_PEERS_TABLE_SIZE: usize = 64; pub const LOCK_TAG_TICK: &str = "TICK"; pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>; + +#[derive(Debug)] +pub struct NodeRelativePerformance { + pub percentile: f32, + pub node_index: usize, + pub node_count: usize, +} + ////////////////////////////////////////////////////////////////////////// /// RoutingTable rwlock-internal data @@ -108,6 +116,9 @@ impl RoutingTableInner { pub fn relay_node_last_keepalive(&self, domain: RoutingDomain) -> Option { self.with_routing_domain(domain, |rdd| rdd.relay_node_last_keepalive()) } + pub fn relay_node_last_optimized(&self, domain: RoutingDomain) -> Option { + self.with_routing_domain(domain, |rdd| rdd.relay_node_last_optimized()) + } pub fn set_relay_node_last_keepalive(&mut self, domain: RoutingDomain, ts: Timestamp) { match domain { @@ -119,6 +130,16 @@ impl RoutingTableInner { .set_relay_node_last_keepalive(Some(ts)), }; } + pub fn set_relay_node_last_optimized(&mut self, domain: RoutingDomain, ts: Timestamp) { + match domain { + RoutingDomain::PublicInternet => self + .public_internet_routing_domain + .set_relay_node_last_optimized(Some(ts)), + RoutingDomain::LocalNetwork => self + .local_network_routing_domain + .set_relay_node_last_optimized(Some(ts)), + }; + } #[expect(dead_code)] pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { @@ -1382,6 +1403,105 @@ impl RoutingTableInner { // Unlock noderefs closest_nodes_locked.iter().map(|x| x.unlocked()).collect() } + + #[instrument(level = "trace", skip(self, filter, metric), ret)] + pub fn find_fastest_node( + &self, + cur_ts: Timestamp, + filter: impl Fn(&BucketEntryInner) -> bool, + metric: impl Fn(&LatencyStats) -> TimestampDuration, + ) -> Option { + // Go through all entries and find fastest entry that matches filter function + let mut fastest_node: Option> = None; + + // Iterate all known nodes for candidates + self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { + let entry2 = entry.clone(); + entry.with(rti, |rti, e| { + // Filter this node + if filter(e) { + // Compare against previous candidate + if let Some(fastest_node) = fastest_node.as_mut() { + // Less is faster + let better = fastest_node.with(rti, |_rti, best| { + // choose low latency stability for relays + BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best, &metric) + == std::cmp::Ordering::Less + }); + // Now apply filter function and see if this node should be included + if better { + *fastest_node = entry2; + } + } else { + // Always store the first candidate + fastest_node = Some(entry2); + } + } + }); + // Don't end early, iterate through all entries + Option::<()>::None + }); + // Return the fastest node + fastest_node.map(|e| NodeRef::new(self.registry(), e)) + } + + #[instrument(level = "trace", skip(self, filter, metric), ret)] + pub fn get_node_relative_performance( + &self, + node_id: TypedKey, + cur_ts: Timestamp, + filter: impl Fn(&BucketEntryInner) -> bool, + metric: impl Fn(&LatencyStats) -> TimestampDuration, + ) -> Option { + // Go through all entries and find all entries that matches filter function + let mut all_filtered_nodes: Vec> = Vec::new(); + + // Iterate all known nodes for candidates + self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { + let entry2 = entry.clone(); + entry.with(rti, |_rti, e| { + // Filter this node + if filter(e) { + all_filtered_nodes.push(entry2); + } + }); + // Don't end early, iterate through all entries + Option::<()>::None + }); + + // Sort by fastest tm90 reliable + all_filtered_nodes.sort_by(|a, b| { + a.with(self, |rti, ea| { + b.with(rti, |_rti, eb| { + BucketEntryInner::cmp_fastest_reliable(cur_ts, ea, eb, &metric) + }) + }) + }); + + // Get position in list of requested node + let node_count = all_filtered_nodes.len(); + let node_index = all_filtered_nodes + .iter() + .position(|x| x.with(self, |_rti, e| e.node_ids().contains(&node_id)))?; + + // Print faster node stats + #[cfg(feature = "verbose-tracing")] + for nl in 0..node_index { + let (latency, node_id) = all_filtered_nodes[nl].with(self, |_rti, e| { + (e.peer_stats().latency.clone(), e.best_node_id()) + }); + if let Some(latency) = latency { + veilid_log!(self debug "Better relay {}: {}: {}", nl, node_id, latency); + } + } + + // Return 'percentile' position. Fastest node is 100%. + Some(NodeRelativePerformance { + percentile: 100.0f32 - ((node_index * 100) as f32) / (node_count as f32), + node_index, + node_count, + }) + } } #[instrument(level = "trace", skip_all)] diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs index e6882631..c111ae85 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs @@ -89,6 +89,9 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { fn relay_node_last_keepalive(&self) -> Option { self.common.relay_node_last_keepalive() } + fn relay_node_last_optimized(&self) -> Option { + self.common.relay_node_last_optimized() + } fn dial_info_details(&self) -> &Vec { self.common.dial_info_details() } @@ -236,4 +239,8 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { fn set_relay_node_last_keepalive(&mut self, ts: Option) { self.common.set_relay_node_last_keepalive(ts); } + + fn set_relay_node_last_optimized(&mut self, ts: Option) { + self.common.set_relay_node_last_optimized(ts); + } } diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/mod.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/mod.rs index a8a648d8..6ec364e2 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/mod.rs @@ -22,6 +22,7 @@ pub trait RoutingDomainDetail { fn requires_relay(&self) -> Option; fn relay_node(&self) -> Option; fn relay_node_last_keepalive(&self) -> Option; + fn relay_node_last_optimized(&self) -> Option; fn dial_info_details(&self) -> &Vec; fn get_published_peer_info(&self) -> Option>; fn inbound_dial_info_filter(&self) -> DialInfoFilter; @@ -55,6 +56,8 @@ pub trait RoutingDomainDetail { // Set last relay keepalive time fn set_relay_node_last_keepalive(&mut self, ts: Option); + // Set last relay optimized time + fn set_relay_node_last_optimized(&mut self, ts: Option); } trait RoutingDomainDetailCommonAccessors: RoutingDomainDetail { @@ -125,6 +128,7 @@ struct RoutingDomainDetailCommon { // caches cached_peer_info: Mutex>>, relay_node_last_keepalive: Option, + relay_node_last_optimized: Option, } impl RoutingDomainDetailCommon { @@ -140,6 +144,7 @@ impl RoutingDomainDetailCommon { confirmed: false, cached_peer_info: Mutex::new(Default::default()), relay_node_last_keepalive: Default::default(), + relay_node_last_optimized: Default::default(), } } @@ -227,6 +232,10 @@ impl RoutingDomainDetailCommon { self.relay_node_last_keepalive } + pub fn relay_node_last_optimized(&self) -> Option { + self.relay_node_last_optimized + } + pub fn dial_info_details(&self) -> &Vec { &self.dial_info_details } @@ -277,6 +286,12 @@ impl RoutingDomainDetailCommon { fn set_relay_node(&mut self, opt_relay_node: Option) { self.relay_node = opt_relay_node; self.relay_node_last_keepalive = None; + self.relay_node_last_optimized = if self.relay_node.is_some() { + Some(Timestamp::now()) + } else { + None + }; + self.clear_cache(); } @@ -317,6 +332,9 @@ impl RoutingDomainDetailCommon { fn set_relay_node_last_keepalive(&mut self, ts: Option) { self.relay_node_last_keepalive = ts; } + fn set_relay_node_last_optimized(&mut self, ts: Option) { + self.relay_node_last_optimized = ts; + } ////////////////////////////////////////////////////////////////////////////// // Internal functions diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs index c63d811a..eaee529b 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs @@ -70,6 +70,9 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { fn relay_node_last_keepalive(&self) -> Option { self.common.relay_node_last_keepalive() } + fn relay_node_last_optimized(&self) -> Option { + self.common.relay_node_last_optimized() + } fn dial_info_details(&self) -> &Vec { self.common.dial_info_details() } @@ -400,4 +403,8 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { fn set_relay_node_last_keepalive(&mut self, ts: Option) { self.common.set_relay_node_last_keepalive(ts); } + + fn set_relay_node_last_optimized(&mut self, ts: Option) { + self.common.set_relay_node_last_optimized(ts); + } } diff --git a/veilid-core/src/routing_table/stats_accounting.rs b/veilid-core/src/routing_table/stats_accounting.rs index 540a6a03..a10b6a61 100644 --- a/veilid-core/src/routing_table/stats_accounting.rs +++ b/veilid-core/src/routing_table/stats_accounting.rs @@ -2,7 +2,7 @@ use super::*; // Latency entry is per round-trip packet (ping or data) // - Size is number of entries -const ROLLING_LATENCIES_SIZE: usize = 10; +const ROLLING_LATENCIES_SIZE: usize = 50; // Transfers entries are in bytes total for the interval // - Size is number of entries @@ -102,28 +102,64 @@ impl LatencyStatsAccounting { } } + fn get_tm_n(sorted_latencies: &[TimestampDuration], n: usize) -> Option { + let tmcount = sorted_latencies.len() * n / 100; + if tmcount == 0 { + None + } else { + let mut tm = TimestampDuration::new(0); + for l in &sorted_latencies[..tmcount] { + tm += *l; + } + tm /= tmcount as u64; + Some(tm) + } + } + + fn get_p_n(sorted_latencies: &[TimestampDuration], n: usize) -> TimestampDuration { + let pindex = (sorted_latencies.len() * n / 100).saturating_sub(1); + sorted_latencies[pindex] + } + pub fn record_latency(&mut self, latency: TimestampDuration) -> LatencyStats { while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE { self.rolling_latencies.pop_front(); } self.rolling_latencies.push_back(latency); - let mut ls = LatencyStats { - fastest: u64::MAX.into(), - average: 0.into(), - slowest: 0.into(), - }; + // Calculate latency stats + + let mut fastest = TimestampDuration::new(u64::MAX); + let mut slowest = TimestampDuration::new(0u64); + let mut average = TimestampDuration::new(0u64); + for rl in &self.rolling_latencies { - ls.fastest.min_assign(*rl); - ls.slowest.max_assign(*rl); - ls.average += *rl; + fastest.min_assign(*rl); + slowest.max_assign(*rl); + average += *rl; } let len = self.rolling_latencies.len() as u64; if len > 0 { - ls.average /= len; + average /= len; } - ls + let mut sorted_latencies: Vec<_> = self.rolling_latencies.iter().copied().collect(); + sorted_latencies.sort(); + + let tm90 = Self::get_tm_n(&sorted_latencies, 90).unwrap_or(average); + let tm75 = Self::get_tm_n(&sorted_latencies, 75).unwrap_or(average); + let p90 = Self::get_p_n(&sorted_latencies, 90); + let p75 = Self::get_p_n(&sorted_latencies, 75); + + LatencyStats { + fastest, + average, + slowest, + tm90, + tm75, + p90, + p75, + } } } diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index f880dd37..d71be651 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -317,7 +317,7 @@ impl RoutingTable { if count == 0 { return; } - veilid_log!(self debug "[{}] Ping validation queue: {} remaining", name, count); + veilid_log!(self debug target:"network_result", "[{}] Ping validation queue: {} remaining", name, count); let atomic_count = AtomicUsize::new(count); process_batched_future_queue(future_queue, MAX_PARALLEL_PINGS, stop_token, |res| async { @@ -326,7 +326,7 @@ impl RoutingTable { } let remaining = atomic_count.fetch_sub(1, Ordering::AcqRel) - 1; if remaining > 0 { - veilid_log!(self debug "[{}] Ping validation queue: {} remaining", name, remaining); + veilid_log!(self debug target:"network_result", "[{}] Ping validation queue: {} remaining", name, remaining); } }) .await; diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs index dc234396..b3d6c61d 100644 --- a/veilid-core/src/routing_table/tasks/relay_management.rs +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -71,7 +71,7 @@ impl RoutingTable { false } // Relay node no longer can relay - else if relay_node.operate(|_rti, e| !relay_node_filter(e)) { + else if relay_node.operate(|_rti, e| !&relay_node_filter(e)) { veilid_log!(self debug "Relay node can no longer relay, dropping relay {}", relay_node @@ -88,7 +88,86 @@ impl RoutingTable { editor.set_relay_node(None); false } else { - true + // See if our relay was optimized last long enough ago to consider getting a new one + // if it is no longer fast enough + let mut has_relay = true; + let mut inner = self.inner.upgradable_read(); + if let Some(last_optimized) = + inner.relay_node_last_optimized(RoutingDomain::PublicInternet) + { + let last_optimized_duration = cur_ts - last_optimized; + if last_optimized_duration + > TimestampDuration::new_secs(RELAY_OPTIMIZATION_INTERVAL_SECS) + { + // See what our relay's current percentile is + let relay_node_id = relay_node.best_node_id(); + if let Some(relay_relative_performance) = inner + .get_node_relative_performance( + relay_node_id, + cur_ts, + &relay_node_filter, + |ls| ls.tm90, + ) + { + // Get latency numbers + let latency_stats = + if let Some(latency) = relay_node.peer_stats().latency { + latency.to_string() + } else { + "[no stats]".to_owned() + }; + + // Get current relay reliability + let state_reason = relay_node.state_reason(cur_ts); + + if relay_relative_performance.percentile + < RELAY_OPTIMIZATION_PERCENTILE + { + // Drop the current relay so we can get the best new one + veilid_log!(self debug + "Relay tm90 is ({:.2}% < {:.2}%) ({} out of {}) (latency {}, {:?}) optimizing relay {}", + relay_relative_performance.percentile, + RELAY_OPTIMIZATION_PERCENTILE, + relay_relative_performance.node_index, + relay_relative_performance.node_count, + latency_stats, + state_reason, + relay_node + ); + editor.set_relay_node(None); + has_relay = false; + } else { + // Note that we tried to optimize the relay but it was good + veilid_log!(self debug + "Relay tm90 is ({:.2}% >= {:.2}%) ({} out of {}) (latency {}, {:?}) keeping {}", + relay_relative_performance.percentile, + RELAY_OPTIMIZATION_PERCENTILE, + relay_relative_performance.node_index, + relay_relative_performance.node_count, + latency_stats, + state_reason, + relay_node + ); + inner.with_upgraded(|inner| { + inner.set_relay_node_last_optimized( + RoutingDomain::PublicInternet, + cur_ts, + ) + }); + } + } else { + // Drop the current relay because it could not be measured + veilid_log!(self debug + "Relay relative performance not found {}", + relay_node + ); + editor.set_relay_node(None); + has_relay = false; + } + } + } + + has_relay } } else { false @@ -123,11 +202,7 @@ impl RoutingTable { } if !got_outbound_relay { // Find a node in our routing table that is an acceptable inbound relay - if let Some(nr) = self.find_inbound_relay( - RoutingDomain::PublicInternet, - cur_ts, - relay_node_filter, - ) { + if let Some(nr) = self.find_fastest_node(cur_ts, &relay_node_filter, |ls| ls.tm90) { veilid_log!(self debug "Inbound relay node selected: {}", nr); editor.set_relay_node(Some(nr)); } @@ -233,47 +308,4 @@ impl RoutingTable { true } } - - #[instrument(level = "trace", skip(self, relay_node_filter), ret)] - pub fn find_inbound_relay( - &self, - routing_domain: RoutingDomain, - cur_ts: Timestamp, - relay_node_filter: impl Fn(&BucketEntryInner) -> bool, - ) -> Option { - // Go through all entries and find fastest entry that matches filter function - let inner = self.inner.read(); - let inner = &*inner; - let mut best_inbound_relay: Option> = None; - - // Iterate all known nodes for candidates - inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { - let entry2 = entry.clone(); - entry.with(rti, |rti, e| { - // Filter this node - if relay_node_filter(e) { - // Compare against previous candidate - if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { - // Less is faster - let better = best_inbound_relay.with(rti, |_rti, best| { - // choose low latency stability for relays - BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) - == std::cmp::Ordering::Less - }); - // Now apply filter function and see if this node should be included - if better { - *best_inbound_relay = entry2; - } - } else { - // Always store the first candidate - best_inbound_relay = Some(entry2); - } - } - }); - // Don't end early, iterate through all entries - Option::<()>::None - }); - // Return the best inbound relay noderef - best_inbound_relay.map(|e| NodeRef::new(self.registry(), e)) - } } diff --git a/veilid-core/src/routing_table/types/node_info.rs b/veilid-core/src/routing_table/types/node_info.rs index 842ef87e..63ca4b30 100644 --- a/veilid-core/src/routing_table/types/node_info.rs +++ b/veilid-core/src/routing_table/types/node_info.rs @@ -184,7 +184,12 @@ impl NodeInfo { true } - /// Does this appear on the same network within the routing domain + /// Does this appear on the same network within the routing domain? + /// The notion of 'ipblock' is a single external IP address for ipv4, and a fixed prefix for ipv6. + /// If a NAT is present, this detects if two public peerinfo would share the same router and be + /// subject to hairpin NAT (for ipv4 typically). This is also overloaded for the concept + /// of rate-limiting the number of nodes coming from the same ip 'block' within a specific amount of + /// time for the address filter. pub fn node_is_on_same_ipblock(&self, node_b: &NodeInfo, ip6_prefix_size: usize) -> bool { let our_ip_blocks = self .dial_info_detail_list() diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 8b1b0906..300f35dc 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -69,16 +69,22 @@ impl_veilid_log_facility!("rpc"); #[derive(Debug)] #[must_use] -struct WaitableReply { - handle: OperationWaitHandle>, +struct WaitableReplyContext { timeout_us: TimestampDuration, node_ref: NodeRef, send_ts: Timestamp, - send_data_method: SendDataResult, + send_data_result: SendDataResult, safety_route: Option, remote_private_route: Option, reply_private_route: Option, +} + +#[derive(Debug)] +#[must_use] +struct WaitableReply { + handle: OperationWaitHandle>, _opt_connection_ref_scope: Option, + context: WaitableReplyContext, } ///////////////////////////////////////////////////////////////////// @@ -327,14 +333,19 @@ impl RPCProcessor { // Sender PeerInfo was specified, update our routing table with it if !self.verify_node_info(routing_domain, peer_info.signed_node_info(), &[]) { - veilid_log!(self debug target:"network_result", "Dropping invalid PeerInfo in {:?} for id {}: {:?}", routing_domain, sender_node_id, peer_info); - // Don't punish for this because in the case of hairpin NAT - // you can legally get LocalNetwork PeerInfo when you expect PublicInternet PeerInfo - // - // self.network_manager().address_filter().punish_node_id( - // sender_node_id, - // PunishmentReason::FailedToVerifySenderPeerInfo, - // ); + veilid_log!(self debug target:"network_result", "Punishing invalid PeerInfo in {:?} for id {}: {:?}", routing_domain, sender_node_id, peer_info); + + // Punish nodes that send peer info for the wrong routing domain + // Hairpin NAT situations where routing domain appears to be LocalNetwork + // shoud not happen. These nodes should be using InboundRelay now to communicate + // due to the 'node_is_on_same_ipblock' check in PublicInternetRoutigDomainDetail::get_contact_method. + // Nodes that are actually sending LocalNetwork ip addresses over the PublicInternet domain + // or vice-versa need to be punished. + + self.network_manager().address_filter().punish_node_id( + sender_node_id, + PunishmentReason::FailedToVerifySenderPeerInfo, + ); return Ok(NetworkResult::value(None)); } let sender_nr = match self @@ -512,28 +523,16 @@ impl RPCProcessor { let id = waitable_reply.handle.id(); let out = self .waiting_rpc_table - .wait_for_op(waitable_reply.handle, waitable_reply.timeout_us) + .wait_for_op(waitable_reply.handle, waitable_reply.context.timeout_us) .await; match &out { Err(e) => { veilid_log!(self debug "RPC Lost (id={} {}): {}", id, debug_string, e); - self.record_lost_answer( - waitable_reply.send_ts, - waitable_reply.node_ref.clone(), - waitable_reply.safety_route, - waitable_reply.remote_private_route, - waitable_reply.reply_private_route, - ); + self.record_lost_answer(&waitable_reply.context); } Ok(TimeoutOr::Timeout) => { veilid_log!(self debug "RPC Lost (id={} {}): Timeout", id, debug_string); - self.record_lost_answer( - waitable_reply.send_ts, - waitable_reply.node_ref.clone(), - waitable_reply.safety_route, - waitable_reply.remote_private_route, - waitable_reply.reply_private_route, - ); + self.record_lost_answer(&waitable_reply.context); } Ok(TimeoutOr::Value((rpcreader, _))) => { // Reply received @@ -541,17 +540,13 @@ impl RPCProcessor { // Record answer received self.record_answer_received( - waitable_reply.send_ts, recv_ts, rpcreader.header.body_len, - waitable_reply.node_ref.clone(), - waitable_reply.safety_route, - waitable_reply.remote_private_route, - waitable_reply.reply_private_route, + &waitable_reply.context, ); // Ensure the reply comes over the private route that was requested - if let Some(reply_private_route) = waitable_reply.reply_private_route { + if let Some(reply_private_route) = waitable_reply.context.reply_private_route { match &rpcreader.header.detail { RPCMessageHeaderDetail::Direct(_) => { return Err(RPCError::protocol( @@ -882,20 +877,15 @@ impl RPCProcessor { /// Record question lost to node or route #[instrument(level = "trace", target = "rpc", skip_all)] - fn record_lost_answer( - &self, - send_ts: Timestamp, - node_ref: NodeRef, - safety_route: Option, - remote_private_route: Option, - private_route: Option, - ) { + fn record_lost_answer(&self, context: &WaitableReplyContext) { // Record for node if this was not sent via a route - if safety_route.is_none() && remote_private_route.is_none() { - node_ref.stats_lost_answer(); + if context.safety_route.is_none() && context.remote_private_route.is_none() { + context + .node_ref + .stats_lost_answer(context.send_data_result.is_ordered()); // Also clear the last_connections for the entry so we make a new connection next time - node_ref.clear_last_flows(); + context.node_ref.clear_last_flows(); return; } @@ -904,20 +894,20 @@ impl RPCProcessor { let rss = routing_table.route_spec_store(); // If safety route was used, record question lost there - if let Some(sr_pubkey) = &safety_route { - rss.with_route_stats_mut(send_ts, sr_pubkey, |s| { + if let Some(sr_pubkey) = &context.safety_route { + rss.with_route_stats_mut(context.send_ts, sr_pubkey, |s| { s.record_lost_answer(); }); } // If remote private route was used, record question lost there - if let Some(rpr_pubkey) = &remote_private_route { - rss.with_route_stats_mut(send_ts, rpr_pubkey, |s| { + if let Some(rpr_pubkey) = &context.remote_private_route { + rss.with_route_stats_mut(context.send_ts, rpr_pubkey, |s| { s.record_lost_answer(); }); } - // If private route was used, record question lost there - if let Some(pr_pubkey) = &private_route { - rss.with_route_stats_mut(send_ts, pr_pubkey, |s| { + // If reply private route was used, record question lost there + if let Some(pr_pubkey) = &context.reply_private_route { + rss.with_route_stats_mut(context.send_ts, pr_pubkey, |s| { s.record_lost_answer(); }); } @@ -925,6 +915,7 @@ impl RPCProcessor { /// Record success sending to node or route #[instrument(level = "trace", target = "rpc", skip_all)] + #[expect(clippy::too_many_arguments)] fn record_send_success( &self, rpc_kind: RPCKind, @@ -933,6 +924,7 @@ impl RPCProcessor { node_ref: NodeRef, safety_route: Option, remote_private_route: Option, + ordered: bool, ) { // Record for node if this was not sent via a route if safety_route.is_none() && remote_private_route.is_none() { @@ -942,7 +934,7 @@ impl RPCProcessor { if is_answer { node_ref.stats_answer_sent(bytes); } else { - node_ref.stats_question_sent(send_ts, bytes, wants_answer); + node_ref.stats_question_sent(send_ts, bytes, wants_answer, ordered); } return; } @@ -971,18 +963,21 @@ impl RPCProcessor { #[instrument(level = "trace", target = "rpc", skip_all)] fn record_answer_received( &self, - send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount, - node_ref: NodeRef, - safety_route: Option, - remote_private_route: Option, - reply_private_route: Option, + context: &WaitableReplyContext, ) { // Record stats for remote node if this was direct - if safety_route.is_none() && remote_private_route.is_none() && reply_private_route.is_none() + if context.safety_route.is_none() + && context.remote_private_route.is_none() + && context.reply_private_route.is_none() { - node_ref.stats_answer_rcvd(send_ts, recv_ts, bytes); + context.node_ref.stats_answer_rcvd( + context.send_ts, + recv_ts, + bytes, + context.send_data_result.is_ordered(), + ); return; } // Get route spec store @@ -991,11 +986,11 @@ impl RPCProcessor { // Get latency for all local routes let mut total_local_latency = TimestampDuration::new(0u64); - let total_latency: TimestampDuration = recv_ts.saturating_sub(send_ts); + let total_latency: TimestampDuration = recv_ts.saturating_sub(context.send_ts); // If safety route was used, record route there - if let Some(sr_pubkey) = &safety_route { - rss.with_route_stats_mut(send_ts, sr_pubkey, |s| { + if let Some(sr_pubkey) = &context.safety_route { + rss.with_route_stats_mut(context.send_ts, sr_pubkey, |s| { // Record received bytes s.record_answer_received(recv_ts, bytes); @@ -1005,8 +1000,8 @@ impl RPCProcessor { } // If local private route was used, record route there - if let Some(pr_pubkey) = &reply_private_route { - rss.with_route_stats_mut(send_ts, pr_pubkey, |s| { + if let Some(pr_pubkey) = &context.reply_private_route { + rss.with_route_stats_mut(context.send_ts, pr_pubkey, |s| { // Record received bytes s.record_answer_received(recv_ts, bytes); @@ -1016,8 +1011,8 @@ impl RPCProcessor { } // If remote private route was used, record there - if let Some(rpr_pubkey) = &remote_private_route { - rss.with_route_stats_mut(send_ts, rpr_pubkey, |s| { + if let Some(rpr_pubkey) = &context.remote_private_route { + rss.with_route_stats_mut(context.send_ts, rpr_pubkey, |s| { // Record received bytes s.record_answer_received(recv_ts, bytes); @@ -1040,13 +1035,13 @@ impl RPCProcessor { // If no remote private route was used, then record half the total latency on our local routes // This is fine because if we sent with a local safety route, // then we must have received with a local private route too, per the design rules - if let Some(sr_pubkey) = &safety_route { - rss.with_route_stats_mut(send_ts, sr_pubkey, |s| { + if let Some(sr_pubkey) = &context.safety_route { + rss.with_route_stats_mut(context.send_ts, sr_pubkey, |s| { s.record_latency(total_latency / 2u64); }); } - if let Some(pr_pubkey) = &reply_private_route { - rss.with_route_stats_mut(send_ts, pr_pubkey, |s| { + if let Some(pr_pubkey) = &context.reply_private_route { + rss.with_route_stats_mut(context.send_ts, pr_pubkey, |s| { s.record_latency(total_latency / 2u64); }); } @@ -1157,7 +1152,7 @@ impl RPCProcessor { ); RPCError::network(e) })?; - let send_data_method = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { + let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { // If we couldn't send we're still cleaning up self.record_send_failure(RPCKind::Question, send_ts, node_ref.unfiltered(), safety_route, remote_private_route); network_result_raise!(res); @@ -1172,11 +1167,12 @@ impl RPCProcessor { node_ref.unfiltered(), safety_route, remote_private_route, + send_data_result.is_ordered(), ); // Ref the connection so it doesn't go away until we're done with the waitable reply let opt_connection_ref_scope = - send_data_method.unique_flow().connection_id.and_then(|id| { + send_data_result.unique_flow().connection_id.and_then(|id| { self.network_manager() .connection_manager() .try_connection_ref_scope(id) @@ -1185,14 +1181,16 @@ impl RPCProcessor { // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { handle, - timeout_us, - node_ref: node_ref.unfiltered(), - send_ts, - send_data_method, - safety_route, - remote_private_route, - reply_private_route, _opt_connection_ref_scope: opt_connection_ref_scope, + context: WaitableReplyContext { + timeout_us, + node_ref: node_ref.unfiltered(), + send_ts, + send_data_result, + safety_route, + remote_private_route, + reply_private_route, + }, })) } @@ -1243,7 +1241,7 @@ impl RPCProcessor { ); RPCError::network(e) })?; - let _send_data_method = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { + let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { // If we couldn't send we're still cleaning up self.record_send_failure(RPCKind::Statement, send_ts, node_ref.unfiltered(), safety_route, remote_private_route); network_result_raise!(res); @@ -1258,6 +1256,7 @@ impl RPCProcessor { node_ref.unfiltered(), safety_route, remote_private_route, + send_data_result.is_ordered(), ); Ok(NetworkResult::value(())) @@ -1313,7 +1312,7 @@ impl RPCProcessor { ); RPCError::network(e) })?; - let _send_data_kind = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { + let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { // If we couldn't send we're still cleaning up self.record_send_failure(RPCKind::Answer, send_ts, node_ref.unfiltered(), safety_route, remote_private_route); network_result_raise!(res); @@ -1328,6 +1327,7 @@ impl RPCProcessor { node_ref.unfiltered(), safety_route, remote_private_route, + send_data_result.is_ordered(), ); Ok(NetworkResult::value(())) diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index f676541b..4a89ebbc 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -27,7 +27,7 @@ impl RPCProcessor { let waitable_reply = network_result_try!(self.question(dest, question, None).await?); // Keep the reply private route that was used to return with the answer - let reply_private_route = waitable_reply.reply_private_route; + let reply_private_route = waitable_reply.context.reply_private_route; // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 31bf3ee5..88ee0aed 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -47,7 +47,7 @@ impl RPCProcessor { let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?); // Keep the reply private route that was used to return with the answer - let reply_private_route = waitable_reply.reply_private_route; + let reply_private_route = waitable_reply.context.reply_private_route; // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 3d49d848..68717c9b 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -86,7 +86,7 @@ impl RPCProcessor { ); // Keep the reply private route that was used to return with the answer - let reply_private_route = waitable_reply.reply_private_route; + let reply_private_route = waitable_reply.context.reply_private_route; // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_inspect_value.rs b/veilid-core/src/rpc_processor/rpc_inspect_value.rs index ed373521..3126ad61 100644 --- a/veilid-core/src/rpc_processor/rpc_inspect_value.rs +++ b/veilid-core/src/rpc_processor/rpc_inspect_value.rs @@ -89,7 +89,7 @@ impl RPCProcessor { ); // Keep the reply private route that was used to return with the answer - let reply_private_route = waitable_reply.reply_private_route; + let reply_private_route = waitable_reply.context.reply_private_route; // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index fe05c681..36a2a018 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -98,7 +98,7 @@ impl RPCProcessor { ); // Keep the reply private route that was used to return with the answer - let reply_private_route = waitable_reply.reply_private_route; + let reply_private_route = waitable_reply.context.reply_private_route; // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index cdf6208f..cdf1ec0d 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -73,10 +73,10 @@ impl RPCProcessor { network_result_try!(self.question(dest.clone(), question, None).await?); // Note what kind of ping this was and to what peer scope - let send_data_method = waitable_reply.send_data_method.clone(); + let send_data_method = waitable_reply.context.send_data_result.clone(); // Keep the reply private route that was used to return with the answer - let reply_private_route = waitable_reply.reply_private_route; + let reply_private_route = waitable_reply.context.reply_private_route; // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index a3196db4..77a45da9 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -91,7 +91,7 @@ impl RPCProcessor { network_result_try!(self.question(dest.clone(), question, None).await?); // Keep the reply private route that was used to return with the answer - let reply_private_route = waitable_reply.reply_private_route; + let reply_private_route = waitable_reply.context.reply_private_route; // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/veilid_api/tests/fixtures.rs b/veilid-core/src/veilid_api/tests/fixtures.rs index d2e1e454..d94e70ab 100644 --- a/veilid-core/src/veilid_api/tests/fixtures.rs +++ b/veilid-core/src/veilid_api/tests/fixtures.rs @@ -7,6 +7,10 @@ pub fn fix_latencystats() -> LatencyStats { fastest: TimestampDuration::from(1234), average: TimestampDuration::from(2345), slowest: TimestampDuration::from(3456), + tm90: TimestampDuration::from(4567), + tm75: TimestampDuration::from(5678), + p90: TimestampDuration::from(6789), + p75: TimestampDuration::from(7890), } } @@ -49,9 +53,11 @@ pub fn fix_rpcstats() -> RPCStats { last_question_ts: Some(Timestamp::from(1685569084280)), last_seen_ts: Some(Timestamp::from(1685569101256)), first_consecutive_seen_ts: Some(Timestamp::from(1685569111851)), - recent_lost_answers: 5, + recent_lost_answers_unordered: 5, + recent_lost_answers_ordered: 6, failed_to_send: 3, - answer: fix_answerstats(), + answer_unordered: fix_answerstats(), + answer_ordered: fix_answerstats(), } } diff --git a/veilid-core/src/veilid_api/types/stats.rs b/veilid-core/src/veilid_api/types/stats.rs index 5ec81501..590b0356 100644 --- a/veilid-core/src/veilid_api/types/stats.rs +++ b/veilid-core/src/veilid_api/types/stats.rs @@ -10,14 +10,26 @@ pub struct LatencyStats { pub average: TimestampDuration, /// slowest latency in the ROLLING_LATENCIES_SIZE last latencies pub slowest: TimestampDuration, + /// trimmed mean with lowest 90% latency in the ROLLING_LATENCIES_SIZE + #[serde(default)] + pub tm90: TimestampDuration, + /// trimmed mean with lowest 75% latency in the ROLLING_LATENCIES_SIZE + #[serde(default)] + pub tm75: TimestampDuration, + /// p90 latency in the ROLLING_LATENCIES_SIZE + #[serde(default)] + pub p90: TimestampDuration, + /// p75 latency in the ROLLING_LATENCIES_SIZE + #[serde(default)] + pub p75: TimestampDuration, } impl fmt::Display for LatencyStats { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "{} slow / {} avg / {} fast", - self.slowest, self.average, self.fastest + "slow={} / avg={} / fast={} / tm90={} / tm75={} / p90={} / p75={}", + self.slowest, self.average, self.fastest, self.tm90, self.tm75, self.p90, self.p75 )?; Ok(()) } @@ -206,13 +218,20 @@ pub struct RPCStats { pub last_seen_ts: Option, /// the timestamp of the first consecutive proof-of-life for this node (an answer or received question) pub first_consecutive_seen_ts: Option, - /// number of answers that have been lost consecutively - pub recent_lost_answers: u32, /// number of messages that have failed to send or connections dropped since we last successfully sent one pub failed_to_send: u32, - /// rpc answer stats for this peer + /// number of answers that have been lost consecutively over an unordered channel #[serde(default)] - pub answer: AnswerStats, + pub recent_lost_answers_unordered: u32, + /// number of answers that have been lost consecutively over an ordered channel + #[serde(default)] + pub recent_lost_answers_ordered: u32, + /// unordered rpc answer stats for this peer + #[serde(default)] + pub answer_unordered: AnswerStats, + /// ordered rpc answer stats for this peer + #[serde(default)] + pub answer_ordered: AnswerStats, } impl fmt::Display for RPCStats { @@ -224,9 +243,10 @@ impl fmt::Display for RPCStats { )?; writeln!( f, - "# recently-lost/failed-to-send: {} / {}", - self.recent_lost_answers, self.failed_to_send + "# recently-lost unordered/ordered: {} / {}", + self.recent_lost_answers_unordered, self.recent_lost_answers_ordered, )?; + writeln!(f, "# failed-to-send: {}", self.failed_to_send)?; writeln!( f, "last_question: {}", @@ -255,7 +275,16 @@ impl fmt::Display for RPCStats { } )?; - write!(f, "answers:\n{}", indent_all_string(&self.answer))?; + write!( + f, + "unreliable answers:\n{}", + indent_all_string(&self.answer_unordered) + )?; + write!( + f, + "reliable answers:\n{}", + indent_all_string(&self.answer_ordered) + )?; Ok(()) } diff --git a/veilid-core/tests/web.rs b/veilid-core/tests/web.rs index 361421e6..1fe84138 100644 --- a/veilid-core/tests/web.rs +++ b/veilid-core/tests/web.rs @@ -19,11 +19,11 @@ pub fn setup() -> () { SETUP_ONCE.call_once(|| { console_error_panic_hook::set_once(); - let mut builder = tracing_wasm::WASMLayerConfigBuilder::new(); - builder.set_report_logs_in_timings(false); - builder.set_max_level(Level::TRACE); - builder.set_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor); - tracing_wasm::set_as_global_default_with_config(builder.build()); + let config = veilid_tracing_wasm::WASMLayerConfig::new() + .with_report_logs_in_timings(false) + .with_max_level(Level::TRACE) + .with_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor); + tracing_wasm::set_as_global_default_with_config(config); }); } diff --git a/veilid-flutter/example/lib/log.dart b/veilid-flutter/example/lib/log.dart index 7c8bdf6e..625606b8 100644 --- a/veilid-flutter/example/lib/log.dart +++ b/veilid-flutter/example/lib/log.dart @@ -1,8 +1,8 @@ // ignore_for_file: prefer_single_quotes +import 'dart:io' show Platform; import 'package:ansicolor/ansicolor.dart'; import 'package:flutter/foundation.dart'; -import 'package:flutter/material.dart'; import 'package:loggy/loggy.dart'; import 'package:veilid/veilid.dart'; @@ -84,7 +84,14 @@ class CallbackPrinter extends LoggyPrinter { @override void onLog(LogRecord record) { - debugPrint(record.pretty()); + final out = record.pretty().replaceAll('\uFFFD', ''); + + if (!kIsWeb && Platform.isAndroid) { + debugPrint(out); + } else { + debugPrintSynchronously(out); + } + callback?.call(record); } diff --git a/veilid-flutter/example/lib/log_terminal.dart b/veilid-flutter/example/lib/log_terminal.dart index 72377da6..a5ef7a42 100644 --- a/veilid-flutter/example/lib/log_terminal.dart +++ b/veilid-flutter/example/lib/log_terminal.dart @@ -27,8 +27,9 @@ class _LogTerminalState extends State { void initState() { super.initState(); _terminal.setLineFeedMode(true); - globalTerminalPrinter.setCallback((log) { - _terminal.write('${log.pretty()}\n'); + globalTerminalPrinter.setCallback((record) { + final out = record.pretty().replaceAll('\uFFFD', ''); + _terminal.write('$out\n'); }); } diff --git a/veilid-flutter/lib/veilid_state.dart b/veilid-flutter/lib/veilid_state.dart index 7e2f683f..73223617 100644 --- a/veilid-flutter/lib/veilid_state.dart +++ b/veilid-flutter/lib/veilid_state.dart @@ -51,6 +51,10 @@ class LatencyStats with _$LatencyStats { required TimestampDuration fastest, required TimestampDuration average, required TimestampDuration slowest, + required TimestampDuration tm90, + required TimestampDuration tm75, + required TimestampDuration p90, + required TimestampDuration p75, }) = _LatencyStats; factory LatencyStats.fromJson(dynamic json) => @@ -152,9 +156,11 @@ class RPCStats with _$RPCStats { required Timestamp? lastQuestionTs, required Timestamp? lastSeenTs, required Timestamp? firstConsecutiveSeenTs, - required int recentLostAnswers, + required int recentLostAnswersUnordered, + required int recentLostAnswersOrdered, required int failedToSend, - required AnswerStats answer, + required AnswerStats answerUnordered, + required AnswerStats answerOrdered, }) = _RPCStats; factory RPCStats.fromJson(dynamic json) => diff --git a/veilid-flutter/lib/veilid_state.freezed.dart b/veilid-flutter/lib/veilid_state.freezed.dart index c1bcfbcb..dc920883 100644 --- a/veilid-flutter/lib/veilid_state.freezed.dart +++ b/veilid-flutter/lib/veilid_state.freezed.dart @@ -23,6 +23,10 @@ mixin _$LatencyStats { TimestampDuration get fastest => throw _privateConstructorUsedError; TimestampDuration get average => throw _privateConstructorUsedError; TimestampDuration get slowest => throw _privateConstructorUsedError; + TimestampDuration get tm90 => throw _privateConstructorUsedError; + TimestampDuration get tm75 => throw _privateConstructorUsedError; + TimestampDuration get p90 => throw _privateConstructorUsedError; + TimestampDuration get p75 => throw _privateConstructorUsedError; /// Serializes this LatencyStats to a JSON map. Map toJson() => throw _privateConstructorUsedError; @@ -43,7 +47,11 @@ abstract class $LatencyStatsCopyWith<$Res> { $Res call( {TimestampDuration fastest, TimestampDuration average, - TimestampDuration slowest}); + TimestampDuration slowest, + TimestampDuration tm90, + TimestampDuration tm75, + TimestampDuration p90, + TimestampDuration p75}); } /// @nodoc @@ -64,6 +72,10 @@ class _$LatencyStatsCopyWithImpl<$Res, $Val extends LatencyStats> Object? fastest = null, Object? average = null, Object? slowest = null, + Object? tm90 = null, + Object? tm75 = null, + Object? p90 = null, + Object? p75 = null, }) { return _then(_value.copyWith( fastest: null == fastest @@ -78,6 +90,22 @@ class _$LatencyStatsCopyWithImpl<$Res, $Val extends LatencyStats> ? _value.slowest : slowest // ignore: cast_nullable_to_non_nullable as TimestampDuration, + tm90: null == tm90 + ? _value.tm90 + : tm90 // ignore: cast_nullable_to_non_nullable + as TimestampDuration, + tm75: null == tm75 + ? _value.tm75 + : tm75 // ignore: cast_nullable_to_non_nullable + as TimestampDuration, + p90: null == p90 + ? _value.p90 + : p90 // ignore: cast_nullable_to_non_nullable + as TimestampDuration, + p75: null == p75 + ? _value.p75 + : p75 // ignore: cast_nullable_to_non_nullable + as TimestampDuration, ) as $Val); } } @@ -93,7 +121,11 @@ abstract class _$$LatencyStatsImplCopyWith<$Res> $Res call( {TimestampDuration fastest, TimestampDuration average, - TimestampDuration slowest}); + TimestampDuration slowest, + TimestampDuration tm90, + TimestampDuration tm75, + TimestampDuration p90, + TimestampDuration p75}); } /// @nodoc @@ -112,6 +144,10 @@ class __$$LatencyStatsImplCopyWithImpl<$Res> Object? fastest = null, Object? average = null, Object? slowest = null, + Object? tm90 = null, + Object? tm75 = null, + Object? p90 = null, + Object? p75 = null, }) { return _then(_$LatencyStatsImpl( fastest: null == fastest @@ -126,6 +162,22 @@ class __$$LatencyStatsImplCopyWithImpl<$Res> ? _value.slowest : slowest // ignore: cast_nullable_to_non_nullable as TimestampDuration, + tm90: null == tm90 + ? _value.tm90 + : tm90 // ignore: cast_nullable_to_non_nullable + as TimestampDuration, + tm75: null == tm75 + ? _value.tm75 + : tm75 // ignore: cast_nullable_to_non_nullable + as TimestampDuration, + p90: null == p90 + ? _value.p90 + : p90 // ignore: cast_nullable_to_non_nullable + as TimestampDuration, + p75: null == p75 + ? _value.p75 + : p75 // ignore: cast_nullable_to_non_nullable + as TimestampDuration, )); } } @@ -134,7 +186,13 @@ class __$$LatencyStatsImplCopyWithImpl<$Res> @JsonSerializable() class _$LatencyStatsImpl implements _LatencyStats { const _$LatencyStatsImpl( - {required this.fastest, required this.average, required this.slowest}); + {required this.fastest, + required this.average, + required this.slowest, + required this.tm90, + required this.tm75, + required this.p90, + required this.p75}); factory _$LatencyStatsImpl.fromJson(Map json) => _$$LatencyStatsImplFromJson(json); @@ -145,10 +203,18 @@ class _$LatencyStatsImpl implements _LatencyStats { final TimestampDuration average; @override final TimestampDuration slowest; + @override + final TimestampDuration tm90; + @override + final TimestampDuration tm75; + @override + final TimestampDuration p90; + @override + final TimestampDuration p75; @override String toString() { - return 'LatencyStats(fastest: $fastest, average: $average, slowest: $slowest)'; + return 'LatencyStats(fastest: $fastest, average: $average, slowest: $slowest, tm90: $tm90, tm75: $tm75, p90: $p90, p75: $p75)'; } @override @@ -158,12 +224,17 @@ class _$LatencyStatsImpl implements _LatencyStats { other is _$LatencyStatsImpl && (identical(other.fastest, fastest) || other.fastest == fastest) && (identical(other.average, average) || other.average == average) && - (identical(other.slowest, slowest) || other.slowest == slowest)); + (identical(other.slowest, slowest) || other.slowest == slowest) && + (identical(other.tm90, tm90) || other.tm90 == tm90) && + (identical(other.tm75, tm75) || other.tm75 == tm75) && + (identical(other.p90, p90) || other.p90 == p90) && + (identical(other.p75, p75) || other.p75 == p75)); } @JsonKey(includeFromJson: false, includeToJson: false) @override - int get hashCode => Object.hash(runtimeType, fastest, average, slowest); + int get hashCode => + Object.hash(runtimeType, fastest, average, slowest, tm90, tm75, p90, p75); /// Create a copy of LatencyStats /// with the given fields replaced by the non-null parameter values. @@ -185,7 +256,11 @@ abstract class _LatencyStats implements LatencyStats { const factory _LatencyStats( {required final TimestampDuration fastest, required final TimestampDuration average, - required final TimestampDuration slowest}) = _$LatencyStatsImpl; + required final TimestampDuration slowest, + required final TimestampDuration tm90, + required final TimestampDuration tm75, + required final TimestampDuration p90, + required final TimestampDuration p75}) = _$LatencyStatsImpl; factory _LatencyStats.fromJson(Map json) = _$LatencyStatsImpl.fromJson; @@ -196,6 +271,14 @@ abstract class _LatencyStats implements LatencyStats { TimestampDuration get average; @override TimestampDuration get slowest; + @override + TimestampDuration get tm90; + @override + TimestampDuration get tm75; + @override + TimestampDuration get p90; + @override + TimestampDuration get p75; /// Create a copy of LatencyStats /// with the given fields replaced by the non-null parameter values. @@ -1545,9 +1628,11 @@ mixin _$RPCStats { Timestamp? get lastQuestionTs => throw _privateConstructorUsedError; Timestamp? get lastSeenTs => throw _privateConstructorUsedError; Timestamp? get firstConsecutiveSeenTs => throw _privateConstructorUsedError; - int get recentLostAnswers => throw _privateConstructorUsedError; + int get recentLostAnswersUnordered => throw _privateConstructorUsedError; + int get recentLostAnswersOrdered => throw _privateConstructorUsedError; int get failedToSend => throw _privateConstructorUsedError; - AnswerStats get answer => throw _privateConstructorUsedError; + AnswerStats get answerUnordered => throw _privateConstructorUsedError; + AnswerStats get answerOrdered => throw _privateConstructorUsedError; /// Serializes this RPCStats to a JSON map. Map toJson() => throw _privateConstructorUsedError; @@ -1571,11 +1656,14 @@ abstract class $RPCStatsCopyWith<$Res> { Timestamp? lastQuestionTs, Timestamp? lastSeenTs, Timestamp? firstConsecutiveSeenTs, - int recentLostAnswers, + int recentLostAnswersUnordered, + int recentLostAnswersOrdered, int failedToSend, - AnswerStats answer}); + AnswerStats answerUnordered, + AnswerStats answerOrdered}); - $AnswerStatsCopyWith<$Res> get answer; + $AnswerStatsCopyWith<$Res> get answerUnordered; + $AnswerStatsCopyWith<$Res> get answerOrdered; } /// @nodoc @@ -1599,9 +1687,11 @@ class _$RPCStatsCopyWithImpl<$Res, $Val extends RPCStats> Object? lastQuestionTs = freezed, Object? lastSeenTs = freezed, Object? firstConsecutiveSeenTs = freezed, - Object? recentLostAnswers = null, + Object? recentLostAnswersUnordered = null, + Object? recentLostAnswersOrdered = null, Object? failedToSend = null, - Object? answer = null, + Object? answerUnordered = null, + Object? answerOrdered = null, }) { return _then(_value.copyWith( messagesSent: null == messagesSent @@ -1628,17 +1718,25 @@ class _$RPCStatsCopyWithImpl<$Res, $Val extends RPCStats> ? _value.firstConsecutiveSeenTs : firstConsecutiveSeenTs // ignore: cast_nullable_to_non_nullable as Timestamp?, - recentLostAnswers: null == recentLostAnswers - ? _value.recentLostAnswers - : recentLostAnswers // ignore: cast_nullable_to_non_nullable + recentLostAnswersUnordered: null == recentLostAnswersUnordered + ? _value.recentLostAnswersUnordered + : recentLostAnswersUnordered // ignore: cast_nullable_to_non_nullable + as int, + recentLostAnswersOrdered: null == recentLostAnswersOrdered + ? _value.recentLostAnswersOrdered + : recentLostAnswersOrdered // ignore: cast_nullable_to_non_nullable as int, failedToSend: null == failedToSend ? _value.failedToSend : failedToSend // ignore: cast_nullable_to_non_nullable as int, - answer: null == answer - ? _value.answer - : answer // ignore: cast_nullable_to_non_nullable + answerUnordered: null == answerUnordered + ? _value.answerUnordered + : answerUnordered // ignore: cast_nullable_to_non_nullable + as AnswerStats, + answerOrdered: null == answerOrdered + ? _value.answerOrdered + : answerOrdered // ignore: cast_nullable_to_non_nullable as AnswerStats, ) as $Val); } @@ -1647,9 +1745,19 @@ class _$RPCStatsCopyWithImpl<$Res, $Val extends RPCStats> /// with the given fields replaced by the non-null parameter values. @override @pragma('vm:prefer-inline') - $AnswerStatsCopyWith<$Res> get answer { - return $AnswerStatsCopyWith<$Res>(_value.answer, (value) { - return _then(_value.copyWith(answer: value) as $Val); + $AnswerStatsCopyWith<$Res> get answerUnordered { + return $AnswerStatsCopyWith<$Res>(_value.answerUnordered, (value) { + return _then(_value.copyWith(answerUnordered: value) as $Val); + }); + } + + /// Create a copy of RPCStats + /// with the given fields replaced by the non-null parameter values. + @override + @pragma('vm:prefer-inline') + $AnswerStatsCopyWith<$Res> get answerOrdered { + return $AnswerStatsCopyWith<$Res>(_value.answerOrdered, (value) { + return _then(_value.copyWith(answerOrdered: value) as $Val); }); } } @@ -1669,12 +1777,16 @@ abstract class _$$RPCStatsImplCopyWith<$Res> Timestamp? lastQuestionTs, Timestamp? lastSeenTs, Timestamp? firstConsecutiveSeenTs, - int recentLostAnswers, + int recentLostAnswersUnordered, + int recentLostAnswersOrdered, int failedToSend, - AnswerStats answer}); + AnswerStats answerUnordered, + AnswerStats answerOrdered}); @override - $AnswerStatsCopyWith<$Res> get answer; + $AnswerStatsCopyWith<$Res> get answerUnordered; + @override + $AnswerStatsCopyWith<$Res> get answerOrdered; } /// @nodoc @@ -1696,9 +1808,11 @@ class __$$RPCStatsImplCopyWithImpl<$Res> Object? lastQuestionTs = freezed, Object? lastSeenTs = freezed, Object? firstConsecutiveSeenTs = freezed, - Object? recentLostAnswers = null, + Object? recentLostAnswersUnordered = null, + Object? recentLostAnswersOrdered = null, Object? failedToSend = null, - Object? answer = null, + Object? answerUnordered = null, + Object? answerOrdered = null, }) { return _then(_$RPCStatsImpl( messagesSent: null == messagesSent @@ -1725,17 +1839,25 @@ class __$$RPCStatsImplCopyWithImpl<$Res> ? _value.firstConsecutiveSeenTs : firstConsecutiveSeenTs // ignore: cast_nullable_to_non_nullable as Timestamp?, - recentLostAnswers: null == recentLostAnswers - ? _value.recentLostAnswers - : recentLostAnswers // ignore: cast_nullable_to_non_nullable + recentLostAnswersUnordered: null == recentLostAnswersUnordered + ? _value.recentLostAnswersUnordered + : recentLostAnswersUnordered // ignore: cast_nullable_to_non_nullable + as int, + recentLostAnswersOrdered: null == recentLostAnswersOrdered + ? _value.recentLostAnswersOrdered + : recentLostAnswersOrdered // ignore: cast_nullable_to_non_nullable as int, failedToSend: null == failedToSend ? _value.failedToSend : failedToSend // ignore: cast_nullable_to_non_nullable as int, - answer: null == answer - ? _value.answer - : answer // ignore: cast_nullable_to_non_nullable + answerUnordered: null == answerUnordered + ? _value.answerUnordered + : answerUnordered // ignore: cast_nullable_to_non_nullable + as AnswerStats, + answerOrdered: null == answerOrdered + ? _value.answerOrdered + : answerOrdered // ignore: cast_nullable_to_non_nullable as AnswerStats, )); } @@ -1751,9 +1873,11 @@ class _$RPCStatsImpl implements _RPCStats { required this.lastQuestionTs, required this.lastSeenTs, required this.firstConsecutiveSeenTs, - required this.recentLostAnswers, + required this.recentLostAnswersUnordered, + required this.recentLostAnswersOrdered, required this.failedToSend, - required this.answer}); + required this.answerUnordered, + required this.answerOrdered}); factory _$RPCStatsImpl.fromJson(Map json) => _$$RPCStatsImplFromJson(json); @@ -1771,15 +1895,19 @@ class _$RPCStatsImpl implements _RPCStats { @override final Timestamp? firstConsecutiveSeenTs; @override - final int recentLostAnswers; + final int recentLostAnswersUnordered; + @override + final int recentLostAnswersOrdered; @override final int failedToSend; @override - final AnswerStats answer; + final AnswerStats answerUnordered; + @override + final AnswerStats answerOrdered; @override String toString() { - return 'RPCStats(messagesSent: $messagesSent, messagesRcvd: $messagesRcvd, questionsInFlight: $questionsInFlight, lastQuestionTs: $lastQuestionTs, lastSeenTs: $lastSeenTs, firstConsecutiveSeenTs: $firstConsecutiveSeenTs, recentLostAnswers: $recentLostAnswers, failedToSend: $failedToSend, answer: $answer)'; + return 'RPCStats(messagesSent: $messagesSent, messagesRcvd: $messagesRcvd, questionsInFlight: $questionsInFlight, lastQuestionTs: $lastQuestionTs, lastSeenTs: $lastSeenTs, firstConsecutiveSeenTs: $firstConsecutiveSeenTs, recentLostAnswersUnordered: $recentLostAnswersUnordered, recentLostAnswersOrdered: $recentLostAnswersOrdered, failedToSend: $failedToSend, answerUnordered: $answerUnordered, answerOrdered: $answerOrdered)'; } @override @@ -1799,11 +1927,19 @@ class _$RPCStatsImpl implements _RPCStats { other.lastSeenTs == lastSeenTs) && (identical(other.firstConsecutiveSeenTs, firstConsecutiveSeenTs) || other.firstConsecutiveSeenTs == firstConsecutiveSeenTs) && - (identical(other.recentLostAnswers, recentLostAnswers) || - other.recentLostAnswers == recentLostAnswers) && + (identical(other.recentLostAnswersUnordered, + recentLostAnswersUnordered) || + other.recentLostAnswersUnordered == + recentLostAnswersUnordered) && + (identical( + other.recentLostAnswersOrdered, recentLostAnswersOrdered) || + other.recentLostAnswersOrdered == recentLostAnswersOrdered) && (identical(other.failedToSend, failedToSend) || other.failedToSend == failedToSend) && - (identical(other.answer, answer) || other.answer == answer)); + (identical(other.answerUnordered, answerUnordered) || + other.answerUnordered == answerUnordered) && + (identical(other.answerOrdered, answerOrdered) || + other.answerOrdered == answerOrdered)); } @JsonKey(includeFromJson: false, includeToJson: false) @@ -1816,9 +1952,11 @@ class _$RPCStatsImpl implements _RPCStats { lastQuestionTs, lastSeenTs, firstConsecutiveSeenTs, - recentLostAnswers, + recentLostAnswersUnordered, + recentLostAnswersOrdered, failedToSend, - answer); + answerUnordered, + answerOrdered); /// Create a copy of RPCStats /// with the given fields replaced by the non-null parameter values. @@ -1844,9 +1982,11 @@ abstract class _RPCStats implements RPCStats { required final Timestamp? lastQuestionTs, required final Timestamp? lastSeenTs, required final Timestamp? firstConsecutiveSeenTs, - required final int recentLostAnswers, + required final int recentLostAnswersUnordered, + required final int recentLostAnswersOrdered, required final int failedToSend, - required final AnswerStats answer}) = _$RPCStatsImpl; + required final AnswerStats answerUnordered, + required final AnswerStats answerOrdered}) = _$RPCStatsImpl; factory _RPCStats.fromJson(Map json) = _$RPCStatsImpl.fromJson; @@ -1864,11 +2004,15 @@ abstract class _RPCStats implements RPCStats { @override Timestamp? get firstConsecutiveSeenTs; @override - int get recentLostAnswers; + int get recentLostAnswersUnordered; + @override + int get recentLostAnswersOrdered; @override int get failedToSend; @override - AnswerStats get answer; + AnswerStats get answerUnordered; + @override + AnswerStats get answerOrdered; /// Create a copy of RPCStats /// with the given fields replaced by the non-null parameter values. diff --git a/veilid-flutter/lib/veilid_state.g.dart b/veilid-flutter/lib/veilid_state.g.dart index cf202e98..261357d0 100644 --- a/veilid-flutter/lib/veilid_state.g.dart +++ b/veilid-flutter/lib/veilid_state.g.dart @@ -11,6 +11,10 @@ _$LatencyStatsImpl _$$LatencyStatsImplFromJson(Map json) => fastest: TimestampDuration.fromJson(json['fastest']), average: TimestampDuration.fromJson(json['average']), slowest: TimestampDuration.fromJson(json['slowest']), + tm90: TimestampDuration.fromJson(json['tm90']), + tm75: TimestampDuration.fromJson(json['tm75']), + p90: TimestampDuration.fromJson(json['p90']), + p75: TimestampDuration.fromJson(json['p75']), ); Map _$$LatencyStatsImplToJson(_$LatencyStatsImpl instance) => @@ -18,6 +22,10 @@ Map _$$LatencyStatsImplToJson(_$LatencyStatsImpl instance) => 'fastest': instance.fastest.toJson(), 'average': instance.average.toJson(), 'slowest': instance.slowest.toJson(), + 'tm90': instance.tm90.toJson(), + 'tm75': instance.tm75.toJson(), + 'p90': instance.p90.toJson(), + 'p75': instance.p75.toJson(), }; _$TransferStatsImpl _$$TransferStatsImplFromJson(Map json) => @@ -148,9 +156,13 @@ _$RPCStatsImpl _$$RPCStatsImplFromJson(Map json) => firstConsecutiveSeenTs: json['first_consecutive_seen_ts'] == null ? null : Timestamp.fromJson(json['first_consecutive_seen_ts']), - recentLostAnswers: (json['recent_lost_answers'] as num).toInt(), + recentLostAnswersUnordered: + (json['recent_lost_answers_unordered'] as num).toInt(), + recentLostAnswersOrdered: + (json['recent_lost_answers_ordered'] as num).toInt(), failedToSend: (json['failed_to_send'] as num).toInt(), - answer: AnswerStats.fromJson(json['answer']), + answerUnordered: AnswerStats.fromJson(json['answer_unordered']), + answerOrdered: AnswerStats.fromJson(json['answer_ordered']), ); Map _$$RPCStatsImplToJson(_$RPCStatsImpl instance) => @@ -161,9 +173,11 @@ Map _$$RPCStatsImplToJson(_$RPCStatsImpl instance) => 'last_question_ts': instance.lastQuestionTs?.toJson(), 'last_seen_ts': instance.lastSeenTs?.toJson(), 'first_consecutive_seen_ts': instance.firstConsecutiveSeenTs?.toJson(), - 'recent_lost_answers': instance.recentLostAnswers, + 'recent_lost_answers_unordered': instance.recentLostAnswersUnordered, + 'recent_lost_answers_ordered': instance.recentLostAnswersOrdered, 'failed_to_send': instance.failedToSend, - 'answer': instance.answer.toJson(), + 'answer_unordered': instance.answerUnordered.toJson(), + 'answer_ordered': instance.answerOrdered.toJson(), }; _$PeerStatsImpl _$$PeerStatsImplFromJson(Map json) => diff --git a/veilid-python/veilid/schema/RecvMessage.json b/veilid-python/veilid/schema/RecvMessage.json index b0a3d5d3..47da68d6 100644 --- a/veilid-python/veilid/schema/RecvMessage.json +++ b/veilid-python/veilid/schema/RecvMessage.json @@ -3110,9 +3110,29 @@ "description": "fastest latency in the ROLLING_LATENCIES_SIZE last latencies", "type": "string" }, + "p75": { + "description": "p75 latency in the ROLLING_LATENCIES_SIZE", + "default": "0", + "type": "string" + }, + "p90": { + "description": "p90 latency in the ROLLING_LATENCIES_SIZE", + "default": "0", + "type": "string" + }, "slowest": { "description": "slowest latency in the ROLLING_LATENCIES_SIZE last latencies", "type": "string" + }, + "tm75": { + "description": "trimmed mean with lowest 75% latency in the ROLLING_LATENCIES_SIZE", + "default": "0", + "type": "string" + }, + "tm90": { + "description": "trimmed mean with lowest 90% latency in the ROLLING_LATENCIES_SIZE", + "default": "0", + "type": "string" } } }, @@ -3153,7 +3173,19 @@ "rpc_stats": { "description": "information about RPCs", "default": { - "answer": { + "answer_ordered": { + "answers": 0, + "consecutive_answers_average": 0, + "consecutive_answers_maximum": 0, + "consecutive_answers_minimum": 0, + "consecutive_lost_answers_average": 0, + "consecutive_lost_answers_maximum": 0, + "consecutive_lost_answers_minimum": 0, + "lost_answers": 0, + "questions": 0, + "span": "0" + }, + "answer_unordered": { "answers": 0, "consecutive_answers_average": 0, "consecutive_answers_maximum": 0, @@ -3172,7 +3204,8 @@ "messages_rcvd": 0, "messages_sent": 0, "questions_in_flight": 0, - "recent_lost_answers": 0 + "recent_lost_answers_ordered": 0, + "recent_lost_answers_unordered": 0 }, "allOf": [ { @@ -3269,12 +3302,31 @@ "failed_to_send", "messages_rcvd", "messages_sent", - "questions_in_flight", - "recent_lost_answers" + "questions_in_flight" ], "properties": { - "answer": { - "description": "rpc answer stats for this peer", + "answer_ordered": { + "description": "ordered rpc answer stats for this peer", + "default": { + "answers": 0, + "consecutive_answers_average": 0, + "consecutive_answers_maximum": 0, + "consecutive_answers_minimum": 0, + "consecutive_lost_answers_average": 0, + "consecutive_lost_answers_maximum": 0, + "consecutive_lost_answers_minimum": 0, + "lost_answers": 0, + "questions": 0, + "span": "0" + }, + "allOf": [ + { + "$ref": "#/definitions/AnswerStats" + } + ] + }, + "answer_unordered": { + "description": "unordered rpc answer stats for this peer", "default": { "answers": 0, "consecutive_answers_average": 0, @@ -3338,8 +3390,16 @@ "format": "uint32", "minimum": 0.0 }, - "recent_lost_answers": { - "description": "number of answers that have been lost consecutively", + "recent_lost_answers_ordered": { + "description": "number of answers that have been lost consecutively over an ordered channel", + "default": 0, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + }, + "recent_lost_answers_unordered": { + "description": "number of answers that have been lost consecutively over an unordered channel", + "default": 0, "type": "integer", "format": "uint32", "minimum": 0.0 diff --git a/veilid-python/veilid/state.py b/veilid-python/veilid/state.py index 4d28fa31..becfd29e 100644 --- a/veilid-python/veilid/state.py +++ b/veilid-python/veilid/state.py @@ -121,9 +121,11 @@ class RPCStats: last_question_ts: Optional[Timestamp] last_seen_ts: Optional[Timestamp] first_consecutive_seen_ts: Optional[Timestamp] - recent_lost_answers: int + recent_lost_answers_unordered: int + recent_lost_answers_ordered: int failed_to_send: int - answer: AnswerStats + answer_unordered: AnswerStats + answer_ordered: AnswerStats def __init__( self, @@ -133,9 +135,11 @@ class RPCStats: last_question_ts: Optional[Timestamp], last_seen_ts: Optional[Timestamp], first_consecutive_seen_ts: Optional[Timestamp], - recent_lost_answers: int, + recent_lost_answers_unordered: int, + recent_lost_answers_ordered: int, failed_to_send: int, - answer: AnswerStats, + answer_unordered: AnswerStats, + answer_ordered: AnswerStats, ): self.messages_sent = messages_sent self.messages_rcvd = messages_rcvd @@ -143,9 +147,11 @@ class RPCStats: self.last_question_ts = last_question_ts self.last_seen_ts = last_seen_ts self.first_consecutive_seen_ts = first_consecutive_seen_ts - self.recent_lost_answers = recent_lost_answers + self.recent_lost_answers_unordered = recent_lost_answers_unordered + self.recent_lost_answers_ordered = recent_lost_answers_ordered self.failed_to_send = failed_to_send - self.answer = answer + self.answer_unordered = answer_unordered + self.answer_ordered = answer_ordered @classmethod def from_json(cls, j: dict) -> Self: @@ -159,9 +165,11 @@ class RPCStats: None if j["first_consecutive_seen_ts"] is None else Timestamp(j["first_consecutive_seen_ts"]), - j["recent_lost_answers"], + j["recent_lost_answers_unordered"], + j["recent_lost_answers_ordered"], j["failed_to_send"], - AnswerStats.from_json(j["answer"]), + AnswerStats.from_json(j["answer_unordered"]), + AnswerStats.from_json(j["answer_ordered"]), ) @@ -169,16 +177,28 @@ class LatencyStats: fastest: TimestampDuration average: TimestampDuration slowest: TimestampDuration + tm90: TimestampDuration + tm75: TimestampDuration + p90: TimestampDuration + p75: TimestampDuration def __init__( self, fastest: TimestampDuration, average: TimestampDuration, slowest: TimestampDuration, + tm90: TimestampDuration, + tm75: TimestampDuration, + p90: TimestampDuration, + p75: TimestampDuration, ): self.fastest = fastest self.average = average self.slowest = slowest + self.tm90 = tm90 + self.tm75 = tm75 + self.p90 = p90 + self.p75 = p75 @classmethod def from_json(cls, j: dict) -> Self: @@ -187,6 +207,10 @@ class LatencyStats: TimestampDuration(j["fastest"]), TimestampDuration(j["average"]), TimestampDuration(j["slowest"]), + TimestampDuration(j["tm90"]), + TimestampDuration(j["tm75"]), + TimestampDuration(j["p90"]), + TimestampDuration(j["p75"]), ) diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index 9182b557..820cfb66 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -242,7 +242,7 @@ pub async fn run_veilid_server( // Run all subnodes let mut all_subnodes_jh = vec![]; - for subnode in subnode_index..(subnode_index + subnode_count) { + for subnode in subnode_index..=(subnode_index + subnode_count - 1) { debug!("Spawning subnode {}", subnode); let jh = spawn( &format!("subnode{}", subnode), @@ -254,7 +254,7 @@ pub async fn run_veilid_server( // Wait for all subnodes to complete for (sn, jh) in all_subnodes_jh.into_iter().enumerate() { jh.await?; - debug!("Subnode {} exited", sn); + debug!("Subnode {} exited", (sn as u16) + subnode_index); } // Finally, drop logs diff --git a/veilid-tools/tests/web.rs b/veilid-tools/tests/web.rs index 8d4a68ea..26de66e7 100644 --- a/veilid-tools/tests/web.rs +++ b/veilid-tools/tests/web.rs @@ -19,11 +19,11 @@ pub fn setup() -> () { console_error_panic_hook::set_once(); cfg_if! { if #[cfg(feature = "tracing")] { - let mut builder = tracing_wasm::WASMLayerConfigBuilder::new(); - builder.set_report_logs_in_timings(false); - builder.set_max_level(Level::TRACE); - builder.set_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor); - tracing_wasm::set_as_global_default_with_config(builder.build()); + let config = veilid_tracing_wasm::WASMLayerConfig::new() + .with_report_logs_in_timings(false); + .with_max_level(Level::TRACE); + .with_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor); + tracing_wasm::set_as_global_default_with_config(config); } else { wasm_logger::init(wasm_logger::Config::default()); } diff --git a/veilid-wasm/Cargo.toml b/veilid-wasm/Cargo.toml index 9a21ed4c..f032e59e 100644 --- a/veilid-wasm/Cargo.toml +++ b/veilid-wasm/Cargo.toml @@ -23,7 +23,7 @@ crypto-test = ["veilid-core/crypto-test"] veilid-core = { version = "0.4.3", path = "../veilid-core", default-features = false } tracing = { version = "^0", features = ["log", "attributes"] } -tracing-wasm = "^0" +veilid-tracing-wasm = "^0" tracing-subscriber = "^0" wasm-bindgen = { version = "^0", features = ["serde-serialize"] } diff --git a/veilid-wasm/src/lib.rs b/veilid-wasm/src/lib.rs index c7cc3c79..595bc65e 100644 --- a/veilid-wasm/src/lib.rs +++ b/veilid-wasm/src/lib.rs @@ -20,10 +20,10 @@ use send_wrapper::*; use serde::*; use tracing_subscriber::prelude::*; use tracing_subscriber::*; -use tracing_wasm::{WASMLayerConfigBuilder, *}; use tsify::*; use veilid_core::*; use veilid_core::{tools::*, VeilidAPIError}; +use veilid_tracing_wasm::*; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::*; @@ -220,14 +220,14 @@ pub fn initialize_veilid_core(platform_config: String) { None, ); let layer = WASMLayer::new( - WASMLayerConfigBuilder::new() - .set_report_logs_in_timings(platform_config.logging.performance.logs_in_timings) - .set_console_config(if platform_config.logging.performance.logs_in_console { + WASMLayerConfig::new() + .with_report_logs_in_timings(platform_config.logging.performance.logs_in_timings) + .with_console_config(if platform_config.logging.performance.logs_in_console { ConsoleConfig::ReportWithConsoleColor } else { ConsoleConfig::NoReporting }) - .build(), + .with_field_filter(Some(Arc::new(|k| k != veilid_core::VEILID_LOG_KEY_FIELD))), ) .with_filter(filter.clone()); filters.insert("performance", filter); diff --git a/veilid-wasm/src/veilid_client_js.rs b/veilid-wasm/src/veilid_client_js.rs index 40a62586..e2c70d47 100644 --- a/veilid-wasm/src/veilid_client_js.rs +++ b/veilid-wasm/src/veilid_client_js.rs @@ -49,14 +49,14 @@ impl VeilidClient { None, ); let layer = WASMLayer::new( - WASMLayerConfigBuilder::new() - .set_report_logs_in_timings(platformConfig.logging.performance.logs_in_timings) - .set_console_config(if platformConfig.logging.performance.logs_in_console { + WASMLayerConfig::new() + .with_report_logs_in_timings(platformConfig.logging.performance.logs_in_timings) + .with_console_config(if platformConfig.logging.performance.logs_in_console { ConsoleConfig::ReportWithConsoleColor } else { ConsoleConfig::NoReporting }) - .build(), + .with_field_filter(Some(Arc::new(|k| k != veilid_core::VEILID_LOG_KEY_FIELD))), ) .with_filter(filter.clone()); filters.insert("performance", filter); diff --git a/veilid-wasm/tests/package-lock.json b/veilid-wasm/tests/package-lock.json index e8975760..b84c4f92 100644 --- a/veilid-wasm/tests/package-lock.json +++ b/veilid-wasm/tests/package-lock.json @@ -21,7 +21,7 @@ }, "../pkg": { "name": "veilid-wasm", - "version": "0.4.1", + "version": "0.4.3", "dev": true, "license": "MPL-2.0" },