diff --git a/Cargo.lock b/Cargo.lock index 90fe8fd2..c2aa676c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2844,18 +2844,18 @@ dependencies = [ [[package]] name = "keyvaluedb" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bdcaabe14fa83eaae1fb92480f619c5a8b413580723153da0334848eb2dff24" +checksum = "c3fe4850c4103a92a7bd14a56ecbe413c27f8c89ea642cb4753b310c30dff812" dependencies = [ "smallvec", ] [[package]] name = "keyvaluedb-memorydb" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62802173041ed97845bc20f8cf6817e445fe537ed879ddf43fb96166829ec8ff" +checksum = "e5e8d196e170cdf21ee4fb0ff779278ca24253c10abee1a49914a893dce71097" dependencies = [ "keyvaluedb", "parking_lot 0.12.3", @@ -2863,9 +2863,9 @@ dependencies = [ [[package]] name = "keyvaluedb-sqlite" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "144f474a27a7dadc5c179c08ef9a4662acaca8047f4e5c30d5b77582243c7538" +checksum = "aa4d3df76ca45b92891e22fbd0a0928d2ef848a65051d5bd2da43ed1c0dfeb6f" dependencies = [ "hex", "keyvaluedb", @@ -2876,9 +2876,9 @@ dependencies = [ [[package]] name = "keyvaluedb-web" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d93d243dfa1643389f8b981ddc07b2a7c533f0fae38b3f5831b004b2cc7f6353" +checksum = "351f750d6c87e6af74cdead18c33bb14fa47882787873bc05faa0002078cb9e5" dependencies = [ "async-lock 2.8.0", "flume", @@ -5490,6 +5490,17 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-flame" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9" +dependencies = [ + "lazy_static", + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-journald" version = "0.3.0" @@ -5881,6 +5892,7 @@ dependencies = [ "glob", "hex", "hickory-resolver", + "indent", "jni", "jni-sys", "js-sys", @@ -6047,6 +6059,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-appender", + "tracing-flame", "tracing-journald", "tracing-opentelemetry", "tracing-subscriber", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index ae11c84d..e5ad2a53 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -80,7 +80,7 @@ thiserror = "1.0.50" # Data structures enumset = { version = "1.1.3", features = ["serde"] } -keyvaluedb = "0.1.1" +keyvaluedb = "0.1.2" range-set-blaze = "0.1.13" weak-table = "0.3.2" hashlink = { package = "veilid-hashlink", version = "0.1.0", features = [ @@ -135,6 +135,7 @@ lz4_flex = { version = "0.11.1", default-features = false, features = [ "safe-encode", "safe-decode", ] } +indent = "0.1.1" # Dependencies for native builds only # Linux, Windows, Mac, iOS, Android @@ -163,7 +164,7 @@ futures-util = { version = "0.3.29", default-features = false, features = [ # Data structures keyring-manager = "0.5.1" -keyvaluedb-sqlite = "0.1.1" +keyvaluedb-sqlite = "0.1.2" # Network async-tungstenite = { package = "veilid-async-tungstenite", version = "0.23.0", features = [ @@ -211,7 +212,7 @@ wasm-logger = "0.2.0" tracing-wasm = "0.2.1" # Data Structures -keyvaluedb-web = "0.1.1" +keyvaluedb-web = "0.1.2" ### Configuration for WASM32 'web-sys' crate [target.'cfg(target_arch = "wasm32")'.dependencies.web-sys] diff --git a/veilid-core/src/crypto/envelope.rs b/veilid-core/src/crypto/envelope.rs index d371e182..8a4f3933 100644 --- a/veilid-core/src/crypto/envelope.rs +++ b/veilid-core/src/crypto/envelope.rs @@ -66,6 +66,7 @@ impl Envelope { } } + #[instrument(level = "trace", target = "envelope", skip_all, err)] pub fn from_signed_data( crypto: Crypto, data: &[u8], @@ -190,6 +191,7 @@ impl Envelope { }) } + #[instrument(level = "trace", target = "envelope", skip_all, err)] pub fn decrypt_body( &self, crypto: Crypto, @@ -222,6 +224,7 @@ impl Envelope { Ok(body) } + #[instrument(level = "trace", target = "envelope", skip_all, err)] pub fn to_encrypted_data( &self, crypto: Crypto, diff --git a/veilid-core/src/crypto/receipt.rs b/veilid-core/src/crypto/receipt.rs index a2922172..ab8af8b2 100644 --- a/veilid-core/src/crypto/receipt.rs +++ b/veilid-core/src/crypto/receipt.rs @@ -68,6 +68,7 @@ impl Receipt { }) } + #[instrument(level = "trace", target = "receipt", skip_all, err)] pub fn from_signed_data(crypto: Crypto, data: &[u8]) -> VeilidAPIResult { // Ensure we are at least the length of the envelope if data.len() < MIN_RECEIPT_SIZE { @@ -156,6 +157,7 @@ impl Receipt { }) } + #[instrument(level = "trace", target = "receipt", skip_all, err)] pub fn to_signed_data(&self, crypto: Crypto, secret: &SecretKey) -> VeilidAPIResult> { // Ensure extra data isn't too long let receipt_size: usize = self.extra_data.len() + MIN_RECEIPT_SIZE; diff --git a/veilid-core/src/crypto/vld0/mod.rs b/veilid-core/src/crypto/vld0/mod.rs index a8c6382d..62cf90c0 100644 --- a/veilid-core/src/crypto/vld0/mod.rs +++ b/veilid-core/src/crypto/vld0/mod.rs @@ -69,12 +69,14 @@ impl CryptoSystem for CryptoSystemVLD0 { } // Cached Operations + #[instrument(level = "trace", skip_all)] fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult { self.crypto .cached_dh_internal::(self, key, secret) } // Generation + #[instrument(level = "trace", target = "crypto", skip_all)] fn random_bytes(&self, len: u32) -> Vec { let mut bytes = unsafe { unaligned_u8_vec_uninit(len as usize) }; random_bytes(bytes.as_mut()); @@ -83,6 +85,7 @@ impl CryptoSystem for CryptoSystemVLD0 { fn default_salt_length(&self) -> u32 { 16 } + #[instrument(level = "trace", target = "crypto", skip_all)] fn hash_password(&self, password: &[u8], salt: &[u8]) -> VeilidAPIResult { if salt.len() < Salt::MIN_LENGTH || salt.len() > Salt::MAX_LENGTH { apibail_generic!("invalid salt length"); @@ -100,6 +103,7 @@ impl CryptoSystem for CryptoSystemVLD0 { .to_string(); Ok(password_hash) } + #[instrument(level = "trace", target = "crypto", skip_all)] fn verify_password(&self, password: &[u8], password_hash: &str) -> VeilidAPIResult { let parsed_hash = PasswordHash::new(password_hash).map_err(VeilidAPIError::generic)?; // Argon2 with default params (Argon2id v19) @@ -108,6 +112,7 @@ impl CryptoSystem for CryptoSystemVLD0 { Ok(argon2.verify_password(password, &parsed_hash).is_ok()) } + #[instrument(level = "trace", target = "crypto", skip_all)] fn derive_shared_secret(&self, password: &[u8], salt: &[u8]) -> VeilidAPIResult { if salt.len() < Salt::MIN_LENGTH || salt.len() > Salt::MAX_LENGTH { apibail_generic!("invalid salt length"); @@ -123,16 +128,21 @@ impl CryptoSystem for CryptoSystemVLD0 { Ok(SharedSecret::new(output_key_material)) } + #[instrument(level = "trace", target = "crypto", skip_all)] fn random_nonce(&self) -> Nonce { let mut nonce = [0u8; NONCE_LENGTH]; random_bytes(&mut nonce); Nonce::new(nonce) } + + #[instrument(level = "trace", target = "crypto", skip_all)] fn random_shared_secret(&self) -> SharedSecret { let mut s = [0u8; SHARED_SECRET_LENGTH]; random_bytes(&mut s); SharedSecret::new(s) } + + #[instrument(level = "trace", target = "crypto", skip_all)] fn compute_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult { let pk_xd = public_to_x25519_pk(key)?; let sk_xd = secret_to_x25519_sk(secret)?; @@ -146,12 +156,18 @@ impl CryptoSystem for CryptoSystemVLD0 { Ok(SharedSecret::new(*output.as_bytes())) } + + #[instrument(level = "trace", target = "crypto", skip_all)] fn generate_keypair(&self) -> KeyPair { vld0_generate_keypair() } + + #[instrument(level = "trace", target = "crypto", skip_all)] fn generate_hash(&self, data: &[u8]) -> PublicKey { PublicKey::new(*blake3::hash(data).as_bytes()) } + + #[instrument(level = "trace", target = "crypto", skip_all)] fn generate_hash_reader(&self, reader: &mut dyn std::io::Read) -> VeilidAPIResult { let mut hasher = blake3::Hasher::new(); std::io::copy(reader, &mut hasher).map_err(VeilidAPIError::generic)?; @@ -159,6 +175,7 @@ impl CryptoSystem for CryptoSystemVLD0 { } // Validation + #[instrument(level = "trace", target = "crypto", skip_all)] fn validate_keypair(&self, dht_key: &PublicKey, dht_key_secret: &SecretKey) -> bool { let data = vec![0u8; 512]; let Ok(sig) = self.sign(dht_key, dht_key_secret, &data) else { @@ -169,11 +186,15 @@ impl CryptoSystem for CryptoSystemVLD0 { }; v } + + #[instrument(level = "trace", target = "crypto", skip_all)] fn validate_hash(&self, data: &[u8], dht_key: &PublicKey) -> bool { let bytes = *blake3::hash(data).as_bytes(); bytes == dht_key.bytes } + + #[instrument(level = "trace", target = "crypto", skip_all)] fn validate_hash_reader( &self, reader: &mut dyn std::io::Read, @@ -184,7 +205,9 @@ impl CryptoSystem for CryptoSystemVLD0 { let bytes = *hasher.finalize().as_bytes(); Ok(bytes == dht_key.bytes) } + // Distance Metric + #[instrument(level = "trace", target = "crypto", skip_all)] fn distance(&self, key1: &PublicKey, key2: &PublicKey) -> CryptoKeyDistance { let mut bytes = [0u8; CRYPTO_KEY_LENGTH]; @@ -196,6 +219,7 @@ impl CryptoSystem for CryptoSystemVLD0 { } // Authentication + #[instrument(level = "trace", target = "crypto", skip_all)] fn sign( &self, dht_key: &PublicKey, @@ -225,6 +249,7 @@ impl CryptoSystem for CryptoSystemVLD0 { Ok(sig) } + #[instrument(level = "trace", target = "crypto", skip_all)] fn verify( &self, dht_key: &PublicKey, @@ -251,6 +276,8 @@ impl CryptoSystem for CryptoSystemVLD0 { fn aead_overhead(&self) -> usize { AEAD_OVERHEAD } + + #[instrument(level = "trace", target = "crypto", skip_all)] fn decrypt_in_place_aead( &self, body: &mut Vec, @@ -266,6 +293,7 @@ impl CryptoSystem for CryptoSystemVLD0 { .map_err(VeilidAPIError::generic) } + #[instrument(level = "trace", target = "crypto", skip_all)] fn decrypt_aead( &self, body: &[u8], @@ -280,6 +308,7 @@ impl CryptoSystem for CryptoSystemVLD0 { Ok(out) } + #[instrument(level = "trace", target = "crypto", skip_all)] fn encrypt_in_place_aead( &self, body: &mut Vec, @@ -296,6 +325,7 @@ impl CryptoSystem for CryptoSystemVLD0 { .map_err(VeilidAPIError::generic) } + #[instrument(level = "trace", target = "crypto", skip_all)] fn encrypt_aead( &self, body: &[u8], @@ -311,6 +341,7 @@ impl CryptoSystem for CryptoSystemVLD0 { } // NoAuth Encrypt/Decrypt + #[instrument(level = "trace", target = "crypto", skip_all)] fn crypt_in_place_no_auth( &self, body: &mut [u8], @@ -321,6 +352,7 @@ impl CryptoSystem for CryptoSystemVLD0 { cipher.apply_keystream(body); } + #[instrument(level = "trace", target = "crypto", skip_all)] fn crypt_b2b_no_auth( &self, in_buf: &[u8], @@ -332,6 +364,7 @@ impl CryptoSystem for CryptoSystemVLD0 { cipher.apply_keystream_b2b(in_buf, out_buf).unwrap(); } + #[instrument(level = "trace", target = "crypto", skip_all)] fn crypt_no_auth_aligned_8( &self, in_buf: &[u8], @@ -343,6 +376,7 @@ impl CryptoSystem for CryptoSystemVLD0 { out_buf } + #[instrument(level = "trace", target = "crypto", skip_all)] fn crypt_no_auth_unaligned( &self, in_buf: &[u8], diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index c2b2be15..694b8368 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -611,7 +611,7 @@ impl NetworkManager { } /// Process a received out-of-band receipt - #[instrument(level = "trace", skip(self, receipt_data), ret)] + #[instrument(level = "trace", target = "receipt", skip_all)] pub async fn handle_out_of_band_receipt>( &self, receipt_data: R, @@ -631,7 +631,7 @@ impl NetworkManager { } /// Process a received in-band receipt - #[instrument(level = "trace", skip(self, receipt_data), ret)] + #[instrument(level = "trace", target = "receipt", skip_all)] pub async fn handle_in_band_receipt>( &self, receipt_data: R, @@ -652,7 +652,7 @@ impl NetworkManager { } /// Process a received safety receipt - #[instrument(level = "trace", skip(self, receipt_data), ret)] + #[instrument(level = "trace", target = "receipt", skip_all)] pub async fn handle_safety_receipt>( &self, receipt_data: R, @@ -672,7 +672,7 @@ impl NetworkManager { } /// Process a received private receipt - #[instrument(level = "trace", skip(self, receipt_data), ret)] + #[instrument(level = "trace", target = "receipt", skip_all)] pub async fn handle_private_receipt>( &self, receipt_data: R, @@ -693,7 +693,7 @@ impl NetworkManager { } // Process a received signal - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", target = "receipt", skip_all)] pub async fn handle_signal( &self, signal_flow: Flow, @@ -792,10 +792,7 @@ impl NetworkManager { } /// Builds an envelope for sending over the network - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, body), err) - )] + #[instrument(level = "trace", target = "receipt", skip_all)] fn build_envelope>( &self, dest_node_id: TypedKey, @@ -838,10 +835,7 @@ impl NetworkManager { /// node_ref is the direct destination to which the envelope will be sent /// If 'destination_node_ref' is specified, it can be different than the node_ref being sent to /// which will cause the envelope to be relayed - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, body), ret, err) - )] + #[instrument(level = "trace", target = "receipt", skip_all)] pub async fn send_envelope>( &self, node_ref: NodeRef, @@ -879,7 +873,7 @@ impl NetworkManager { } /// Called by the RPC handler when we want to issue an direct receipt - #[instrument(level = "debug", skip(self, rcpt_data), err)] + #[instrument(level = "trace", target = "receipt", skip_all)] pub async fn send_out_of_band_receipt( &self, dial_info: DialInfo, @@ -905,7 +899,7 @@ impl NetworkManager { // Called when a packet potentially containing an RPC envelope is received by a low-level // network protocol handler. Processes the envelope, authenticates and decrypts the RPC message // and passes it to the RPC handler - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len())))] + #[instrument(level = "trace", target = "receipt", skip_all)] async fn on_recv_envelope(&self, data: &mut [u8], flow: Flow) -> EyreResult { #[cfg(feature = "verbose-tracing")] let root = span!( diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 7cad975d..e4ecbba6 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -220,7 +220,7 @@ impl NetworkConnection { } } - #[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(message, stats), fields(message.len = message.len()), ret))] + #[instrument(level="trace", target="net", skip_all)] async fn send_internal( protocol_connection: &ProtocolNetworkConnection, stats: Arc>, @@ -235,7 +235,7 @@ impl NetworkConnection { Ok(NetworkResult::Value(())) } - #[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(stats), fields(ret.len)))] + #[instrument(level="trace", target="net", skip_all)] async fn recv_internal( protocol_connection: &ProtocolNetworkConnection, stats: Arc>, @@ -265,6 +265,7 @@ impl NetworkConnection { // Connection receiver loop #[allow(clippy::too_many_arguments)] + #[instrument(level="trace", target="net", skip_all)] fn process_connection( connection_manager: ConnectionManager, local_stop_token: StopToken, diff --git a/veilid-core/src/network_manager/receipt_manager.rs b/veilid-core/src/network_manager/receipt_manager.rs index f06a9a60..d510a56c 100644 --- a/veilid-core/src/network_manager/receipt_manager.rs +++ b/veilid-core/src/network_manager/receipt_manager.rs @@ -198,6 +198,7 @@ impl ReceiptManager { Ok(()) } + #[instrument(level = "trace", target = "receipt", skip_all)] fn perform_callback( evt: ReceiptEvent, record_mut: &mut ReceiptRecord, @@ -221,7 +222,7 @@ impl ReceiptManager { } } - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace", target = "receipt", skip_all)] pub async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) { // Go through all receipts and build a list of expired nonces let mut new_next_oldest_ts: Option = None; @@ -270,6 +271,7 @@ impl ReceiptManager { } } + #[instrument(level = "trace", target = "receipt", skip_all, err)] pub async fn tick(&self) -> EyreResult<()> { let (next_oldest_ts, timeout_task, stop_token) = { let inner = self.inner.lock(); @@ -318,6 +320,7 @@ impl ReceiptManager { } #[allow(dead_code)] + #[instrument(level = "trace", target = "receipt", skip_all)] pub fn record_receipt( &self, receipt: Receipt, @@ -339,6 +342,7 @@ impl ReceiptManager { Self::update_next_oldest_timestamp(&mut inner); } + #[instrument(level = "trace", target = "receipt", skip_all)] pub fn record_single_shot_receipt( &self, receipt: Receipt, diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 82e5f308..b3202646 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -11,6 +11,7 @@ impl NetworkManager { /// /// Sending to a node requires determining a NetworkClass compatible contact method /// between the source and destination node + #[instrument(level="trace", target="net", skip_all, err)] pub(crate) async fn send_data( &self, destination_node_ref: NodeRef, @@ -149,6 +150,7 @@ impl NetworkManager { } /// Send data using NodeContactMethod::Existing + #[instrument(level="trace", target="net", skip_all, err)] async fn send_data_ncm_existing( &self, target_node_ref: NodeRef, @@ -185,6 +187,7 @@ impl NetworkManager { } /// Send data using NodeContactMethod::Unreachable + #[instrument(level="trace", target="net", skip_all, err)] async fn send_data_ncm_unreachable( &self, target_node_ref: NodeRef, @@ -221,6 +224,7 @@ impl NetworkManager { } /// Send data using NodeContactMethod::SignalReverse + #[instrument(level="trace", target="net", skip_all, err)] async fn send_data_ncm_signal_reverse( &self, relay_nr: NodeRef, @@ -268,6 +272,7 @@ impl NetworkManager { } /// Send data using NodeContactMethod::SignalHolePunch + #[instrument(level="trace", target="net", skip_all, err)] async fn send_data_ncm_signal_hole_punch( &self, relay_nr: NodeRef, @@ -313,6 +318,7 @@ impl NetworkManager { } /// Send data using NodeContactMethod::Direct + #[instrument(level="trace", target="net", skip_all, err)] async fn send_data_ncm_direct( &self, node_ref: NodeRef, @@ -372,6 +378,7 @@ impl NetworkManager { /// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access /// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not /// allowed to use NodeRefs due to recursive locking + #[instrument(level="trace", target="net", skip_all, err)] pub(crate) fn get_node_contact_method( &self, target_node_ref: NodeRef, @@ -544,10 +551,7 @@ impl NetworkManager { /// Send a reverse connection signal and wait for the return receipt over it /// Then send the data across the new connection /// Only usable for PublicInternet routing domain - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, data), err) - )] + #[instrument(level="trace", target="net", skip_all, err)] async fn do_reverse_connect( &self, relay_nr: NodeRef, @@ -636,10 +640,7 @@ impl NetworkManager { /// Send a hole punch signal and do a negotiating ping and wait for the return receipt /// Then send the data across the new connection /// Only usable for PublicInternet routing domain - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, data), err) - )] + #[instrument(level="trace", target="net", skip_all, err)] async fn do_hole_punch( &self, relay_nr: NodeRef, diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 22daf74c..a8e7cff9 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -140,6 +140,7 @@ where }) } + #[instrument(level = "trace", target = "fanout", skip_all)] fn evaluate_done(self: Arc, ctx: &mut FanoutContext) -> bool { // If we have a result, then we're done if ctx.result.is_some() { @@ -151,6 +152,7 @@ where ctx.result.is_some() } + #[instrument(level = "trace", target = "fanout", skip_all)] fn add_to_fanout_queue(self: Arc, new_nodes: &[NodeRef]) { event!(target: "fanout", Level::DEBUG, "FanoutCall::add_to_fanout_queue:\n new_nodes={{\n{}}}\n", @@ -172,6 +174,7 @@ where }); } + #[instrument(level = "trace", target = "fanout", skip_all)] async fn fanout_processor(self: Arc) -> bool { // Loop until we have a result or are done loop { @@ -229,6 +232,7 @@ where } } + #[instrument(level = "trace", target = "fanout", skip_all)] fn init_closest_nodes(self: Arc) -> Result<(), RPCError> { // Get the 'node_count' closest nodes to the key out of our routing table let closest_nodes = { @@ -278,6 +282,7 @@ where Ok(()) } + #[instrument(level = "trace", target = "fanout", skip_all)] pub async fn run( self: Arc, init_fanout_queue: Vec, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 40e4eaa8..04d323d8 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -448,6 +448,7 @@ impl RPCProcessor { } /// Determine if a SignedNodeInfo can be placed into the specified routing domain + #[instrument(level="trace", target="rpc", skip_all)] fn verify_node_info( &self, routing_domain: RoutingDomain, @@ -463,6 +464,7 @@ impl RPCProcessor { /// Search the network for a single node and add it to the routing table and return the node reference /// If no node was found in the timeout, this returns None + #[instrument(level="trace", target="rpc", skip_all)] async fn search_for_node_id( &self, node_id: TypedKey, @@ -529,6 +531,7 @@ impl RPCProcessor { /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference /// Note: This routine can possibly be recursive, hence the SendPinBoxFuture async form + #[instrument(level="trace", target="rpc", skip_all)] pub fn resolve_node( &self, node_id: TypedKey, @@ -579,10 +582,7 @@ impl RPCProcessor { }) } - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, waitable_reply), err) - )] + #[instrument(level="trace", target="rpc", skip_all)] async fn wait_for_reply( &self, waitable_reply: WaitableReply, @@ -654,6 +654,7 @@ impl RPCProcessor { } /// Wrap an operation with a private route inside a safety route + #[instrument(level="trace", target="rpc", skip_all)] fn wrap_with_route( &self, safety_selection: SafetySelection, @@ -728,10 +729,7 @@ impl RPCProcessor { /// Produce a byte buffer that represents the wire encoding of the entire /// unencrypted envelope body for a RPC message. This incorporates /// wrapping a private and/or safety route if they are specified. - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "debug", skip(self, operation), err) - )] + #[instrument(level="trace", target="rpc", skip_all)] fn render_operation( &self, dest: Destination, @@ -863,10 +861,7 @@ impl RPCProcessor { /// routing table caching when it is okay to do so /// Also check target's timestamp of our own node info, to see if we should send that /// And send our timestamp of the target's node info so they can determine if they should update us on their next rpc - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self), ret) - )] + #[instrument(level="trace", target="rpc", skip_all)] fn get_sender_peer_info(&self, dest: &Destination) -> SenderPeerInfo { // Don't do this if the sender is to remain private // Otherwise we would be attaching the original sender's identity to the final destination, @@ -908,6 +903,7 @@ impl RPCProcessor { } /// Record failure to send to node or route + #[instrument(level="trace", target="rpc", skip_all)] fn record_send_failure( &self, rpc_kind: RPCKind, @@ -942,6 +938,7 @@ impl RPCProcessor { } /// Record question lost to node or route + #[instrument(level="trace", target="rpc", skip_all)] fn record_question_lost( &self, send_ts: Timestamp, @@ -984,6 +981,7 @@ impl RPCProcessor { } /// Record success sending to node or route + #[instrument(level="trace", target="rpc", skip_all)] fn record_send_success( &self, rpc_kind: RPCKind, @@ -1027,6 +1025,7 @@ impl RPCProcessor { /// Record answer received from node or route #[allow(clippy::too_many_arguments)] + #[instrument(level="trace", target="rpc", skip_all)] fn record_answer_received( &self, send_ts: Timestamp, @@ -1112,6 +1111,7 @@ impl RPCProcessor { } /// Record question or statement received from node or route + #[instrument(level="trace", target="rpc", skip_all)] fn record_question_received(&self, msg: &RPCMessage) { let recv_ts = msg.header.timestamp; let bytes = msg.header.body_len; @@ -1156,10 +1156,7 @@ impl RPCProcessor { /// Issue a question over the network, possibly using an anonymized route /// Optionally keeps a context to be passed to the answer processor when an answer is received - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "debug", skip(self, question), err) - )] + #[instrument(level="trace", target="rpc", skip_all)] async fn question( &self, dest: Destination, @@ -1261,10 +1258,7 @@ impl RPCProcessor { } /// Issue a statement over the network, possibly using an anonymized route - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "debug", skip(self, statement), err) - )] + #[instrument(level="trace", target="rpc", skip_all)] async fn statement( &self, dest: Destination, @@ -1336,10 +1330,7 @@ impl RPCProcessor { } /// Issue a reply over the network, possibly using an anonymized route /// The request must want a response, or this routine fails - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "debug", skip(self, request, answer), err) - )] + #[instrument(level="trace", target="rpc", skip_all)] async fn answer( &self, request: RPCMessage, @@ -1417,7 +1408,7 @@ impl RPCProcessor { /// Decoding RPC from the wire /// This performs a capnp decode on the data, and if it passes the capnp schema /// it performs the cryptographic validation required to pass the operation up for processing - #[instrument(skip_all)] + #[instrument(level="trace", target="rpc", skip_all)] fn decode_rpc_operation( &self, encoded_msg: &RPCMessageEncoded, @@ -1445,6 +1436,7 @@ impl RPCProcessor { /// caller or receiver. This does not mean the operation is 'semantically correct'. For /// complex operations that require stateful validation and a more robust context than /// 'signatures', the caller must still perform whatever validation is necessary + #[instrument(level="trace", target="rpc", skip_all)] fn validate_rpc_operation(&self, operation: &mut RPCOperation) -> Result<(), RPCError> { // If this is an answer, get the question context for this answer // If we received an answer for a question we did not ask, this will return an error @@ -1469,10 +1461,7 @@ impl RPCProcessor { } ////////////////////////////////////////////////////////////////////// - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, encoded_msg), err) - )] + #[instrument(level="trace", target="rpc", skip_all)] async fn process_rpc_message( &self, encoded_msg: RPCMessageEncoded, @@ -1671,6 +1660,7 @@ impl RPCProcessor { } } + #[instrument(level="trace", target="rpc", skip_all)] async fn rpc_worker( self, stop_token: StopToken, @@ -1700,10 +1690,7 @@ impl RPCProcessor { } } - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, body), err) - )] + #[instrument(level="trace", target="rpc", skip_all)] pub fn enqueue_direct_message( &self, envelope: Envelope, @@ -1742,10 +1729,7 @@ impl RPCProcessor { Ok(()) } - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, body), err) - )] + #[instrument(level="trace", target="rpc", skip_all)] fn enqueue_safety_routed_message( &self, direct: RPCMessageHeaderDetailDirect, @@ -1781,10 +1765,7 @@ impl RPCProcessor { Ok(()) } - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, body), err) - )] + #[instrument(level="trace", target="rpc", skip_all)] fn enqueue_private_routed_message( &self, direct: RPCMessageHeaderDetailDirect, diff --git a/veilid-core/src/rpc_processor/operation_waiter.rs b/veilid-core/src/rpc_processor/operation_waiter.rs index 3c8ab352..0bf5ba01 100644 --- a/veilid-core/src/rpc_processor/operation_waiter.rs +++ b/veilid-core/src/rpc_processor/operation_waiter.rs @@ -128,10 +128,7 @@ where } /// Complete the app call - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, message), err) - )] + #[instrument(level = "trace", target = "rpc", skip_all)] pub async fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> { let waiting_op = { let mut inner = self.inner.lock(); @@ -151,6 +148,7 @@ where } /// Wait for operation to complete + #[instrument(level = "trace", target = "rpc", skip_all)] pub async fn wait_for_op( &self, mut handle: OperationWaitHandle, diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index 1a26bbf9..333b9b83 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -3,10 +3,7 @@ use super::*; impl RPCProcessor { // Sends a high level app request and wait for response // Can be sent via all methods including relays and routes - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, message), fields(message.len = message.len(), ret.latency, ret.len), err) - )] + #[instrument(level = "trace", target = "rpc", skip(self, message), fields(message.len = message.len(), ret.latency, ret.len), err)] pub async fn rpc_call_app_call( self, dest: Destination, @@ -57,7 +54,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_app_call_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled let routing_table = self.routing_table(); @@ -149,6 +146,7 @@ impl RPCProcessor { } /// Exposed to API for apps to return app call answers + #[instrument(level = "trace", target = "rpc", skip_all)] pub async fn app_call_reply( &self, call_id: OperationId, diff --git a/veilid-core/src/rpc_processor/rpc_app_message.rs b/veilid-core/src/rpc_processor/rpc_app_message.rs index 5c397883..07fa2d08 100644 --- a/veilid-core/src/rpc_processor/rpc_app_message.rs +++ b/veilid-core/src/rpc_processor/rpc_app_message.rs @@ -3,10 +3,7 @@ use super::*; impl RPCProcessor { // Sends a high level app message // Can be sent via all methods including relays and routes - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, message), fields(message.len = message.len()), err) - )] + #[instrument(level = "trace", target = "rpc", skip(self, message), fields(message.len = message.len()), err)] pub async fn rpc_call_app_message( self, dest: Destination, @@ -21,7 +18,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_app_message(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled let routing_table = self.routing_table(); diff --git a/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs index 133ded54..2fcf4054 100644 --- a/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs @@ -1,7 +1,7 @@ use super::*; impl RPCProcessor { - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled #[cfg(feature = "unstable-tunnels")] diff --git a/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs index 4b97985f..d726f507 100644 --- a/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs @@ -1,7 +1,7 @@ use super::*; impl RPCProcessor { - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_complete_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled #[cfg(feature = "unstable-tunnels")] diff --git a/veilid-core/src/rpc_processor/rpc_find_block.rs b/veilid-core/src/rpc_processor/rpc_find_block.rs index 786cef37..40850be0 100644 --- a/veilid-core/src/rpc_processor/rpc_find_block.rs +++ b/veilid-core/src/rpc_processor/rpc_find_block.rs @@ -1,7 +1,7 @@ use super::*; impl RPCProcessor { - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_find_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled #[cfg(feature = "unstable-blockstore")] diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 8d456238..ee53205a 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -7,10 +7,7 @@ impl RPCProcessor { /// Because this leaks information about the identity of the node itself, /// replying to this request received over a private route will leak /// the identity of the node and defeat the private route. - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self), err) - )] + #[instrument(level = "trace", target = "rpc", skip(self), err)] pub async fn rpc_call_find_node( self, dest: Destination, diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 5060d6e2..98586df1 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -16,16 +16,13 @@ impl RPCProcessor { /// replying to this request received over a private route will leak /// the identity of the node and defeat the private route. - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, last_descriptor), + #[instrument(level = "trace", target = "rpc", skip(self, last_descriptor), fields(ret.value.data.len, ret.value.data.seq, ret.value.data.writer, ret.peers.len, ret.latency - ),err) - )] + ),err)] pub async fn rpc_call_get_value( self, dest: Destination, @@ -168,7 +165,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_get_value_q( &self, msg: RPCMessage, diff --git a/veilid-core/src/rpc_processor/rpc_inspect_value.rs b/veilid-core/src/rpc_processor/rpc_inspect_value.rs index f1c23a6d..537d3386 100644 --- a/veilid-core/src/rpc_processor/rpc_inspect_value.rs +++ b/veilid-core/src/rpc_processor/rpc_inspect_value.rs @@ -19,13 +19,12 @@ impl RPCProcessor { /// * the amount requested /// * an amount truncated to MAX_INSPECT_VALUE_A_SEQS_LEN subkeys /// * zero if nothing was found - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, last_descriptor), + #[ + instrument(level = "trace", target = "rpc", skip(self, last_descriptor), fields(ret.peers.len, ret.latency ),err) - )] + ] pub async fn rpc_call_inspect_value( self, dest: Destination, @@ -153,7 +152,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_inspect_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ensure this never came over a private route, safety route is okay though match &msg.header.detail { diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index 8d3caaa1..2e6c4478 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -3,10 +3,7 @@ use super::*; impl RPCProcessor { // Sends a unidirectional in-band return receipt // Can be sent via all methods including relays and routes - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, receipt), ret, err) - )] + #[instrument(level = "trace", target = "rpc", skip(self, receipt), ret, err)] pub async fn rpc_call_return_receipt>( self, dest: Destination, @@ -24,7 +21,7 @@ impl RPCProcessor { Ok(NetworkResult::value(())) } - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_return_receipt(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Get the statement let (_, _, _, kind) = msg.operation.destructure(); diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index 0325f82c..601fa699 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -1,10 +1,7 @@ use super::*; impl RPCProcessor { - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip_all, err) - )] + #[instrument(level = "trace", target = "rpc", skip_all, err)] async fn process_route_safety_route_hop( &self, routed_operation: RoutedOperation, @@ -59,10 +56,7 @@ impl RPCProcessor { .await } - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip_all, err) - )] + #[instrument(level = "trace", target = "rpc", skip_all, err)] async fn process_route_private_route_hop( &self, routed_operation: RoutedOperation, @@ -112,10 +106,7 @@ impl RPCProcessor { /// Note: it is important that we never respond with a safety route to questions that come /// in without a private route. Giving away a safety route when the node id is known is /// a privacy violation! - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip_all, err) - )] + #[instrument(level = "trace", target = "rpc", skip_all, err)] fn process_safety_routed_operation( &self, detail: RPCMessageHeaderDetailDirect, @@ -159,10 +150,7 @@ impl RPCProcessor { } /// Process a routed operation that came in over both a safety route and a private route - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip_all, err) - )] + #[instrument(level = "trace", target = "rpc", skip_all, err)] fn process_private_routed_operation( &self, detail: RPCMessageHeaderDetailDirect, @@ -235,10 +223,7 @@ impl RPCProcessor { Ok(NetworkResult::value(())) } - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip_all, err) - )] + #[instrument(level = "trace", target = "rpc", skip_all, err)] fn process_routed_operation( &self, detail: RPCMessageHeaderDetailDirect, @@ -272,7 +257,7 @@ impl RPCProcessor { feature = "verbose-tracing", instrument(level = "trace", skip_all, err) )] - + #[instrument(level = "trace", target = "rpc", skip_all)] async fn process_private_route_first_hop( &self, mut routed_operation: RoutedOperation, @@ -337,6 +322,7 @@ impl RPCProcessor { } /// Decrypt route hop data and sign routed operation + #[instrument(level = "trace", target = "rpc", skip_all)] fn decrypt_private_route_hop_data( &self, route_hop_data: &RouteHopData, @@ -399,10 +385,7 @@ impl RPCProcessor { Ok(NetworkResult::value(route_hop)) } - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self), ret, err) - )] + #[instrument(level = "trace", target = "rpc", skip(self), ret, err)] pub(crate) async fn process_route(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled let routing_table = self.routing_table(); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 380eb160..57b853ed 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -14,9 +14,7 @@ impl RPCProcessor { /// Because this leaks information about the identity of the node itself, /// replying to this request received over a private route will leak /// the identity of the node and defeat the private route. - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, value, descriptor), + #[instrument(level = "trace", target = "rpc", skip(self, value, descriptor), fields(value.data.len = value.value_data().data().len(), value.data.seq = value.value_data().seq(), value.data.writer = value.value_data().writer().to_string(), @@ -26,8 +24,7 @@ impl RPCProcessor { ret.value.data.writer, ret.peers.len, ret.latency - ), err) - )] + ), err)] pub async fn rpc_call_set_value( self, dest: Destination, @@ -183,7 +180,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_set_value_q( &self, msg: RPCMessage, diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 6c8c73ef..e6a02a08 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -3,10 +3,7 @@ use super::*; impl RPCProcessor { // Sends a unidirectional signal to a node // Can be sent via relays but not routes. For routed 'signal' like capabilities, use AppMessage. - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self), ret, err) - )] + #[instrument(level = "trace", target = "rpc", skip(self), ret, err)] pub async fn rpc_call_signal( self, dest: Destination, @@ -34,7 +31,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_signal(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled let routing_table = self.routing_table(); diff --git a/veilid-core/src/rpc_processor/rpc_start_tunnel.rs b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs index f6d8b3c7..2a163bbc 100644 --- a/veilid-core/src/rpc_processor/rpc_start_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs @@ -1,7 +1,7 @@ use super::*; impl RPCProcessor { - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_start_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled #[cfg(feature = "unstable-tunnels")] diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 364f01fc..03cf9008 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -15,10 +15,7 @@ impl RPCProcessor { // direct -> node status + sender info // safety -> node status // private -> nothing - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self), ret, err) - )] + #[instrument(level = "trace", target = "rpc", skip(self), ret, err)] pub async fn rpc_call_status( self, dest: Destination, @@ -159,7 +156,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Get the question let kind = msg.operation.kind().clone(); diff --git a/veilid-core/src/rpc_processor/rpc_supply_block.rs b/veilid-core/src/rpc_processor/rpc_supply_block.rs index 368bd44b..12118cd7 100644 --- a/veilid-core/src/rpc_processor/rpc_supply_block.rs +++ b/veilid-core/src/rpc_processor/rpc_supply_block.rs @@ -1,7 +1,7 @@ use super::*; impl RPCProcessor { - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_supply_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled #[cfg(feature = "unstable-blockstore")] diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 07970dda..12d5646a 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -2,10 +2,7 @@ use super::*; impl RPCProcessor { // Can only be sent directly, not via relays or routes - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self), ret, err) - )] + #[instrument(level = "trace", target = "rpc", skip(self), ret, err)] #[cfg_attr(target_arch = "wasm32", allow(dead_code))] pub async fn rpc_call_validate_dial_info( self, @@ -58,7 +55,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> RPCNetworkResult<()> { let routing_table = self.routing_table(); if !routing_table.has_valid_network_class(msg.header.routing_domain()) { diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index 9a7b5b49..2d5c3380 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -3,10 +3,7 @@ use super::*; impl RPCProcessor { // Sends a dht value change notification // Can be sent via all methods including relays and routes but never over a safety route - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self, value), err) - )] + #[instrument(level = "trace", target = "rpc", skip(self, value), err)] pub async fn rpc_call_value_changed( self, dest: Destination, @@ -32,6 +29,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// + #[instrument(level = "trace", target = "rpc", skip_all)] pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Get the statement let (_, _, _, kind) = msg.operation.destructure(); diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index 4fff8dfc..04281d95 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -16,14 +16,11 @@ impl RPCProcessor { /// replying to this request received over a private route will leak /// the identity of the node and defeat the private route. - #[cfg_attr( - feature = "verbose-tracing", - instrument(level = "trace", skip(self), + #[instrument(level = "trace", target = "rpc", skip(self), fields(ret.expiration, ret.latency, ret.peers.len - ),err) - )] + ),err)] #[allow(clippy::too_many_arguments)] pub async fn rpc_call_watch_value( self, @@ -181,7 +178,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { let routing_table = self.routing_table(); let rss = routing_table.route_spec_store(); diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index ad29d5a9..2c4627f4 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -25,6 +25,7 @@ pub(super) struct OutboundGetValueResult { impl StorageManager { /// Perform a 'get value' query on the network + #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_get_value( &self, rpc_processor: RPCProcessor, @@ -276,6 +277,7 @@ impl StorageManager { Ok(out_rx) } + #[instrument(level = "trace", target = "dht", skip_all)] pub(super) fn process_deferred_outbound_get_value_result_inner(&self, inner: &mut StorageManagerInner, res_rx: flume::Receiver>, key: TypedKey, subkey: ValueSubkey, last_seq: ValueSeqNum) { let this = self.clone(); inner.process_deferred_results( @@ -323,6 +325,7 @@ impl StorageManager { ); } + #[instrument(level = "trace", target = "dht", skip_all)] pub(super) async fn process_outbound_get_value_result(&self, key: TypedKey, subkey: ValueSubkey, opt_last_seq: Option, result: get_value::OutboundGetValueResult) -> Result, VeilidAPIError> { // See if we got a value back let Some(get_result_value) = result.get_result.opt_value else { @@ -354,6 +357,7 @@ impl StorageManager { } /// Handle a received 'Get Value' query + #[instrument(level = "trace", target = "dht", skip_all)] pub async fn inbound_get_value( &self, key: TypedKey, diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 4d032e52..77bb9292 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -49,6 +49,7 @@ pub(super) struct OutboundInspectValueResult { impl StorageManager { /// Perform a 'inspect value' query on the network + #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_inspect_value( &self, rpc_processor: RPCProcessor, @@ -309,6 +310,7 @@ impl StorageManager { } /// Handle a received 'Inspect Value' query + #[instrument(level = "trace", target = "dht", skip_all)] pub async fn inbound_inspect_value( &self, key: TypedKey, diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 1f110bbd..c841e616 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -217,6 +217,7 @@ impl StorageManager { } /// Create a local record from scratch with a new owner key, open it, and return the opened descriptor + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn create_record( &self, kind: CryptoKind, @@ -240,6 +241,7 @@ impl StorageManager { } /// Open an existing local record if it exists, and if it doesnt exist locally, try to pull it from the network and open it and return the opened descriptor + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn open_record( &self, key: TypedKey, @@ -325,6 +327,7 @@ impl StorageManager { } /// Close an opened local record + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { let (opt_opened_record, opt_rpc_processor) = { let mut inner = self.lock().await?; @@ -376,6 +379,7 @@ impl StorageManager { } /// Delete a local record + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn delete_record(&self, key: TypedKey) -> VeilidAPIResult<()> { // Ensure the record is closed self.close_record(key).await?; @@ -391,6 +395,7 @@ impl StorageManager { } /// Get the value of a subkey from an opened local record + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn get_value( &self, key: TypedKey, @@ -475,6 +480,7 @@ impl StorageManager { } /// Set the value of a subkey on an opened local record + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn set_value( &self, key: TypedKey, @@ -627,6 +633,7 @@ impl StorageManager { } /// Create,update or cancel an outbound watch to a DHT value + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn watch_values( &self, key: TypedKey, @@ -752,6 +759,7 @@ impl StorageManager { Ok(owvresult.expiration_ts) } + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn cancel_watch_values( &self, key: TypedKey, @@ -807,6 +815,7 @@ impl StorageManager { } /// Inspect an opened DHT record for its subkey sequence numbers + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn inspect_record( &self, key: TypedKey, @@ -929,7 +938,7 @@ impl StorageManager { } // Send single value change out to the network - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", target = "stor", skip(self), err)] async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> { let rpc_processor = { let inner = self.inner.lock().await; @@ -957,7 +966,7 @@ impl StorageManager { } // Send a value change up through the callback - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", target = "stor", skip(self, value), err)] async fn update_callback_value_change( &self, key: TypedKey, @@ -981,6 +990,7 @@ impl StorageManager { Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all)] fn check_fanout_set_offline( &self, key: TypedKey, diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index f094f0ec..3adb034b 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -208,6 +208,7 @@ where Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all)] fn add_dead_record(&mut self, key: RecordTableKey, record: Record) { self.dead_records.push(DeadRecord { key, @@ -216,6 +217,7 @@ where }); } + #[instrument(level = "trace", target = "stor", skip_all)] fn add_to_subkey_cache(&mut self, key: SubkeyTableKey, record_data: RecordData) { let record_data_total_size = record_data.total_size(); // Write to subkey cache @@ -251,6 +253,7 @@ where } } + #[instrument(level = "trace", target = "stor", skip_all)] fn remove_from_subkey_cache(&mut self, key: SubkeyTableKey) { if let Some(dead_record_data) = self.subkey_cache.remove(&key) { self.subkey_cache_total_size @@ -259,6 +262,7 @@ where } } + #[instrument(level = "trace", target = "stor", skip_all)] async fn purge_dead_records(&mut self, lazy: bool) { let purge_dead_records_mutex = self.purge_dead_records_mutex.clone(); let _lock = if lazy { @@ -339,6 +343,7 @@ where } } + #[instrument(level = "trace", target = "stor", skip_all)] async fn flush_changed_records(&mut self) { if self.changed_records.is_empty() { return; @@ -361,12 +366,14 @@ where } } + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn flush(&mut self) -> EyreResult<()> { self.flush_changed_records().await; self.purge_dead_records(true).await; Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn new_record(&mut self, key: TypedKey, record: Record) -> VeilidAPIResult<()> { let rtk = RecordTableKey { key }; if self.record_index.contains_key(&rtk) { @@ -412,6 +419,7 @@ where Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn delete_record(&mut self, key: TypedKey) -> VeilidAPIResult<()> { // Get the record table key let rtk = RecordTableKey { key }; @@ -437,11 +445,13 @@ where Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all)] pub(super) fn contains_record(&mut self, key: TypedKey) -> bool { let rtk = RecordTableKey { key }; self.record_index.contains_key(&rtk) } + #[instrument(level = "trace", target = "stor", skip_all)] pub(super) fn with_record(&mut self, key: TypedKey, f: F) -> Option where F: FnOnce(&Record) -> R, @@ -465,6 +475,7 @@ where out } + #[instrument(level = "trace", target = "stor", skip_all)] pub(super) fn peek_record(&self, key: TypedKey, f: F) -> Option where F: FnOnce(&Record) -> R, @@ -479,6 +490,7 @@ where out } + #[instrument(level = "trace", target = "stor", skip_all)] pub(super) fn with_record_mut(&mut self, key: TypedKey, f: F) -> Option where F: FnOnce(&mut Record) -> R, @@ -502,6 +514,7 @@ where out } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn get_subkey( &mut self, key: TypedKey, @@ -573,6 +586,7 @@ where })) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(crate) async fn peek_subkey( &self, key: TypedKey, @@ -641,6 +655,7 @@ where })) } + #[instrument(level = "trace", target = "stor", skip_all)] async fn update_watched_value( &mut self, key: TypedKey, @@ -679,6 +694,7 @@ where } } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn set_subkey( &mut self, key: TypedKey, @@ -786,6 +802,7 @@ where Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn inspect_record( &mut self, key: TypedKey, @@ -869,6 +886,7 @@ where })) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn _change_existing_watch( &mut self, key: TypedKey, @@ -905,6 +923,7 @@ where Ok(WatchResult::Rejected) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn _create_new_watch( &mut self, key: TypedKey, @@ -995,7 +1014,7 @@ where } /// Add or update an inbound record watch for changes - #[allow(clippy::too_many_arguments)] + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn watch_record( &mut self, key: TypedKey, @@ -1053,6 +1072,7 @@ where /// Clear a specific watch for a record /// returns true if the watch was found and cancelled + #[instrument(level = "trace", target = "stor", skip_all, err)] async fn cancel_watch( &mut self, key: TypedKey, @@ -1092,6 +1112,7 @@ where } /// Move watches from one store to another + #[instrument(level = "trace", target = "stor", skip_all)] pub fn move_watches( &mut self, key: TypedKey, @@ -1110,6 +1131,7 @@ where } /// See if any watched records have expired and clear them out + #[instrument(level = "trace", target = "stor", skip_all)] pub fn check_watched_records(&mut self) { let now = get_aligned_timestamp(); self.watched_records.retain(|key, watch_list| { @@ -1126,6 +1148,7 @@ where }); } + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn take_value_changes(&mut self, changes: &mut Vec) { // ValueChangedInfo but without the subkey data that requires a double mutable borrow to get struct EarlyValueChangedInfo { @@ -1220,6 +1243,7 @@ where /// This will force a garbage collection of the space immediately /// If zero is passed in here, a garbage collection will be performed of dead records /// without removing any live records + #[instrument(level = "trace", target = "stor", skip_all)] pub async fn reclaim_space(&mut self, space: usize) -> usize { let mut reclaimed = 0usize; while reclaimed < space { diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 5b60625c..0b293e77 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -25,6 +25,7 @@ pub(super) struct OutboundSetValueResult { impl StorageManager { /// Perform a 'set value' query on the network + #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_set_value( &self, rpc_processor: RPCProcessor, @@ -272,6 +273,7 @@ impl StorageManager { Ok(out_rx) } + #[instrument(level = "trace", target = "dht", skip_all)] pub(super) fn process_deferred_outbound_set_value_result_inner(&self, inner: &mut StorageManagerInner, res_rx: flume::Receiver>, key: TypedKey, subkey: ValueSubkey, last_value_data: ValueData, safety_selection: SafetySelection, ) { @@ -333,6 +335,7 @@ impl StorageManager { ); } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn process_outbound_set_value_result(&self, key: TypedKey, subkey: ValueSubkey, last_value_data: ValueData, safety_selection: SafetySelection, result: set_value::OutboundSetValueResult) -> Result, VeilidAPIError> { // Regain the lock after network access @@ -370,6 +373,7 @@ impl StorageManager { /// Handle a received 'Set Value' query /// Returns a None if the value passed in was set /// Returns a Some(current value) if the value was older and the current value was kept + #[instrument(level = "trace", target = "dht", skip_all)] pub async fn inbound_set_value( &self, key: TypedKey, diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index b43c78a1..70f59092 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -210,6 +210,7 @@ impl StorageManagerInner { Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn create_new_owned_local_record( &mut self, kind: CryptoKind, @@ -266,6 +267,7 @@ impl StorageManagerInner { Ok((dht_key, owner)) } + #[instrument(level = "trace", target = "stor", skip_all, err)] async fn move_remote_record_to_local( &mut self, key: TypedKey, @@ -326,6 +328,7 @@ impl StorageManagerInner { Ok(Some((*remote_record.owner(), remote_record.schema()))) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn open_existing_record( &mut self, key: TypedKey, @@ -390,6 +393,7 @@ impl StorageManagerInner { Ok(Some(descriptor)) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub async fn open_new_record( &mut self, key: TypedKey, @@ -454,6 +458,7 @@ impl StorageManagerInner { Ok(descriptor) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub fn get_value_nodes(&self, key: TypedKey) -> VeilidAPIResult>> { // Get local record store let Some(local_record_store) = self.local_record_store.as_ref() else { @@ -482,6 +487,7 @@ impl StorageManagerInner { Ok(opt_value_nodes) } + #[instrument(level = "trace", target = "stor", skip_all)] pub(super) fn process_fanout_results< 'a, I: IntoIterator, @@ -538,6 +544,7 @@ impl StorageManagerInner { Ok(self.opened_records.remove(&key)) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn handle_get_local_value( &mut self, key: TypedKey, @@ -561,6 +568,7 @@ impl StorageManagerInner { }) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn handle_set_local_value( &mut self, key: TypedKey, @@ -581,6 +589,7 @@ impl StorageManagerInner { Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn handle_inspect_local_value( &mut self, key: TypedKey, @@ -605,6 +614,7 @@ impl StorageManagerInner { }) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn handle_get_remote_value( &mut self, key: TypedKey, @@ -628,6 +638,7 @@ impl StorageManagerInner { }) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn handle_set_remote_value( &mut self, key: TypedKey, @@ -662,6 +673,7 @@ impl StorageManagerInner { Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn handle_inspect_remote_value( &mut self, key: TypedKey, @@ -687,6 +699,7 @@ impl StorageManagerInner { } /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] + #[instrument(level = "trace", target = "stor", skip_all)] fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey where D: fmt::Debug + Clone + Serialize, @@ -701,6 +714,7 @@ impl StorageManagerInner { TypedKey::new(vcrypto.kind(), hash) } + #[instrument(level = "trace", target = "stor", skip_all)] pub(super) fn add_offline_subkey_write( &mut self, key: TypedKey, @@ -718,6 +732,7 @@ impl StorageManagerInner { }); } + #[instrument(level = "trace", target = "stor", skip_all)] pub fn process_deferred_results( &mut self, receiver: flume::Receiver, diff --git a/veilid-core/src/storage_manager/tasks/check_active_watches.rs b/veilid-core/src/storage_manager/tasks/check_active_watches.rs index f999749a..db201845 100644 --- a/veilid-core/src/storage_manager/tasks/check_active_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_active_watches.rs @@ -2,10 +2,10 @@ use super::*; impl StorageManager { // Check if client-side watches on opened records either have dead nodes or if the watch has expired - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn check_active_watches_task_routine( self, - stop_token: StopToken, + _stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, ) -> EyreResult<()> { diff --git a/veilid-core/src/storage_manager/tasks/check_watched_records.rs b/veilid-core/src/storage_manager/tasks/check_watched_records.rs index fff2a392..792e5df5 100644 --- a/veilid-core/src/storage_manager/tasks/check_watched_records.rs +++ b/veilid-core/src/storage_manager/tasks/check_watched_records.rs @@ -2,10 +2,10 @@ use super::*; impl StorageManager { // Check if server-side watches have expired - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn check_watched_records_task_routine( self, - stop_token: StopToken, + _stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, ) -> EyreResult<()> { diff --git a/veilid-core/src/storage_manager/tasks/flush_record_stores.rs b/veilid-core/src/storage_manager/tasks/flush_record_stores.rs index a596baec..b0229228 100644 --- a/veilid-core/src/storage_manager/tasks/flush_record_stores.rs +++ b/veilid-core/src/storage_manager/tasks/flush_record_stores.rs @@ -2,10 +2,10 @@ use super::*; impl StorageManager { // Flush records stores to disk and remove dead records - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(crate) async fn flush_record_stores_task_routine( self, - stop_token: StopToken, + _stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, ) -> EyreResult<()> { diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index 5c4eb8ce..d565b030 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -3,7 +3,7 @@ use futures_util::*; impl StorageManager { // Best-effort write subkeys to the network that were written offline - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(crate) async fn offline_subkey_writes_task_routine( self, stop_token: StopToken, diff --git a/veilid-core/src/storage_manager/tasks/send_value_changes.rs b/veilid-core/src/storage_manager/tasks/send_value_changes.rs index 5fe7866e..457fc897 100644 --- a/veilid-core/src/storage_manager/tasks/send_value_changes.rs +++ b/veilid-core/src/storage_manager/tasks/send_value_changes.rs @@ -4,7 +4,7 @@ use stop_token::future::FutureExt; impl StorageManager { // Send value change notifications across the network - #[instrument(level = "trace", skip(self), err)] + #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn send_value_changes_task_routine( self, stop_token: StopToken, diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 19fe8509..1e94be85 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -22,7 +22,7 @@ pub(super) struct OutboundWatchValueResult { impl StorageManager { /// Perform a 'watch value cancel' on the network without fanout #[allow(clippy::too_many_arguments)] - #[instrument(target = "dht", level = "debug", skip_all, err)] + #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_watch_value_cancel( &self, rpc_processor: RPCProcessor, @@ -140,7 +140,7 @@ impl StorageManager { /// Perform a 'watch value' query on the network using fanout #[allow(clippy::too_many_arguments)] - #[instrument(target = "dht", level = "debug", skip_all, err)] + #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_watch_value( &self, rpc_processor: RPCProcessor, @@ -367,6 +367,7 @@ impl StorageManager { /// Handle a received 'Watch Value' query #[allow(clippy::too_many_arguments)] + #[instrument(level = "trace", target = "dht", skip_all)] pub async fn inbound_watch_value( &self, key: TypedKey, diff --git a/veilid-core/src/table_store/mod.rs b/veilid-core/src/table_store/mod.rs index 699a8d69..4daa4717 100644 --- a/veilid-core/src/table_store/mod.rs +++ b/veilid-core/src/table_store/mod.rs @@ -18,6 +18,53 @@ use keyvaluedb::*; const ALL_TABLE_NAMES: &[u8] = b"all_table_names"; +/// Description of column +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[cfg_attr(target_arch = "wasm32", derive(Tsify))] +pub struct ColumnInfo { + pub key_count: AlignedU64, +} + +/// IO Stats for table +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[cfg_attr(target_arch = "wasm32", derive(Tsify))] +pub struct IOStatsInfo { + /// Number of transaction. + pub transactions: AlignedU64, + /// Number of read operations. + pub reads: AlignedU64, + /// Number of reads resulted in a read from cache. + pub cache_reads: AlignedU64, + /// Number of write operations. + pub writes: AlignedU64, + /// Number of bytes read + pub bytes_read: AlignedU64, + /// Number of bytes read from cache + pub cache_read_bytes: AlignedU64, + /// Number of bytes write + pub bytes_written: AlignedU64, + /// Start of the statistic period. + pub started: Timestamp, + /// Total duration of the statistic period. + pub span: TimestampDuration, +} + +/// Description of table +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[cfg_attr(target_arch = "wasm32", derive(Tsify))] +pub struct TableInfo { + /// Internal table name + pub table_name: String, + /// IO statistics since previous query + pub io_stats_since_previous: IOStatsInfo, + /// IO statistics since database open + pub io_stats_overall: IOStatsInfo, + /// Total number of columns in the table + pub column_count: u32, + /// Column descriptions + pub columns: Vec, +} + struct TableStoreInner { opened: BTreeMap>, encryption_key: Option, @@ -123,6 +170,7 @@ impl TableStore { Ok(real_name) } + #[instrument(level = "trace", target = "tstore", skip_all)] async fn name_delete(&self, table: &str) -> VeilidAPIResult> { let name = self.namespaced_name(table)?; let mut inner = self.inner.lock(); @@ -130,6 +178,7 @@ impl TableStore { Ok(real_name) } + #[instrument(level = "trace", target = "tstore", skip_all)] async fn name_get(&self, table: &str) -> VeilidAPIResult> { let name = self.namespaced_name(table)?; let inner = self.inner.lock(); @@ -137,6 +186,7 @@ impl TableStore { Ok(real_name) } + #[instrument(level = "trace", target = "tstore", skip_all)] async fn name_rename(&self, old_table: &str, new_table: &str) -> VeilidAPIResult<()> { let old_name = self.namespaced_name(old_table)?; let new_name = self.namespaced_name(new_table)?; @@ -156,8 +206,20 @@ impl TableStore { Ok(()) } + /// List all known tables + #[instrument(level = "trace", target = "tstore", skip_all)] + pub fn list_all(&self) -> Vec<(String, String)> { + let inner = self.inner.lock(); + inner + .all_table_names + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>() + } + /// Delete all known tables - async fn delete_all(&self) { + #[instrument(level = "trace", target = "tstore", skip_all)] + pub async fn delete_all(&self) { // Get all tables let real_names = { let mut inner = self.inner.lock(); @@ -179,6 +241,7 @@ impl TableStore { self.flush().await; } + #[instrument(level = "trace", target = "tstore", skip_all)] pub(crate) fn maybe_unprotect_device_encryption_key( &self, dek_bytes: &[u8], @@ -230,6 +293,7 @@ impl TableStore { )) } + #[instrument(level = "trace", target = "tstore", skip_all)] pub(crate) fn maybe_protect_device_encryption_key( &self, dek: TypedSharedSecret, @@ -267,6 +331,7 @@ impl TableStore { Ok(out) } + #[instrument(level = "trace", target = "tstore", skip_all)] async fn load_device_encryption_key(&self) -> EyreResult> { let dek_bytes: Option> = self .protected_store @@ -288,6 +353,8 @@ impl TableStore { &device_encryption_key_password, )?)) } + + #[instrument(level = "trace", target = "tstore", skip_all)] async fn save_device_encryption_key( &self, device_encryption_key: Option, @@ -313,7 +380,9 @@ impl TableStore { log_tstore!(debug "changing dek password"); self.config .with_mut(|c| { - c.protected_store.device_encryption_key_password.clone_from(&new_device_encryption_key_password); + c.protected_store + .device_encryption_key_password + .clone_from(&new_device_encryption_key_password); Ok(new_device_encryption_key_password) }) .unwrap() @@ -338,6 +407,7 @@ impl TableStore { Ok(()) } + #[instrument(level = "trace", target = "tstore", skip_all)] pub(crate) async fn init(&self) -> EyreResult<()> { let _async_guard = self.async_lock.lock().await; @@ -417,6 +487,7 @@ impl TableStore { Ok(()) } + #[instrument(level = "trace", target = "tstore", skip_all)] pub(crate) async fn terminate(&self) { let _async_guard = self.async_lock.lock().await; @@ -434,6 +505,7 @@ impl TableStore { inner.encryption_key = None; } + #[instrument(level = "trace", target = "tstore", skip_all)] pub(crate) fn on_table_db_drop(&self, table: String) { log_rtab!("dropping table db: {}", table); let mut inner = self.inner.lock(); @@ -444,6 +516,7 @@ impl TableStore { /// Get or create a TableDB database table. If the column count is greater than an /// existing TableDB's column count, the database will be upgraded to add the missing columns. + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn open(&self, name: &str, column_count: u32) -> VeilidAPIResult { let _async_guard = self.async_lock.lock().await; @@ -531,12 +604,13 @@ impl TableStore { // Keep track of opened DBs inner .opened - .insert(table_name.clone(), table_db.weak_inner()); + .insert(table_name.clone(), table_db.weak_unlocked_inner()); Ok(table_db) } /// Delete a TableDB table by name + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn delete(&self, name: &str) -> VeilidAPIResult { let _async_guard = self.async_lock.lock().await; // If we aren't initialized yet, bail @@ -575,7 +649,65 @@ impl TableStore { Ok(true) } + /// Get the description of a TableDB table + #[instrument(level = "trace", target = "tstore", skip_all)] + pub async fn info(&self, name: &str) -> VeilidAPIResult> { + // Open with the default number of columns + let tdb = self.open(name, 0).await?; + let internal_name = tdb.table_name(); + let io_stats_since_previous = tdb.io_stats(IoStatsKind::SincePrevious); + let io_stats_overall = tdb.io_stats(IoStatsKind::Overall); + let column_count = tdb.get_column_count()?; + let mut columns = Vec::::with_capacity(column_count as usize); + for col in 0..column_count { + let key_count = tdb.get_key_count(col).await?; + columns.push(ColumnInfo { + key_count: AlignedU64::new(key_count), + }) + } + Ok(Some(TableInfo { + table_name: internal_name, + io_stats_since_previous: IOStatsInfo { + transactions: AlignedU64::new(io_stats_since_previous.transactions), + reads: AlignedU64::new(io_stats_since_previous.reads), + cache_reads: AlignedU64::new(io_stats_since_previous.cache_reads), + writes: AlignedU64::new(io_stats_since_previous.writes), + bytes_read: AlignedU64::new(io_stats_since_previous.bytes_read), + cache_read_bytes: AlignedU64::new(io_stats_since_previous.cache_read_bytes), + bytes_written: AlignedU64::new(io_stats_since_previous.bytes_written), + started: Timestamp::new( + io_stats_since_previous + .started + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_micros() as u64, + ), + span: TimestampDuration::new(io_stats_since_previous.span.as_micros() as u64), + }, + io_stats_overall: IOStatsInfo { + transactions: AlignedU64::new(io_stats_overall.transactions), + reads: AlignedU64::new(io_stats_overall.reads), + cache_reads: AlignedU64::new(io_stats_overall.cache_reads), + writes: AlignedU64::new(io_stats_overall.writes), + bytes_read: AlignedU64::new(io_stats_overall.bytes_read), + cache_read_bytes: AlignedU64::new(io_stats_overall.cache_read_bytes), + bytes_written: AlignedU64::new(io_stats_overall.bytes_written), + started: Timestamp::new( + io_stats_overall + .started + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_micros() as u64, + ), + span: TimestampDuration::new(io_stats_overall.span.as_micros() as u64), + }, + column_count, + columns, + })) + } + /// Rename a TableDB table + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn rename(&self, old_name: &str, new_name: &str) -> VeilidAPIResult<()> { let _async_guard = self.async_lock.lock().await; // If we aren't initialized yet, bail diff --git a/veilid-core/src/table_store/table_db.rs b/veilid-core/src/table_store/table_db.rs index 435360d1..f26e68db 100644 --- a/veilid-core/src/table_store/table_db.rs +++ b/veilid-core/src/table_store/table_db.rs @@ -62,8 +62,14 @@ impl TableDB { let encrypt_info = encryption_key.map(|ek| CryptInfo::new(crypto.clone(), ek)); let decrypt_info = decryption_key.map(|dk| CryptInfo::new(crypto.clone(), dk)); + let total_columns = database.num_columns().unwrap(); + Self { - opened_column_count, + opened_column_count: if opened_column_count == 0 { + total_columns + } else { + opened_column_count + }, unlocked_inner: Arc::new(TableDBUnlockedInner { table, table_store, @@ -78,18 +84,38 @@ impl TableDB { weak_inner: Weak, opened_column_count: u32, ) -> Option { - weak_inner.upgrade().map(|table_db_unlocked_inner| Self { - opened_column_count, - unlocked_inner: table_db_unlocked_inner, + weak_inner.upgrade().map(|table_db_unlocked_inner| { + let db = &table_db_unlocked_inner.database; + let total_columns = db.num_columns().unwrap(); + Self { + opened_column_count: if opened_column_count == 0 { + total_columns + } else { + opened_column_count + }, + unlocked_inner: table_db_unlocked_inner, + } }) } - pub(super) fn weak_inner(&self) -> Weak { + pub(super) fn weak_unlocked_inner(&self) -> Weak { Arc::downgrade(&self.unlocked_inner) } + /// Get the internal name of the table + pub fn table_name(&self) -> String { + self.unlocked_inner.table.clone() + } + + /// Get the io stats for the table + #[instrument(level = "trace", target = "tstore", skip_all)] + pub fn io_stats(&self, kind: IoStatsKind) -> IoStats { + self.unlocked_inner.database.io_stats(kind) + } + /// Get the total number of columns in the TableDB. /// Not the number of columns that were opened, rather the total number that could be opened. + #[instrument(level = "trace", target = "tstore", skip_all)] pub fn get_column_count(&self) -> VeilidAPIResult { let db = &self.unlocked_inner.database; db.num_columns().map_err(VeilidAPIError::from) @@ -101,6 +127,7 @@ impl TableDB { /// requirement is that they are different for each encryption /// but if the contents are guaranteed to be unique, then a nonce /// can be generated from the hash of the contents and the encryption key itself. + #[instrument(level = "trace", target = "tstore", skip_all)] fn maybe_encrypt(&self, data: &[u8], keyed_nonce: bool) -> Vec { let data = compress_prepend_size(data); if let Some(ei) = &self.unlocked_inner.encrypt_info { @@ -132,6 +159,7 @@ impl TableDB { } /// Decrypt buffer using decrypt key with nonce prepended to input + #[instrument(level = "trace", target = "tstore", skip_all)] fn maybe_decrypt(&self, data: &[u8]) -> std::io::Result> { if let Some(di) = &self.unlocked_inner.decrypt_info { assert!(data.len() >= NONCE_LENGTH); @@ -156,6 +184,7 @@ impl TableDB { } /// Get the list of keys in a column of the TableDB + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn get_keys(&self, col: u32) -> VeilidAPIResult>> { if col >= self.opened_column_count { apibail_generic!(format!( @@ -175,13 +204,29 @@ impl TableDB { Ok(out) } + /// Get the number of keys in a column of the TableDB + #[instrument(level = "trace", target = "tstore", skip_all)] + pub async fn get_key_count(&self, col: u32) -> VeilidAPIResult { + if col >= self.opened_column_count { + apibail_generic!(format!( + "Column exceeds opened column count {} >= {}", + col, self.opened_column_count + )); + } + let db = self.unlocked_inner.database.clone(); + let key_count = db.num_keys(col).await.map_err(VeilidAPIError::from)?; + Ok(key_count) + } + /// Start a TableDB write transaction. The transaction object must be committed or rolled back before dropping. + #[instrument(level = "trace", target = "tstore", skip_all)] pub fn transact(&self) -> TableDBTransaction { let dbt = self.unlocked_inner.database.transaction(); TableDBTransaction::new(self.clone(), dbt) } /// Store a key with a value in a column in the TableDB. Performs a single transaction immediately. + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> { if col >= self.opened_column_count { apibail_generic!(format!( @@ -200,6 +245,7 @@ impl TableDB { } /// Store a key in json format with a value in a column in the TableDB. Performs a single transaction immediately. + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn store_json(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()> where T: serde::Serialize, @@ -209,6 +255,7 @@ impl TableDB { } /// Read a key from a column in the TableDB immediately. + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult>> { if col >= self.opened_column_count { apibail_generic!(format!( @@ -225,6 +272,7 @@ impl TableDB { } /// Read an serde-json key from a column in the TableDB immediately + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn load_json(&self, col: u32, key: &[u8]) -> VeilidAPIResult> where T: for<'de> serde::Deserialize<'de>, @@ -237,6 +285,7 @@ impl TableDB { } /// Delete key with from a column in the TableDB + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult>> { if col >= self.opened_column_count { apibail_generic!(format!( @@ -255,6 +304,7 @@ impl TableDB { } /// Delete serde-json key with from a column in the TableDB + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn delete_json(&self, col: u32, key: &[u8]) -> VeilidAPIResult> where T: for<'de> serde::Deserialize<'de>, @@ -303,6 +353,7 @@ impl TableDBTransaction { } /// Commit the transaction. Performs all actions atomically. + #[instrument(level = "trace", target = "tstore", skip_all)] pub async fn commit(self) -> VeilidAPIResult<()> { let dbt = { let mut inner = self.inner.lock(); @@ -319,12 +370,14 @@ impl TableDBTransaction { } /// Rollback the transaction. Does nothing to the TableDB. + #[instrument(level = "trace", target = "tstore", skip_all)] pub fn rollback(self) { let mut inner = self.inner.lock(); inner.dbt = None; } /// Store a key with a value in a column in the TableDB + #[instrument(level = "trace", target = "tstore", skip_all)] pub fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> { if col >= self.db.opened_column_count { apibail_generic!(format!( @@ -341,6 +394,7 @@ impl TableDBTransaction { } /// Store a key in json format with a value in a column in the TableDB + #[instrument(level = "trace", target = "tstore", skip_all)] pub fn store_json(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()> where T: serde::Serialize, @@ -350,6 +404,7 @@ impl TableDBTransaction { } /// Delete key with from a column in the TableDB + #[instrument(level = "trace", target = "tstore", skip_all)] pub fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> { if col >= self.db.opened_column_count { apibail_generic!(format!( diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 7a30a12d..05720f2b 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -1910,6 +1910,73 @@ impl VeilidAPI { } } + async fn debug_table_list(&self, _args: Vec) -> VeilidAPIResult { + // + let table_store = self.table_store()?; + let table_names = table_store.list_all(); + let out = format!( + "TableStore tables:\n{}", + table_names + .iter() + .map(|(k, v)| format!("{} ({})", k, v)) + .collect::>() + .join("\n") + ); + Ok(out) + } + + fn _format_columns(columns: &[table_store::ColumnInfo]) -> String { + let mut out = String::new(); + for (n, col) in columns.iter().enumerate() { + // + out += &format!("Column {}:\n", n); + out += &format!(" Key Count: {}\n", col.key_count); + } + out + } + + async fn debug_table_info(&self, args: Vec) -> VeilidAPIResult { + // + let table_store = self.table_store()?; + + let table_name = get_debug_argument_at(&args, 1, "debug_table_info", "name", get_string)?; + + let Some(info) = table_store.info(&table_name).await? else { + return Ok(format!("Table '{}' does not exist", table_name)); + }; + + let info_str = format!( + "Table Name: {}\n\ + Column Count: {}\n\ + IO Stats (since previous query):\n{}\n\ + IO Stats (overall):\n{}\n\ + Columns:\n{}\n", + info.table_name, + info.column_count, + indent::indent_all_by(4, format!("{:#?}", info.io_stats_since_previous)), + indent::indent_all_by(4, format!("{:#?}", info.io_stats_overall)), + Self::_format_columns(&info.columns), + ); + + let out = format!("Table info for '{}':\n{}", table_name, info_str); + Ok(out) + } + + async fn debug_table(&self, args: String) -> VeilidAPIResult { + let args: Vec = + shell_words::split(&args).map_err(|e| VeilidAPIError::parse_error(e, args))?; + + let command = get_debug_argument_at(&args, 0, "debug_table", "command", get_string)?; + + if command == "list" { + self.debug_table_list(args).await + } else if command == "info" { + self.debug_table_info(args).await + } else { + Ok(">>> Unknown command\n".to_owned()) + } + } + async fn debug_punish_list(&self, _args: Vec) -> VeilidAPIResult { // let network_manager = self.network_manager()?; @@ -1988,6 +2055,7 @@ record list watch [] [ [ []]] cancel [] [] inspect [] [ []] +table list -------------------------------------------------------------------- is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4 * also , , , @@ -2081,6 +2149,8 @@ record list self.debug_record(rest).await } else if arg == "punish" { self.debug_punish(rest).await + } else if arg == "table" { + self.debug_table(rest).await } else { Err(VeilidAPIError::generic("Unknown server debug command")) } diff --git a/veilid-core/src/veilid_api/json_api/process.rs b/veilid-core/src/veilid_api/json_api/process.rs index c4d6d338..38e1c90d 100644 --- a/veilid-core/src/veilid_api/json_api/process.rs +++ b/veilid-core/src/veilid_api/json_api/process.rs @@ -234,6 +234,7 @@ impl JsonRequestProcessor { ////////////////////////////////////////////////////////////////////////////////////// + #[instrument(level = "trace", target = "json_api", skip_all)] pub async fn process_routing_context_request( &self, routing_context: RoutingContext, @@ -382,6 +383,7 @@ impl JsonRequestProcessor { } } + #[instrument(level = "trace", target = "json_api", skip_all)] pub async fn process_table_db_request( &self, table_db: TableDB, @@ -427,6 +429,7 @@ impl JsonRequestProcessor { } } + #[instrument(level = "trace", target = "json_api", skip_all)] pub async fn process_table_db_transaction_request( &self, table_db_transaction: TableDBTransaction, @@ -461,6 +464,7 @@ impl JsonRequestProcessor { } } + #[instrument(level = "trace", target = "json_api", skip_all)] pub async fn process_crypto_system_request( &self, csv: CryptoSystemVersion, @@ -592,6 +596,7 @@ impl JsonRequestProcessor { } } + #[instrument(level = "trace", target = "json_api", skip_all)] pub async fn process_request(self, request: Request) -> Response { let id = request.id; diff --git a/veilid-core/src/veilid_api/serialize_helpers/compression.rs b/veilid-core/src/veilid_api/serialize_helpers/compression.rs index 627b21fe..0fcf2b2b 100644 --- a/veilid-core/src/veilid_api/serialize_helpers/compression.rs +++ b/veilid-core/src/veilid_api/serialize_helpers/compression.rs @@ -1,10 +1,12 @@ use super::*; use lz4_flex::block; +#[instrument(level = "trace", target = "veilid_api", skip_all)] pub fn compress_prepend_size(input: &[u8]) -> Vec { block::compress_prepend_size(input) } +#[instrument(level = "trace", target = "veilid_api", skip_all)] pub fn decompress_size_prepended( input: &[u8], max_size: Option, diff --git a/veilid-core/src/veilid_api/serialize_helpers/serialize_json.rs b/veilid-core/src/veilid_api/serialize_helpers/serialize_json.rs index f30e6deb..764cc33b 100644 --- a/veilid-core/src/veilid_api/serialize_helpers/serialize_json.rs +++ b/veilid-core/src/veilid_api/serialize_helpers/serialize_json.rs @@ -1,8 +1,8 @@ use super::*; -// Don't trace these functions as they are used in the transfer of API logs, which will recurse! +// Don't trace these functions with events as they are used in the transfer of API logs, which will recurse! -// #[instrument(level = "trace", ret, err)] +#[instrument(level = "trace", target = "json", skip_all)] pub fn deserialize_json<'a, T: de::Deserialize<'a> + Debug>(arg: &'a str) -> VeilidAPIResult { serde_json::from_str(arg).map_err(|e| VeilidAPIError::ParseError { message: e.to_string(), @@ -13,6 +13,8 @@ pub fn deserialize_json<'a, T: de::Deserialize<'a> + Debug>(arg: &'a str) -> Vei ), }) } + +#[instrument(level = "trace", target = "json", skip_all)] pub fn deserialize_json_bytes<'a, T: de::Deserialize<'a> + Debug>( arg: &'a [u8], ) -> VeilidAPIResult { @@ -26,7 +28,7 @@ pub fn deserialize_json_bytes<'a, T: de::Deserialize<'a> + Debug>( }) } -// #[instrument(level = "trace", ret, err)] +#[instrument(level = "trace", target = "json", skip_all)] pub fn deserialize_opt_json( arg: Option, ) -> VeilidAPIResult { @@ -40,7 +42,7 @@ pub fn deserialize_opt_json( deserialize_json(arg) } -// #[instrument(level = "trace", ret, err)] +#[instrument(level = "trace", target = "json", skip_all)] pub fn deserialize_opt_json_bytes( arg: Option>, ) -> VeilidAPIResult { @@ -54,7 +56,7 @@ pub fn deserialize_opt_json_bytes( deserialize_json_bytes(arg.as_slice()) } -// #[instrument(level = "trace", ret)] +#[instrument(level = "trace", target = "json", skip_all)] pub fn serialize_json(val: T) -> String { match serde_json::to_string(&val) { Ok(v) => v, @@ -63,6 +65,8 @@ pub fn serialize_json(val: T) -> String { } } } + +#[instrument(level = "trace", target = "json", skip_all)] pub fn serialize_json_bytes(val: T) -> Vec { match serde_json::to_vec(&val) { Ok(v) => v, @@ -78,7 +82,9 @@ pub fn serialize_json_bytes(val: T) -> Vec { pub mod as_human_base64 { use data_encoding::BASE64URL_NOPAD; use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use tracing::instrument; + #[instrument(level = "trace", target = "json", skip_all)] pub fn serialize(v: &Vec, s: S) -> Result { if s.is_human_readable() { let base64 = BASE64URL_NOPAD.encode(v); @@ -88,6 +94,7 @@ pub mod as_human_base64 { } } + #[instrument(level = "trace", target = "json", skip_all)] pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result, D::Error> { if d.is_human_readable() { let base64 = String::deserialize(d)?; @@ -103,7 +110,9 @@ pub mod as_human_base64 { pub mod as_human_opt_base64 { use data_encoding::BASE64URL_NOPAD; use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use tracing::instrument; + #[instrument(level = "trace", target = "json", skip_all)] pub fn serialize(v: &Option>, s: S) -> Result { if s.is_human_readable() { let base64 = v.as_ref().map(|x| BASE64URL_NOPAD.encode(x)); @@ -113,6 +122,7 @@ pub mod as_human_opt_base64 { } } + #[instrument(level = "trace", target = "json", skip_all)] pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result>, D::Error> { if d.is_human_readable() { let base64 = Option::::deserialize(d)?; @@ -134,7 +144,9 @@ pub mod as_human_string { use std::str::FromStr; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; + use tracing::instrument; + #[instrument(level = "trace", target = "json", skip_all)] pub fn serialize(value: &T, s: S) -> Result where T: Display + Serialize, @@ -147,6 +159,7 @@ pub mod as_human_string { } } + #[instrument(level = "trace", target = "json", skip_all)] pub fn deserialize<'de, T, D>(d: D) -> Result where T: FromStr + Deserialize<'de>, @@ -166,7 +179,9 @@ pub mod as_human_opt_string { use std::str::FromStr; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; + use tracing::instrument; + #[instrument(level = "trace", target = "json", skip_all)] pub fn serialize(value: &Option, s: S) -> Result where T: Display + Serialize, @@ -182,6 +197,7 @@ pub mod as_human_opt_string { } } + #[instrument(level = "trace", target = "json", skip_all)] pub fn deserialize<'de, T, D>(d: D) -> Result, D::Error> where T: FromStr + Deserialize<'de>, diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 779ffa95..fbc741ce 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -411,4 +411,67 @@ async def test_dht_integration_writer_reader(): print(f' {n}') n+=1 +@pytest.mark.asyncio +async def test_dht_write_read_local(): + + async def null_update_callback(update: veilid.VeilidUpdate): + pass + + try: + api0 = await api_connector(null_update_callback, 0) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server 0.") + return + + async with api0: + # purge local and remote record stores to ensure we start fresh + await api0.debug("record purge local") + await api0.debug("record purge remote") + + # make routing contexts + rc0 = await api0.new_routing_context() + async with rc0: + + COUNT = 500 + TEST_DATA = b"ABCD"*1024 + TEST_DATA2 = b"ABCD"*4096 + + # write dht records on server 0 + records = [] + schema = veilid.DHTSchema.dflt(2) + print(f'writing {COUNT} records') + for n in range(COUNT): + desc = await rc0.create_dht_record(schema) + records.append(desc) + + await rc0.set_dht_value(desc.key, 0, TEST_DATA) + await rc0.set_dht_value(desc.key, 1, TEST_DATA2) + + print(f' {n}') + print(f'syncing records to the network') + for desc0 in records: + while True: + rr = await rc0.inspect_dht_record(desc0.key, []) + if len(rr.offline_subkeys) == 0: + await rc0.close_dht_record(desc0.key) + break + time.sleep(0.1) + + # read dht records on server 0 + print(f'reading {COUNT} records') + n=0 + for desc0 in records: + desc1 = await rc0.open_dht_record(desc0.key) + + vd0 = await rc0.get_dht_value(desc1.key, 0) + assert vd0.data == TEST_DATA + + vd1 = await rc0.get_dht_value(desc1.key, 1) + assert vd1.data == TEST_DATA2 + await rc0.close_dht_record(desc1.key) + + print(f' {n}') + n+=1 + + \ No newline at end of file diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index 6cc76f3b..21743437 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -81,6 +81,7 @@ hostname = "^0" stop-token = { version = "^0", default-features = false } sysinfo = { version = "^0.30.6" } wg = { version = "^0.9.1", features = ["future"] } +tracing-flame = "0.2.0" [target.'cfg(windows)'.dependencies] windows-service = "^0" diff --git a/veilid-server/profile.json b/veilid-server/profile.json new file mode 100644 index 00000000..41776ded --- /dev/null +++ b/veilid-server/profile.json @@ -0,0 +1 @@ +{"meta":{"categories":[{"name":"Other","color":"grey","subcategories":["Other"]},{"name":"User","color":"yellow","subcategories":["Other"]}],"debug":false,"extensions":{"baseURL":[],"id":[],"length":0,"name":[]},"interval":0.1,"preprocessedProfileVersion":46,"processType":0,"product":"../target/profiling/veilid-server","sampleUnits":{"eventDelay":"ms","threadCPUDelta":"µs","time":"ms"},"startTime":1720014457589.647,"symbolicated":false,"pausedRanges":[],"version":24,"usesOnlyOneStackType":true,"doesNotUseFrameImplementation":true,"sourceCodeIsNotOnSearchfox":true,"markerSchema":[]},"libs":[{"name":"dyld","path":"/usr/lib/dyld","debugName":"dyld","debugPath":"/usr/lib/dyld","breakpadId":"37BBC384075531C7A8080ED49E44DD8E0","codeId":"37BBC384075531C7A8080ED49E44DD8E","arch":"arm64e"},{"name":"veilid-server","path":"/Users/dildog/code/veilid/target/profiling/veilid-server","debugName":"veilid-server","debugPath":"/Users/dildog/code/veilid/target/profiling/veilid-server","breakpadId":"0C900E21ABF83DD5848F0728125792EA0","codeId":"0C900E21ABF83DD5848F0728125792EA","arch":"arm64"},{"name":"libsystem_malloc.dylib","path":"/usr/lib/system/libsystem_malloc.dylib","debugName":"libsystem_malloc.dylib","debugPath":"/usr/lib/system/libsystem_malloc.dylib","breakpadId":"C6337A382B5C380595E8CF1786E2F4E70","codeId":"C6337A382B5C380595E8CF1786E2F4E7","arch":"arm64e"},{"name":"libsystem_kernel.dylib","path":"/usr/lib/system/libsystem_kernel.dylib","debugName":"libsystem_kernel.dylib","debugPath":"/usr/lib/system/libsystem_kernel.dylib","breakpadId":"9B8B53F9E2B636DF98E928D8FCA732F20","codeId":"9B8B53F9E2B636DF98E928D8FCA732F2","arch":"arm64e"}],"threads":[{"frameTable":{"length":18,"address":[24799,9886639,9760907,9887711,9893535,9924447,9927607,1245483,1245699,1229056,1245627,145256,9893543,1397243,1399123,4045067,4040447,19732],"inlineDepth":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"category":[1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1],"subcategory":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"func":[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17],"nativeSymbol":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"innerWindowID":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"implementation":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"line":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"column":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"optimizations":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null]},"funcTable":{"length":18,"name":[1,3,4,5,6,7,8,9,10,11,12,14,15,16,17,18,19,21],"isJS":[false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false],"relevantForJS":[false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false],"resource":[0,1,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,3],"fileName":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"lineNumber":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"columnNumber":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null]},"markers":{"length":0,"category":[],"data":[],"endTime":[],"name":[],"phase":[],"startTime":[]},"name":"veilid-server","isMainThread":true,"nativeSymbols":{"length":0,"address":[],"functionSize":[],"libIndex":[],"name":[]},"pausedRanges":[],"pid":"12087","processName":"veilid-server","processShutdownTime":73.451958,"processStartupTime":70.423167,"processType":"default","registerTime":70.423167,"resourceTable":{"length":4,"lib":[0,1,2,3],"name":[0,2,13,20],"host":[null,null,null,null],"type":[1,1,1,1]},"samples":{"length":4,"stack":[9,11,17,17],"time":[72.541208,72.654958,72.786542,72.872625],"weight":[1,1,1,1],"weightType":"samples","threadCPUDelta":[6619,7,56,11]},"stackTable":{"length":18,"prefix":[null,0,1,2,3,4,5,6,7,8,7,10,3,12,13,14,15,16],"frame":[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17],"category":[1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1],"subcategory":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"stringArray":["dyld","0x60df","veilid-server","0x96dbaf","0x94f08b","0x96dfdf","0x96f69f","0x976f5f","0x977bb7","0x13012b","0x130203","0x12c100","0x1301bb","libsystem_malloc.dylib","0x23768","0x96f6a7","0x1551fb","0x155953","0x3db90b","0x3da6ff","libsystem_kernel.dylib","0x4d14"],"tid":"1220572","unregisterTime":73.451958}],"pages":[],"profilerOverhead":[],"counters":[]} \ No newline at end of file diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index 7ebab500..53dcdc03 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -84,6 +84,10 @@ pub struct CmdlineArgs { #[arg(long, value_name = "endpoint")] otlp: Option, + /// Turn on flamegraph tracing (experimental, isn't terribly useful) + #[arg(long, hide = true, value_name = "PATH", num_args=0..=1, require_equals=true, default_missing_value = "")] + flame: Option, + /// Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports #[arg(long)] subnode_index: Option, @@ -206,6 +210,19 @@ fn main() -> EyreResult<()> { .wrap_err("failed to parse OTLP address")?; settingsrw.logging.otlp.level = LogLevel::Trace; } + if let Some(flame) = args.flame { + let flame = if flame.is_empty() { + Settings::get_default_flame_path(settingsrw.testing.subnode_index) + .to_string_lossy() + .to_string() + } else { + flame.to_string_lossy().to_string() + }; + println!("Enabling flamegraph output to {}", flame); + settingsrw.logging.flame.enabled = true; + settingsrw.logging.flame.path = flame; + } + if args.no_attach { settingsrw.auto_attach = false; } diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index b82b6c89..e4455fd6 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -66,6 +66,9 @@ logging: level: 'trace' grpc_endpoint: 'localhost:4317' ignore_log_targets: [] + flame: + enabled: false + path: '' console: enabled: false testing: @@ -442,6 +445,12 @@ pub struct Terminal { pub ignore_log_targets: Vec, } +#[derive(Debug, Deserialize, Serialize)] +pub struct Flame { + pub enabled: bool, + pub path: String, +} + #[derive(Debug, Deserialize, Serialize)] pub struct Console { pub enabled: bool, @@ -493,6 +502,7 @@ pub struct Logging { pub file: File, pub api: Api, pub otlp: Otlp, + pub flame: Flame, pub console: Console, } @@ -854,6 +864,15 @@ impl Settings { .unwrap_or_else(|| PathBuf::from("./veilid-server.conf")) } + /// Determine default flamegraph output path + pub fn get_default_flame_path(subnode_index: u16) -> PathBuf { + std::env::temp_dir().join(if subnode_index == 0 { + "veilid-server.folded".to_owned() + } else { + format!("veilid-server-{}.folded", subnode_index) + }) + } + #[allow(dead_code)] fn get_or_create_private_directory>(path: P, group_read: bool) -> bool { let path = path.as_ref(); @@ -975,6 +994,8 @@ impl Settings { set_config_value!(inner.logging.otlp.level, value); set_config_value!(inner.logging.otlp.grpc_endpoint, value); set_config_value!(inner.logging.otlp.ignore_log_targets, value); + set_config_value!(inner.logging.flame.enabled, value); + set_config_value!(inner.logging.flame.path, value); set_config_value!(inner.logging.console.enabled, value); set_config_value!(inner.testing.subnode_index, value); set_config_value!(inner.core.capabilities.disable, value); @@ -1542,6 +1563,8 @@ mod tests { s.logging.otlp.grpc_endpoint, NamedSocketAddrs::from_str("localhost:4317").unwrap() ); + assert!(!s.logging.flame.enabled); + assert_eq!(s.logging.flame.path, ""); assert!(!s.logging.console.enabled); assert_eq!(s.testing.subnode_index, 0); diff --git a/veilid-server/src/veilid_logs.rs b/veilid-server/src/veilid_logs.rs index 1e4fa33f..55e7e2b7 100644 --- a/veilid-server/src/veilid_logs.rs +++ b/veilid-server/src/veilid_logs.rs @@ -17,11 +17,13 @@ use std::collections::BTreeMap; use std::path::*; use std::sync::Arc; use tracing_appender::*; +use tracing_flame::FlameLayer; use tracing_subscriber::prelude::*; use tracing_subscriber::*; struct VeilidLogsInner { - _guard: Option, + _file_guard: Option, + _flame_guard: Option>>, filters: BTreeMap<&'static str, veilid_core::VeilidLayerFilter>, } @@ -71,6 +73,25 @@ impl VeilidLogs { layers.push(layer.boxed()); } + // Flamegraph logger + let mut flame_guard = None; + if settingsr.logging.flame.enabled { + let filter = veilid_core::VeilidLayerFilter::new( + convert_loglevel(LogLevel::Trace), + &[], //&settingsr.logging.terminal.ignore_log_targets, + ); + let (flame_layer, guard) = FlameLayer::with_file(&settingsr.logging.flame.path)?; + flame_guard = Some(guard); + filters.insert("flame", filter.clone()); + layers.push( + flame_layer + .with_threads_collapsed(true) + .with_empty_samples(false) + .with_filter(filter) + .boxed(), + ); + } + // OpenTelemetry logger #[cfg(feature = "opentelemetry-otlp")] if settingsr.logging.otlp.enabled { @@ -121,7 +142,7 @@ impl VeilidLogs { } // File logger - let mut guard = None; + let mut file_guard = None; if settingsr.logging.file.enabled { let log_path = Path::new(&settingsr.logging.file.path); let full_path = std::env::current_dir() @@ -143,7 +164,7 @@ impl VeilidLogs { let appender = tracing_appender::rolling::never(log_parent, Path::new(log_filename)); let (non_blocking_appender, non_blocking_guard) = tracing_appender::non_blocking(appender); - guard = Some(non_blocking_guard); + file_guard = Some(non_blocking_guard); let filter = veilid_core::VeilidLayerFilter::new( convert_loglevel(settingsr.logging.file.level), @@ -192,7 +213,8 @@ impl VeilidLogs { Ok(VeilidLogs { inner: Arc::new(Mutex::new(VeilidLogsInner { - _guard: guard, + _file_guard: file_guard, + _flame_guard: flame_guard, filters, })), })