instruments

This commit is contained in:
Christien Rioux 2024-07-03 21:00:12 -04:00
parent c69dabf721
commit 76f5052960
28 changed files with 997 additions and 757 deletions

View File

@ -211,7 +211,7 @@ impl AttachmentManager {
} }
} }
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip_all)]
async fn attachment_maintainer(self) { async fn attachment_maintainer(self) {
log_net!(debug "attachment starting"); log_net!(debug "attachment starting");
self.update_attaching_detaching_state(AttachmentState::Attaching); self.update_attaching_detaching_state(AttachmentState::Attaching);
@ -286,7 +286,7 @@ impl AttachmentManager {
Ok(()) Ok(())
} }
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip_all)]
pub async fn terminate(&self) { pub async fn terminate(&self) {
// Ensure we detached // Ensure we detached
self.detach().await; self.detach().await;
@ -294,7 +294,7 @@ impl AttachmentManager {
self.inner.lock().update_callback = None; self.inner.lock().update_callback = None;
} }
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", skip_all)]
pub async fn attach(&self) -> bool { pub async fn attach(&self) -> bool {
// Create long-running connection maintenance routine // Create long-running connection maintenance routine
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -307,7 +307,7 @@ impl AttachmentManager {
true true
} }
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", skip_all)]
pub async fn detach(&self) -> bool { pub async fn detach(&self) -> bool {
let attachment_maintainer_jh = { let attachment_maintainer_jh = {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();

View File

@ -62,7 +62,7 @@ impl ServicesContext {
} }
} }
#[instrument(err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
pub async fn startup(&mut self) -> EyreResult<()> { pub async fn startup(&mut self) -> EyreResult<()> {
info!("Veilid API starting up"); info!("Veilid API starting up");
@ -151,7 +151,7 @@ impl ServicesContext {
Ok(()) Ok(())
} }
#[instrument(skip_all)] #[instrument(level = "trace", target = "core_context", skip_all)]
pub async fn shutdown(&mut self) { pub async fn shutdown(&mut self) {
info!("Veilid API shutting down"); info!("Veilid API shutting down");
@ -200,7 +200,7 @@ pub(crate) struct VeilidCoreContext {
} }
impl VeilidCoreContext { impl VeilidCoreContext {
#[instrument(err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
async fn new_with_config_callback( async fn new_with_config_callback(
update_callback: UpdateCallback, update_callback: UpdateCallback,
config_callback: ConfigCallback, config_callback: ConfigCallback,
@ -212,7 +212,7 @@ impl VeilidCoreContext {
Self::new_common(update_callback, config).await Self::new_common(update_callback, config).await
} }
#[instrument(err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
async fn new_with_config_json( async fn new_with_config_json(
update_callback: UpdateCallback, update_callback: UpdateCallback,
config_json: String, config_json: String,
@ -223,7 +223,7 @@ impl VeilidCoreContext {
Self::new_common(update_callback, config).await Self::new_common(update_callback, config).await
} }
#[instrument(err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
async fn new_with_config( async fn new_with_config(
update_callback: UpdateCallback, update_callback: UpdateCallback,
config_inner: VeilidConfigInner, config_inner: VeilidConfigInner,
@ -234,7 +234,7 @@ impl VeilidCoreContext {
Self::new_common(update_callback, config).await Self::new_common(update_callback, config).await
} }
#[instrument(err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
async fn new_common( async fn new_common(
update_callback: UpdateCallback, update_callback: UpdateCallback,
config: VeilidConfig, config: VeilidConfig,
@ -263,7 +263,7 @@ impl VeilidCoreContext {
}) })
} }
#[instrument(skip_all)] #[instrument(level = "trace", target = "core_context", skip_all)]
async fn shutdown(self) { async fn shutdown(self) {
let mut sc = ServicesContext::new_full( let mut sc = ServicesContext::new_full(
self.config.clone(), self.config.clone(),
@ -294,7 +294,7 @@ lazy_static::lazy_static! {
/// * `config_callback` - called at startup to supply a configuration object directly to Veilid. /// * `config_callback` - called at startup to supply a configuration object directly to Veilid.
/// ///
/// Returns a [VeilidAPI] object that can be used to operate the node. /// Returns a [VeilidAPI] object that can be used to operate the node.
#[instrument(err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
pub async fn api_startup( pub async fn api_startup(
update_callback: UpdateCallback, update_callback: UpdateCallback,
config_callback: ConfigCallback, config_callback: ConfigCallback,
@ -325,7 +325,7 @@ pub async fn api_startup(
/// * `config_json` - called at startup to supply a JSON configuration object. /// * `config_json` - called at startup to supply a JSON configuration object.
/// ///
/// Returns a [VeilidAPI] object that can be used to operate the node. /// Returns a [VeilidAPI] object that can be used to operate the node.
#[instrument(err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
pub async fn api_startup_json( pub async fn api_startup_json(
update_callback: UpdateCallback, update_callback: UpdateCallback,
config_json: String, config_json: String,
@ -355,7 +355,7 @@ pub async fn api_startup_json(
/// * `config` - called at startup to supply a configuration object. /// * `config` - called at startup to supply a configuration object.
/// ///
/// Returns a [VeilidAPI] object that can be used to operate the node. /// Returns a [VeilidAPI] object that can be used to operate the node.
#[instrument(err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
pub async fn api_startup_config( pub async fn api_startup_config(
update_callback: UpdateCallback, update_callback: UpdateCallback,
config: VeilidConfigInner, config: VeilidConfigInner,
@ -377,7 +377,7 @@ pub async fn api_startup_config(
Ok(veilid_api) Ok(veilid_api)
} }
#[instrument(skip_all)] #[instrument(level = "trace", target = "core_context", skip_all)]
pub(crate) async fn api_shutdown(context: VeilidCoreContext) { pub(crate) async fn api_shutdown(context: VeilidCoreContext) {
let mut initialized_lock = INITIALIZED.lock().await; let mut initialized_lock = INITIALIZED.lock().await;
context.shutdown().await; context.shutdown().await;

View File

@ -128,7 +128,7 @@ impl Crypto {
self.unlocked_inner.config.clone() self.unlocked_inner.config.clone()
} }
#[instrument(skip_all, err)] #[instrument(level = "trace", target = "crypto", skip_all, err)]
pub async fn init(&self) -> EyreResult<()> { pub async fn init(&self) -> EyreResult<()> {
let table_store = self.unlocked_inner.table_store.clone(); let table_store = self.unlocked_inner.table_store.clone();
// Init node id from config // Init node id from config

View File

@ -61,7 +61,7 @@ mod wasm_helpers;
pub use self::core_context::{api_startup, api_startup_config, api_startup_json, UpdateCallback}; pub use self::core_context::{api_startup, api_startup_config, api_startup_json, UpdateCallback};
pub use self::logging::{ pub use self::logging::{
ApiTracingLayer, VeilidLayerFilter, DEFAULT_LOG_FACILITIES_ENABLED_LIST, ApiTracingLayer, VeilidLayerFilter, DEFAULT_LOG_FACILITIES_ENABLED_LIST,
DEFAULT_LOG_FACILITIES_IGNORE_LIST, DURATION_LOG_FACILITIES, DEFAULT_LOG_FACILITIES_IGNORE_LIST, DURATION_LOG_FACILITIES, FLAME_LOG_FACILITIES_IGNORE_LIST,
}; };
pub use self::veilid_api::*; pub use self::veilid_api::*;
pub use self::veilid_config::*; pub use self::veilid_config::*;

View File

@ -29,6 +29,31 @@ pub static DEFAULT_LOG_FACILITIES_IGNORE_LIST: [&str; 28] = [
"receipt", "receipt",
]; ];
pub static FLAME_LOG_FACILITIES_IGNORE_LIST: [&str; 22] = [
"mio",
"h2",
"hyper",
"tower",
"tonic",
"tokio",
"runtime",
"tokio_util",
"want",
"serial_test",
"async_std",
"async_io",
"polling",
"rustls",
"async_tungstenite",
"tungstenite",
"netlink_proto",
"netlink_sys",
"hickory_resolver",
"hickory_proto",
"attohttpc",
"ws_stream_wasm",
];
pub static DEFAULT_LOG_FACILITIES_ENABLED_LIST: [&str; 8] = [ pub static DEFAULT_LOG_FACILITIES_ENABLED_LIST: [&str; 8] = [
"net", "net",
"rpc", "rpc",

View File

@ -16,12 +16,12 @@ pub struct VeilidLayerFilter {
impl VeilidLayerFilter { impl VeilidLayerFilter {
pub fn new( pub fn new(
max_level: VeilidConfigLogLevel, max_level: VeilidConfigLogLevel,
ignore_log_targets: &[String], ignore_change_list: &[String],
) -> VeilidLayerFilter { ) -> VeilidLayerFilter {
let mut ignore_list = DEFAULT_LOG_FACILITIES_IGNORE_LIST let mut ignore_list = DEFAULT_LOG_FACILITIES_IGNORE_LIST
.map(|x| x.to_owned()) .map(|x| x.to_owned())
.to_vec(); .to_vec();
Self::apply_ignore_change_list(&mut ignore_list, ignore_log_targets); Self::apply_ignore_change_list(&mut ignore_list, ignore_change_list);
Self { Self {
inner: Arc::new(RwLock::new(VeilidLayerFilterInner { inner: Arc::new(RwLock::new(VeilidLayerFilterInner {
max_level: max_level.to_tracing_level_filter(), max_level: max_level.to_tracing_level_filter(),
@ -29,6 +29,17 @@ impl VeilidLayerFilter {
})), })),
} }
} }
pub fn new_no_default(
max_level: VeilidConfigLogLevel,
ignore_list: &[String],
) -> VeilidLayerFilter {
Self {
inner: Arc::new(RwLock::new(VeilidLayerFilterInner {
max_level: max_level.to_tracing_level_filter(),
ignore_list: ignore_list.to_vec(),
})),
}
}
pub fn max_level(&self) -> VeilidConfigLogLevel { pub fn max_level(&self) -> VeilidConfigLogLevel {
let inner = self.inner.read(); let inner = self.inner.read();
@ -102,6 +113,9 @@ impl VeilidLayerFilter {
.map(|x| x.to_owned()) .map(|x| x.to_owned())
.collect(); .collect();
continue; continue;
} else if change == "none" {
ignore_list.clear();
continue;
} else if change == "default" { } else if change == "default" {
*ignore_list = [DEFAULT_LOG_FACILITIES_IGNORE_LIST.to_vec()] *ignore_list = [DEFAULT_LOG_FACILITIES_IGNORE_LIST.to_vec()]
.concat() .concat()

View File

@ -186,7 +186,7 @@ impl ConnectionManager {
// Internal routine to register new connection atomically. // Internal routine to register new connection atomically.
// Registers connection in the connection table for later access // Registers connection in the connection table for later access
// and spawns a message processing loop for the connection // and spawns a message processing loop for the connection
#[instrument(level = "trace", skip(self, inner), ret, err)] //#[instrument(level = "trace", skip(self, inner), ret, err)]
fn on_new_protocol_network_connection( fn on_new_protocol_network_connection(
&self, &self,
inner: &mut ConnectionManagerInner, inner: &mut ConnectionManagerInner,
@ -420,7 +420,7 @@ impl ConnectionManager {
} }
} }
#[instrument(level = "trace", skip_all)] //#[instrument(level = "trace", skip_all)]
async fn async_processor( async fn async_processor(
self, self,
stop_token: StopToken, stop_token: StopToken,
@ -438,7 +438,7 @@ impl ConnectionManager {
// Called by low-level network when any connection-oriented protocol connection appears // Called by low-level network when any connection-oriented protocol connection appears
// either from incoming connections. // either from incoming connections.
#[cfg_attr(target_arch = "wasm32", allow(dead_code))] //#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
pub(super) async fn on_accepted_protocol_network_connection( pub(super) async fn on_accepted_protocol_network_connection(
&self, &self,
protocol_connection: ProtocolNetworkConnection, protocol_connection: ProtocolNetworkConnection,

View File

@ -901,17 +901,6 @@ impl NetworkManager {
// and passes it to the RPC handler // and passes it to the RPC handler
#[instrument(level = "trace", target = "receipt", skip_all)] #[instrument(level = "trace", target = "receipt", skip_all)]
async fn on_recv_envelope(&self, data: &mut [u8], flow: Flow) -> EyreResult<bool> { async fn on_recv_envelope(&self, data: &mut [u8], flow: Flow) -> EyreResult<bool> {
#[cfg(feature = "verbose-tracing")]
let root = span!(
parent: None,
Level::TRACE,
"on_recv_envelope",
"data.len" = data.len(),
"flow" = ?flow
);
#[cfg(feature = "verbose-tracing")]
let _root_enter = root.enter();
log_net!("envelope of {} bytes received from {:?}", data.len(), flow); log_net!("envelope of {} bytes received from {:?}", data.len(), flow);
let remote_addr = flow.remote_address().ip_addr(); let remote_addr = flow.remote_address().ip_addr();

View File

@ -59,6 +59,7 @@ impl IGDManager {
} }
} }
#[instrument(level = "trace", target = "net", skip_all)]
fn get_routed_local_ip_address(address_type: AddressType) -> Option<IpAddr> { fn get_routed_local_ip_address(address_type: AddressType) -> Option<IpAddr> {
let socket = match UdpSocket::bind(match address_type { let socket = match UdpSocket::bind(match address_type {
AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
@ -90,6 +91,7 @@ impl IGDManager {
Some(socket.local_addr().ok()?.ip()) Some(socket.local_addr().ok()?.ip())
} }
#[instrument(level = "trace", target = "net", skip_all)]
fn find_local_ip(inner: &mut IGDManagerInner, fn find_local_ip(inner: &mut IGDManagerInner,
address_type: AddressType, address_type: AddressType,
) -> Option<IpAddr> { ) -> Option<IpAddr> {
@ -109,6 +111,7 @@ impl IGDManager {
Some(ip) Some(ip)
} }
#[instrument(level = "trace", target = "net", skip_all)]
fn get_local_ip( fn get_local_ip(
inner: &mut IGDManagerInner, inner: &mut IGDManagerInner,
address_type: AddressType, address_type: AddressType,
@ -119,6 +122,7 @@ impl IGDManager {
None None
} }
#[instrument(level = "trace", target = "net", skip_all)]
fn find_gateway( fn find_gateway(
inner: &mut IGDManagerInner, inner: &mut IGDManagerInner,
local_ip: IpAddr, local_ip: IpAddr,
@ -165,6 +169,7 @@ impl IGDManager {
Some(gw) Some(gw)
} }
#[instrument(level = "trace", target = "net", skip_all)]
fn get_gateway( fn get_gateway(
inner: &mut IGDManagerInner, inner: &mut IGDManagerInner,
local_ip: IpAddr, local_ip: IpAddr,
@ -179,6 +184,7 @@ impl IGDManager {
format!("{} map {} for port {}", self.config.get().program_name, convert_llpt(llpt), local_port ) format!("{} map {} for port {}", self.config.get().program_name, convert_llpt(llpt), local_port )
} }
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn unmap_port(&self, pub async fn unmap_port(&self,
llpt: LowLevelProtocolType, llpt: LowLevelProtocolType,
at: AddressType, at: AddressType,
@ -220,6 +226,7 @@ impl IGDManager {
.await .await
} }
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn map_any_port( pub async fn map_any_port(
&self, &self,
llpt: LowLevelProtocolType, llpt: LowLevelProtocolType,
@ -303,6 +310,7 @@ impl IGDManager {
.await .await
} }
#[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn tick(&self) -> EyreResult<bool> { pub async fn tick(&self) -> EyreResult<bool> {
// Refresh mappings if we have them // Refresh mappings if we have them
// If an error is received, then return false to restart the local network // If an error is received, then return false to restart the local network
@ -426,6 +434,6 @@ impl IGDManager {
// Normal exit, no restart // Normal exit, no restart
Ok(true) Ok(true)
}, Err(eyre!("failed to process blocking task"))).await }, Err(eyre!("failed to process blocking task"))).in_current_span().await
} }
} }

View File

@ -976,10 +976,10 @@ impl Network {
////////////////////////////////////////// //////////////////////////////////////////
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn network_interfaces_task_routine( pub async fn network_interfaces_task_routine(
self, self,
stop_token: StopToken, _stop_token: StopToken,
_l: u64, _l: u64,
_t: u64, _t: u64,
) -> EyreResult<()> { ) -> EyreResult<()> {
@ -988,10 +988,10 @@ impl Network {
Ok(()) Ok(())
} }
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn upnp_task_routine( pub async fn upnp_task_routine(
self, self,
stop_token: StopToken, _stop_token: StopToken,
_l: u64, _l: u64,
_t: u64, _t: u64,
) -> EyreResult<()> { ) -> EyreResult<()> {
@ -1004,6 +1004,7 @@ impl Network {
Ok(()) Ok(())
} }
#[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
let (detect_address_changes, upnp) = { let (detect_address_changes, upnp) = {
let config = self.network_manager().config(); let config = self.network_manager().config();

View File

@ -299,7 +299,7 @@ impl NetworkConnection {
}; };
let timer = MutableFuture::new(new_timer()); let timer = MutableFuture::new(new_timer());
unord.push(system_boxed(timer.clone().instrument(Span::current()))); unord.push(system_boxed(timer.clone().in_current_span()));
loop { loop {
// Add another message sender future if necessary // Add another message sender future if necessary
@ -309,10 +309,6 @@ impl NetworkConnection {
match res { match res {
Ok((_span_id, message)) => { Ok((_span_id, message)) => {
let recv_span = span!(Level::TRACE, "process_connection recv");
// xxx: causes crash (Missing otel data span extensions)
// recv_span.follows_from(span_id);
// Touch the LRU for this connection // Touch the LRU for this connection
connection_manager.touch_connection_by_id(connection_id); connection_manager.touch_connection_by_id(connection_id);
@ -321,7 +317,7 @@ impl NetworkConnection {
&protocol_connection, &protocol_connection,
stats.clone(), stats.clone(),
message, message,
).instrument(recv_span) ).in_current_span()
.await .await
{ {
// Sending the packet along can fail, if so, this connection is dead // Sending the packet along can fail, if so, this connection is dead
@ -338,7 +334,7 @@ impl NetworkConnection {
} }
} }
}); });
unord.push(system_boxed(sender_fut.instrument(Span::current()))); unord.push(system_boxed(sender_fut.in_current_span()));
} }
// Add another message receiver future if necessary // Add another message receiver future if necessary
@ -393,9 +389,9 @@ impl NetworkConnection {
RecvLoopAction::Finish RecvLoopAction::Finish
} }
} }
}); }.in_current_span());
unord.push(system_boxed(receiver_fut.instrument(Span::current()))); unord.push(system_boxed(receiver_fut.in_current_span()));
} }
// Process futures // Process futures
@ -445,7 +441,7 @@ impl NetworkConnection {
log_net!(debug "Protocol connection close error: {}", e); log_net!(debug "Protocol connection close error: {}", e);
} }
}.instrument(trace_span!("process_connection"))) }.in_current_span())
} }
pub fn debug_print(&self, cur_ts: Timestamp) -> String { pub fn debug_print(&self, cur_ts: Timestamp) -> String {

View File

@ -290,7 +290,11 @@ impl ReceiptManager {
if now >= next_oldest_ts { if now >= next_oldest_ts {
// Single-spawn the timeout task routine // Single-spawn the timeout task routine
let _ = timeout_task let _ = timeout_task
.single_spawn(self.clone().timeout_task_routine(now, stop_token)) .single_spawn(
self.clone()
.timeout_task_routine(now, stop_token)
.in_current_span(),
)
.await; .await;
} }
} }

View File

@ -11,14 +11,11 @@ impl NetworkManager {
self.unlocked_inner self.unlocked_inner
.rolling_transfers_task .rolling_transfers_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().rolling_transfers_task_routine(
this.clone() s,
.rolling_transfers_task_routine(s, Timestamp::new(l), Timestamp::new(t)) Timestamp::new(l),
.instrument(trace_span!( Timestamp::new(t),
parent: None, ))
"NetworkManager rolling transfers task routine"
)),
)
}); });
} }
@ -28,18 +25,11 @@ impl NetworkManager {
self.unlocked_inner self.unlocked_inner
.public_address_check_task .public_address_check_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().public_address_check_task_routine(
this.clone() s,
.public_address_check_task_routine( Timestamp::new(l),
s, Timestamp::new(t),
Timestamp::new(l), ))
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"public address check task routine"
)),
)
}); });
} }
@ -49,11 +39,11 @@ impl NetworkManager {
self.unlocked_inner self.unlocked_inner
.address_filter_task .address_filter_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.address_filter().address_filter_task_routine(
this.address_filter() s,
.address_filter_task_routine(s, Timestamp::new(l), Timestamp::new(t)) Timestamp::new(l),
.instrument(trace_span!(parent: None, "address filter task routine")), Timestamp::new(t),
) ))
}); });
} }
} }

View File

@ -653,6 +653,7 @@ impl RoutingTable {
} }
/// Resolve an existing routing table entry and return a filtered reference to it /// Resolve an existing routing table entry and return a filtered reference to it
#[instrument(level = "trace", skip_all)]
pub fn lookup_and_filter_noderef( pub fn lookup_and_filter_noderef(
&self, &self,
node_id: TypedKey, node_id: TypedKey,
@ -670,6 +671,7 @@ impl RoutingTable {
/// Shortcut function to add a node to our routing table if it doesn't exist /// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the dial info we have for it. Returns a noderef filtered to /// and add the dial info we have for it. Returns a noderef filtered to
/// the routing domain in which this node was registered for convenience. /// the routing domain in which this node was registered for convenience.
#[instrument(level = "trace", skip_all, err)]
pub fn register_node_with_peer_info( pub fn register_node_with_peer_info(
&self, &self,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
@ -686,6 +688,7 @@ impl RoutingTable {
/// Shortcut function to add a node to our routing table if it doesn't exist /// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the last peer address we have for it, since that's pretty common /// and add the last peer address we have for it, since that's pretty common
#[instrument(level = "trace", skip_all, err)]
pub fn register_node_with_existing_connection( pub fn register_node_with_existing_connection(
&self, &self,
node_id: TypedKey, node_id: TypedKey,
@ -707,6 +710,7 @@ impl RoutingTable {
self.inner.read().get_routing_table_health() self.inner.read().get_routing_table_health()
} }
#[instrument(level = "trace", skip_all)]
pub fn get_recent_peers(&self) -> Vec<(TypedKey, RecentPeersEntry)> { pub fn get_recent_peers(&self) -> Vec<(TypedKey, RecentPeersEntry)> {
let mut recent_peers = Vec::new(); let mut recent_peers = Vec::new();
let mut dead_peers = Vec::new(); let mut dead_peers = Vec::new();

View File

@ -627,6 +627,7 @@ impl RoutingTableInner {
/// Create a node reference, possibly creating a bucket entry /// Create a node reference, possibly creating a bucket entry
/// the 'update_func' closure is called on the node, and, if created, /// the 'update_func' closure is called on the node, and, if created,
/// in a locked fashion as to ensure the bucket entry state is always valid /// in a locked fashion as to ensure the bucket entry state is always valid
#[instrument(level = "trace", skip_all, err)]
fn create_node_ref<F>( fn create_node_ref<F>(
&mut self, &mut self,
outer_self: RoutingTable, outer_self: RoutingTable,
@ -707,6 +708,7 @@ impl RoutingTableInner {
} }
/// Resolve an existing routing table entry using any crypto kind and return a reference to it /// Resolve an existing routing table entry using any crypto kind and return a reference to it
#[instrument(level = "trace", skip_all, err)]
pub fn lookup_any_node_ref( pub fn lookup_any_node_ref(
&self, &self,
outer_self: RoutingTable, outer_self: RoutingTable,
@ -723,6 +725,7 @@ impl RoutingTableInner {
} }
/// Resolve an existing routing table entry and return a reference to it /// Resolve an existing routing table entry and return a reference to it
#[instrument(level = "trace", skip_all, err)]
pub fn lookup_node_ref( pub fn lookup_node_ref(
&self, &self,
outer_self: RoutingTable, outer_self: RoutingTable,
@ -743,6 +746,7 @@ impl RoutingTableInner {
} }
/// Resolve an existing routing table entry and return a filtered reference to it /// Resolve an existing routing table entry and return a filtered reference to it
#[instrument(level = "trace", skip_all, err)]
pub fn lookup_and_filter_noderef( pub fn lookup_and_filter_noderef(
&self, &self,
outer_self: RoutingTable, outer_self: RoutingTable,
@ -781,6 +785,7 @@ impl RoutingTableInner {
/// Shortcut function to add a node to our routing table if it doesn't exist /// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the dial info we have for it. Returns a noderef filtered to /// and add the dial info we have for it. Returns a noderef filtered to
/// the routing domain in which this node was registered for convenience. /// the routing domain in which this node was registered for convenience.
#[instrument(level = "trace", skip_all, err)]
pub fn register_node_with_peer_info( pub fn register_node_with_peer_info(
&mut self, &mut self,
outer_self: RoutingTable, outer_self: RoutingTable,
@ -853,6 +858,7 @@ impl RoutingTableInner {
/// Shortcut function to add a node to our routing table if it doesn't exist /// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the last peer address we have for it, since that's pretty common /// and add the last peer address we have for it, since that's pretty common
#[instrument(level = "trace", skip_all, err)]
pub fn register_node_with_existing_connection( pub fn register_node_with_existing_connection(
&mut self, &mut self,
outer_self: RoutingTable, outer_self: RoutingTable,
@ -933,6 +939,7 @@ impl RoutingTableInner {
// Find Nodes // Find Nodes
// Retrieve the fastest nodes in the routing table matching an entry filter // Retrieve the fastest nodes in the routing table matching an entry filter
#[instrument(level = "trace", skip_all)]
pub fn find_fast_public_nodes_filtered( pub fn find_fast_public_nodes_filtered(
&self, &self,
outer_self: RoutingTable, outer_self: RoutingTable,
@ -965,6 +972,7 @@ impl RoutingTableInner {
) )
} }
#[instrument(level = "trace", skip_all)]
pub fn filter_has_valid_signed_node_info( pub fn filter_has_valid_signed_node_info(
&self, &self,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
@ -984,6 +992,7 @@ impl RoutingTableInner {
} }
} }
#[instrument(level = "trace", skip_all)]
pub fn transform_to_peer_info( pub fn transform_to_peer_info(
&self, &self,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
@ -996,6 +1005,7 @@ impl RoutingTableInner {
} }
} }
#[instrument(level = "trace", skip_all)]
pub fn find_peers_with_sort_and_filter<C, T, O>( pub fn find_peers_with_sort_and_filter<C, T, O>(
&self, &self,
node_count: usize, node_count: usize,
@ -1058,6 +1068,7 @@ impl RoutingTableInner {
out out
} }
#[instrument(level = "trace", skip_all)]
pub fn find_preferred_fastest_nodes<T, O>( pub fn find_preferred_fastest_nodes<T, O>(
&self, &self,
node_count: usize, node_count: usize,
@ -1136,6 +1147,7 @@ impl RoutingTableInner {
self.find_peers_with_sort_and_filter(node_count, cur_ts, filters, sort, transform) self.find_peers_with_sort_and_filter(node_count, cur_ts, filters, sort, transform)
} }
#[instrument(level = "trace", skip_all)]
pub fn find_preferred_closest_nodes<T, O>( pub fn find_preferred_closest_nodes<T, O>(
&self, &self,
node_count: usize, node_count: usize,
@ -1222,6 +1234,7 @@ impl RoutingTableInner {
Ok(out) Ok(out)
} }
#[instrument(level = "trace", skip_all)]
pub fn sort_and_clean_closest_noderefs( pub fn sort_and_clean_closest_noderefs(
&self, &self,
node_id: TypedKey, node_id: TypedKey,
@ -1250,6 +1263,7 @@ impl RoutingTableInner {
} }
} }
#[instrument(level = "trace", skip_all)]
pub(crate) fn make_closest_noderef_sort( pub(crate) fn make_closest_noderef_sort(
crypto: Crypto, crypto: Crypto,
node_id: TypedKey, node_id: TypedKey,

View File

@ -17,14 +17,11 @@ impl RoutingTable {
self.unlocked_inner self.unlocked_inner
.rolling_transfers_task .rolling_transfers_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().rolling_transfers_task_routine(
this.clone() s,
.rolling_transfers_task_routine(s, Timestamp::new(l), Timestamp::new(t)) Timestamp::new(l),
.instrument(trace_span!( Timestamp::new(t),
parent: None, ))
"RoutingTable rolling transfers task routine"
)),
)
}); });
} }
@ -34,11 +31,11 @@ impl RoutingTable {
self.unlocked_inner self.unlocked_inner
.kick_buckets_task .kick_buckets_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().kick_buckets_task_routine(
this.clone() s,
.kick_buckets_task_routine(s, Timestamp::new(l), Timestamp::new(t)) Timestamp::new(l),
.instrument(trace_span!(parent: None, "kick buckets task routine")), Timestamp::new(t),
) ))
}); });
} }
@ -47,13 +44,7 @@ impl RoutingTable {
let this = self.clone(); let this = self.clone();
self.unlocked_inner self.unlocked_inner
.bootstrap_task .bootstrap_task
.set_routine(move |s, _l, _t| { .set_routine(move |s, _l, _t| Box::pin(this.clone().bootstrap_task_routine(s)));
Box::pin(
this.clone()
.bootstrap_task_routine(s)
.instrument(trace_span!(parent: None, "bootstrap task routine")),
)
});
} }
// Set peer minimum refresh tick task // Set peer minimum refresh tick task
@ -62,14 +53,7 @@ impl RoutingTable {
self.unlocked_inner self.unlocked_inner
.peer_minimum_refresh_task .peer_minimum_refresh_task
.set_routine(move |s, _l, _t| { .set_routine(move |s, _l, _t| {
Box::pin( Box::pin(this.clone().peer_minimum_refresh_task_routine(s))
this.clone()
.peer_minimum_refresh_task_routine(s)
.instrument(trace_span!(
parent: None,
"peer minimum refresh task routine"
)),
)
}); });
} }
@ -79,14 +63,7 @@ impl RoutingTable {
self.unlocked_inner self.unlocked_inner
.closest_peers_refresh_task .closest_peers_refresh_task
.set_routine(move |s, _l, _t| { .set_routine(move |s, _l, _t| {
Box::pin( Box::pin(this.clone().closest_peers_refresh_task_routine(s))
this.clone()
.closest_peers_refresh_task_routine(s)
.instrument(trace_span!(
parent: None,
"closest peers refresh task routine"
)),
)
}); });
} }
@ -96,11 +73,11 @@ impl RoutingTable {
self.unlocked_inner self.unlocked_inner
.ping_validator_task .ping_validator_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().ping_validator_task_routine(
this.clone() s,
.ping_validator_task_routine(s, Timestamp::new(l), Timestamp::new(t)) Timestamp::new(l),
.instrument(trace_span!(parent: None, "ping validator task routine")), Timestamp::new(t),
) ))
}); });
} }
@ -110,11 +87,11 @@ impl RoutingTable {
self.unlocked_inner self.unlocked_inner
.relay_management_task .relay_management_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().relay_management_task_routine(
this.clone() s,
.relay_management_task_routine(s, Timestamp::new(l), Timestamp::new(t)) Timestamp::new(l),
.instrument(trace_span!(parent: None, "relay management task routine")), Timestamp::new(t),
) ))
}); });
} }
@ -124,18 +101,11 @@ impl RoutingTable {
self.unlocked_inner self.unlocked_inner
.private_route_management_task .private_route_management_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().private_route_management_task_routine(
this.clone() s,
.private_route_management_task_routine( Timestamp::new(l),
s, Timestamp::new(t),
Timestamp::new(l), ))
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"private route management task routine"
)),
)
}); });
} }
} }

View File

@ -2,6 +2,7 @@ use super::*;
impl RoutingTable { impl RoutingTable {
// Check if a relay is desired or not // Check if a relay is desired or not
#[instrument(level = "trace", skip_all)]
fn public_internet_wants_relay(&self) -> Option<RelayKind> { fn public_internet_wants_relay(&self) -> Option<RelayKind> {
let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet);
let own_node_info = own_peer_info.signed_node_info().node_info(); let own_node_info = own_peer_info.signed_node_info().node_info();
@ -47,7 +48,7 @@ impl RoutingTable {
} }
// Keep relays assigned and accessible // Keep relays assigned and accessible
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip_all, err)]
pub(crate) async fn relay_management_task_routine( pub(crate) async fn relay_management_task_routine(
self, self,
_stop_token: StopToken, _stop_token: StopToken,
@ -145,6 +146,7 @@ impl RoutingTable {
Ok(()) Ok(())
} }
#[instrument(level = "trace", skip_all)]
pub fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { pub fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool {
// Get all our outbound protocol/address types // Get all our outbound protocol/address types
let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet); let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet);

View File

@ -171,7 +171,7 @@ pub(crate) struct RPCMessage {
opt_sender_nr: Option<NodeRef>, opt_sender_nr: Option<NodeRef>,
} }
#[instrument(skip_all, err)] #[instrument(level="trace", target="rpc", skip_all, err)]
pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<u8>, RPCError> pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<u8>, RPCError>
where where
T: capnp::message::Allocator + 'a, T: capnp::message::Allocator + 'a,
@ -1669,13 +1669,13 @@ impl RPCProcessor {
while let Ok(Ok((_span_id, msg))) = while let Ok(Ok((_span_id, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await receiver.recv_async().timeout_at(stop_token.clone()).await
{ {
let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv"); //let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv");
// xxx: causes crash (Missing otel data span extensions) // xxx: causes crash (Missing otel data span extensions)
// rpc_worker_span.follows_from(span_id); // rpc_worker_span.follows_from(span_id);
network_result_value_or_log!(match self network_result_value_or_log!(match self
.process_rpc_message(msg) .process_rpc_message(msg).in_current_span()
.instrument(rpc_worker_span) //.instrument(rpc_worker_span)
.await .await
{ {
Err(e) => { Err(e) => {

View File

@ -182,7 +182,7 @@ impl StorageManager {
log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
Ok(NetworkResult::value(gva.answer.peers)) Ok(NetworkResult::value(gva.answer.peers))
} }.in_current_span()
} }
}; };
@ -271,7 +271,7 @@ impl StorageManager {
})) { })) {
log_dht!(debug "Sending GetValue result failed: {}", e); log_dht!(debug "Sending GetValue result failed: {}", e);
} }
})) }.in_current_span()))
.detach(); .detach();
Ok(out_rx) Ok(out_rx)
@ -319,7 +319,7 @@ impl StorageManager {
// Return done // Return done
false false
}) }.in_current_span())
}, },
), ),
); );

View File

@ -228,7 +228,7 @@ impl StorageManager {
log_network_result!(debug "InspectValue fanout call returned peers {}", answer.peers.len()); log_network_result!(debug "InspectValue fanout call returned peers {}", answer.peers.len());
Ok(NetworkResult::value(answer.peers)) Ok(NetworkResult::value(answer.peers))
} }.in_current_span()
}; };
// Routine to call to check if we're done at each step // Routine to call to check if we're done at each step

View File

@ -177,7 +177,7 @@ impl StorageManager {
ctx.send_partial_update = true; ctx.send_partial_update = true;
Ok(NetworkResult::value(sva.answer.peers)) Ok(NetworkResult::value(sva.answer.peers))
} }.in_current_span()
} }
}; };
@ -267,7 +267,7 @@ impl StorageManager {
})) { })) {
log_dht!(debug "Sending SetValue result failed: {}", e); log_dht!(debug "Sending SetValue result failed: {}", e);
} }
})) }.in_current_span()))
.detach(); .detach();
Ok(out_rx) Ok(out_rx)
@ -329,7 +329,7 @@ impl StorageManager {
// Return done // Return done
false false
}) }.in_current_span())
}, },
), ),
); );

View File

@ -15,18 +15,11 @@ impl StorageManager {
self.unlocked_inner self.unlocked_inner
.flush_record_stores_task .flush_record_stores_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().flush_record_stores_task_routine(
this.clone() s,
.flush_record_stores_task_routine( Timestamp::new(l),
s, Timestamp::new(t),
Timestamp::new(l), ))
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager flush record stores task routine"
)),
)
}); });
} }
// Set offline subkey writes tick task // Set offline subkey writes tick task
@ -36,18 +29,11 @@ impl StorageManager {
self.unlocked_inner self.unlocked_inner
.offline_subkey_writes_task .offline_subkey_writes_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().offline_subkey_writes_task_routine(
this.clone() s,
.offline_subkey_writes_task_routine( Timestamp::new(l),
s, Timestamp::new(t),
Timestamp::new(l), ))
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager offline subkey writes task routine"
)),
)
}); });
} }
// Set send value changes tick task // Set send value changes tick task
@ -57,18 +43,11 @@ impl StorageManager {
self.unlocked_inner self.unlocked_inner
.send_value_changes_task .send_value_changes_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().send_value_changes_task_routine(
this.clone() s,
.send_value_changes_task_routine( Timestamp::new(l),
s, Timestamp::new(t),
Timestamp::new(l), ))
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager send value changes task routine"
)),
)
}); });
} }
// Set check active watches tick task // Set check active watches tick task
@ -78,18 +57,11 @@ impl StorageManager {
self.unlocked_inner self.unlocked_inner
.check_active_watches_task .check_active_watches_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().check_active_watches_task_routine(
this.clone() s,
.check_active_watches_task_routine( Timestamp::new(l),
s, Timestamp::new(t),
Timestamp::new(l), ))
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager check active watches task routine"
)),
)
}); });
} }
// Set check watched records tick task // Set check watched records tick task
@ -99,18 +71,11 @@ impl StorageManager {
self.unlocked_inner self.unlocked_inner
.check_watched_records_task .check_watched_records_task
.set_routine(move |s, l, t| { .set_routine(move |s, l, t| {
Box::pin( Box::pin(this.clone().check_watched_records_task_routine(
this.clone() s,
.check_watched_records_task_routine( Timestamp::new(l),
s, Timestamp::new(t),
Timestamp::new(l), ))
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager check watched records task routine"
)),
)
}); });
} }
} }

View File

@ -294,7 +294,7 @@ impl StorageManager {
log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node); log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node);
Ok(NetworkResult::value(wva.answer.peers)) Ok(NetworkResult::value(wva.answer.peers))
} }.in_current_span()
}; };
// Routine to call to check if we're done at each step // Routine to call to check if we're done at each step
@ -408,6 +408,7 @@ impl StorageManager {
} }
/// Handle a received 'Value Changed' statement /// Handle a received 'Value Changed' statement
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_value_changed( pub async fn inbound_value_changed(
&self, &self,
key: TypedKey, key: TypedKey,

File diff suppressed because it is too large Load Diff

View File

@ -58,6 +58,7 @@ impl DartIsolateWrapper {
Err(e) => self.err_json(e), Err(e) => self.err_json(e),
} }
} }
pub fn result_json<T: Serialize + Debug, E: Serialize + Debug>( pub fn result_json<T: Serialize + Debug, E: Serialize + Debug>(
self, self,
result: Result<T, E>, result: Result<T, E>,
@ -67,6 +68,7 @@ impl DartIsolateWrapper {
Err(e) => self.err_json(e), Err(e) => self.err_json(e),
} }
} }
pub fn ok<T: IntoDart>(self, value: T) -> bool { pub fn ok<T: IntoDart>(self, value: T) -> bool {
self.isolate self.isolate
.post(vec![MESSAGE_OK.into_dart(), value.into_dart()]) .post(vec![MESSAGE_OK.into_dart(), value.into_dart()])

View File

@ -151,7 +151,7 @@ pub struct CmdlineArgs {
console: bool, console: bool,
} }
#[instrument(err)] #[instrument(level = "trace", skip_all, err)]
fn main() -> EyreResult<()> { fn main() -> EyreResult<()> {
#[cfg(windows)] #[cfg(windows)]
let _ = ansi_term::enable_ansi_support(); let _ = ansi_term::enable_ansi_support();

View File

@ -8,7 +8,7 @@ use signal_hook::consts::signal::*;
use signal_hook_async_std::Signals; use signal_hook_async_std::Signals;
use veilid_core::tools::*; use veilid_core::tools::*;
#[instrument(skip(signals))] #[instrument(level = "trace", skip_all)]
async fn handle_signals(mut signals: Signals) { async fn handle_signals(mut signals: Signals) {
while let Some(signal) = signals.next().await { while let Some(signal) = signals.next().await {
match signal { match signal {
@ -47,7 +47,7 @@ pub async fn run_veilid_server_with_signals(
} }
#[warn(missing_docs)] #[warn(missing_docs)]
#[instrument(err)] #[instrument(level = "trace", skip_all, err)]
pub fn run_daemon(settings: Settings, _args: CmdlineArgs) -> EyreResult<()> { pub fn run_daemon(settings: Settings, _args: CmdlineArgs) -> EyreResult<()> {
let daemon = { let daemon = {
let mut daemon = daemonize::Daemonize::new(); let mut daemon = daemonize::Daemonize::new();

View File

@ -76,13 +76,14 @@ impl VeilidLogs {
// Flamegraph logger // Flamegraph logger
let mut flame_guard = None; let mut flame_guard = None;
if settingsr.logging.flame.enabled { if settingsr.logging.flame.enabled {
let filter = veilid_core::VeilidLayerFilter::new( let filter = veilid_core::VeilidLayerFilter::new_no_default(
convert_loglevel(LogLevel::Trace), veilid_core::VeilidConfigLogLevel::Trace,
&[], //&settingsr.logging.terminal.ignore_log_targets, &veilid_core::FLAME_LOG_FACILITIES_IGNORE_LIST.map(|x| x.to_string()),
); );
let (flame_layer, guard) = FlameLayer::with_file(&settingsr.logging.flame.path)?; let (flame_layer, guard) = FlameLayer::with_file(&settingsr.logging.flame.path)?;
flame_guard = Some(guard); flame_guard = Some(guard);
filters.insert("flame", filter.clone()); // Do not include this in change_log_level changes, so we keep trace level
// filters.insert("flame", filter.clone());
layers.push( layers.push(
flame_layer flame_layer
.with_threads_collapsed(true) .with_threads_collapsed(true)