diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index d8b72852..a4125dcf 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -83,7 +83,7 @@ directories = "5.0.1" # Logging tracing = { version = "0.1.40", features = ["log", "attributes"] } -tracing-subscriber = "0.3.18" +tracing-subscriber = "0.3.19" tracing-error = "0.2.0" eyre = "0.6.12" thiserror = "1.0.63" diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index 503264ee..04e74320 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -50,6 +50,7 @@ mod logging; mod network_manager; mod routing_table; mod rpc_processor; +mod stats_accounting; mod storage_manager; mod table_store; mod veilid_api; @@ -64,6 +65,7 @@ pub use self::logging::{ DEFAULT_LOG_FACILITIES_ENABLED_LIST, DEFAULT_LOG_FACILITIES_IGNORE_LIST, DURATION_LOG_FACILITIES, FLAME_LOG_FACILITIES_IGNORE_LIST, VEILID_LOG_KEY_FIELD, }; +pub(crate) use self::stats_accounting::*; pub use self::veilid_api::*; pub use self::veilid_config::*; pub use veilid_tools as tools; diff --git a/veilid-core/src/network_manager/debug.rs b/veilid-core/src/network_manager/debug.rs new file mode 100644 index 00000000..c06a92be --- /dev/null +++ b/veilid-core/src/network_manager/debug.rs @@ -0,0 +1,37 @@ +use super::*; + +impl NetworkManager { + pub fn debug_info_nodeinfo(&self) -> String { + let mut out = String::new(); + let inner = self.inner.lock(); + out += &format!( + "Relay Worker Deque Latency:\n{}", + indent_all_string(&inner.stats.relay_worker_dequeue_latency) + ); + out += "\n"; + out += &format!( + "Relay Worker Process Latency:\n{}", + indent_all_string(&inner.stats.relay_worker_process_latency) + ); + out + } + + pub fn debug(&self) -> String { + let stats = self.get_stats(); + + let mut out = String::new(); + out += "Network Manager\n"; + out += "---------------\n"; + let mut out = format!( + "Transfer stats:\n{}\n", + indent_all_string(&stats.self_stats.transfer_stats) + ); + out += &self.debug_info_nodeinfo(); + + out += "Node Contact Method Cache\n"; + out += "-------------------------\n"; + out += &self.inner.lock().node_contact_method_cache.debug(); + + out + } +} diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 968e3e42..c9464395 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -10,10 +10,12 @@ mod address_filter; mod connection_handle; mod connection_manager; mod connection_table; +mod debug; mod direct_boot; mod network_connection; mod node_contact_method_cache; mod receipt_manager; +mod relay_worker; mod send_data; mod stats; mod tasks; @@ -26,9 +28,10 @@ pub mod tests; pub use connection_manager::*; pub use network_connection::*; -pub(crate) use node_contact_method_cache::*; pub use receipt_manager::*; pub use stats::*; + +pub(crate) use node_contact_method_cache::*; pub(crate) use types::*; //////////////////////////////////////////////////////////////////////////////////////// @@ -42,6 +45,7 @@ use hashlink::LruCache; use native::*; #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] pub use native::{MAX_CAPABILITIES, PUBLIC_INTERNET_CAPABILITIES}; +use relay_worker::*; use routing_table::*; use rpc_processor::*; #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] @@ -60,6 +64,7 @@ pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration = pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60; pub const BOOT_MAGIC: &[u8; 4] = b"BOOT"; pub const HOLE_PUNCH_DELAY_MS: u32 = 100; +pub const RELAY_WORKERS_PER_CORE: u32 = 16; // Things we get when we start up and go away when we shut down // Routing table is not in here because we want it to survive a network shutdown/startup restart @@ -171,7 +176,6 @@ impl Default for NetworkManagerStartupContext { Self::new() } } - // The mutable state of the network manager #[derive(Debug)] struct NetworkManagerInner { @@ -181,6 +185,11 @@ struct NetworkManagerInner { address_check: Option, peer_info_change_subscription: Option, socket_address_change_subscription: Option, + + // Relay workers + relay_stop_source: Option, + relay_send_channel: Option>, + relay_worker_join_handles: Vec>, } pub(crate) struct NetworkManager { @@ -202,6 +211,10 @@ pub(crate) struct NetworkManager { // Startup context startup_context: NetworkManagerStartupContext, + + // Relay workers config + concurrency: u32, + queue_size: u32, } impl_veilid_component!(NetworkManager); @@ -214,6 +227,8 @@ impl fmt::Debug for NetworkManager { .field("address_filter", &self.address_filter) .field("network_key", &self.network_key) .field("startup_context", &self.startup_context) + .field("concurrency", &self.concurrency) + .field("queue_size", &self.queue_size) .finish() } } @@ -227,6 +242,10 @@ impl NetworkManager { address_check: None, peer_info_change_subscription: None, socket_address_change_subscription: None, + // + relay_send_channel: None, + relay_stop_source: None, + relay_worker_join_handles: Vec::new(), } } @@ -264,6 +283,26 @@ impl NetworkManager { network_key }; + // make local copy of node id for easy access + let (concurrency, queue_size) = { + let config = registry.config(); + let c = config.get(); + + // set up channel + let mut concurrency = c.network.rpc.concurrency; + let queue_size = c.network.rpc.queue_size; + if concurrency == 0 { + concurrency = get_concurrency(); + if concurrency == 0 { + concurrency = 1; + } + + // Default relay concurrency is the number of CPUs * 16 relay workers per core + concurrency *= RELAY_WORKERS_PER_CORE; + } + (concurrency, queue_size) + }; + let inner = Self::new_inner(); let address_filter = AddressFilter::new(registry.clone()); @@ -282,6 +321,8 @@ impl NetworkManager { ), network_key, startup_context, + concurrency, + queue_size, }; this.setup_tasks(); @@ -360,7 +401,8 @@ impl NetworkManager { receipt_manager: receipt_manager.clone(), }); - let address_check = AddressCheck::new(net.clone()); + // Startup relay workers + self.startup_relay_workers()?; // Register event handlers let peer_info_change_subscription = @@ -371,6 +413,7 @@ impl NetworkManager { { let mut inner = self.inner.lock(); + let address_check = AddressCheck::new(net.clone()); inner.address_check = Some(address_check); inner.peer_info_change_subscription = Some(peer_info_change_subscription); inner.socket_address_change_subscription = Some(socket_address_change_subscription); @@ -426,6 +469,9 @@ impl NetworkManager { inner.address_check = None; } + // Shutdown relay workers + self.shutdown_relay_workers().await; + // Shutdown network components if they started up veilid_log!(self debug "shutting down network components"); @@ -1099,13 +1145,12 @@ impl NetworkManager { relay_nr.set_sequencing(Sequencing::EnsureOrdered); }; - // Relay the packet to the desired destination - veilid_log!(self trace "relaying {} bytes to {}", data.len(), relay_nr); - let cur_ts = Timestamp::now(); - if let Err(e) = pin_future!(self.send_data(relay_nr, data.to_vec())).await { - veilid_log!(self debug "failed to relay envelope: {}" ,e); + // Pass relay to RPC system + if let Err(e) = self.enqueue_relay(relay_nr, data.to_vec()) { + // Couldn't enqueue, but not the sender's fault + veilid_log!(self debug "failed to enqueue relay: {}", e); + return Ok(false); } - veilid_log!(self debug target:"network_result", "relay time: {}", Timestamp::now() - cur_ts); } // Inform caller that we dealt with the envelope, but did not process it locally return Ok(false); diff --git a/veilid-core/src/network_manager/relay_worker.rs b/veilid-core/src/network_manager/relay_worker.rs new file mode 100644 index 00000000..61dc7951 --- /dev/null +++ b/veilid-core/src/network_manager/relay_worker.rs @@ -0,0 +1,120 @@ +use futures_util::StreamExt as _; +use stop_token::future::FutureExt as _; + +use super::*; + +#[derive(Debug)] +pub(super) enum RelayWorkerRequestKind { + Relay { + relay_nr: FilteredNodeRef, + data: Vec, + }, +} + +#[derive(Debug)] +pub(super) struct RelayWorkerRequest { + enqueued_ts: Timestamp, + span: Span, + kind: RelayWorkerRequestKind, +} + +impl NetworkManager { + pub(super) fn startup_relay_workers(&self) -> EyreResult<()> { + let mut inner = self.inner.lock(); + + // Relay workers + let channel = flume::bounded(self.queue_size as usize); + inner.relay_send_channel = Some(channel.0.clone()); + inner.relay_stop_source = Some(StopSource::new()); + + // spin up N workers + veilid_log!(self debug "Starting {} relay workers", self.concurrency); + for task_n in 0..self.concurrency { + let registry = self.registry(); + let receiver = channel.1.clone(); + let stop_token = inner.relay_stop_source.as_ref().unwrap().token(); + let jh = spawn(&format!("relay worker {}", task_n), async move { + let this = registry.network_manager(); + Box::pin(this.relay_worker(stop_token, receiver)).await + }); + inner.relay_worker_join_handles.push(jh); + } + Ok(()) + } + + pub(super) async fn shutdown_relay_workers(&self) { + // Stop the relay workers + let mut unord = FuturesUnordered::new(); + { + let mut inner = self.inner.lock(); + // take the join handles out + for h in inner.relay_worker_join_handles.drain(..) { + unord.push(h); + } + // drop the stop + drop(inner.relay_stop_source.take()); + } + veilid_log!(self debug "Stopping {} relay workers", unord.len()); + + // Wait for them to complete + while unord.next().await.is_some() {} + } + + pub(super) async fn relay_worker( + &self, + stop_token: StopToken, + receiver: flume::Receiver, + ) { + while let Ok(Ok(request)) = receiver.recv_async().timeout_at(stop_token.clone()).await { + let relay_request_span = tracing::trace_span!("relay request"); + relay_request_span.follows_from(request.span); + + // Measure dequeue time + let dequeue_ts = Timestamp::now(); + let dequeue_latency = dequeue_ts.saturating_sub(request.enqueued_ts); + + // Process request kind + match request.kind { + RelayWorkerRequestKind::Relay { relay_nr, data } => { + // Relay the packet to the desired destination + veilid_log!(self trace "relaying {} bytes to {}", data.len(), relay_nr); + if let Err(e) = pin_future!(self.send_data(relay_nr, data.to_vec())).await { + veilid_log!(self debug "failed to relay envelope: {}" ,e); + } + } + } + + // Measure process time + let process_ts = Timestamp::now(); + let process_latency = process_ts.saturating_sub(dequeue_ts); + + // Accounting + self.stats_relay_processed(dequeue_latency, process_latency) + } + } + + #[instrument(level = "trace", target = "rpc", skip_all)] + pub(super) fn enqueue_relay(&self, relay_nr: FilteredNodeRef, data: Vec) -> EyreResult<()> { + let _guard = self + .startup_context + .startup_lock + .enter() + .wrap_err("not started up")?; + + let send_channel = { + let inner = self.inner.lock(); + let Some(send_channel) = inner.relay_send_channel.as_ref().cloned() else { + bail!("send channel is closed"); + }; + send_channel + }; + send_channel + .try_send(RelayWorkerRequest { + enqueued_ts: Timestamp::now(), + span: Span::current(), + kind: RelayWorkerRequestKind::Relay { relay_nr, data }, + }) + .map_err(|e| eyre!("failed to enqueue relay: {}", e))?; + Ok(()) + } +} diff --git a/veilid-core/src/network_manager/stats.rs b/veilid-core/src/network_manager/stats.rs index 0e897968..4cdca5ad 100644 --- a/veilid-core/src/network_manager/stats.rs +++ b/veilid-core/src/network_manager/stats.rs @@ -22,6 +22,10 @@ impl Default for PerAddressStatsKey { pub struct NetworkManagerStats { pub self_stats: PerAddressStats, pub per_address_stats: LruCache, + pub relay_worker_dequeue_latency: LatencyStats, + pub relay_worker_process_latency: LatencyStats, + pub relay_worker_dequeue_latency_accounting: LatencyStatsAccounting, + pub relay_worker_process_latency_accounting: LatencyStatsAccounting, } impl Default for NetworkManagerStats { @@ -29,6 +33,10 @@ impl Default for NetworkManagerStats { Self { self_stats: PerAddressStats::default(), per_address_stats: LruCache::new(IPADDR_TABLE_SIZE), + relay_worker_dequeue_latency: LatencyStats::default(), + relay_worker_process_latency: LatencyStats::default(), + relay_worker_dequeue_latency_accounting: LatencyStatsAccounting::new(), + relay_worker_process_latency_accounting: LatencyStatsAccounting::new(), } } } @@ -36,7 +44,7 @@ impl Default for NetworkManagerStats { impl NetworkManager { // Callbacks from low level network for statistics gathering pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) { - let inner = &mut *self.inner.lock(); + let mut inner = self.inner.lock(); inner .stats .self_stats @@ -53,7 +61,7 @@ impl NetworkManager { } pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) { - let inner = &mut *self.inner.lock(); + let mut inner = self.inner.lock(); inner .stats .self_stats @@ -69,28 +77,27 @@ impl NetworkManager { .add_down(bytes); } + pub fn stats_relay_processed( + &self, + dequeue_latency: TimestampDuration, + process_latency: TimestampDuration, + ) { + let mut inner = self.inner.lock(); + inner.stats.relay_worker_dequeue_latency = inner + .stats + .relay_worker_dequeue_latency_accounting + .record_latency(dequeue_latency); + inner.stats.relay_worker_process_latency = inner + .stats + .relay_worker_process_latency_accounting + .record_latency(process_latency); + } + pub fn get_stats(&self) -> NetworkManagerStats { let inner = self.inner.lock(); inner.stats.clone() } - pub fn debug(&self) -> String { - let stats = self.get_stats(); - - let mut out = String::new(); - out += "Network Manager\n"; - out += "---------------\n"; - let mut out = format!( - "Transfer stats:\n{}\n", - indent_all_string(&stats.self_stats.transfer_stats) - ); - out += "Node Contact Method Cache\n"; - out += "-------------------------\n"; - out += &self.inner.lock().node_contact_method_cache.debug(); - - out - } - pub fn get_veilid_state(&self) -> Box { if !self.network_is_started() { return Box::new(VeilidStateNetwork { diff --git a/veilid-core/src/routing_table/stats_accounting.rs b/veilid-core/src/routing_table/stats_accounting.rs index a10b6a61..9a8552da 100644 --- a/veilid-core/src/routing_table/stats_accounting.rs +++ b/veilid-core/src/routing_table/stats_accounting.rs @@ -1,15 +1,5 @@ use super::*; -// Latency entry is per round-trip packet (ping or data) -// - Size is number of entries -const ROLLING_LATENCIES_SIZE: usize = 50; - -// Transfers entries are in bytes total for the interval -// - Size is number of entries -// - Interval is number of seconds in each entry -const ROLLING_TRANSFERS_SIZE: usize = 10; -pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 1; - // State entry is per state reason change // - Size is number of entries const ROLLING_STATE_REASON_SPAN_SIZE: usize = 32; @@ -20,149 +10,6 @@ pub const UPDATE_STATE_STATS_INTERVAL_SECS: u32 = 1; // - Interval is number of seconds in each entry const ROLLING_ANSWERS_SIZE: usize = 10; pub const ROLLING_ANSWER_INTERVAL_SECS: u32 = 60; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] -pub struct TransferCount { - down: ByteCount, - up: ByteCount, -} - -#[derive(Debug, Clone, Default)] -pub struct TransferStatsAccounting { - rolling_transfers: VecDeque, - current_transfer: TransferCount, -} - -impl TransferStatsAccounting { - pub fn new() -> Self { - Self { - rolling_transfers: VecDeque::new(), - current_transfer: TransferCount::default(), - } - } - - pub fn add_down(&mut self, bytes: ByteCount) { - self.current_transfer.down += bytes; - } - - pub fn add_up(&mut self, bytes: ByteCount) { - self.current_transfer.up += bytes; - } - - pub fn roll_transfers( - &mut self, - last_ts: Timestamp, - cur_ts: Timestamp, - transfer_stats: &mut TransferStatsDownUp, - ) { - let dur_ms = cur_ts.saturating_sub(last_ts) / 1000u64; - while self.rolling_transfers.len() >= ROLLING_TRANSFERS_SIZE { - self.rolling_transfers.pop_front(); - } - self.rolling_transfers.push_back(self.current_transfer); - - transfer_stats.down.total += self.current_transfer.down; - transfer_stats.up.total += self.current_transfer.up; - - self.current_transfer = TransferCount::default(); - - transfer_stats.down.maximum = 0.into(); - transfer_stats.up.maximum = 0.into(); - transfer_stats.down.minimum = u64::MAX.into(); - transfer_stats.up.minimum = u64::MAX.into(); - transfer_stats.down.average = 0.into(); - transfer_stats.up.average = 0.into(); - for xfer in &self.rolling_transfers { - let bpsd = xfer.down * 1000u64 / dur_ms; - let bpsu = xfer.up * 1000u64 / dur_ms; - transfer_stats.down.maximum.max_assign(bpsd); - transfer_stats.up.maximum.max_assign(bpsu); - transfer_stats.down.minimum.min_assign(bpsd); - transfer_stats.up.minimum.min_assign(bpsu); - transfer_stats.down.average += bpsd; - transfer_stats.up.average += bpsu; - } - let len = self.rolling_transfers.len() as u64; - if len > 0 { - transfer_stats.down.average /= len; - transfer_stats.up.average /= len; - } - } -} - -#[derive(Debug, Clone, Default)] -pub struct LatencyStatsAccounting { - rolling_latencies: VecDeque, -} - -impl LatencyStatsAccounting { - pub fn new() -> Self { - Self { - rolling_latencies: VecDeque::new(), - } - } - - fn get_tm_n(sorted_latencies: &[TimestampDuration], n: usize) -> Option { - let tmcount = sorted_latencies.len() * n / 100; - if tmcount == 0 { - None - } else { - let mut tm = TimestampDuration::new(0); - for l in &sorted_latencies[..tmcount] { - tm += *l; - } - tm /= tmcount as u64; - Some(tm) - } - } - - fn get_p_n(sorted_latencies: &[TimestampDuration], n: usize) -> TimestampDuration { - let pindex = (sorted_latencies.len() * n / 100).saturating_sub(1); - sorted_latencies[pindex] - } - - pub fn record_latency(&mut self, latency: TimestampDuration) -> LatencyStats { - while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE { - self.rolling_latencies.pop_front(); - } - self.rolling_latencies.push_back(latency); - - // Calculate latency stats - - let mut fastest = TimestampDuration::new(u64::MAX); - let mut slowest = TimestampDuration::new(0u64); - let mut average = TimestampDuration::new(0u64); - - for rl in &self.rolling_latencies { - fastest.min_assign(*rl); - slowest.max_assign(*rl); - average += *rl; - } - let len = self.rolling_latencies.len() as u64; - if len > 0 { - average /= len; - } - - let mut sorted_latencies: Vec<_> = self.rolling_latencies.iter().copied().collect(); - sorted_latencies.sort(); - - let tm90 = Self::get_tm_n(&sorted_latencies, 90).unwrap_or(average); - let tm75 = Self::get_tm_n(&sorted_latencies, 75).unwrap_or(average); - let p90 = Self::get_p_n(&sorted_latencies, 90); - let p75 = Self::get_p_n(&sorted_latencies, 75); - - LatencyStats { - fastest, - average, - slowest, - tm90, - tm75, - p90, - p75, - } - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct StateReasonSpan { state_reason: BucketEntryStateReason, diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs new file mode 100644 index 00000000..1e19735f --- /dev/null +++ b/veilid-core/src/rpc_processor/debug.rs @@ -0,0 +1,18 @@ +use super::*; + +impl RPCProcessor { + pub fn debug_info_nodeinfo(&self) -> String { + let mut out = String::new(); + let inner = self.inner.lock(); + out += &format!( + "RPC Worker Dequeue Latency:\n{}", + indent_all_string(&inner.rpc_worker_dequeue_latency) + ); + out += "\n"; + out += &format!( + "RPC Worker Process Latency:\n{}", + indent_all_string(&inner.rpc_worker_process_latency) + ); + out + } +} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index b463c752..b623f5ba 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -2,6 +2,7 @@ use super::*; mod answer; mod coders; +mod debug; mod destination; mod error; mod fanout; @@ -22,6 +23,7 @@ mod rpc_status; mod rpc_validate_dial_info; mod rpc_value_changed; mod rpc_watch_value; +mod rpc_worker; mod sender_info; mod sender_peer_info; @@ -48,7 +50,7 @@ pub(crate) use error::*; pub(crate) use fanout::*; pub(crate) use sender_info::*; -use futures_util::StreamExt; +use futures_util::StreamExt as _; use stop_token::future::FutureExt as _; use coders::*; @@ -56,6 +58,7 @@ use message::*; use message_header::*; use operation_waiter::*; use rendered_operation::*; +use rpc_worker::*; use sender_peer_info::*; use crypto::*; @@ -67,6 +70,10 @@ impl_veilid_log_facility!("rpc"); ///////////////////////////////////////////////////////////////////// +const RPC_WORKERS_PER_CORE: u32 = 16; + +///////////////////////////////////////////////////////////////////// + #[derive(Debug)] #[must_use] struct WaitableReplyContext { @@ -122,9 +129,13 @@ impl Default for RPCProcessorStartupContext { #[derive(Debug)] #[must_use] struct RPCProcessorInner { - send_channel: Option>, - stop_source: Option, - worker_join_handles: Vec>, + rpc_send_channel: Option>, + rpc_stop_source: Option, + rpc_worker_join_handles: Vec>, + rpc_worker_dequeue_latency: LatencyStats, + rpc_worker_process_latency: LatencyStats, + rpc_worker_dequeue_latency_accounting: LatencyStatsAccounting, + rpc_worker_process_latency_accounting: LatencyStatsAccounting, } #[derive(Debug)] @@ -146,9 +157,13 @@ impl_veilid_component!(RPCProcessor); impl RPCProcessor { fn new_inner() -> RPCProcessorInner { RPCProcessorInner { - send_channel: None, - stop_source: None, - worker_join_handles: Vec::new(), + rpc_send_channel: None, + rpc_stop_source: None, + rpc_worker_join_handles: Vec::new(), + rpc_worker_dequeue_latency: LatencyStats::default(), + rpc_worker_process_latency: LatencyStats::default(), + rpc_worker_dequeue_latency_accounting: LatencyStatsAccounting::new(), + rpc_worker_process_latency_accounting: LatencyStatsAccounting::new(), } } @@ -173,7 +188,7 @@ impl RPCProcessor { } // Default RPC concurrency is the number of CPUs * 16 rpc workers per core, as a single worker takes about 1% CPU when relaying and 16% is reasonable for baseline plus relay - concurrency *= 16; + concurrency *= RPC_WORKERS_PER_CORE; } (concurrency, queue_size, max_route_hop_count, timeout_us) }; @@ -227,22 +242,12 @@ impl RPCProcessor { let mut inner = self.inner.lock(); let channel = flume::bounded(self.queue_size as usize); - inner.send_channel = Some(channel.0.clone()); - inner.stop_source = Some(StopSource::new()); - - // spin up N workers - veilid_log!(self trace "Spinning up {} RPC workers", self.concurrency); - for task_n in 0..self.concurrency { - let registry = self.registry(); - let receiver = channel.1.clone(); - let stop_token = inner.stop_source.as_ref().unwrap().token(); - let jh = spawn(&format!("rpc worker {}", task_n), async move { - let this = registry.rpc_processor(); - Box::pin(this.rpc_worker(stop_token, receiver)).await - }); - inner.worker_join_handles.push(jh); - } + inner.rpc_send_channel = Some(channel.0.clone()); + inner.rpc_stop_source = Some(StopSource::new()); } + + self.startup_rpc_workers()?; + guard.success(); veilid_log!(self debug "finished rpc processor startup"); @@ -260,21 +265,7 @@ impl RPCProcessor { .await .expect("should be started up"); - // Stop the rpc workers - let mut unord = FuturesUnordered::new(); - { - let mut inner = self.inner.lock(); - // take the join handles out - for h in inner.worker_join_handles.drain(..) { - unord.push(h); - } - // drop the stop - drop(inner.stop_source.take()); - } - veilid_log!(self debug "stopping {} rpc worker tasks", unord.len()); - - // Wait for them to complete - while unord.next().await.is_some() {} + self.shutdown_rpc_workers().await; veilid_log!(self debug "resetting rpc processor state"); @@ -1620,164 +1611,4 @@ impl RPCProcessor { } } } - - async fn rpc_worker( - &self, - stop_token: StopToken, - receiver: flume::Receiver<(Span, MessageEncoded)>, - ) { - while let Ok(Ok((prev_span, msg))) = - receiver.recv_async().timeout_at(stop_token.clone()).await - { - let rpc_message_span = tracing::trace_span!("rpc message"); - rpc_message_span.follows_from(prev_span); - - network_result_value_or_log!(self match self - .process_rpc_message(msg).instrument(rpc_message_span) - .await - { - Err(e) => { - veilid_log!(self error "couldn't process rpc message: {}", e); - continue; - } - - Ok(v) => { - v - } - } => [ format!(": msg.header={:?}", msg.header) ] {}); - } - } - - #[instrument(level = "trace", target = "rpc", skip_all)] - pub fn enqueue_direct_message( - &self, - envelope: Envelope, - sender_noderef: FilteredNodeRef, - flow: Flow, - routing_domain: RoutingDomain, - body: Vec, - ) -> EyreResult<()> { - let _guard = self - .startup_context - .startup_lock - .enter() - .wrap_err("not started up")?; - - if sender_noderef.routing_domain_set() != routing_domain { - bail!("routing domain should match peer noderef filter"); - } - - let header = MessageHeader { - detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect { - envelope, - sender_noderef, - flow, - routing_domain, - }), - timestamp: Timestamp::now(), - body_len: ByteCount::new(body.len() as u64), - }; - - let msg = MessageEncoded { - header, - data: MessageData { contents: body }, - }; - - let send_channel = { - let inner = self.inner.lock(); - let Some(send_channel) = inner.send_channel.as_ref().cloned() else { - bail!("send channel is closed"); - }; - send_channel - }; - send_channel - .try_send((Span::current(), msg)) - .map_err(|e| eyre!("failed to enqueue direct RPC message: {}", e))?; - Ok(()) - } - - #[instrument(level = "trace", target = "rpc", skip_all)] - fn enqueue_safety_routed_message( - &self, - direct: RPCMessageHeaderDetailDirect, - remote_safety_route: PublicKey, - sequencing: Sequencing, - body: Vec, - ) -> EyreResult<()> { - let _guard = self - .startup_context - .startup_lock - .enter() - .wrap_err("not started up")?; - - let header = MessageHeader { - detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted { - direct, - remote_safety_route, - sequencing, - }), - timestamp: Timestamp::now(), - body_len: (body.len() as u64).into(), - }; - - let msg = MessageEncoded { - header, - data: MessageData { contents: body }, - }; - let send_channel = { - let inner = self.inner.lock(); - let Some(send_channel) = inner.send_channel.as_ref().cloned() else { - bail!("send channel is closed"); - }; - send_channel - }; - send_channel - .try_send((Span::current(), msg)) - .map_err(|e| eyre!("failed to enqueue safety routed RPC message: {}", e))?; - Ok(()) - } - - #[instrument(level = "trace", target = "rpc", skip_all)] - fn enqueue_private_routed_message( - &self, - direct: RPCMessageHeaderDetailDirect, - remote_safety_route: PublicKey, - private_route: PublicKey, - safety_spec: SafetySpec, - body: Vec, - ) -> EyreResult<()> { - let _guard = self - .startup_context - .startup_lock - .enter() - .wrap_err("not started up")?; - - let header = MessageHeader { - detail: RPCMessageHeaderDetail::PrivateRouted(RPCMessageHeaderDetailPrivateRouted { - direct, - remote_safety_route, - private_route, - safety_spec, - }), - timestamp: Timestamp::now(), - body_len: (body.len() as u64).into(), - }; - - let msg = MessageEncoded { - header, - data: MessageData { contents: body }, - }; - - let send_channel = { - let inner = self.inner.lock(); - let Some(send_channel) = inner.send_channel.as_ref().cloned() else { - bail!("send channel is closed"); - }; - send_channel - }; - send_channel - .try_send((Span::current(), msg)) - .map_err(|e| eyre!("failed to enqueue private routed RPC message: {}", e))?; - Ok(()) - } } diff --git a/veilid-core/src/rpc_processor/rpc_worker.rs b/veilid-core/src/rpc_processor/rpc_worker.rs new file mode 100644 index 00000000..8141f965 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_worker.rs @@ -0,0 +1,247 @@ +use futures_util::StreamExt as _; +use stop_token::future::FutureExt as _; + +use super::*; + +#[derive(Debug)] +pub(super) enum RPCWorkerRequestKind { + Message { message_encoded: MessageEncoded }, +} + +#[derive(Debug)] +pub(super) struct RPCWorkerRequest { + enqueued_ts: Timestamp, + span: Span, + kind: RPCWorkerRequestKind, +} + +impl RPCProcessor { + pub(super) fn startup_rpc_workers(&self) -> EyreResult<()> { + let mut inner = self.inner.lock(); + + // Relay workers + let channel = flume::bounded(self.queue_size as usize); + inner.rpc_send_channel = Some(channel.0.clone()); + inner.rpc_stop_source = Some(StopSource::new()); + + // spin up N workers + veilid_log!(self debug "Starting {} RPC workers", self.concurrency); + for task_n in 0..self.concurrency { + let registry = self.registry(); + let receiver = channel.1.clone(); + let stop_token = inner.rpc_stop_source.as_ref().unwrap().token(); + let jh = spawn(&format!("relay worker {}", task_n), async move { + let this = registry.rpc_processor(); + Box::pin(this.rpc_worker(stop_token, receiver)).await + }); + inner.rpc_worker_join_handles.push(jh); + } + Ok(()) + } + + pub(super) async fn shutdown_rpc_workers(&self) { + // Stop the rpc workers + let mut unord = FuturesUnordered::new(); + { + let mut inner = self.inner.lock(); + // take the join handles out + for h in inner.rpc_worker_join_handles.drain(..) { + unord.push(h); + } + // drop the stop + drop(inner.rpc_stop_source.take()); + } + veilid_log!(self debug "Stopping {} RPC workers", unord.len()); + + // Wait for them to complete + while unord.next().await.is_some() {} + } + + async fn rpc_worker(&self, stop_token: StopToken, receiver: flume::Receiver) { + while let Ok(Ok(request)) = receiver.recv_async().timeout_at(stop_token.clone()).await { + let rpc_request_span = tracing::trace_span!("rpc request"); + rpc_request_span.follows_from(request.span); + + // Measure dequeue time + let dequeue_ts = Timestamp::now(); + let dequeue_latency = dequeue_ts.saturating_sub(request.enqueued_ts); + + // Process request kind + match request.kind { + // Process RPC Message + RPCWorkerRequestKind::Message { message_encoded } => { + network_result_value_or_log!(self match self + .process_rpc_message(message_encoded).instrument(rpc_request_span) + .await + { + Err(e) => { + veilid_log!(self error "couldn't process rpc message: {}", e); + continue; + } + Ok(v) => { + v + } + } => [ format!(": msg.header={:?}", message_encoded.header) ] {}); + } + } + + // Measure process time + let process_ts = Timestamp::now(); + let process_latency = process_ts.saturating_sub(dequeue_ts); + + // Accounting + let mut inner = self.inner.lock(); + inner.rpc_worker_dequeue_latency = inner + .rpc_worker_dequeue_latency_accounting + .record_latency(dequeue_latency); + inner.rpc_worker_process_latency = inner + .rpc_worker_process_latency_accounting + .record_latency(process_latency); + } + } + + #[instrument(level = "trace", target = "rpc", skip_all)] + pub fn enqueue_direct_message( + &self, + envelope: Envelope, + sender_noderef: FilteredNodeRef, + flow: Flow, + routing_domain: RoutingDomain, + body: Vec, + ) -> EyreResult<()> { + let _guard = self + .startup_context + .startup_lock + .enter() + .wrap_err("not started up")?; + + if sender_noderef.routing_domain_set() != routing_domain { + bail!("routing domain should match peer noderef filter"); + } + + let header = MessageHeader { + detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect { + envelope, + sender_noderef, + flow, + routing_domain, + }), + timestamp: Timestamp::now(), + body_len: ByteCount::new(body.len() as u64), + }; + + let message_encoded = MessageEncoded { + header, + data: MessageData { contents: body }, + }; + + let send_channel = { + let inner = self.inner.lock(); + let Some(send_channel) = inner.rpc_send_channel.as_ref().cloned() else { + bail!("send channel is closed"); + }; + send_channel + }; + send_channel + .try_send(RPCWorkerRequest { + enqueued_ts: Timestamp::now(), + span: Span::current(), + kind: RPCWorkerRequestKind::Message { message_encoded }, + }) + .map_err(|e| eyre!("failed to enqueue direct RPC message: {}", e))?; + Ok(()) + } + + #[instrument(level = "trace", target = "rpc", skip_all)] + pub(super) fn enqueue_safety_routed_message( + &self, + direct: RPCMessageHeaderDetailDirect, + remote_safety_route: PublicKey, + sequencing: Sequencing, + body: Vec, + ) -> EyreResult<()> { + let _guard = self + .startup_context + .startup_lock + .enter() + .wrap_err("not started up")?; + + let header = MessageHeader { + detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted { + direct, + remote_safety_route, + sequencing, + }), + timestamp: Timestamp::now(), + body_len: (body.len() as u64).into(), + }; + + let message_encoded = MessageEncoded { + header, + data: MessageData { contents: body }, + }; + let send_channel = { + let inner = self.inner.lock(); + let Some(send_channel) = inner.rpc_send_channel.as_ref().cloned() else { + bail!("send channel is closed"); + }; + send_channel + }; + send_channel + .try_send(RPCWorkerRequest { + enqueued_ts: Timestamp::now(), + span: Span::current(), + kind: RPCWorkerRequestKind::Message { message_encoded }, + }) + .map_err(|e| eyre!("failed to enqueue safety routed RPC message: {}", e))?; + Ok(()) + } + + #[instrument(level = "trace", target = "rpc", skip_all)] + pub(super) fn enqueue_private_routed_message( + &self, + direct: RPCMessageHeaderDetailDirect, + remote_safety_route: PublicKey, + private_route: PublicKey, + safety_spec: SafetySpec, + body: Vec, + ) -> EyreResult<()> { + let _guard = self + .startup_context + .startup_lock + .enter() + .wrap_err("not started up")?; + + let header = MessageHeader { + detail: RPCMessageHeaderDetail::PrivateRouted(RPCMessageHeaderDetailPrivateRouted { + direct, + remote_safety_route, + private_route, + safety_spec, + }), + timestamp: Timestamp::now(), + body_len: (body.len() as u64).into(), + }; + + let message_encoded = MessageEncoded { + header, + data: MessageData { contents: body }, + }; + + let send_channel = { + let inner = self.inner.lock(); + let Some(send_channel) = inner.rpc_send_channel.as_ref().cloned() else { + bail!("send channel is closed"); + }; + send_channel + }; + send_channel + .try_send(RPCWorkerRequest { + enqueued_ts: Timestamp::now(), + span: Span::current(), + kind: RPCWorkerRequestKind::Message { message_encoded }, + }) + .map_err(|e| eyre!("failed to enqueue private routed RPC message: {}", e))?; + Ok(()) + } +} diff --git a/veilid-core/src/stats_accounting.rs b/veilid-core/src/stats_accounting.rs new file mode 100644 index 00000000..acbc0d20 --- /dev/null +++ b/veilid-core/src/stats_accounting.rs @@ -0,0 +1,153 @@ +use super::*; + +// Latency entry is per round-trip packet (ping or data) +// - Size is number of entries +const ROLLING_LATENCIES_SIZE: usize = 50; + +// Transfers entries are in bytes total for the interval +// - Size is number of entries +// - Interval is number of seconds in each entry +const ROLLING_TRANSFERS_SIZE: usize = 10; +pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 1; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct TransferCount { + down: ByteCount, + up: ByteCount, +} + +#[derive(Debug, Clone, Default)] +pub struct TransferStatsAccounting { + rolling_transfers: VecDeque, + current_transfer: TransferCount, +} + +impl TransferStatsAccounting { + pub fn new() -> Self { + Self { + rolling_transfers: VecDeque::new(), + current_transfer: TransferCount::default(), + } + } + + pub fn add_down(&mut self, bytes: ByteCount) { + self.current_transfer.down += bytes; + } + + pub fn add_up(&mut self, bytes: ByteCount) { + self.current_transfer.up += bytes; + } + + pub fn roll_transfers( + &mut self, + last_ts: Timestamp, + cur_ts: Timestamp, + transfer_stats: &mut TransferStatsDownUp, + ) { + let dur_ms = cur_ts.saturating_sub(last_ts) / 1000u64; + while self.rolling_transfers.len() >= ROLLING_TRANSFERS_SIZE { + self.rolling_transfers.pop_front(); + } + self.rolling_transfers.push_back(self.current_transfer); + + transfer_stats.down.total += self.current_transfer.down; + transfer_stats.up.total += self.current_transfer.up; + + self.current_transfer = TransferCount::default(); + + transfer_stats.down.maximum = 0.into(); + transfer_stats.up.maximum = 0.into(); + transfer_stats.down.minimum = u64::MAX.into(); + transfer_stats.up.minimum = u64::MAX.into(); + transfer_stats.down.average = 0.into(); + transfer_stats.up.average = 0.into(); + for xfer in &self.rolling_transfers { + let bpsd = xfer.down * 1000u64 / dur_ms; + let bpsu = xfer.up * 1000u64 / dur_ms; + transfer_stats.down.maximum.max_assign(bpsd); + transfer_stats.up.maximum.max_assign(bpsu); + transfer_stats.down.minimum.min_assign(bpsd); + transfer_stats.up.minimum.min_assign(bpsu); + transfer_stats.down.average += bpsd; + transfer_stats.up.average += bpsu; + } + let len = self.rolling_transfers.len() as u64; + if len > 0 { + transfer_stats.down.average /= len; + transfer_stats.up.average /= len; + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct LatencyStatsAccounting { + rolling_latencies: VecDeque, +} + +impl LatencyStatsAccounting { + pub fn new() -> Self { + Self { + rolling_latencies: VecDeque::new(), + } + } + + fn get_tm_n(sorted_latencies: &[TimestampDuration], n: usize) -> Option { + let tmcount = sorted_latencies.len() * n / 100; + if tmcount == 0 { + None + } else { + let mut tm = TimestampDuration::new(0); + for l in &sorted_latencies[..tmcount] { + tm += *l; + } + tm /= tmcount as u64; + Some(tm) + } + } + + fn get_p_n(sorted_latencies: &[TimestampDuration], n: usize) -> TimestampDuration { + let pindex = (sorted_latencies.len() * n / 100).saturating_sub(1); + sorted_latencies[pindex] + } + + pub fn record_latency(&mut self, latency: TimestampDuration) -> LatencyStats { + while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE { + self.rolling_latencies.pop_front(); + } + self.rolling_latencies.push_back(latency); + + // Calculate latency stats + + let mut fastest = TimestampDuration::new(u64::MAX); + let mut slowest = TimestampDuration::new(0u64); + let mut average = TimestampDuration::new(0u64); + + for rl in &self.rolling_latencies { + fastest.min_assign(*rl); + slowest.max_assign(*rl); + average += *rl; + } + let len = self.rolling_latencies.len() as u64; + if len > 0 { + average /= len; + } + + let mut sorted_latencies: Vec<_> = self.rolling_latencies.iter().copied().collect(); + sorted_latencies.sort(); + + let tm90 = Self::get_tm_n(&sorted_latencies, 90).unwrap_or(average); + let tm75 = Self::get_tm_n(&sorted_latencies, 75).unwrap_or(average); + let p90 = Self::get_p_n(&sorted_latencies, 90); + let p75 = Self::get_p_n(&sorted_latencies, 75); + + LatencyStats { + fastest, + average, + slowest, + tm90, + tm75, + p90, + p75, + } + } +} diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index ab7a4da0..34f58d55 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -761,7 +761,9 @@ impl VeilidAPI { async fn debug_nodeinfo(&self, _args: String) -> VeilidAPIResult { // Dump routing table entry let registry = self.core_context()?.registry(); - let nodeinfo = registry.routing_table().debug_info_nodeinfo(); + let nodeinfo_rtab = registry.routing_table().debug_info_nodeinfo(); + let nodeinfo_net = registry.network_manager().debug_info_nodeinfo(); + let nodeinfo_rpc = registry.rpc_processor().debug_info_nodeinfo(); // Dump core state let state = self.get_state().await?; @@ -790,7 +792,10 @@ impl VeilidAPI { "Connection manager unavailable when detached".to_owned() }; - Ok(format!("{}\n{}\n{}\n", nodeinfo, peertable, connman)) + Ok(format!( + "{}\n{}\n{}\n{}\n{}\n", + nodeinfo_rtab, nodeinfo_net, nodeinfo_rpc, peertable, connman + )) } fn debug_nodeid(&self, _args: String) -> VeilidAPIResult { diff --git a/veilid-flutter/rust/Cargo.toml b/veilid-flutter/rust/Cargo.toml index b98347de..ec297ff8 100644 --- a/veilid-flutter/rust/Cargo.toml +++ b/veilid-flutter/rust/Cargo.toml @@ -36,7 +36,7 @@ debug-load = ["dep:ctor", "dep:libc-print", "dep:android_log-sys", "dep:oslog"] [dependencies] veilid-core = { path = "../../veilid-core", default-features = false } tracing = { version = "0.1.40", features = ["log", "attributes"] } -tracing-subscriber = "0.3.18" +tracing-subscriber = "0.3.19" parking_lot = "0.12.3" backtrace = "0.3.71" serde_json = "1.0.120" diff --git a/veilid-tools/src/timestamp.rs b/veilid-tools/src/timestamp.rs index 81b00b20..7fc8ca08 100644 --- a/veilid-tools/src/timestamp.rs +++ b/veilid-tools/src/timestamp.rs @@ -125,27 +125,33 @@ pub fn display_duration(dur: u64) -> String { let secs = dur / SEC; let dur = dur % SEC; let msecs = dur / MSEC; + let dur = dur % MSEC; - format!( - "{}{}{}{}.{:03}s", - if days != 0 { - format!("{}d", days) - } else { - "".to_owned() - }, - if hours != 0 { - format!("{}h", hours) - } else { - "".to_owned() - }, - if mins != 0 { - format!("{}m", mins) - } else { - "".to_owned() - }, - secs, - msecs - ) + // microseconds format + if days == 0 && hours == 0 && mins == 0 && secs == 0 { + format!("{}.{:03}ms", msecs, dur) + } else { + format!( + "{}{}{}{}.{:03}s", + if days != 0 { + format!("{}d", days) + } else { + "".to_owned() + }, + if hours != 0 { + format!("{}h", hours) + } else { + "".to_owned() + }, + if mins != 0 { + format!("{}m", mins) + } else { + "".to_owned() + }, + secs, + msecs + ) + } } #[must_use]