Merge branch 'dht-performance' into 'main'

Relay workers

See merge request veilid/veilid!360
This commit is contained in:
Christien Rioux 2025-03-02 17:36:01 +00:00
commit 7c75cf02dd
17 changed files with 790 additions and 403 deletions

View File

@ -83,7 +83,7 @@ directories = "5.0.1"
# Logging
tracing = { version = "0.1.40", features = ["log", "attributes"] }
tracing-subscriber = "0.3.18"
tracing-subscriber = "0.3.19"
tracing-error = "0.2.0"
eyre = "0.6.12"
thiserror = "1.0.63"

View File

@ -50,6 +50,7 @@ mod logging;
mod network_manager;
mod routing_table;
mod rpc_processor;
mod stats_accounting;
mod storage_manager;
mod table_store;
mod veilid_api;
@ -64,6 +65,7 @@ pub use self::logging::{
DEFAULT_LOG_FACILITIES_ENABLED_LIST, DEFAULT_LOG_FACILITIES_IGNORE_LIST,
DURATION_LOG_FACILITIES, FLAME_LOG_FACILITIES_IGNORE_LIST, VEILID_LOG_KEY_FIELD,
};
pub(crate) use self::stats_accounting::*;
pub use self::veilid_api::*;
pub use self::veilid_config::*;
pub use veilid_tools as tools;

View File

@ -0,0 +1,37 @@
use super::*;
impl NetworkManager {
pub fn debug_info_nodeinfo(&self) -> String {
let mut out = String::new();
let inner = self.inner.lock();
out += &format!(
"Relay Worker Dequeue Latency:\n{}",
indent_all_string(&inner.stats.relay_worker_dequeue_latency)
);
out += "\n";
out += &format!(
"Relay Worker Process Latency:\n{}",
indent_all_string(&inner.stats.relay_worker_process_latency)
);
out
}
pub fn debug(&self) -> String {
let stats = self.get_stats();
let mut out = String::new();
out += "Network Manager\n";
out += "---------------\n";
let mut out = format!(
"Transfer stats:\n{}\n",
indent_all_string(&stats.self_stats.transfer_stats)
);
out += &self.debug_info_nodeinfo();
out += "Node Contact Method Cache\n";
out += "-------------------------\n";
out += &self.inner.lock().node_contact_method_cache.debug();
out
}
}

View File

@ -10,10 +10,12 @@ mod address_filter;
mod connection_handle;
mod connection_manager;
mod connection_table;
mod debug;
mod direct_boot;
mod network_connection;
mod node_contact_method_cache;
mod receipt_manager;
mod relay_worker;
mod send_data;
mod stats;
mod tasks;
@ -26,9 +28,10 @@ pub mod tests;
pub use connection_manager::*;
pub use network_connection::*;
pub(crate) use node_contact_method_cache::*;
pub use receipt_manager::*;
pub use stats::*;
pub(crate) use node_contact_method_cache::*;
pub(crate) use types::*;
////////////////////////////////////////////////////////////////////////////////////////
@ -42,6 +45,7 @@ use hashlink::LruCache;
use native::*;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
pub use native::{MAX_CAPABILITIES, PUBLIC_INTERNET_CAPABILITIES};
use relay_worker::*;
use routing_table::*;
use rpc_processor::*;
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
@ -60,6 +64,7 @@ pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration =
pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60;
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
pub const HOLE_PUNCH_DELAY_MS: u32 = 100;
pub const RELAY_WORKERS_PER_CORE: u32 = 16;
// Things we get when we start up and go away when we shut down
// Routing table is not in here because we want it to survive a network shutdown/startup restart
@ -171,7 +176,6 @@ impl Default for NetworkManagerStartupContext {
Self::new()
}
}
// The mutable state of the network manager
#[derive(Debug)]
struct NetworkManagerInner {
@ -181,6 +185,11 @@ struct NetworkManagerInner {
address_check: Option<AddressCheck>,
peer_info_change_subscription: Option<EventBusSubscription>,
socket_address_change_subscription: Option<EventBusSubscription>,
// Relay workers
relay_stop_source: Option<StopSource>,
relay_send_channel: Option<flume::Sender<RelayWorkerRequest>>,
relay_worker_join_handles: Vec<MustJoinHandle<()>>,
}
pub(crate) struct NetworkManager {
@ -202,6 +211,10 @@ pub(crate) struct NetworkManager {
// Startup context
startup_context: NetworkManagerStartupContext,
// Relay workers config
concurrency: u32,
queue_size: u32,
}
impl_veilid_component!(NetworkManager);
@ -214,6 +227,8 @@ impl fmt::Debug for NetworkManager {
.field("address_filter", &self.address_filter)
.field("network_key", &self.network_key)
.field("startup_context", &self.startup_context)
.field("concurrency", &self.concurrency)
.field("queue_size", &self.queue_size)
.finish()
}
}
@ -227,6 +242,10 @@ impl NetworkManager {
address_check: None,
peer_info_change_subscription: None,
socket_address_change_subscription: None,
//
relay_send_channel: None,
relay_stop_source: None,
relay_worker_join_handles: Vec::new(),
}
}
@ -264,6 +283,26 @@ impl NetworkManager {
network_key
};
// make local copy of node id for easy access
let (concurrency, queue_size) = {
let config = registry.config();
let c = config.get();
// set up channel
let mut concurrency = c.network.rpc.concurrency;
let queue_size = c.network.rpc.queue_size;
if concurrency == 0 {
concurrency = get_concurrency();
if concurrency == 0 {
concurrency = 1;
}
// Default relay concurrency is the number of CPUs * 16 relay workers per core
concurrency *= RELAY_WORKERS_PER_CORE;
}
(concurrency, queue_size)
};
let inner = Self::new_inner();
let address_filter = AddressFilter::new(registry.clone());
@ -282,6 +321,8 @@ impl NetworkManager {
),
network_key,
startup_context,
concurrency,
queue_size,
};
this.setup_tasks();
@ -360,7 +401,8 @@ impl NetworkManager {
receipt_manager: receipt_manager.clone(),
});
let address_check = AddressCheck::new(net.clone());
// Startup relay workers
self.startup_relay_workers()?;
// Register event handlers
let peer_info_change_subscription =
@ -371,6 +413,7 @@ impl NetworkManager {
{
let mut inner = self.inner.lock();
let address_check = AddressCheck::new(net.clone());
inner.address_check = Some(address_check);
inner.peer_info_change_subscription = Some(peer_info_change_subscription);
inner.socket_address_change_subscription = Some(socket_address_change_subscription);
@ -426,6 +469,9 @@ impl NetworkManager {
inner.address_check = None;
}
// Shutdown relay workers
self.shutdown_relay_workers().await;
// Shutdown network components if they started up
veilid_log!(self debug "shutting down network components");
@ -1099,10 +1145,11 @@ impl NetworkManager {
relay_nr.set_sequencing(Sequencing::EnsureOrdered);
};
// Relay the packet to the desired destination
veilid_log!(self trace "relaying {} bytes to {}", data.len(), relay_nr);
if let Err(e) = pin_future!(self.send_data(relay_nr, data.to_vec())).await {
veilid_log!(self debug "failed to relay envelope: {}" ,e);
// Pass relay to RPC system
if let Err(e) = self.enqueue_relay(relay_nr, data.to_vec()) {
// Couldn't enqueue, but not the sender's fault
veilid_log!(self debug "failed to enqueue relay: {}", e);
return Ok(false);
}
}
// Inform caller that we dealt with the envelope, but did not process it locally

View File

@ -0,0 +1,120 @@
use futures_util::StreamExt as _;
use stop_token::future::FutureExt as _;
use super::*;
#[derive(Debug)]
pub(super) enum RelayWorkerRequestKind {
Relay {
relay_nr: FilteredNodeRef,
data: Vec<u8>,
},
}
#[derive(Debug)]
pub(super) struct RelayWorkerRequest {
enqueued_ts: Timestamp,
span: Span,
kind: RelayWorkerRequestKind,
}
impl NetworkManager {
pub(super) fn startup_relay_workers(&self) -> EyreResult<()> {
let mut inner = self.inner.lock();
// Relay workers
let channel = flume::bounded(self.queue_size as usize);
inner.relay_send_channel = Some(channel.0.clone());
inner.relay_stop_source = Some(StopSource::new());
// spin up N workers
veilid_log!(self debug "Starting {} relay workers", self.concurrency);
for task_n in 0..self.concurrency {
let registry = self.registry();
let receiver = channel.1.clone();
let stop_token = inner.relay_stop_source.as_ref().unwrap().token();
let jh = spawn(&format!("relay worker {}", task_n), async move {
let this = registry.network_manager();
Box::pin(this.relay_worker(stop_token, receiver)).await
});
inner.relay_worker_join_handles.push(jh);
}
Ok(())
}
pub(super) async fn shutdown_relay_workers(&self) {
// Stop the relay workers
let mut unord = FuturesUnordered::new();
{
let mut inner = self.inner.lock();
// take the join handles out
for h in inner.relay_worker_join_handles.drain(..) {
unord.push(h);
}
// drop the stop
drop(inner.relay_stop_source.take());
}
veilid_log!(self debug "Stopping {} relay workers", unord.len());
// Wait for them to complete
while unord.next().await.is_some() {}
}
pub(super) async fn relay_worker(
&self,
stop_token: StopToken,
receiver: flume::Receiver<RelayWorkerRequest>,
) {
while let Ok(Ok(request)) = receiver.recv_async().timeout_at(stop_token.clone()).await {
let relay_request_span = tracing::trace_span!("relay request");
relay_request_span.follows_from(request.span);
// Measure dequeue time
let dequeue_ts = Timestamp::now();
let dequeue_latency = dequeue_ts.saturating_sub(request.enqueued_ts);
// Process request kind
match request.kind {
RelayWorkerRequestKind::Relay { relay_nr, data } => {
// Relay the packet to the desired destination
veilid_log!(self trace "relaying {} bytes to {}", data.len(), relay_nr);
if let Err(e) = pin_future!(self.send_data(relay_nr, data.to_vec())).await {
veilid_log!(self debug "failed to relay envelope: {}" ,e);
}
}
}
// Measure process time
let process_ts = Timestamp::now();
let process_latency = process_ts.saturating_sub(dequeue_ts);
// Accounting
self.stats_relay_processed(dequeue_latency, process_latency)
}
}
#[instrument(level = "trace", target = "rpc", skip_all)]
pub(super) fn enqueue_relay(&self, relay_nr: FilteredNodeRef, data: Vec<u8>) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.relay_send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send(RelayWorkerRequest {
enqueued_ts: Timestamp::now(),
span: Span::current(),
kind: RelayWorkerRequestKind::Relay { relay_nr, data },
})
.map_err(|e| eyre!("failed to enqueue relay: {}", e))?;
Ok(())
}
}

View File

@ -22,6 +22,10 @@ impl Default for PerAddressStatsKey {
pub struct NetworkManagerStats {
pub self_stats: PerAddressStats,
pub per_address_stats: LruCache<PerAddressStatsKey, PerAddressStats>,
pub relay_worker_dequeue_latency: LatencyStats,
pub relay_worker_process_latency: LatencyStats,
pub relay_worker_dequeue_latency_accounting: LatencyStatsAccounting,
pub relay_worker_process_latency_accounting: LatencyStatsAccounting,
}
impl Default for NetworkManagerStats {
@ -29,6 +33,10 @@ impl Default for NetworkManagerStats {
Self {
self_stats: PerAddressStats::default(),
per_address_stats: LruCache::new(IPADDR_TABLE_SIZE),
relay_worker_dequeue_latency: LatencyStats::default(),
relay_worker_process_latency: LatencyStats::default(),
relay_worker_dequeue_latency_accounting: LatencyStatsAccounting::new(),
relay_worker_process_latency_accounting: LatencyStatsAccounting::new(),
}
}
}
@ -36,7 +44,7 @@ impl Default for NetworkManagerStats {
impl NetworkManager {
// Callbacks from low level network for statistics gathering
pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
let mut inner = self.inner.lock();
inner
.stats
.self_stats
@ -53,7 +61,7 @@ impl NetworkManager {
}
pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
let mut inner = self.inner.lock();
inner
.stats
.self_stats
@ -69,28 +77,27 @@ impl NetworkManager {
.add_down(bytes);
}
pub fn stats_relay_processed(
&self,
dequeue_latency: TimestampDuration,
process_latency: TimestampDuration,
) {
let mut inner = self.inner.lock();
inner.stats.relay_worker_dequeue_latency = inner
.stats
.relay_worker_dequeue_latency_accounting
.record_latency(dequeue_latency);
inner.stats.relay_worker_process_latency = inner
.stats
.relay_worker_process_latency_accounting
.record_latency(process_latency);
}
pub fn get_stats(&self) -> NetworkManagerStats {
let inner = self.inner.lock();
inner.stats.clone()
}
pub fn debug(&self) -> String {
let stats = self.get_stats();
let mut out = String::new();
out += "Network Manager\n";
out += "---------------\n";
let mut out = format!(
"Transfer stats:\n{}\n",
indent_all_string(&stats.self_stats.transfer_stats)
);
out += "Node Contact Method Cache\n";
out += "-------------------------\n";
out += &self.inner.lock().node_contact_method_cache.debug();
out
}
pub fn get_veilid_state(&self) -> Box<VeilidStateNetwork> {
if !self.network_is_started() {
return Box::new(VeilidStateNetwork {

View File

@ -43,7 +43,9 @@ pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1;
/// How frequently we optimize relays
pub const RELAY_OPTIMIZATION_INTERVAL_SECS: u32 = 10;
/// What percentile to keep our relays optimized to
pub const RELAY_OPTIMIZATION_PERCENTILE: f32 = 75.0;
pub const RELAY_OPTIMIZATION_PERCENTILE: f32 = 66.0;
/// What percentile to choose our relays from (must be greater than RELAY_OPTIMIZATION_PERCENTILE)
pub const RELAY_SELECTION_PERCENTILE: f32 = 85.0;
/// How frequently we tick the private route management routine
pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1;
@ -1146,6 +1148,18 @@ impl RoutingTable {
inner.find_fastest_node(cur_ts, filter, metric)
}
#[instrument(level = "trace", skip(self, filter, metric), ret)]
pub fn find_random_fast_node(
&self,
cur_ts: Timestamp,
filter: impl Fn(&BucketEntryInner) -> bool,
percentile: f32,
metric: impl Fn(&LatencyStats) -> TimestampDuration,
) -> Option<NodeRef> {
let inner = self.inner.read();
inner.find_random_fast_node(cur_ts, filter, percentile, metric)
}
#[instrument(level = "trace", skip(self, filter, metric), ret)]
pub fn get_node_speed_percentile(
&self,

View File

@ -1445,6 +1445,54 @@ impl RoutingTableInner {
fastest_node.map(|e| NodeRef::new(self.registry(), e))
}
#[instrument(level = "trace", skip(self, filter, metric), ret)]
pub fn find_random_fast_node(
&self,
cur_ts: Timestamp,
filter: impl Fn(&BucketEntryInner) -> bool,
percentile: f32,
metric: impl Fn(&LatencyStats) -> TimestampDuration,
) -> Option<NodeRef> {
// Go through all entries and find all entries that matches filter function
let mut all_filtered_nodes: Vec<Arc<BucketEntry>> = Vec::new();
// Iterate all known nodes for candidates
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
let entry2 = entry.clone();
entry.with(rti, |_rti, e| {
// Filter this node
if filter(e) {
all_filtered_nodes.push(entry2);
}
});
// Don't end early, iterate through all entries
Option::<()>::None
});
// Sort by fastest tm90 reliable
all_filtered_nodes.sort_by(|a, b| {
a.with(self, |rti, ea| {
b.with(rti, |_rti, eb| {
BucketEntryInner::cmp_fastest_reliable(cur_ts, ea, eb, &metric)
})
})
});
if all_filtered_nodes.is_empty() {
return None;
}
let max_index =
(((all_filtered_nodes.len() - 1) as f32) * (100.0 - percentile) / 100.0) as u32;
let chosen_index = (get_random_u32() % (max_index + 1)) as usize;
// Return the chosen node node
Some(NodeRef::new(
self.registry(),
all_filtered_nodes[chosen_index].clone(),
))
}
#[instrument(level = "trace", skip(self, filter, metric), ret)]
pub fn get_node_relative_performance(
&self,

View File

@ -1,15 +1,5 @@
use super::*;
// Latency entry is per round-trip packet (ping or data)
// - Size is number of entries
const ROLLING_LATENCIES_SIZE: usize = 50;
// Transfers entries are in bytes total for the interval
// - Size is number of entries
// - Interval is number of seconds in each entry
const ROLLING_TRANSFERS_SIZE: usize = 10;
pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 1;
// State entry is per state reason change
// - Size is number of entries
const ROLLING_STATE_REASON_SPAN_SIZE: usize = 32;
@ -20,149 +10,6 @@ pub const UPDATE_STATE_STATS_INTERVAL_SECS: u32 = 1;
// - Interval is number of seconds in each entry
const ROLLING_ANSWERS_SIZE: usize = 10;
pub const ROLLING_ANSWER_INTERVAL_SECS: u32 = 60;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct TransferCount {
down: ByteCount,
up: ByteCount,
}
#[derive(Debug, Clone, Default)]
pub struct TransferStatsAccounting {
rolling_transfers: VecDeque<TransferCount>,
current_transfer: TransferCount,
}
impl TransferStatsAccounting {
pub fn new() -> Self {
Self {
rolling_transfers: VecDeque::new(),
current_transfer: TransferCount::default(),
}
}
pub fn add_down(&mut self, bytes: ByteCount) {
self.current_transfer.down += bytes;
}
pub fn add_up(&mut self, bytes: ByteCount) {
self.current_transfer.up += bytes;
}
pub fn roll_transfers(
&mut self,
last_ts: Timestamp,
cur_ts: Timestamp,
transfer_stats: &mut TransferStatsDownUp,
) {
let dur_ms = cur_ts.saturating_sub(last_ts) / 1000u64;
while self.rolling_transfers.len() >= ROLLING_TRANSFERS_SIZE {
self.rolling_transfers.pop_front();
}
self.rolling_transfers.push_back(self.current_transfer);
transfer_stats.down.total += self.current_transfer.down;
transfer_stats.up.total += self.current_transfer.up;
self.current_transfer = TransferCount::default();
transfer_stats.down.maximum = 0.into();
transfer_stats.up.maximum = 0.into();
transfer_stats.down.minimum = u64::MAX.into();
transfer_stats.up.minimum = u64::MAX.into();
transfer_stats.down.average = 0.into();
transfer_stats.up.average = 0.into();
for xfer in &self.rolling_transfers {
let bpsd = xfer.down * 1000u64 / dur_ms;
let bpsu = xfer.up * 1000u64 / dur_ms;
transfer_stats.down.maximum.max_assign(bpsd);
transfer_stats.up.maximum.max_assign(bpsu);
transfer_stats.down.minimum.min_assign(bpsd);
transfer_stats.up.minimum.min_assign(bpsu);
transfer_stats.down.average += bpsd;
transfer_stats.up.average += bpsu;
}
let len = self.rolling_transfers.len() as u64;
if len > 0 {
transfer_stats.down.average /= len;
transfer_stats.up.average /= len;
}
}
}
#[derive(Debug, Clone, Default)]
pub struct LatencyStatsAccounting {
rolling_latencies: VecDeque<TimestampDuration>,
}
impl LatencyStatsAccounting {
pub fn new() -> Self {
Self {
rolling_latencies: VecDeque::new(),
}
}
fn get_tm_n(sorted_latencies: &[TimestampDuration], n: usize) -> Option<TimestampDuration> {
let tmcount = sorted_latencies.len() * n / 100;
if tmcount == 0 {
None
} else {
let mut tm = TimestampDuration::new(0);
for l in &sorted_latencies[..tmcount] {
tm += *l;
}
tm /= tmcount as u64;
Some(tm)
}
}
fn get_p_n(sorted_latencies: &[TimestampDuration], n: usize) -> TimestampDuration {
let pindex = (sorted_latencies.len() * n / 100).saturating_sub(1);
sorted_latencies[pindex]
}
pub fn record_latency(&mut self, latency: TimestampDuration) -> LatencyStats {
while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE {
self.rolling_latencies.pop_front();
}
self.rolling_latencies.push_back(latency);
// Calculate latency stats
let mut fastest = TimestampDuration::new(u64::MAX);
let mut slowest = TimestampDuration::new(0u64);
let mut average = TimestampDuration::new(0u64);
for rl in &self.rolling_latencies {
fastest.min_assign(*rl);
slowest.max_assign(*rl);
average += *rl;
}
let len = self.rolling_latencies.len() as u64;
if len > 0 {
average /= len;
}
let mut sorted_latencies: Vec<_> = self.rolling_latencies.iter().copied().collect();
sorted_latencies.sort();
let tm90 = Self::get_tm_n(&sorted_latencies, 90).unwrap_or(average);
let tm75 = Self::get_tm_n(&sorted_latencies, 75).unwrap_or(average);
let p90 = Self::get_p_n(&sorted_latencies, 90);
let p75 = Self::get_p_n(&sorted_latencies, 75);
LatencyStats {
fastest,
average,
slowest,
tm90,
tm75,
p90,
p75,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StateReasonSpan {
state_reason: BucketEntryStateReason,

View File

@ -202,7 +202,12 @@ impl RoutingTable {
}
if !got_outbound_relay {
// Find a node in our routing table that is an acceptable inbound relay
if let Some(nr) = self.find_fastest_node(cur_ts, &relay_node_filter, |ls| ls.tm90) {
if let Some(nr) = self.find_random_fast_node(
cur_ts,
&relay_node_filter,
RELAY_SELECTION_PERCENTILE,
|ls| ls.tm90,
) {
veilid_log!(self debug "Inbound relay node selected: {}", nr);
editor.set_relay_node(Some(nr));
}

View File

@ -0,0 +1,18 @@
use super::*;
impl RPCProcessor {
pub fn debug_info_nodeinfo(&self) -> String {
let mut out = String::new();
let inner = self.inner.lock();
out += &format!(
"RPC Worker Dequeue Latency:\n{}",
indent_all_string(&inner.rpc_worker_dequeue_latency)
);
out += "\n";
out += &format!(
"RPC Worker Process Latency:\n{}",
indent_all_string(&inner.rpc_worker_process_latency)
);
out
}
}

View File

@ -2,6 +2,7 @@ use super::*;
mod answer;
mod coders;
mod debug;
mod destination;
mod error;
mod fanout;
@ -22,6 +23,7 @@ mod rpc_status;
mod rpc_validate_dial_info;
mod rpc_value_changed;
mod rpc_watch_value;
mod rpc_worker;
mod sender_info;
mod sender_peer_info;
@ -48,7 +50,7 @@ pub(crate) use error::*;
pub(crate) use fanout::*;
pub(crate) use sender_info::*;
use futures_util::StreamExt;
use futures_util::StreamExt as _;
use stop_token::future::FutureExt as _;
use coders::*;
@ -56,6 +58,7 @@ use message::*;
use message_header::*;
use operation_waiter::*;
use rendered_operation::*;
use rpc_worker::*;
use sender_peer_info::*;
use crypto::*;
@ -67,6 +70,10 @@ impl_veilid_log_facility!("rpc");
/////////////////////////////////////////////////////////////////////
const RPC_WORKERS_PER_CORE: u32 = 16;
/////////////////////////////////////////////////////////////////////
#[derive(Debug)]
#[must_use]
struct WaitableReplyContext {
@ -122,9 +129,13 @@ impl Default for RPCProcessorStartupContext {
#[derive(Debug)]
#[must_use]
struct RPCProcessorInner {
send_channel: Option<flume::Sender<(Span, MessageEncoded)>>,
stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>,
rpc_send_channel: Option<flume::Sender<RPCWorkerRequest>>,
rpc_stop_source: Option<StopSource>,
rpc_worker_join_handles: Vec<MustJoinHandle<()>>,
rpc_worker_dequeue_latency: LatencyStats,
rpc_worker_process_latency: LatencyStats,
rpc_worker_dequeue_latency_accounting: LatencyStatsAccounting,
rpc_worker_process_latency_accounting: LatencyStatsAccounting,
}
#[derive(Debug)]
@ -146,9 +157,13 @@ impl_veilid_component!(RPCProcessor);
impl RPCProcessor {
fn new_inner() -> RPCProcessorInner {
RPCProcessorInner {
send_channel: None,
stop_source: None,
worker_join_handles: Vec::new(),
rpc_send_channel: None,
rpc_stop_source: None,
rpc_worker_join_handles: Vec::new(),
rpc_worker_dequeue_latency: LatencyStats::default(),
rpc_worker_process_latency: LatencyStats::default(),
rpc_worker_dequeue_latency_accounting: LatencyStatsAccounting::new(),
rpc_worker_process_latency_accounting: LatencyStatsAccounting::new(),
}
}
@ -173,7 +188,7 @@ impl RPCProcessor {
}
// Default RPC concurrency is the number of CPUs * 16 rpc workers per core, as a single worker takes about 1% CPU when relaying and 16% is reasonable for baseline plus relay
concurrency *= 16;
concurrency *= RPC_WORKERS_PER_CORE;
}
(concurrency, queue_size, max_route_hop_count, timeout_us)
};
@ -227,22 +242,12 @@ impl RPCProcessor {
let mut inner = self.inner.lock();
let channel = flume::bounded(self.queue_size as usize);
inner.send_channel = Some(channel.0.clone());
inner.stop_source = Some(StopSource::new());
// spin up N workers
veilid_log!(self trace "Spinning up {} RPC workers", self.concurrency);
for task_n in 0..self.concurrency {
let registry = self.registry();
let receiver = channel.1.clone();
let stop_token = inner.stop_source.as_ref().unwrap().token();
let jh = spawn(&format!("rpc worker {}", task_n), async move {
let this = registry.rpc_processor();
Box::pin(this.rpc_worker(stop_token, receiver)).await
});
inner.worker_join_handles.push(jh);
}
inner.rpc_send_channel = Some(channel.0.clone());
inner.rpc_stop_source = Some(StopSource::new());
}
self.startup_rpc_workers()?;
guard.success();
veilid_log!(self debug "finished rpc processor startup");
@ -260,21 +265,7 @@ impl RPCProcessor {
.await
.expect("should be started up");
// Stop the rpc workers
let mut unord = FuturesUnordered::new();
{
let mut inner = self.inner.lock();
// take the join handles out
for h in inner.worker_join_handles.drain(..) {
unord.push(h);
}
// drop the stop
drop(inner.stop_source.take());
}
veilid_log!(self debug "stopping {} rpc worker tasks", unord.len());
// Wait for them to complete
while unord.next().await.is_some() {}
self.shutdown_rpc_workers().await;
veilid_log!(self debug "resetting rpc processor state");
@ -1620,164 +1611,4 @@ impl RPCProcessor {
}
}
}
async fn rpc_worker(
&self,
stop_token: StopToken,
receiver: flume::Receiver<(Span, MessageEncoded)>,
) {
while let Ok(Ok((prev_span, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await
{
let rpc_message_span = tracing::trace_span!("rpc message");
rpc_message_span.follows_from(prev_span);
network_result_value_or_log!(self match self
.process_rpc_message(msg).instrument(rpc_message_span)
.await
{
Err(e) => {
veilid_log!(self error "couldn't process rpc message: {}", e);
continue;
}
Ok(v) => {
v
}
} => [ format!(": msg.header={:?}", msg.header) ] {});
}
}
#[instrument(level = "trace", target = "rpc", skip_all)]
pub fn enqueue_direct_message(
&self,
envelope: Envelope,
sender_noderef: FilteredNodeRef,
flow: Flow,
routing_domain: RoutingDomain,
body: Vec<u8>,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
if sender_noderef.routing_domain_set() != routing_domain {
bail!("routing domain should match peer noderef filter");
}
let header = MessageHeader {
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
envelope,
sender_noderef,
flow,
routing_domain,
}),
timestamp: Timestamp::now(),
body_len: ByteCount::new(body.len() as u64),
};
let msg = MessageEncoded {
header,
data: MessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send((Span::current(), msg))
.map_err(|e| eyre!("failed to enqueue direct RPC message: {}", e))?;
Ok(())
}
#[instrument(level = "trace", target = "rpc", skip_all)]
fn enqueue_safety_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,
remote_safety_route: PublicKey,
sequencing: Sequencing,
body: Vec<u8>,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
let header = MessageHeader {
detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted {
direct,
remote_safety_route,
sequencing,
}),
timestamp: Timestamp::now(),
body_len: (body.len() as u64).into(),
};
let msg = MessageEncoded {
header,
data: MessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send((Span::current(), msg))
.map_err(|e| eyre!("failed to enqueue safety routed RPC message: {}", e))?;
Ok(())
}
#[instrument(level = "trace", target = "rpc", skip_all)]
fn enqueue_private_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,
remote_safety_route: PublicKey,
private_route: PublicKey,
safety_spec: SafetySpec,
body: Vec<u8>,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
let header = MessageHeader {
detail: RPCMessageHeaderDetail::PrivateRouted(RPCMessageHeaderDetailPrivateRouted {
direct,
remote_safety_route,
private_route,
safety_spec,
}),
timestamp: Timestamp::now(),
body_len: (body.len() as u64).into(),
};
let msg = MessageEncoded {
header,
data: MessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send((Span::current(), msg))
.map_err(|e| eyre!("failed to enqueue private routed RPC message: {}", e))?;
Ok(())
}
}

View File

@ -0,0 +1,247 @@
use futures_util::StreamExt as _;
use stop_token::future::FutureExt as _;
use super::*;
#[derive(Debug)]
pub(super) enum RPCWorkerRequestKind {
Message { message_encoded: MessageEncoded },
}
#[derive(Debug)]
pub(super) struct RPCWorkerRequest {
enqueued_ts: Timestamp,
span: Span,
kind: RPCWorkerRequestKind,
}
impl RPCProcessor {
pub(super) fn startup_rpc_workers(&self) -> EyreResult<()> {
let mut inner = self.inner.lock();
// Relay workers
let channel = flume::bounded(self.queue_size as usize);
inner.rpc_send_channel = Some(channel.0.clone());
inner.rpc_stop_source = Some(StopSource::new());
// spin up N workers
veilid_log!(self debug "Starting {} RPC workers", self.concurrency);
for task_n in 0..self.concurrency {
let registry = self.registry();
let receiver = channel.1.clone();
let stop_token = inner.rpc_stop_source.as_ref().unwrap().token();
let jh = spawn(&format!("relay worker {}", task_n), async move {
let this = registry.rpc_processor();
Box::pin(this.rpc_worker(stop_token, receiver)).await
});
inner.rpc_worker_join_handles.push(jh);
}
Ok(())
}
pub(super) async fn shutdown_rpc_workers(&self) {
// Stop the rpc workers
let mut unord = FuturesUnordered::new();
{
let mut inner = self.inner.lock();
// take the join handles out
for h in inner.rpc_worker_join_handles.drain(..) {
unord.push(h);
}
// drop the stop
drop(inner.rpc_stop_source.take());
}
veilid_log!(self debug "Stopping {} RPC workers", unord.len());
// Wait for them to complete
while unord.next().await.is_some() {}
}
async fn rpc_worker(&self, stop_token: StopToken, receiver: flume::Receiver<RPCWorkerRequest>) {
while let Ok(Ok(request)) = receiver.recv_async().timeout_at(stop_token.clone()).await {
let rpc_request_span = tracing::trace_span!("rpc request");
rpc_request_span.follows_from(request.span);
// Measure dequeue time
let dequeue_ts = Timestamp::now();
let dequeue_latency = dequeue_ts.saturating_sub(request.enqueued_ts);
// Process request kind
match request.kind {
// Process RPC Message
RPCWorkerRequestKind::Message { message_encoded } => {
network_result_value_or_log!(self match self
.process_rpc_message(message_encoded).instrument(rpc_request_span)
.await
{
Err(e) => {
veilid_log!(self error "couldn't process rpc message: {}", e);
continue;
}
Ok(v) => {
v
}
} => [ format!(": msg.header={:?}", message_encoded.header) ] {});
}
}
// Measure process time
let process_ts = Timestamp::now();
let process_latency = process_ts.saturating_sub(dequeue_ts);
// Accounting
let mut inner = self.inner.lock();
inner.rpc_worker_dequeue_latency = inner
.rpc_worker_dequeue_latency_accounting
.record_latency(dequeue_latency);
inner.rpc_worker_process_latency = inner
.rpc_worker_process_latency_accounting
.record_latency(process_latency);
}
}
#[instrument(level = "trace", target = "rpc", skip_all)]
pub fn enqueue_direct_message(
&self,
envelope: Envelope,
sender_noderef: FilteredNodeRef,
flow: Flow,
routing_domain: RoutingDomain,
body: Vec<u8>,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
if sender_noderef.routing_domain_set() != routing_domain {
bail!("routing domain should match peer noderef filter");
}
let header = MessageHeader {
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
envelope,
sender_noderef,
flow,
routing_domain,
}),
timestamp: Timestamp::now(),
body_len: ByteCount::new(body.len() as u64),
};
let message_encoded = MessageEncoded {
header,
data: MessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.rpc_send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send(RPCWorkerRequest {
enqueued_ts: Timestamp::now(),
span: Span::current(),
kind: RPCWorkerRequestKind::Message { message_encoded },
})
.map_err(|e| eyre!("failed to enqueue direct RPC message: {}", e))?;
Ok(())
}
#[instrument(level = "trace", target = "rpc", skip_all)]
pub(super) fn enqueue_safety_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,
remote_safety_route: PublicKey,
sequencing: Sequencing,
body: Vec<u8>,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
let header = MessageHeader {
detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted {
direct,
remote_safety_route,
sequencing,
}),
timestamp: Timestamp::now(),
body_len: (body.len() as u64).into(),
};
let message_encoded = MessageEncoded {
header,
data: MessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.rpc_send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send(RPCWorkerRequest {
enqueued_ts: Timestamp::now(),
span: Span::current(),
kind: RPCWorkerRequestKind::Message { message_encoded },
})
.map_err(|e| eyre!("failed to enqueue safety routed RPC message: {}", e))?;
Ok(())
}
#[instrument(level = "trace", target = "rpc", skip_all)]
pub(super) fn enqueue_private_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,
remote_safety_route: PublicKey,
private_route: PublicKey,
safety_spec: SafetySpec,
body: Vec<u8>,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
let header = MessageHeader {
detail: RPCMessageHeaderDetail::PrivateRouted(RPCMessageHeaderDetailPrivateRouted {
direct,
remote_safety_route,
private_route,
safety_spec,
}),
timestamp: Timestamp::now(),
body_len: (body.len() as u64).into(),
};
let message_encoded = MessageEncoded {
header,
data: MessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.rpc_send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send(RPCWorkerRequest {
enqueued_ts: Timestamp::now(),
span: Span::current(),
kind: RPCWorkerRequestKind::Message { message_encoded },
})
.map_err(|e| eyre!("failed to enqueue private routed RPC message: {}", e))?;
Ok(())
}
}

View File

@ -0,0 +1,153 @@
use super::*;
// Latency entry is per round-trip packet (ping or data)
// - Size is number of entries
const ROLLING_LATENCIES_SIZE: usize = 50;
// Transfers entries are in bytes total for the interval
// - Size is number of entries
// - Interval is number of seconds in each entry
const ROLLING_TRANSFERS_SIZE: usize = 10;
pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct TransferCount {
down: ByteCount,
up: ByteCount,
}
#[derive(Debug, Clone, Default)]
pub struct TransferStatsAccounting {
rolling_transfers: VecDeque<TransferCount>,
current_transfer: TransferCount,
}
impl TransferStatsAccounting {
pub fn new() -> Self {
Self {
rolling_transfers: VecDeque::new(),
current_transfer: TransferCount::default(),
}
}
pub fn add_down(&mut self, bytes: ByteCount) {
self.current_transfer.down += bytes;
}
pub fn add_up(&mut self, bytes: ByteCount) {
self.current_transfer.up += bytes;
}
pub fn roll_transfers(
&mut self,
last_ts: Timestamp,
cur_ts: Timestamp,
transfer_stats: &mut TransferStatsDownUp,
) {
let dur_ms = cur_ts.saturating_sub(last_ts) / 1000u64;
while self.rolling_transfers.len() >= ROLLING_TRANSFERS_SIZE {
self.rolling_transfers.pop_front();
}
self.rolling_transfers.push_back(self.current_transfer);
transfer_stats.down.total += self.current_transfer.down;
transfer_stats.up.total += self.current_transfer.up;
self.current_transfer = TransferCount::default();
transfer_stats.down.maximum = 0.into();
transfer_stats.up.maximum = 0.into();
transfer_stats.down.minimum = u64::MAX.into();
transfer_stats.up.minimum = u64::MAX.into();
transfer_stats.down.average = 0.into();
transfer_stats.up.average = 0.into();
for xfer in &self.rolling_transfers {
let bpsd = xfer.down * 1000u64 / dur_ms;
let bpsu = xfer.up * 1000u64 / dur_ms;
transfer_stats.down.maximum.max_assign(bpsd);
transfer_stats.up.maximum.max_assign(bpsu);
transfer_stats.down.minimum.min_assign(bpsd);
transfer_stats.up.minimum.min_assign(bpsu);
transfer_stats.down.average += bpsd;
transfer_stats.up.average += bpsu;
}
let len = self.rolling_transfers.len() as u64;
if len > 0 {
transfer_stats.down.average /= len;
transfer_stats.up.average /= len;
}
}
}
#[derive(Debug, Clone, Default)]
pub struct LatencyStatsAccounting {
rolling_latencies: VecDeque<TimestampDuration>,
}
impl LatencyStatsAccounting {
pub fn new() -> Self {
Self {
rolling_latencies: VecDeque::new(),
}
}
fn get_tm_n(sorted_latencies: &[TimestampDuration], n: usize) -> Option<TimestampDuration> {
let tmcount = sorted_latencies.len() * n / 100;
if tmcount == 0 {
None
} else {
let mut tm = TimestampDuration::new(0);
for l in &sorted_latencies[..tmcount] {
tm += *l;
}
tm /= tmcount as u64;
Some(tm)
}
}
fn get_p_n(sorted_latencies: &[TimestampDuration], n: usize) -> TimestampDuration {
let pindex = (sorted_latencies.len() * n / 100).saturating_sub(1);
sorted_latencies[pindex]
}
pub fn record_latency(&mut self, latency: TimestampDuration) -> LatencyStats {
while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE {
self.rolling_latencies.pop_front();
}
self.rolling_latencies.push_back(latency);
// Calculate latency stats
let mut fastest = TimestampDuration::new(u64::MAX);
let mut slowest = TimestampDuration::new(0u64);
let mut average = TimestampDuration::new(0u64);
for rl in &self.rolling_latencies {
fastest.min_assign(*rl);
slowest.max_assign(*rl);
average += *rl;
}
let len = self.rolling_latencies.len() as u64;
if len > 0 {
average /= len;
}
let mut sorted_latencies: Vec<_> = self.rolling_latencies.iter().copied().collect();
sorted_latencies.sort();
let tm90 = Self::get_tm_n(&sorted_latencies, 90).unwrap_or(average);
let tm75 = Self::get_tm_n(&sorted_latencies, 75).unwrap_or(average);
let p90 = Self::get_p_n(&sorted_latencies, 90);
let p75 = Self::get_p_n(&sorted_latencies, 75);
LatencyStats {
fastest,
average,
slowest,
tm90,
tm75,
p90,
p75,
}
}
}

View File

@ -761,7 +761,9 @@ impl VeilidAPI {
async fn debug_nodeinfo(&self, _args: String) -> VeilidAPIResult<String> {
// Dump routing table entry
let registry = self.core_context()?.registry();
let nodeinfo = registry.routing_table().debug_info_nodeinfo();
let nodeinfo_rtab = registry.routing_table().debug_info_nodeinfo();
let nodeinfo_net = registry.network_manager().debug_info_nodeinfo();
let nodeinfo_rpc = registry.rpc_processor().debug_info_nodeinfo();
// Dump core state
let state = self.get_state().await?;
@ -790,7 +792,10 @@ impl VeilidAPI {
"Connection manager unavailable when detached".to_owned()
};
Ok(format!("{}\n{}\n{}\n", nodeinfo, peertable, connman))
Ok(format!(
"{}\n{}\n{}\n{}\n{}\n",
nodeinfo_rtab, nodeinfo_net, nodeinfo_rpc, peertable, connman
))
}
fn debug_nodeid(&self, _args: String) -> VeilidAPIResult<String> {

View File

@ -36,7 +36,7 @@ debug-load = ["dep:ctor", "dep:libc-print", "dep:android_log-sys", "dep:oslog"]
[dependencies]
veilid-core = { path = "../../veilid-core", default-features = false }
tracing = { version = "0.1.40", features = ["log", "attributes"] }
tracing-subscriber = "0.3.18"
tracing-subscriber = "0.3.19"
parking_lot = "0.12.3"
backtrace = "0.3.71"
serde_json = "1.0.120"

View File

@ -125,27 +125,33 @@ pub fn display_duration(dur: u64) -> String {
let secs = dur / SEC;
let dur = dur % SEC;
let msecs = dur / MSEC;
let dur = dur % MSEC;
format!(
"{}{}{}{}.{:03}s",
if days != 0 {
format!("{}d", days)
} else {
"".to_owned()
},
if hours != 0 {
format!("{}h", hours)
} else {
"".to_owned()
},
if mins != 0 {
format!("{}m", mins)
} else {
"".to_owned()
},
secs,
msecs
)
// microseconds format
if days == 0 && hours == 0 && mins == 0 && secs == 0 {
format!("{}.{:03}ms", msecs, dur)
} else {
format!(
"{}{}{}{}.{:03}s",
if days != 0 {
format!("{}d", days)
} else {
"".to_owned()
},
if hours != 0 {
format!("{}h", hours)
} else {
"".to_owned()
},
if mins != 0 {
format!("{}m", mins)
} else {
"".to_owned()
},
secs,
msecs
)
}
}
#[must_use]