flamegraph and instrumentation work

This commit is contained in:
Christien Rioux 2024-07-03 13:15:59 -04:00
parent 94a8c2a54e
commit 20e76bbed1
54 changed files with 658 additions and 203 deletions

29
Cargo.lock generated
View File

@ -2844,18 +2844,18 @@ dependencies = [
[[package]]
name = "keyvaluedb"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bdcaabe14fa83eaae1fb92480f619c5a8b413580723153da0334848eb2dff24"
checksum = "c3fe4850c4103a92a7bd14a56ecbe413c27f8c89ea642cb4753b310c30dff812"
dependencies = [
"smallvec",
]
[[package]]
name = "keyvaluedb-memorydb"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62802173041ed97845bc20f8cf6817e445fe537ed879ddf43fb96166829ec8ff"
checksum = "e5e8d196e170cdf21ee4fb0ff779278ca24253c10abee1a49914a893dce71097"
dependencies = [
"keyvaluedb",
"parking_lot 0.12.3",
@ -2863,9 +2863,9 @@ dependencies = [
[[package]]
name = "keyvaluedb-sqlite"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "144f474a27a7dadc5c179c08ef9a4662acaca8047f4e5c30d5b77582243c7538"
checksum = "aa4d3df76ca45b92891e22fbd0a0928d2ef848a65051d5bd2da43ed1c0dfeb6f"
dependencies = [
"hex",
"keyvaluedb",
@ -2876,9 +2876,9 @@ dependencies = [
[[package]]
name = "keyvaluedb-web"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d93d243dfa1643389f8b981ddc07b2a7c533f0fae38b3f5831b004b2cc7f6353"
checksum = "351f750d6c87e6af74cdead18c33bb14fa47882787873bc05faa0002078cb9e5"
dependencies = [
"async-lock 2.8.0",
"flume",
@ -5490,6 +5490,17 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-flame"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
dependencies = [
"lazy_static",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-journald"
version = "0.3.0"
@ -5881,6 +5892,7 @@ dependencies = [
"glob",
"hex",
"hickory-resolver",
"indent",
"jni",
"jni-sys",
"js-sys",
@ -6047,6 +6059,7 @@ dependencies = [
"tokio-util",
"tracing",
"tracing-appender",
"tracing-flame",
"tracing-journald",
"tracing-opentelemetry",
"tracing-subscriber",

View File

@ -80,7 +80,7 @@ thiserror = "1.0.50"
# Data structures
enumset = { version = "1.1.3", features = ["serde"] }
keyvaluedb = "0.1.1"
keyvaluedb = "0.1.2"
range-set-blaze = "0.1.13"
weak-table = "0.3.2"
hashlink = { package = "veilid-hashlink", version = "0.1.0", features = [
@ -135,6 +135,7 @@ lz4_flex = { version = "0.11.1", default-features = false, features = [
"safe-encode",
"safe-decode",
] }
indent = "0.1.1"
# Dependencies for native builds only
# Linux, Windows, Mac, iOS, Android
@ -163,7 +164,7 @@ futures-util = { version = "0.3.29", default-features = false, features = [
# Data structures
keyring-manager = "0.5.1"
keyvaluedb-sqlite = "0.1.1"
keyvaluedb-sqlite = "0.1.2"
# Network
async-tungstenite = { package = "veilid-async-tungstenite", version = "0.23.0", features = [
@ -211,7 +212,7 @@ wasm-logger = "0.2.0"
tracing-wasm = "0.2.1"
# Data Structures
keyvaluedb-web = "0.1.1"
keyvaluedb-web = "0.1.2"
### Configuration for WASM32 'web-sys' crate
[target.'cfg(target_arch = "wasm32")'.dependencies.web-sys]

View File

@ -66,6 +66,7 @@ impl Envelope {
}
}
#[instrument(level = "trace", target = "envelope", skip_all, err)]
pub fn from_signed_data(
crypto: Crypto,
data: &[u8],
@ -190,6 +191,7 @@ impl Envelope {
})
}
#[instrument(level = "trace", target = "envelope", skip_all, err)]
pub fn decrypt_body(
&self,
crypto: Crypto,
@ -222,6 +224,7 @@ impl Envelope {
Ok(body)
}
#[instrument(level = "trace", target = "envelope", skip_all, err)]
pub fn to_encrypted_data(
&self,
crypto: Crypto,

View File

@ -68,6 +68,7 @@ impl Receipt {
})
}
#[instrument(level = "trace", target = "receipt", skip_all, err)]
pub fn from_signed_data(crypto: Crypto, data: &[u8]) -> VeilidAPIResult<Receipt> {
// Ensure we are at least the length of the envelope
if data.len() < MIN_RECEIPT_SIZE {
@ -156,6 +157,7 @@ impl Receipt {
})
}
#[instrument(level = "trace", target = "receipt", skip_all, err)]
pub fn to_signed_data(&self, crypto: Crypto, secret: &SecretKey) -> VeilidAPIResult<Vec<u8>> {
// Ensure extra data isn't too long
let receipt_size: usize = self.extra_data.len() + MIN_RECEIPT_SIZE;

View File

@ -69,12 +69,14 @@ impl CryptoSystem for CryptoSystemVLD0 {
}
// Cached Operations
#[instrument(level = "trace", skip_all)]
fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret> {
self.crypto
.cached_dh_internal::<CryptoSystemVLD0>(self, key, secret)
}
// Generation
#[instrument(level = "trace", target = "crypto", skip_all)]
fn random_bytes(&self, len: u32) -> Vec<u8> {
let mut bytes = unsafe { unaligned_u8_vec_uninit(len as usize) };
random_bytes(bytes.as_mut());
@ -83,6 +85,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
fn default_salt_length(&self) -> u32 {
16
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn hash_password(&self, password: &[u8], salt: &[u8]) -> VeilidAPIResult<String> {
if salt.len() < Salt::MIN_LENGTH || salt.len() > Salt::MAX_LENGTH {
apibail_generic!("invalid salt length");
@ -100,6 +103,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
.to_string();
Ok(password_hash)
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn verify_password(&self, password: &[u8], password_hash: &str) -> VeilidAPIResult<bool> {
let parsed_hash = PasswordHash::new(password_hash).map_err(VeilidAPIError::generic)?;
// Argon2 with default params (Argon2id v19)
@ -108,6 +112,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
Ok(argon2.verify_password(password, &parsed_hash).is_ok())
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn derive_shared_secret(&self, password: &[u8], salt: &[u8]) -> VeilidAPIResult<SharedSecret> {
if salt.len() < Salt::MIN_LENGTH || salt.len() > Salt::MAX_LENGTH {
apibail_generic!("invalid salt length");
@ -123,16 +128,21 @@ impl CryptoSystem for CryptoSystemVLD0 {
Ok(SharedSecret::new(output_key_material))
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn random_nonce(&self) -> Nonce {
let mut nonce = [0u8; NONCE_LENGTH];
random_bytes(&mut nonce);
Nonce::new(nonce)
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn random_shared_secret(&self) -> SharedSecret {
let mut s = [0u8; SHARED_SECRET_LENGTH];
random_bytes(&mut s);
SharedSecret::new(s)
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn compute_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret> {
let pk_xd = public_to_x25519_pk(key)?;
let sk_xd = secret_to_x25519_sk(secret)?;
@ -146,12 +156,18 @@ impl CryptoSystem for CryptoSystemVLD0 {
Ok(SharedSecret::new(*output.as_bytes()))
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn generate_keypair(&self) -> KeyPair {
vld0_generate_keypair()
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn generate_hash(&self, data: &[u8]) -> PublicKey {
PublicKey::new(*blake3::hash(data).as_bytes())
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn generate_hash_reader(&self, reader: &mut dyn std::io::Read) -> VeilidAPIResult<PublicKey> {
let mut hasher = blake3::Hasher::new();
std::io::copy(reader, &mut hasher).map_err(VeilidAPIError::generic)?;
@ -159,6 +175,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
}
// Validation
#[instrument(level = "trace", target = "crypto", skip_all)]
fn validate_keypair(&self, dht_key: &PublicKey, dht_key_secret: &SecretKey) -> bool {
let data = vec![0u8; 512];
let Ok(sig) = self.sign(dht_key, dht_key_secret, &data) else {
@ -169,11 +186,15 @@ impl CryptoSystem for CryptoSystemVLD0 {
};
v
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn validate_hash(&self, data: &[u8], dht_key: &PublicKey) -> bool {
let bytes = *blake3::hash(data).as_bytes();
bytes == dht_key.bytes
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn validate_hash_reader(
&self,
reader: &mut dyn std::io::Read,
@ -184,7 +205,9 @@ impl CryptoSystem for CryptoSystemVLD0 {
let bytes = *hasher.finalize().as_bytes();
Ok(bytes == dht_key.bytes)
}
// Distance Metric
#[instrument(level = "trace", target = "crypto", skip_all)]
fn distance(&self, key1: &PublicKey, key2: &PublicKey) -> CryptoKeyDistance {
let mut bytes = [0u8; CRYPTO_KEY_LENGTH];
@ -196,6 +219,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
}
// Authentication
#[instrument(level = "trace", target = "crypto", skip_all)]
fn sign(
&self,
dht_key: &PublicKey,
@ -225,6 +249,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
Ok(sig)
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn verify(
&self,
dht_key: &PublicKey,
@ -251,6 +276,8 @@ impl CryptoSystem for CryptoSystemVLD0 {
fn aead_overhead(&self) -> usize {
AEAD_OVERHEAD
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn decrypt_in_place_aead(
&self,
body: &mut Vec<u8>,
@ -266,6 +293,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
.map_err(VeilidAPIError::generic)
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn decrypt_aead(
&self,
body: &[u8],
@ -280,6 +308,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
Ok(out)
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn encrypt_in_place_aead(
&self,
body: &mut Vec<u8>,
@ -296,6 +325,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
.map_err(VeilidAPIError::generic)
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn encrypt_aead(
&self,
body: &[u8],
@ -311,6 +341,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
}
// NoAuth Encrypt/Decrypt
#[instrument(level = "trace", target = "crypto", skip_all)]
fn crypt_in_place_no_auth(
&self,
body: &mut [u8],
@ -321,6 +352,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
cipher.apply_keystream(body);
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn crypt_b2b_no_auth(
&self,
in_buf: &[u8],
@ -332,6 +364,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
cipher.apply_keystream_b2b(in_buf, out_buf).unwrap();
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn crypt_no_auth_aligned_8(
&self,
in_buf: &[u8],
@ -343,6 +376,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
out_buf
}
#[instrument(level = "trace", target = "crypto", skip_all)]
fn crypt_no_auth_unaligned(
&self,
in_buf: &[u8],

View File

@ -611,7 +611,7 @@ impl NetworkManager {
}
/// Process a received out-of-band receipt
#[instrument(level = "trace", skip(self, receipt_data), ret)]
#[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn handle_out_of_band_receipt<R: AsRef<[u8]>>(
&self,
receipt_data: R,
@ -631,7 +631,7 @@ impl NetworkManager {
}
/// Process a received in-band receipt
#[instrument(level = "trace", skip(self, receipt_data), ret)]
#[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn handle_in_band_receipt<R: AsRef<[u8]>>(
&self,
receipt_data: R,
@ -652,7 +652,7 @@ impl NetworkManager {
}
/// Process a received safety receipt
#[instrument(level = "trace", skip(self, receipt_data), ret)]
#[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn handle_safety_receipt<R: AsRef<[u8]>>(
&self,
receipt_data: R,
@ -672,7 +672,7 @@ impl NetworkManager {
}
/// Process a received private receipt
#[instrument(level = "trace", skip(self, receipt_data), ret)]
#[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn handle_private_receipt<R: AsRef<[u8]>>(
&self,
receipt_data: R,
@ -693,7 +693,7 @@ impl NetworkManager {
}
// Process a received signal
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn handle_signal(
&self,
signal_flow: Flow,
@ -792,10 +792,7 @@ impl NetworkManager {
}
/// Builds an envelope for sending over the network
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, body), err)
)]
#[instrument(level = "trace", target = "receipt", skip_all)]
fn build_envelope<B: AsRef<[u8]>>(
&self,
dest_node_id: TypedKey,
@ -838,10 +835,7 @@ impl NetworkManager {
/// node_ref is the direct destination to which the envelope will be sent
/// If 'destination_node_ref' is specified, it can be different than the node_ref being sent to
/// which will cause the envelope to be relayed
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, body), ret, err)
)]
#[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn send_envelope<B: AsRef<[u8]>>(
&self,
node_ref: NodeRef,
@ -879,7 +873,7 @@ impl NetworkManager {
}
/// Called by the RPC handler when we want to issue an direct receipt
#[instrument(level = "debug", skip(self, rcpt_data), err)]
#[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn send_out_of_band_receipt(
&self,
dial_info: DialInfo,
@ -905,7 +899,7 @@ impl NetworkManager {
// Called when a packet potentially containing an RPC envelope is received by a low-level
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
// and passes it to the RPC handler
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len())))]
#[instrument(level = "trace", target = "receipt", skip_all)]
async fn on_recv_envelope(&self, data: &mut [u8], flow: Flow) -> EyreResult<bool> {
#[cfg(feature = "verbose-tracing")]
let root = span!(

View File

@ -220,7 +220,7 @@ impl NetworkConnection {
}
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(message, stats), fields(message.len = message.len()), ret))]
#[instrument(level="trace", target="net", skip_all)]
async fn send_internal(
protocol_connection: &ProtocolNetworkConnection,
stats: Arc<Mutex<NetworkConnectionStats>>,
@ -235,7 +235,7 @@ impl NetworkConnection {
Ok(NetworkResult::Value(()))
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(stats), fields(ret.len)))]
#[instrument(level="trace", target="net", skip_all)]
async fn recv_internal(
protocol_connection: &ProtocolNetworkConnection,
stats: Arc<Mutex<NetworkConnectionStats>>,
@ -265,6 +265,7 @@ impl NetworkConnection {
// Connection receiver loop
#[allow(clippy::too_many_arguments)]
#[instrument(level="trace", target="net", skip_all)]
fn process_connection(
connection_manager: ConnectionManager,
local_stop_token: StopToken,

View File

@ -198,6 +198,7 @@ impl ReceiptManager {
Ok(())
}
#[instrument(level = "trace", target = "receipt", skip_all)]
fn perform_callback(
evt: ReceiptEvent,
record_mut: &mut ReceiptRecord,
@ -221,7 +222,7 @@ impl ReceiptManager {
}
}
#[instrument(level = "trace", skip(self))]
#[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) {
// Go through all receipts and build a list of expired nonces
let mut new_next_oldest_ts: Option<Timestamp> = None;
@ -270,6 +271,7 @@ impl ReceiptManager {
}
}
#[instrument(level = "trace", target = "receipt", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> {
let (next_oldest_ts, timeout_task, stop_token) = {
let inner = self.inner.lock();
@ -318,6 +320,7 @@ impl ReceiptManager {
}
#[allow(dead_code)]
#[instrument(level = "trace", target = "receipt", skip_all)]
pub fn record_receipt(
&self,
receipt: Receipt,
@ -339,6 +342,7 @@ impl ReceiptManager {
Self::update_next_oldest_timestamp(&mut inner);
}
#[instrument(level = "trace", target = "receipt", skip_all)]
pub fn record_single_shot_receipt(
&self,
receipt: Receipt,

View File

@ -11,6 +11,7 @@ impl NetworkManager {
///
/// Sending to a node requires determining a NetworkClass compatible contact method
/// between the source and destination node
#[instrument(level="trace", target="net", skip_all, err)]
pub(crate) async fn send_data(
&self,
destination_node_ref: NodeRef,
@ -149,6 +150,7 @@ impl NetworkManager {
}
/// Send data using NodeContactMethod::Existing
#[instrument(level="trace", target="net", skip_all, err)]
async fn send_data_ncm_existing(
&self,
target_node_ref: NodeRef,
@ -185,6 +187,7 @@ impl NetworkManager {
}
/// Send data using NodeContactMethod::Unreachable
#[instrument(level="trace", target="net", skip_all, err)]
async fn send_data_ncm_unreachable(
&self,
target_node_ref: NodeRef,
@ -221,6 +224,7 @@ impl NetworkManager {
}
/// Send data using NodeContactMethod::SignalReverse
#[instrument(level="trace", target="net", skip_all, err)]
async fn send_data_ncm_signal_reverse(
&self,
relay_nr: NodeRef,
@ -268,6 +272,7 @@ impl NetworkManager {
}
/// Send data using NodeContactMethod::SignalHolePunch
#[instrument(level="trace", target="net", skip_all, err)]
async fn send_data_ncm_signal_hole_punch(
&self,
relay_nr: NodeRef,
@ -313,6 +318,7 @@ impl NetworkManager {
}
/// Send data using NodeContactMethod::Direct
#[instrument(level="trace", target="net", skip_all, err)]
async fn send_data_ncm_direct(
&self,
node_ref: NodeRef,
@ -372,6 +378,7 @@ impl NetworkManager {
/// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access
/// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not
/// allowed to use NodeRefs due to recursive locking
#[instrument(level="trace", target="net", skip_all, err)]
pub(crate) fn get_node_contact_method(
&self,
target_node_ref: NodeRef,
@ -544,10 +551,7 @@ impl NetworkManager {
/// Send a reverse connection signal and wait for the return receipt over it
/// Then send the data across the new connection
/// Only usable for PublicInternet routing domain
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, data), err)
)]
#[instrument(level="trace", target="net", skip_all, err)]
async fn do_reverse_connect(
&self,
relay_nr: NodeRef,
@ -636,10 +640,7 @@ impl NetworkManager {
/// Send a hole punch signal and do a negotiating ping and wait for the return receipt
/// Then send the data across the new connection
/// Only usable for PublicInternet routing domain
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, data), err)
)]
#[instrument(level="trace", target="net", skip_all, err)]
async fn do_hole_punch(
&self,
relay_nr: NodeRef,

View File

@ -140,6 +140,7 @@ where
})
}
#[instrument(level = "trace", target = "fanout", skip_all)]
fn evaluate_done(self: Arc<Self>, ctx: &mut FanoutContext<R>) -> bool {
// If we have a result, then we're done
if ctx.result.is_some() {
@ -151,6 +152,7 @@ where
ctx.result.is_some()
}
#[instrument(level = "trace", target = "fanout", skip_all)]
fn add_to_fanout_queue(self: Arc<Self>, new_nodes: &[NodeRef]) {
event!(target: "fanout", Level::DEBUG,
"FanoutCall::add_to_fanout_queue:\n new_nodes={{\n{}}}\n",
@ -172,6 +174,7 @@ where
});
}
#[instrument(level = "trace", target = "fanout", skip_all)]
async fn fanout_processor(self: Arc<Self>) -> bool {
// Loop until we have a result or are done
loop {
@ -229,6 +232,7 @@ where
}
}
#[instrument(level = "trace", target = "fanout", skip_all)]
fn init_closest_nodes(self: Arc<Self>) -> Result<(), RPCError> {
// Get the 'node_count' closest nodes to the key out of our routing table
let closest_nodes = {
@ -278,6 +282,7 @@ where
Ok(())
}
#[instrument(level = "trace", target = "fanout", skip_all)]
pub async fn run(
self: Arc<Self>,
init_fanout_queue: Vec<NodeRef>,

View File

@ -448,6 +448,7 @@ impl RPCProcessor {
}
/// Determine if a SignedNodeInfo can be placed into the specified routing domain
#[instrument(level="trace", target="rpc", skip_all)]
fn verify_node_info(
&self,
routing_domain: RoutingDomain,
@ -463,6 +464,7 @@ impl RPCProcessor {
/// Search the network for a single node and add it to the routing table and return the node reference
/// If no node was found in the timeout, this returns None
#[instrument(level="trace", target="rpc", skip_all)]
async fn search_for_node_id(
&self,
node_id: TypedKey,
@ -529,6 +531,7 @@ impl RPCProcessor {
/// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
/// Note: This routine can possibly be recursive, hence the SendPinBoxFuture async form
#[instrument(level="trace", target="rpc", skip_all)]
pub fn resolve_node(
&self,
node_id: TypedKey,
@ -579,10 +582,7 @@ impl RPCProcessor {
})
}
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, waitable_reply), err)
)]
#[instrument(level="trace", target="rpc", skip_all)]
async fn wait_for_reply(
&self,
waitable_reply: WaitableReply,
@ -654,6 +654,7 @@ impl RPCProcessor {
}
/// Wrap an operation with a private route inside a safety route
#[instrument(level="trace", target="rpc", skip_all)]
fn wrap_with_route(
&self,
safety_selection: SafetySelection,
@ -728,10 +729,7 @@ impl RPCProcessor {
/// Produce a byte buffer that represents the wire encoding of the entire
/// unencrypted envelope body for a RPC message. This incorporates
/// wrapping a private and/or safety route if they are specified.
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "debug", skip(self, operation), err)
)]
#[instrument(level="trace", target="rpc", skip_all)]
fn render_operation(
&self,
dest: Destination,
@ -863,10 +861,7 @@ impl RPCProcessor {
/// routing table caching when it is okay to do so
/// Also check target's timestamp of our own node info, to see if we should send that
/// And send our timestamp of the target's node info so they can determine if they should update us on their next rpc
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret)
)]
#[instrument(level="trace", target="rpc", skip_all)]
fn get_sender_peer_info(&self, dest: &Destination) -> SenderPeerInfo {
// Don't do this if the sender is to remain private
// Otherwise we would be attaching the original sender's identity to the final destination,
@ -908,6 +903,7 @@ impl RPCProcessor {
}
/// Record failure to send to node or route
#[instrument(level="trace", target="rpc", skip_all)]
fn record_send_failure(
&self,
rpc_kind: RPCKind,
@ -942,6 +938,7 @@ impl RPCProcessor {
}
/// Record question lost to node or route
#[instrument(level="trace", target="rpc", skip_all)]
fn record_question_lost(
&self,
send_ts: Timestamp,
@ -984,6 +981,7 @@ impl RPCProcessor {
}
/// Record success sending to node or route
#[instrument(level="trace", target="rpc", skip_all)]
fn record_send_success(
&self,
rpc_kind: RPCKind,
@ -1027,6 +1025,7 @@ impl RPCProcessor {
/// Record answer received from node or route
#[allow(clippy::too_many_arguments)]
#[instrument(level="trace", target="rpc", skip_all)]
fn record_answer_received(
&self,
send_ts: Timestamp,
@ -1112,6 +1111,7 @@ impl RPCProcessor {
}
/// Record question or statement received from node or route
#[instrument(level="trace", target="rpc", skip_all)]
fn record_question_received(&self, msg: &RPCMessage) {
let recv_ts = msg.header.timestamp;
let bytes = msg.header.body_len;
@ -1156,10 +1156,7 @@ impl RPCProcessor {
/// Issue a question over the network, possibly using an anonymized route
/// Optionally keeps a context to be passed to the answer processor when an answer is received
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "debug", skip(self, question), err)
)]
#[instrument(level="trace", target="rpc", skip_all)]
async fn question(
&self,
dest: Destination,
@ -1261,10 +1258,7 @@ impl RPCProcessor {
}
/// Issue a statement over the network, possibly using an anonymized route
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "debug", skip(self, statement), err)
)]
#[instrument(level="trace", target="rpc", skip_all)]
async fn statement(
&self,
dest: Destination,
@ -1336,10 +1330,7 @@ impl RPCProcessor {
}
/// Issue a reply over the network, possibly using an anonymized route
/// The request must want a response, or this routine fails
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "debug", skip(self, request, answer), err)
)]
#[instrument(level="trace", target="rpc", skip_all)]
async fn answer(
&self,
request: RPCMessage,
@ -1417,7 +1408,7 @@ impl RPCProcessor {
/// Decoding RPC from the wire
/// This performs a capnp decode on the data, and if it passes the capnp schema
/// it performs the cryptographic validation required to pass the operation up for processing
#[instrument(skip_all)]
#[instrument(level="trace", target="rpc", skip_all)]
fn decode_rpc_operation(
&self,
encoded_msg: &RPCMessageEncoded,
@ -1445,6 +1436,7 @@ impl RPCProcessor {
/// caller or receiver. This does not mean the operation is 'semantically correct'. For
/// complex operations that require stateful validation and a more robust context than
/// 'signatures', the caller must still perform whatever validation is necessary
#[instrument(level="trace", target="rpc", skip_all)]
fn validate_rpc_operation(&self, operation: &mut RPCOperation) -> Result<(), RPCError> {
// If this is an answer, get the question context for this answer
// If we received an answer for a question we did not ask, this will return an error
@ -1469,10 +1461,7 @@ impl RPCProcessor {
}
//////////////////////////////////////////////////////////////////////
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, encoded_msg), err)
)]
#[instrument(level="trace", target="rpc", skip_all)]
async fn process_rpc_message(
&self,
encoded_msg: RPCMessageEncoded,
@ -1671,6 +1660,7 @@ impl RPCProcessor {
}
}
#[instrument(level="trace", target="rpc", skip_all)]
async fn rpc_worker(
self,
stop_token: StopToken,
@ -1700,10 +1690,7 @@ impl RPCProcessor {
}
}
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, body), err)
)]
#[instrument(level="trace", target="rpc", skip_all)]
pub fn enqueue_direct_message(
&self,
envelope: Envelope,
@ -1742,10 +1729,7 @@ impl RPCProcessor {
Ok(())
}
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, body), err)
)]
#[instrument(level="trace", target="rpc", skip_all)]
fn enqueue_safety_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,
@ -1781,10 +1765,7 @@ impl RPCProcessor {
Ok(())
}
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, body), err)
)]
#[instrument(level="trace", target="rpc", skip_all)]
fn enqueue_private_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,

View File

@ -128,10 +128,7 @@ where
}
/// Complete the app call
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, message), err)
)]
#[instrument(level = "trace", target = "rpc", skip_all)]
pub async fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
let waiting_op = {
let mut inner = self.inner.lock();
@ -151,6 +148,7 @@ where
}
/// Wait for operation to complete
#[instrument(level = "trace", target = "rpc", skip_all)]
pub async fn wait_for_op(
&self,
mut handle: OperationWaitHandle<T, C>,

View File

@ -3,10 +3,7 @@ use super::*;
impl RPCProcessor {
// Sends a high level app request and wait for response
// Can be sent via all methods including relays and routes
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, message), fields(message.len = message.len(), ret.latency, ret.len), err)
)]
#[instrument(level = "trace", target = "rpc", skip(self, message), fields(message.len = message.len(), ret.latency, ret.len), err)]
pub async fn rpc_call_app_call(
self,
dest: Destination,
@ -57,7 +54,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_app_call_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
let routing_table = self.routing_table();
@ -149,6 +146,7 @@ impl RPCProcessor {
}
/// Exposed to API for apps to return app call answers
#[instrument(level = "trace", target = "rpc", skip_all)]
pub async fn app_call_reply(
&self,
call_id: OperationId,

View File

@ -3,10 +3,7 @@ use super::*;
impl RPCProcessor {
// Sends a high level app message
// Can be sent via all methods including relays and routes
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, message), fields(message.len = message.len()), err)
)]
#[instrument(level = "trace", target = "rpc", skip(self, message), fields(message.len = message.len()), err)]
pub async fn rpc_call_app_message(
self,
dest: Destination,
@ -21,7 +18,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_app_message(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
let routing_table = self.routing_table();

View File

@ -1,7 +1,7 @@
use super::*;
impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
#[cfg(feature = "unstable-tunnels")]

View File

@ -1,7 +1,7 @@
use super::*;
impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_complete_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
#[cfg(feature = "unstable-tunnels")]

View File

@ -1,7 +1,7 @@
use super::*;
impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_find_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
#[cfg(feature = "unstable-blockstore")]

View File

@ -7,10 +7,7 @@ impl RPCProcessor {
/// Because this leaks information about the identity of the node itself,
/// replying to this request received over a private route will leak
/// the identity of the node and defeat the private route.
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self), err)
)]
#[instrument(level = "trace", target = "rpc", skip(self), err)]
pub async fn rpc_call_find_node(
self,
dest: Destination,

View File

@ -16,16 +16,13 @@ impl RPCProcessor {
/// replying to this request received over a private route will leak
/// the identity of the node and defeat the private route.
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, last_descriptor),
#[instrument(level = "trace", target = "rpc", skip(self, last_descriptor),
fields(ret.value.data.len,
ret.value.data.seq,
ret.value.data.writer,
ret.peers.len,
ret.latency
),err)
)]
),err)]
pub async fn rpc_call_get_value(
self,
dest: Destination,
@ -168,7 +165,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_get_value_q(
&self,
msg: RPCMessage,

View File

@ -19,13 +19,12 @@ impl RPCProcessor {
/// * the amount requested
/// * an amount truncated to MAX_INSPECT_VALUE_A_SEQS_LEN subkeys
/// * zero if nothing was found
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, last_descriptor),
#[
instrument(level = "trace", target = "rpc", skip(self, last_descriptor),
fields(ret.peers.len,
ret.latency
),err)
)]
]
pub async fn rpc_call_inspect_value(
self,
dest: Destination,
@ -153,7 +152,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_inspect_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ensure this never came over a private route, safety route is okay though
match &msg.header.detail {

View File

@ -3,10 +3,7 @@ use super::*;
impl RPCProcessor {
// Sends a unidirectional in-band return receipt
// Can be sent via all methods including relays and routes
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, receipt), ret, err)
)]
#[instrument(level = "trace", target = "rpc", skip(self, receipt), ret, err)]
pub async fn rpc_call_return_receipt<D: AsRef<[u8]>>(
self,
dest: Destination,
@ -24,7 +21,7 @@ impl RPCProcessor {
Ok(NetworkResult::value(()))
}
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_return_receipt(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Get the statement
let (_, _, _, kind) = msg.operation.destructure();

View File

@ -1,10 +1,7 @@
use super::*;
impl RPCProcessor {
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip_all, err)
)]
#[instrument(level = "trace", target = "rpc", skip_all, err)]
async fn process_route_safety_route_hop(
&self,
routed_operation: RoutedOperation,
@ -59,10 +56,7 @@ impl RPCProcessor {
.await
}
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip_all, err)
)]
#[instrument(level = "trace", target = "rpc", skip_all, err)]
async fn process_route_private_route_hop(
&self,
routed_operation: RoutedOperation,
@ -112,10 +106,7 @@ impl RPCProcessor {
/// Note: it is important that we never respond with a safety route to questions that come
/// in without a private route. Giving away a safety route when the node id is known is
/// a privacy violation!
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip_all, err)
)]
#[instrument(level = "trace", target = "rpc", skip_all, err)]
fn process_safety_routed_operation(
&self,
detail: RPCMessageHeaderDetailDirect,
@ -159,10 +150,7 @@ impl RPCProcessor {
}
/// Process a routed operation that came in over both a safety route and a private route
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip_all, err)
)]
#[instrument(level = "trace", target = "rpc", skip_all, err)]
fn process_private_routed_operation(
&self,
detail: RPCMessageHeaderDetailDirect,
@ -235,10 +223,7 @@ impl RPCProcessor {
Ok(NetworkResult::value(()))
}
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip_all, err)
)]
#[instrument(level = "trace", target = "rpc", skip_all, err)]
fn process_routed_operation(
&self,
detail: RPCMessageHeaderDetailDirect,
@ -272,7 +257,7 @@ impl RPCProcessor {
feature = "verbose-tracing",
instrument(level = "trace", skip_all, err)
)]
#[instrument(level = "trace", target = "rpc", skip_all)]
async fn process_private_route_first_hop(
&self,
mut routed_operation: RoutedOperation,
@ -337,6 +322,7 @@ impl RPCProcessor {
}
/// Decrypt route hop data and sign routed operation
#[instrument(level = "trace", target = "rpc", skip_all)]
fn decrypt_private_route_hop_data(
&self,
route_hop_data: &RouteHopData,
@ -399,10 +385,7 @@ impl RPCProcessor {
Ok(NetworkResult::value(route_hop))
}
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err)
)]
#[instrument(level = "trace", target = "rpc", skip(self), ret, err)]
pub(crate) async fn process_route(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
let routing_table = self.routing_table();

View File

@ -14,9 +14,7 @@ impl RPCProcessor {
/// Because this leaks information about the identity of the node itself,
/// replying to this request received over a private route will leak
/// the identity of the node and defeat the private route.
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, value, descriptor),
#[instrument(level = "trace", target = "rpc", skip(self, value, descriptor),
fields(value.data.len = value.value_data().data().len(),
value.data.seq = value.value_data().seq(),
value.data.writer = value.value_data().writer().to_string(),
@ -26,8 +24,7 @@ impl RPCProcessor {
ret.value.data.writer,
ret.peers.len,
ret.latency
), err)
)]
), err)]
pub async fn rpc_call_set_value(
self,
dest: Destination,
@ -183,7 +180,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_set_value_q(
&self,
msg: RPCMessage,

View File

@ -3,10 +3,7 @@ use super::*;
impl RPCProcessor {
// Sends a unidirectional signal to a node
// Can be sent via relays but not routes. For routed 'signal' like capabilities, use AppMessage.
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err)
)]
#[instrument(level = "trace", target = "rpc", skip(self), ret, err)]
pub async fn rpc_call_signal(
self,
dest: Destination,
@ -34,7 +31,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_signal(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
let routing_table = self.routing_table();

View File

@ -1,7 +1,7 @@
use super::*;
impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_start_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
#[cfg(feature = "unstable-tunnels")]

View File

@ -15,10 +15,7 @@ impl RPCProcessor {
// direct -> node status + sender info
// safety -> node status
// private -> nothing
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err)
)]
#[instrument(level = "trace", target = "rpc", skip(self), ret, err)]
pub async fn rpc_call_status(
self,
dest: Destination,
@ -159,7 +156,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Get the question
let kind = msg.operation.kind().clone();

View File

@ -1,7 +1,7 @@
use super::*;
impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_supply_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
#[cfg(feature = "unstable-blockstore")]

View File

@ -2,10 +2,7 @@ use super::*;
impl RPCProcessor {
// Can only be sent directly, not via relays or routes
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err)
)]
#[instrument(level = "trace", target = "rpc", skip(self), ret, err)]
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
pub async fn rpc_call_validate_dial_info(
self,
@ -58,7 +55,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
let routing_table = self.routing_table();
if !routing_table.has_valid_network_class(msg.header.routing_domain()) {

View File

@ -3,10 +3,7 @@ use super::*;
impl RPCProcessor {
// Sends a dht value change notification
// Can be sent via all methods including relays and routes but never over a safety route
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, value), err)
)]
#[instrument(level = "trace", target = "rpc", skip(self, value), err)]
pub async fn rpc_call_value_changed(
self,
dest: Destination,
@ -32,6 +29,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[instrument(level = "trace", target = "rpc", skip_all)]
pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Get the statement
let (_, _, _, kind) = msg.operation.destructure();

View File

@ -16,14 +16,11 @@ impl RPCProcessor {
/// replying to this request received over a private route will leak
/// the identity of the node and defeat the private route.
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self),
#[instrument(level = "trace", target = "rpc", skip(self),
fields(ret.expiration,
ret.latency,
ret.peers.len
),err)
)]
),err)]
#[allow(clippy::too_many_arguments)]
pub async fn rpc_call_watch_value(
self,
@ -181,7 +178,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
let routing_table = self.routing_table();
let rss = routing_table.route_spec_store();

View File

@ -25,6 +25,7 @@ pub(super) struct OutboundGetValueResult {
impl StorageManager {
/// Perform a 'get value' query on the network
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_get_value(
&self,
rpc_processor: RPCProcessor,
@ -276,6 +277,7 @@ impl StorageManager {
Ok(out_rx)
}
#[instrument(level = "trace", target = "dht", skip_all)]
pub(super) fn process_deferred_outbound_get_value_result_inner(&self, inner: &mut StorageManagerInner, res_rx: flume::Receiver<Result<get_value::OutboundGetValueResult, VeilidAPIError>>, key: TypedKey, subkey: ValueSubkey, last_seq: ValueSeqNum) {
let this = self.clone();
inner.process_deferred_results(
@ -323,6 +325,7 @@ impl StorageManager {
);
}
#[instrument(level = "trace", target = "dht", skip_all)]
pub(super) async fn process_outbound_get_value_result(&self, key: TypedKey, subkey: ValueSubkey, opt_last_seq: Option<u32>, result: get_value::OutboundGetValueResult) -> Result<Option<ValueData>, VeilidAPIError> {
// See if we got a value back
let Some(get_result_value) = result.get_result.opt_value else {
@ -354,6 +357,7 @@ impl StorageManager {
}
/// Handle a received 'Get Value' query
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_get_value(
&self,
key: TypedKey,

View File

@ -49,6 +49,7 @@ pub(super) struct OutboundInspectValueResult {
impl StorageManager {
/// Perform a 'inspect value' query on the network
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_inspect_value(
&self,
rpc_processor: RPCProcessor,
@ -309,6 +310,7 @@ impl StorageManager {
}
/// Handle a received 'Inspect Value' query
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_inspect_value(
&self,
key: TypedKey,

View File

@ -217,6 +217,7 @@ impl StorageManager {
}
/// Create a local record from scratch with a new owner key, open it, and return the opened descriptor
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn create_record(
&self,
kind: CryptoKind,
@ -240,6 +241,7 @@ impl StorageManager {
}
/// Open an existing local record if it exists, and if it doesnt exist locally, try to pull it from the network and open it and return the opened descriptor
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn open_record(
&self,
key: TypedKey,
@ -325,6 +327,7 @@ impl StorageManager {
}
/// Close an opened local record
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
let (opt_opened_record, opt_rpc_processor) = {
let mut inner = self.lock().await?;
@ -376,6 +379,7 @@ impl StorageManager {
}
/// Delete a local record
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn delete_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
// Ensure the record is closed
self.close_record(key).await?;
@ -391,6 +395,7 @@ impl StorageManager {
}
/// Get the value of a subkey from an opened local record
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn get_value(
&self,
key: TypedKey,
@ -475,6 +480,7 @@ impl StorageManager {
}
/// Set the value of a subkey on an opened local record
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn set_value(
&self,
key: TypedKey,
@ -627,6 +633,7 @@ impl StorageManager {
}
/// Create,update or cancel an outbound watch to a DHT value
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn watch_values(
&self,
key: TypedKey,
@ -752,6 +759,7 @@ impl StorageManager {
Ok(owvresult.expiration_ts)
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn cancel_watch_values(
&self,
key: TypedKey,
@ -807,6 +815,7 @@ impl StorageManager {
}
/// Inspect an opened DHT record for its subkey sequence numbers
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn inspect_record(
&self,
key: TypedKey,
@ -929,7 +938,7 @@ impl StorageManager {
}
// Send single value change out to the network
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", target = "stor", skip(self), err)]
async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> {
let rpc_processor = {
let inner = self.inner.lock().await;
@ -957,7 +966,7 @@ impl StorageManager {
}
// Send a value change up through the callback
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", target = "stor", skip(self, value), err)]
async fn update_callback_value_change(
&self,
key: TypedKey,
@ -981,6 +990,7 @@ impl StorageManager {
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all)]
fn check_fanout_set_offline(
&self,
key: TypedKey,

View File

@ -208,6 +208,7 @@ where
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all)]
fn add_dead_record(&mut self, key: RecordTableKey, record: Record<D>) {
self.dead_records.push(DeadRecord {
key,
@ -216,6 +217,7 @@ where
});
}
#[instrument(level = "trace", target = "stor", skip_all)]
fn add_to_subkey_cache(&mut self, key: SubkeyTableKey, record_data: RecordData) {
let record_data_total_size = record_data.total_size();
// Write to subkey cache
@ -251,6 +253,7 @@ where
}
}
#[instrument(level = "trace", target = "stor", skip_all)]
fn remove_from_subkey_cache(&mut self, key: SubkeyTableKey) {
if let Some(dead_record_data) = self.subkey_cache.remove(&key) {
self.subkey_cache_total_size
@ -259,6 +262,7 @@ where
}
}
#[instrument(level = "trace", target = "stor", skip_all)]
async fn purge_dead_records(&mut self, lazy: bool) {
let purge_dead_records_mutex = self.purge_dead_records_mutex.clone();
let _lock = if lazy {
@ -339,6 +343,7 @@ where
}
}
#[instrument(level = "trace", target = "stor", skip_all)]
async fn flush_changed_records(&mut self) {
if self.changed_records.is_empty() {
return;
@ -361,12 +366,14 @@ where
}
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn flush(&mut self) -> EyreResult<()> {
self.flush_changed_records().await;
self.purge_dead_records(true).await;
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn new_record(&mut self, key: TypedKey, record: Record<D>) -> VeilidAPIResult<()> {
let rtk = RecordTableKey { key };
if self.record_index.contains_key(&rtk) {
@ -412,6 +419,7 @@ where
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn delete_record(&mut self, key: TypedKey) -> VeilidAPIResult<()> {
// Get the record table key
let rtk = RecordTableKey { key };
@ -437,11 +445,13 @@ where
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn contains_record(&mut self, key: TypedKey) -> bool {
let rtk = RecordTableKey { key };
self.record_index.contains_key(&rtk)
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn with_record<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
@ -465,6 +475,7 @@ where
out
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn peek_record<R, F>(&self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
@ -479,6 +490,7 @@ where
out
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn with_record_mut<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&mut Record<D>) -> R,
@ -502,6 +514,7 @@ where
out
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn get_subkey(
&mut self,
key: TypedKey,
@ -573,6 +586,7 @@ where
}))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(crate) async fn peek_subkey(
&self,
key: TypedKey,
@ -641,6 +655,7 @@ where
}))
}
#[instrument(level = "trace", target = "stor", skip_all)]
async fn update_watched_value(
&mut self,
key: TypedKey,
@ -679,6 +694,7 @@ where
}
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn set_subkey(
&mut self,
key: TypedKey,
@ -786,6 +802,7 @@ where
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn inspect_record(
&mut self,
key: TypedKey,
@ -869,6 +886,7 @@ where
}))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn _change_existing_watch(
&mut self,
key: TypedKey,
@ -905,6 +923,7 @@ where
Ok(WatchResult::Rejected)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn _create_new_watch(
&mut self,
key: TypedKey,
@ -995,7 +1014,7 @@ where
}
/// Add or update an inbound record watch for changes
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn watch_record(
&mut self,
key: TypedKey,
@ -1053,6 +1072,7 @@ where
/// Clear a specific watch for a record
/// returns true if the watch was found and cancelled
#[instrument(level = "trace", target = "stor", skip_all, err)]
async fn cancel_watch(
&mut self,
key: TypedKey,
@ -1092,6 +1112,7 @@ where
}
/// Move watches from one store to another
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn move_watches(
&mut self,
key: TypedKey,
@ -1110,6 +1131,7 @@ where
}
/// See if any watched records have expired and clear them out
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn check_watched_records(&mut self) {
let now = get_aligned_timestamp();
self.watched_records.retain(|key, watch_list| {
@ -1126,6 +1148,7 @@ where
});
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn take_value_changes(&mut self, changes: &mut Vec<ValueChangedInfo>) {
// ValueChangedInfo but without the subkey data that requires a double mutable borrow to get
struct EarlyValueChangedInfo {
@ -1220,6 +1243,7 @@ where
/// This will force a garbage collection of the space immediately
/// If zero is passed in here, a garbage collection will be performed of dead records
/// without removing any live records
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn reclaim_space(&mut self, space: usize) -> usize {
let mut reclaimed = 0usize;
while reclaimed < space {

View File

@ -25,6 +25,7 @@ pub(super) struct OutboundSetValueResult {
impl StorageManager {
/// Perform a 'set value' query on the network
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_set_value(
&self,
rpc_processor: RPCProcessor,
@ -272,6 +273,7 @@ impl StorageManager {
Ok(out_rx)
}
#[instrument(level = "trace", target = "dht", skip_all)]
pub(super) fn process_deferred_outbound_set_value_result_inner(&self, inner: &mut StorageManagerInner,
res_rx: flume::Receiver<Result<set_value::OutboundSetValueResult, VeilidAPIError>>,
key: TypedKey, subkey: ValueSubkey, last_value_data: ValueData, safety_selection: SafetySelection, ) {
@ -333,6 +335,7 @@ impl StorageManager {
);
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn process_outbound_set_value_result(&self, key: TypedKey, subkey: ValueSubkey, last_value_data: ValueData, safety_selection: SafetySelection, result: set_value::OutboundSetValueResult) -> Result<Option<ValueData>, VeilidAPIError> {
// Regain the lock after network access
@ -370,6 +373,7 @@ impl StorageManager {
/// Handle a received 'Set Value' query
/// Returns a None if the value passed in was set
/// Returns a Some(current value) if the value was older and the current value was kept
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_set_value(
&self,
key: TypedKey,

View File

@ -210,6 +210,7 @@ impl StorageManagerInner {
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn create_new_owned_local_record(
&mut self,
kind: CryptoKind,
@ -266,6 +267,7 @@ impl StorageManagerInner {
Ok((dht_key, owner))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
async fn move_remote_record_to_local(
&mut self,
key: TypedKey,
@ -326,6 +328,7 @@ impl StorageManagerInner {
Ok(Some((*remote_record.owner(), remote_record.schema())))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn open_existing_record(
&mut self,
key: TypedKey,
@ -390,6 +393,7 @@ impl StorageManagerInner {
Ok(Some(descriptor))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn open_new_record(
&mut self,
key: TypedKey,
@ -454,6 +458,7 @@ impl StorageManagerInner {
Ok(descriptor)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn get_value_nodes(&self, key: TypedKey) -> VeilidAPIResult<Option<Vec<NodeRef>>> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_ref() else {
@ -482,6 +487,7 @@ impl StorageManagerInner {
Ok(opt_value_nodes)
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn process_fanout_results<
'a,
I: IntoIterator<Item = (ValueSubkey, &'a FanoutResult)>,
@ -538,6 +544,7 @@ impl StorageManagerInner {
Ok(self.opened_records.remove(&key))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_get_local_value(
&mut self,
key: TypedKey,
@ -561,6 +568,7 @@ impl StorageManagerInner {
})
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_local_value(
&mut self,
key: TypedKey,
@ -581,6 +589,7 @@ impl StorageManagerInner {
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_inspect_local_value(
&mut self,
key: TypedKey,
@ -605,6 +614,7 @@ impl StorageManagerInner {
})
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_get_remote_value(
&mut self,
key: TypedKey,
@ -628,6 +638,7 @@ impl StorageManagerInner {
})
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_remote_value(
&mut self,
key: TypedKey,
@ -662,6 +673,7 @@ impl StorageManagerInner {
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_inspect_remote_value(
&mut self,
key: TypedKey,
@ -687,6 +699,7 @@ impl StorageManagerInner {
}
/// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
#[instrument(level = "trace", target = "stor", skip_all)]
fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey
where
D: fmt::Debug + Clone + Serialize,
@ -701,6 +714,7 @@ impl StorageManagerInner {
TypedKey::new(vcrypto.kind(), hash)
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn add_offline_subkey_write(
&mut self,
key: TypedKey,
@ -718,6 +732,7 @@ impl StorageManagerInner {
});
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn process_deferred_results<T: Send + 'static>(
&mut self,
receiver: flume::Receiver<T>,

View File

@ -2,10 +2,10 @@ use super::*;
impl StorageManager {
// Check if client-side watches on opened records either have dead nodes or if the watch has expired
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_active_watches_task_routine(
self,
stop_token: StopToken,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {

View File

@ -2,10 +2,10 @@ use super::*;
impl StorageManager {
// Check if server-side watches have expired
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_watched_records_task_routine(
self,
stop_token: StopToken,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {

View File

@ -2,10 +2,10 @@ use super::*;
impl StorageManager {
// Flush records stores to disk and remove dead records
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(crate) async fn flush_record_stores_task_routine(
self,
stop_token: StopToken,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {

View File

@ -3,7 +3,7 @@ use futures_util::*;
impl StorageManager {
// Best-effort write subkeys to the network that were written offline
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(crate) async fn offline_subkey_writes_task_routine(
self,
stop_token: StopToken,

View File

@ -4,7 +4,7 @@ use stop_token::future::FutureExt;
impl StorageManager {
// Send value change notifications across the network
#[instrument(level = "trace", skip(self), err)]
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn send_value_changes_task_routine(
self,
stop_token: StopToken,

View File

@ -22,7 +22,7 @@ pub(super) struct OutboundWatchValueResult {
impl StorageManager {
/// Perform a 'watch value cancel' on the network without fanout
#[allow(clippy::too_many_arguments)]
#[instrument(target = "dht", level = "debug", skip_all, err)]
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_watch_value_cancel(
&self,
rpc_processor: RPCProcessor,
@ -140,7 +140,7 @@ impl StorageManager {
/// Perform a 'watch value' query on the network using fanout
#[allow(clippy::too_many_arguments)]
#[instrument(target = "dht", level = "debug", skip_all, err)]
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_watch_value(
&self,
rpc_processor: RPCProcessor,
@ -367,6 +367,7 @@ impl StorageManager {
/// Handle a received 'Watch Value' query
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_watch_value(
&self,
key: TypedKey,

View File

@ -18,6 +18,53 @@ use keyvaluedb::*;
const ALL_TABLE_NAMES: &[u8] = b"all_table_names";
/// Description of column
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct ColumnInfo {
pub key_count: AlignedU64,
}
/// IO Stats for table
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct IOStatsInfo {
/// Number of transaction.
pub transactions: AlignedU64,
/// Number of read operations.
pub reads: AlignedU64,
/// Number of reads resulted in a read from cache.
pub cache_reads: AlignedU64,
/// Number of write operations.
pub writes: AlignedU64,
/// Number of bytes read
pub bytes_read: AlignedU64,
/// Number of bytes read from cache
pub cache_read_bytes: AlignedU64,
/// Number of bytes write
pub bytes_written: AlignedU64,
/// Start of the statistic period.
pub started: Timestamp,
/// Total duration of the statistic period.
pub span: TimestampDuration,
}
/// Description of table
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct TableInfo {
/// Internal table name
pub table_name: String,
/// IO statistics since previous query
pub io_stats_since_previous: IOStatsInfo,
/// IO statistics since database open
pub io_stats_overall: IOStatsInfo,
/// Total number of columns in the table
pub column_count: u32,
/// Column descriptions
pub columns: Vec<ColumnInfo>,
}
struct TableStoreInner {
opened: BTreeMap<String, Weak<TableDBUnlockedInner>>,
encryption_key: Option<TypedSharedSecret>,
@ -123,6 +170,7 @@ impl TableStore {
Ok(real_name)
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn name_delete(&self, table: &str) -> VeilidAPIResult<Option<String>> {
let name = self.namespaced_name(table)?;
let mut inner = self.inner.lock();
@ -130,6 +178,7 @@ impl TableStore {
Ok(real_name)
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn name_get(&self, table: &str) -> VeilidAPIResult<Option<String>> {
let name = self.namespaced_name(table)?;
let inner = self.inner.lock();
@ -137,6 +186,7 @@ impl TableStore {
Ok(real_name)
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn name_rename(&self, old_table: &str, new_table: &str) -> VeilidAPIResult<()> {
let old_name = self.namespaced_name(old_table)?;
let new_name = self.namespaced_name(new_table)?;
@ -156,8 +206,20 @@ impl TableStore {
Ok(())
}
/// List all known tables
#[instrument(level = "trace", target = "tstore", skip_all)]
pub fn list_all(&self) -> Vec<(String, String)> {
let inner = self.inner.lock();
inner
.all_table_names
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<(String, String)>>()
}
/// Delete all known tables
async fn delete_all(&self) {
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn delete_all(&self) {
// Get all tables
let real_names = {
let mut inner = self.inner.lock();
@ -179,6 +241,7 @@ impl TableStore {
self.flush().await;
}
#[instrument(level = "trace", target = "tstore", skip_all)]
pub(crate) fn maybe_unprotect_device_encryption_key(
&self,
dek_bytes: &[u8],
@ -230,6 +293,7 @@ impl TableStore {
))
}
#[instrument(level = "trace", target = "tstore", skip_all)]
pub(crate) fn maybe_protect_device_encryption_key(
&self,
dek: TypedSharedSecret,
@ -267,6 +331,7 @@ impl TableStore {
Ok(out)
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn load_device_encryption_key(&self) -> EyreResult<Option<TypedSharedSecret>> {
let dek_bytes: Option<Vec<u8>> = self
.protected_store
@ -288,6 +353,8 @@ impl TableStore {
&device_encryption_key_password,
)?))
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn save_device_encryption_key(
&self,
device_encryption_key: Option<TypedSharedSecret>,
@ -313,7 +380,9 @@ impl TableStore {
log_tstore!(debug "changing dek password");
self.config
.with_mut(|c| {
c.protected_store.device_encryption_key_password.clone_from(&new_device_encryption_key_password);
c.protected_store
.device_encryption_key_password
.clone_from(&new_device_encryption_key_password);
Ok(new_device_encryption_key_password)
})
.unwrap()
@ -338,6 +407,7 @@ impl TableStore {
Ok(())
}
#[instrument(level = "trace", target = "tstore", skip_all)]
pub(crate) async fn init(&self) -> EyreResult<()> {
let _async_guard = self.async_lock.lock().await;
@ -417,6 +487,7 @@ impl TableStore {
Ok(())
}
#[instrument(level = "trace", target = "tstore", skip_all)]
pub(crate) async fn terminate(&self) {
let _async_guard = self.async_lock.lock().await;
@ -434,6 +505,7 @@ impl TableStore {
inner.encryption_key = None;
}
#[instrument(level = "trace", target = "tstore", skip_all)]
pub(crate) fn on_table_db_drop(&self, table: String) {
log_rtab!("dropping table db: {}", table);
let mut inner = self.inner.lock();
@ -444,6 +516,7 @@ impl TableStore {
/// Get or create a TableDB database table. If the column count is greater than an
/// existing TableDB's column count, the database will be upgraded to add the missing columns.
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn open(&self, name: &str, column_count: u32) -> VeilidAPIResult<TableDB> {
let _async_guard = self.async_lock.lock().await;
@ -531,12 +604,13 @@ impl TableStore {
// Keep track of opened DBs
inner
.opened
.insert(table_name.clone(), table_db.weak_inner());
.insert(table_name.clone(), table_db.weak_unlocked_inner());
Ok(table_db)
}
/// Delete a TableDB table by name
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn delete(&self, name: &str) -> VeilidAPIResult<bool> {
let _async_guard = self.async_lock.lock().await;
// If we aren't initialized yet, bail
@ -575,7 +649,65 @@ impl TableStore {
Ok(true)
}
/// Get the description of a TableDB table
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn info(&self, name: &str) -> VeilidAPIResult<Option<TableInfo>> {
// Open with the default number of columns
let tdb = self.open(name, 0).await?;
let internal_name = tdb.table_name();
let io_stats_since_previous = tdb.io_stats(IoStatsKind::SincePrevious);
let io_stats_overall = tdb.io_stats(IoStatsKind::Overall);
let column_count = tdb.get_column_count()?;
let mut columns = Vec::<ColumnInfo>::with_capacity(column_count as usize);
for col in 0..column_count {
let key_count = tdb.get_key_count(col).await?;
columns.push(ColumnInfo {
key_count: AlignedU64::new(key_count),
})
}
Ok(Some(TableInfo {
table_name: internal_name,
io_stats_since_previous: IOStatsInfo {
transactions: AlignedU64::new(io_stats_since_previous.transactions),
reads: AlignedU64::new(io_stats_since_previous.reads),
cache_reads: AlignedU64::new(io_stats_since_previous.cache_reads),
writes: AlignedU64::new(io_stats_since_previous.writes),
bytes_read: AlignedU64::new(io_stats_since_previous.bytes_read),
cache_read_bytes: AlignedU64::new(io_stats_since_previous.cache_read_bytes),
bytes_written: AlignedU64::new(io_stats_since_previous.bytes_written),
started: Timestamp::new(
io_stats_since_previous
.started
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64,
),
span: TimestampDuration::new(io_stats_since_previous.span.as_micros() as u64),
},
io_stats_overall: IOStatsInfo {
transactions: AlignedU64::new(io_stats_overall.transactions),
reads: AlignedU64::new(io_stats_overall.reads),
cache_reads: AlignedU64::new(io_stats_overall.cache_reads),
writes: AlignedU64::new(io_stats_overall.writes),
bytes_read: AlignedU64::new(io_stats_overall.bytes_read),
cache_read_bytes: AlignedU64::new(io_stats_overall.cache_read_bytes),
bytes_written: AlignedU64::new(io_stats_overall.bytes_written),
started: Timestamp::new(
io_stats_overall
.started
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64,
),
span: TimestampDuration::new(io_stats_overall.span.as_micros() as u64),
},
column_count,
columns,
}))
}
/// Rename a TableDB table
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn rename(&self, old_name: &str, new_name: &str) -> VeilidAPIResult<()> {
let _async_guard = self.async_lock.lock().await;
// If we aren't initialized yet, bail

View File

@ -62,8 +62,14 @@ impl TableDB {
let encrypt_info = encryption_key.map(|ek| CryptInfo::new(crypto.clone(), ek));
let decrypt_info = decryption_key.map(|dk| CryptInfo::new(crypto.clone(), dk));
let total_columns = database.num_columns().unwrap();
Self {
opened_column_count,
opened_column_count: if opened_column_count == 0 {
total_columns
} else {
opened_column_count
},
unlocked_inner: Arc::new(TableDBUnlockedInner {
table,
table_store,
@ -78,18 +84,38 @@ impl TableDB {
weak_inner: Weak<TableDBUnlockedInner>,
opened_column_count: u32,
) -> Option<Self> {
weak_inner.upgrade().map(|table_db_unlocked_inner| Self {
opened_column_count,
unlocked_inner: table_db_unlocked_inner,
weak_inner.upgrade().map(|table_db_unlocked_inner| {
let db = &table_db_unlocked_inner.database;
let total_columns = db.num_columns().unwrap();
Self {
opened_column_count: if opened_column_count == 0 {
total_columns
} else {
opened_column_count
},
unlocked_inner: table_db_unlocked_inner,
}
})
}
pub(super) fn weak_inner(&self) -> Weak<TableDBUnlockedInner> {
pub(super) fn weak_unlocked_inner(&self) -> Weak<TableDBUnlockedInner> {
Arc::downgrade(&self.unlocked_inner)
}
/// Get the internal name of the table
pub fn table_name(&self) -> String {
self.unlocked_inner.table.clone()
}
/// Get the io stats for the table
#[instrument(level = "trace", target = "tstore", skip_all)]
pub fn io_stats(&self, kind: IoStatsKind) -> IoStats {
self.unlocked_inner.database.io_stats(kind)
}
/// Get the total number of columns in the TableDB.
/// Not the number of columns that were opened, rather the total number that could be opened.
#[instrument(level = "trace", target = "tstore", skip_all)]
pub fn get_column_count(&self) -> VeilidAPIResult<u32> {
let db = &self.unlocked_inner.database;
db.num_columns().map_err(VeilidAPIError::from)
@ -101,6 +127,7 @@ impl TableDB {
/// requirement is that they are different for each encryption
/// but if the contents are guaranteed to be unique, then a nonce
/// can be generated from the hash of the contents and the encryption key itself.
#[instrument(level = "trace", target = "tstore", skip_all)]
fn maybe_encrypt(&self, data: &[u8], keyed_nonce: bool) -> Vec<u8> {
let data = compress_prepend_size(data);
if let Some(ei) = &self.unlocked_inner.encrypt_info {
@ -132,6 +159,7 @@ impl TableDB {
}
/// Decrypt buffer using decrypt key with nonce prepended to input
#[instrument(level = "trace", target = "tstore", skip_all)]
fn maybe_decrypt(&self, data: &[u8]) -> std::io::Result<Vec<u8>> {
if let Some(di) = &self.unlocked_inner.decrypt_info {
assert!(data.len() >= NONCE_LENGTH);
@ -156,6 +184,7 @@ impl TableDB {
}
/// Get the list of keys in a column of the TableDB
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn get_keys(&self, col: u32) -> VeilidAPIResult<Vec<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(format!(
@ -175,13 +204,29 @@ impl TableDB {
Ok(out)
}
/// Get the number of keys in a column of the TableDB
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn get_key_count(&self, col: u32) -> VeilidAPIResult<u64> {
if col >= self.opened_column_count {
apibail_generic!(format!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
}
let db = self.unlocked_inner.database.clone();
let key_count = db.num_keys(col).await.map_err(VeilidAPIError::from)?;
Ok(key_count)
}
/// Start a TableDB write transaction. The transaction object must be committed or rolled back before dropping.
#[instrument(level = "trace", target = "tstore", skip_all)]
pub fn transact(&self) -> TableDBTransaction {
let dbt = self.unlocked_inner.database.transaction();
TableDBTransaction::new(self.clone(), dbt)
}
/// Store a key with a value in a column in the TableDB. Performs a single transaction immediately.
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
if col >= self.opened_column_count {
apibail_generic!(format!(
@ -200,6 +245,7 @@ impl TableDB {
}
/// Store a key in json format with a value in a column in the TableDB. Performs a single transaction immediately.
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
where
T: serde::Serialize,
@ -209,6 +255,7 @@ impl TableDB {
}
/// Read a key from a column in the TableDB immediately.
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(format!(
@ -225,6 +272,7 @@ impl TableDB {
}
/// Read an serde-json key from a column in the TableDB immediately
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn load_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
where
T: for<'de> serde::Deserialize<'de>,
@ -237,6 +285,7 @@ impl TableDB {
}
/// Delete key with from a column in the TableDB
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(format!(
@ -255,6 +304,7 @@ impl TableDB {
}
/// Delete serde-json key with from a column in the TableDB
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn delete_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
where
T: for<'de> serde::Deserialize<'de>,
@ -303,6 +353,7 @@ impl TableDBTransaction {
}
/// Commit the transaction. Performs all actions atomically.
#[instrument(level = "trace", target = "tstore", skip_all)]
pub async fn commit(self) -> VeilidAPIResult<()> {
let dbt = {
let mut inner = self.inner.lock();
@ -319,12 +370,14 @@ impl TableDBTransaction {
}
/// Rollback the transaction. Does nothing to the TableDB.
#[instrument(level = "trace", target = "tstore", skip_all)]
pub fn rollback(self) {
let mut inner = self.inner.lock();
inner.dbt = None;
}
/// Store a key with a value in a column in the TableDB
#[instrument(level = "trace", target = "tstore", skip_all)]
pub fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
if col >= self.db.opened_column_count {
apibail_generic!(format!(
@ -341,6 +394,7 @@ impl TableDBTransaction {
}
/// Store a key in json format with a value in a column in the TableDB
#[instrument(level = "trace", target = "tstore", skip_all)]
pub fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
where
T: serde::Serialize,
@ -350,6 +404,7 @@ impl TableDBTransaction {
}
/// Delete key with from a column in the TableDB
#[instrument(level = "trace", target = "tstore", skip_all)]
pub fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> {
if col >= self.db.opened_column_count {
apibail_generic!(format!(

View File

@ -1910,6 +1910,73 @@ impl VeilidAPI {
}
}
async fn debug_table_list(&self, _args: Vec<String>) -> VeilidAPIResult<String> {
//
let table_store = self.table_store()?;
let table_names = table_store.list_all();
let out = format!(
"TableStore tables:\n{}",
table_names
.iter()
.map(|(k, v)| format!("{} ({})", k, v))
.collect::<Vec<String>>()
.join("\n")
);
Ok(out)
}
fn _format_columns(columns: &[table_store::ColumnInfo]) -> String {
let mut out = String::new();
for (n, col) in columns.iter().enumerate() {
//
out += &format!("Column {}:\n", n);
out += &format!(" Key Count: {}\n", col.key_count);
}
out
}
async fn debug_table_info(&self, args: Vec<String>) -> VeilidAPIResult<String> {
//
let table_store = self.table_store()?;
let table_name = get_debug_argument_at(&args, 1, "debug_table_info", "name", get_string)?;
let Some(info) = table_store.info(&table_name).await? else {
return Ok(format!("Table '{}' does not exist", table_name));
};
let info_str = format!(
"Table Name: {}\n\
Column Count: {}\n\
IO Stats (since previous query):\n{}\n\
IO Stats (overall):\n{}\n\
Columns:\n{}\n",
info.table_name,
info.column_count,
indent::indent_all_by(4, format!("{:#?}", info.io_stats_since_previous)),
indent::indent_all_by(4, format!("{:#?}", info.io_stats_overall)),
Self::_format_columns(&info.columns),
);
let out = format!("Table info for '{}':\n{}", table_name, info_str);
Ok(out)
}
async fn debug_table(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> =
shell_words::split(&args).map_err(|e| VeilidAPIError::parse_error(e, args))?;
let command = get_debug_argument_at(&args, 0, "debug_table", "command", get_string)?;
if command == "list" {
self.debug_table_list(args).await
} else if command == "info" {
self.debug_table_info(args).await
} else {
Ok(">>> Unknown command\n".to_owned())
}
}
async fn debug_punish_list(&self, _args: Vec<String>) -> VeilidAPIResult<String> {
//
let network_manager = self.network_manager()?;
@ -1988,6 +2055,7 @@ record list <local|remote|opened|offline>
watch [<key>] [<subkeys> [<expiration> [<count>]]]
cancel [<key>] [<subkeys>]
inspect [<key>] [<scope> [<subkeys>]]
table list
--------------------------------------------------------------------
<key> is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4
* also <node>, <relay>, <target>, <route>
@ -2081,6 +2149,8 @@ record list <local|remote|opened|offline>
self.debug_record(rest).await
} else if arg == "punish" {
self.debug_punish(rest).await
} else if arg == "table" {
self.debug_table(rest).await
} else {
Err(VeilidAPIError::generic("Unknown server debug command"))
}

View File

@ -234,6 +234,7 @@ impl JsonRequestProcessor {
//////////////////////////////////////////////////////////////////////////////////////
#[instrument(level = "trace", target = "json_api", skip_all)]
pub async fn process_routing_context_request(
&self,
routing_context: RoutingContext,
@ -382,6 +383,7 @@ impl JsonRequestProcessor {
}
}
#[instrument(level = "trace", target = "json_api", skip_all)]
pub async fn process_table_db_request(
&self,
table_db: TableDB,
@ -427,6 +429,7 @@ impl JsonRequestProcessor {
}
}
#[instrument(level = "trace", target = "json_api", skip_all)]
pub async fn process_table_db_transaction_request(
&self,
table_db_transaction: TableDBTransaction,
@ -461,6 +464,7 @@ impl JsonRequestProcessor {
}
}
#[instrument(level = "trace", target = "json_api", skip_all)]
pub async fn process_crypto_system_request(
&self,
csv: CryptoSystemVersion,
@ -592,6 +596,7 @@ impl JsonRequestProcessor {
}
}
#[instrument(level = "trace", target = "json_api", skip_all)]
pub async fn process_request(self, request: Request) -> Response {
let id = request.id;

View File

@ -1,10 +1,12 @@
use super::*;
use lz4_flex::block;
#[instrument(level = "trace", target = "veilid_api", skip_all)]
pub fn compress_prepend_size(input: &[u8]) -> Vec<u8> {
block::compress_prepend_size(input)
}
#[instrument(level = "trace", target = "veilid_api", skip_all)]
pub fn decompress_size_prepended(
input: &[u8],
max_size: Option<usize>,

View File

@ -1,8 +1,8 @@
use super::*;
// Don't trace these functions as they are used in the transfer of API logs, which will recurse!
// Don't trace these functions with events as they are used in the transfer of API logs, which will recurse!
// #[instrument(level = "trace", ret, err)]
#[instrument(level = "trace", target = "json", skip_all)]
pub fn deserialize_json<'a, T: de::Deserialize<'a> + Debug>(arg: &'a str) -> VeilidAPIResult<T> {
serde_json::from_str(arg).map_err(|e| VeilidAPIError::ParseError {
message: e.to_string(),
@ -13,6 +13,8 @@ pub fn deserialize_json<'a, T: de::Deserialize<'a> + Debug>(arg: &'a str) -> Vei
),
})
}
#[instrument(level = "trace", target = "json", skip_all)]
pub fn deserialize_json_bytes<'a, T: de::Deserialize<'a> + Debug>(
arg: &'a [u8],
) -> VeilidAPIResult<T> {
@ -26,7 +28,7 @@ pub fn deserialize_json_bytes<'a, T: de::Deserialize<'a> + Debug>(
})
}
// #[instrument(level = "trace", ret, err)]
#[instrument(level = "trace", target = "json", skip_all)]
pub fn deserialize_opt_json<T: de::DeserializeOwned + Debug>(
arg: Option<String>,
) -> VeilidAPIResult<T> {
@ -40,7 +42,7 @@ pub fn deserialize_opt_json<T: de::DeserializeOwned + Debug>(
deserialize_json(arg)
}
// #[instrument(level = "trace", ret, err)]
#[instrument(level = "trace", target = "json", skip_all)]
pub fn deserialize_opt_json_bytes<T: de::DeserializeOwned + Debug>(
arg: Option<Vec<u8>>,
) -> VeilidAPIResult<T> {
@ -54,7 +56,7 @@ pub fn deserialize_opt_json_bytes<T: de::DeserializeOwned + Debug>(
deserialize_json_bytes(arg.as_slice())
}
// #[instrument(level = "trace", ret)]
#[instrument(level = "trace", target = "json", skip_all)]
pub fn serialize_json<T: Serialize + Debug>(val: T) -> String {
match serde_json::to_string(&val) {
Ok(v) => v,
@ -63,6 +65,8 @@ pub fn serialize_json<T: Serialize + Debug>(val: T) -> String {
}
}
}
#[instrument(level = "trace", target = "json", skip_all)]
pub fn serialize_json_bytes<T: Serialize + Debug>(val: T) -> Vec<u8> {
match serde_json::to_vec(&val) {
Ok(v) => v,
@ -78,7 +82,9 @@ pub fn serialize_json_bytes<T: Serialize + Debug>(val: T) -> Vec<u8> {
pub mod as_human_base64 {
use data_encoding::BASE64URL_NOPAD;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing::instrument;
#[instrument(level = "trace", target = "json", skip_all)]
pub fn serialize<S: Serializer>(v: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
if s.is_human_readable() {
let base64 = BASE64URL_NOPAD.encode(v);
@ -88,6 +94,7 @@ pub mod as_human_base64 {
}
}
#[instrument(level = "trace", target = "json", skip_all)]
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
if d.is_human_readable() {
let base64 = String::deserialize(d)?;
@ -103,7 +110,9 @@ pub mod as_human_base64 {
pub mod as_human_opt_base64 {
use data_encoding::BASE64URL_NOPAD;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing::instrument;
#[instrument(level = "trace", target = "json", skip_all)]
pub fn serialize<S: Serializer>(v: &Option<Vec<u8>>, s: S) -> Result<S::Ok, S::Error> {
if s.is_human_readable() {
let base64 = v.as_ref().map(|x| BASE64URL_NOPAD.encode(x));
@ -113,6 +122,7 @@ pub mod as_human_opt_base64 {
}
}
#[instrument(level = "trace", target = "json", skip_all)]
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<Vec<u8>>, D::Error> {
if d.is_human_readable() {
let base64 = Option::<String>::deserialize(d)?;
@ -134,7 +144,9 @@ pub mod as_human_string {
use std::str::FromStr;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use tracing::instrument;
#[instrument(level = "trace", target = "json", skip_all)]
pub fn serialize<T, S>(value: &T, s: S) -> Result<S::Ok, S::Error>
where
T: Display + Serialize,
@ -147,6 +159,7 @@ pub mod as_human_string {
}
}
#[instrument(level = "trace", target = "json", skip_all)]
pub fn deserialize<'de, T, D>(d: D) -> Result<T, D::Error>
where
T: FromStr + Deserialize<'de>,
@ -166,7 +179,9 @@ pub mod as_human_opt_string {
use std::str::FromStr;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use tracing::instrument;
#[instrument(level = "trace", target = "json", skip_all)]
pub fn serialize<T, S>(value: &Option<T>, s: S) -> Result<S::Ok, S::Error>
where
T: Display + Serialize,
@ -182,6 +197,7 @@ pub mod as_human_opt_string {
}
}
#[instrument(level = "trace", target = "json", skip_all)]
pub fn deserialize<'de, T, D>(d: D) -> Result<Option<T>, D::Error>
where
T: FromStr + Deserialize<'de>,

View File

@ -411,4 +411,67 @@ async def test_dht_integration_writer_reader():
print(f' {n}')
n+=1
@pytest.mark.asyncio
async def test_dht_write_read_local():
async def null_update_callback(update: veilid.VeilidUpdate):
pass
try:
api0 = await api_connector(null_update_callback, 0)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server 0.")
return
async with api0:
# purge local and remote record stores to ensure we start fresh
await api0.debug("record purge local")
await api0.debug("record purge remote")
# make routing contexts
rc0 = await api0.new_routing_context()
async with rc0:
COUNT = 500
TEST_DATA = b"ABCD"*1024
TEST_DATA2 = b"ABCD"*4096
# write dht records on server 0
records = []
schema = veilid.DHTSchema.dflt(2)
print(f'writing {COUNT} records')
for n in range(COUNT):
desc = await rc0.create_dht_record(schema)
records.append(desc)
await rc0.set_dht_value(desc.key, 0, TEST_DATA)
await rc0.set_dht_value(desc.key, 1, TEST_DATA2)
print(f' {n}')
print(f'syncing records to the network')
for desc0 in records:
while True:
rr = await rc0.inspect_dht_record(desc0.key, [])
if len(rr.offline_subkeys) == 0:
await rc0.close_dht_record(desc0.key)
break
time.sleep(0.1)
# read dht records on server 0
print(f'reading {COUNT} records')
n=0
for desc0 in records:
desc1 = await rc0.open_dht_record(desc0.key)
vd0 = await rc0.get_dht_value(desc1.key, 0)
assert vd0.data == TEST_DATA
vd1 = await rc0.get_dht_value(desc1.key, 1)
assert vd1.data == TEST_DATA2
await rc0.close_dht_record(desc1.key)
print(f' {n}')
n+=1

View File

@ -81,6 +81,7 @@ hostname = "^0"
stop-token = { version = "^0", default-features = false }
sysinfo = { version = "^0.30.6" }
wg = { version = "^0.9.1", features = ["future"] }
tracing-flame = "0.2.0"
[target.'cfg(windows)'.dependencies]
windows-service = "^0"

View File

@ -0,0 +1 @@
{"meta":{"categories":[{"name":"Other","color":"grey","subcategories":["Other"]},{"name":"User","color":"yellow","subcategories":["Other"]}],"debug":false,"extensions":{"baseURL":[],"id":[],"length":0,"name":[]},"interval":0.1,"preprocessedProfileVersion":46,"processType":0,"product":"../target/profiling/veilid-server","sampleUnits":{"eventDelay":"ms","threadCPUDelta":"µs","time":"ms"},"startTime":1720014457589.647,"symbolicated":false,"pausedRanges":[],"version":24,"usesOnlyOneStackType":true,"doesNotUseFrameImplementation":true,"sourceCodeIsNotOnSearchfox":true,"markerSchema":[]},"libs":[{"name":"dyld","path":"/usr/lib/dyld","debugName":"dyld","debugPath":"/usr/lib/dyld","breakpadId":"37BBC384075531C7A8080ED49E44DD8E0","codeId":"37BBC384075531C7A8080ED49E44DD8E","arch":"arm64e"},{"name":"veilid-server","path":"/Users/dildog/code/veilid/target/profiling/veilid-server","debugName":"veilid-server","debugPath":"/Users/dildog/code/veilid/target/profiling/veilid-server","breakpadId":"0C900E21ABF83DD5848F0728125792EA0","codeId":"0C900E21ABF83DD5848F0728125792EA","arch":"arm64"},{"name":"libsystem_malloc.dylib","path":"/usr/lib/system/libsystem_malloc.dylib","debugName":"libsystem_malloc.dylib","debugPath":"/usr/lib/system/libsystem_malloc.dylib","breakpadId":"C6337A382B5C380595E8CF1786E2F4E70","codeId":"C6337A382B5C380595E8CF1786E2F4E7","arch":"arm64e"},{"name":"libsystem_kernel.dylib","path":"/usr/lib/system/libsystem_kernel.dylib","debugName":"libsystem_kernel.dylib","debugPath":"/usr/lib/system/libsystem_kernel.dylib","breakpadId":"9B8B53F9E2B636DF98E928D8FCA732F20","codeId":"9B8B53F9E2B636DF98E928D8FCA732F2","arch":"arm64e"}],"threads":[{"frameTable":{"length":18,"address":[24799,9886639,9760907,9887711,9893535,9924447,9927607,1245483,1245699,1229056,1245627,145256,9893543,1397243,1399123,4045067,4040447,19732],"inlineDepth":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"category":[1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1],"subcategory":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"func":[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17],"nativeSymbol":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"innerWindowID":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"implementation":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"line":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"column":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"optimizations":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null]},"funcTable":{"length":18,"name":[1,3,4,5,6,7,8,9,10,11,12,14,15,16,17,18,19,21],"isJS":[false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false],"relevantForJS":[false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false],"resource":[0,1,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,3],"fileName":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"lineNumber":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],"columnNumber":[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null]},"markers":{"length":0,"category":[],"data":[],"endTime":[],"name":[],"phase":[],"startTime":[]},"name":"veilid-server","isMainThread":true,"nativeSymbols":{"length":0,"address":[],"functionSize":[],"libIndex":[],"name":[]},"pausedRanges":[],"pid":"12087","processName":"veilid-server","processShutdownTime":73.451958,"processStartupTime":70.423167,"processType":"default","registerTime":70.423167,"resourceTable":{"length":4,"lib":[0,1,2,3],"name":[0,2,13,20],"host":[null,null,null,null],"type":[1,1,1,1]},"samples":{"length":4,"stack":[9,11,17,17],"time":[72.541208,72.654958,72.786542,72.872625],"weight":[1,1,1,1],"weightType":"samples","threadCPUDelta":[6619,7,56,11]},"stackTable":{"length":18,"prefix":[null,0,1,2,3,4,5,6,7,8,7,10,3,12,13,14,15,16],"frame":[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17],"category":[1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1],"subcategory":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"stringArray":["dyld","0x60df","veilid-server","0x96dbaf","0x94f08b","0x96dfdf","0x96f69f","0x976f5f","0x977bb7","0x13012b","0x130203","0x12c100","0x1301bb","libsystem_malloc.dylib","0x23768","0x96f6a7","0x1551fb","0x155953","0x3db90b","0x3da6ff","libsystem_kernel.dylib","0x4d14"],"tid":"1220572","unregisterTime":73.451958}],"pages":[],"profilerOverhead":[],"counters":[]}

View File

@ -84,6 +84,10 @@ pub struct CmdlineArgs {
#[arg(long, value_name = "endpoint")]
otlp: Option<String>,
/// Turn on flamegraph tracing (experimental, isn't terribly useful)
#[arg(long, hide = true, value_name = "PATH", num_args=0..=1, require_equals=true, default_missing_value = "")]
flame: Option<OsString>,
/// Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports
#[arg(long)]
subnode_index: Option<u16>,
@ -206,6 +210,19 @@ fn main() -> EyreResult<()> {
.wrap_err("failed to parse OTLP address")?;
settingsrw.logging.otlp.level = LogLevel::Trace;
}
if let Some(flame) = args.flame {
let flame = if flame.is_empty() {
Settings::get_default_flame_path(settingsrw.testing.subnode_index)
.to_string_lossy()
.to_string()
} else {
flame.to_string_lossy().to_string()
};
println!("Enabling flamegraph output to {}", flame);
settingsrw.logging.flame.enabled = true;
settingsrw.logging.flame.path = flame;
}
if args.no_attach {
settingsrw.auto_attach = false;
}

View File

@ -66,6 +66,9 @@ logging:
level: 'trace'
grpc_endpoint: 'localhost:4317'
ignore_log_targets: []
flame:
enabled: false
path: ''
console:
enabled: false
testing:
@ -442,6 +445,12 @@ pub struct Terminal {
pub ignore_log_targets: Vec<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Flame {
pub enabled: bool,
pub path: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Console {
pub enabled: bool,
@ -493,6 +502,7 @@ pub struct Logging {
pub file: File,
pub api: Api,
pub otlp: Otlp,
pub flame: Flame,
pub console: Console,
}
@ -854,6 +864,15 @@ impl Settings {
.unwrap_or_else(|| PathBuf::from("./veilid-server.conf"))
}
/// Determine default flamegraph output path
pub fn get_default_flame_path(subnode_index: u16) -> PathBuf {
std::env::temp_dir().join(if subnode_index == 0 {
"veilid-server.folded".to_owned()
} else {
format!("veilid-server-{}.folded", subnode_index)
})
}
#[allow(dead_code)]
fn get_or_create_private_directory<P: AsRef<Path>>(path: P, group_read: bool) -> bool {
let path = path.as_ref();
@ -975,6 +994,8 @@ impl Settings {
set_config_value!(inner.logging.otlp.level, value);
set_config_value!(inner.logging.otlp.grpc_endpoint, value);
set_config_value!(inner.logging.otlp.ignore_log_targets, value);
set_config_value!(inner.logging.flame.enabled, value);
set_config_value!(inner.logging.flame.path, value);
set_config_value!(inner.logging.console.enabled, value);
set_config_value!(inner.testing.subnode_index, value);
set_config_value!(inner.core.capabilities.disable, value);
@ -1542,6 +1563,8 @@ mod tests {
s.logging.otlp.grpc_endpoint,
NamedSocketAddrs::from_str("localhost:4317").unwrap()
);
assert!(!s.logging.flame.enabled);
assert_eq!(s.logging.flame.path, "");
assert!(!s.logging.console.enabled);
assert_eq!(s.testing.subnode_index, 0);

View File

@ -17,11 +17,13 @@ use std::collections::BTreeMap;
use std::path::*;
use std::sync::Arc;
use tracing_appender::*;
use tracing_flame::FlameLayer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::*;
struct VeilidLogsInner {
_guard: Option<non_blocking::WorkerGuard>,
_file_guard: Option<non_blocking::WorkerGuard>,
_flame_guard: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>>,
filters: BTreeMap<&'static str, veilid_core::VeilidLayerFilter>,
}
@ -71,6 +73,25 @@ impl VeilidLogs {
layers.push(layer.boxed());
}
// Flamegraph logger
let mut flame_guard = None;
if settingsr.logging.flame.enabled {
let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(LogLevel::Trace),
&[], //&settingsr.logging.terminal.ignore_log_targets,
);
let (flame_layer, guard) = FlameLayer::with_file(&settingsr.logging.flame.path)?;
flame_guard = Some(guard);
filters.insert("flame", filter.clone());
layers.push(
flame_layer
.with_threads_collapsed(true)
.with_empty_samples(false)
.with_filter(filter)
.boxed(),
);
}
// OpenTelemetry logger
#[cfg(feature = "opentelemetry-otlp")]
if settingsr.logging.otlp.enabled {
@ -121,7 +142,7 @@ impl VeilidLogs {
}
// File logger
let mut guard = None;
let mut file_guard = None;
if settingsr.logging.file.enabled {
let log_path = Path::new(&settingsr.logging.file.path);
let full_path = std::env::current_dir()
@ -143,7 +164,7 @@ impl VeilidLogs {
let appender = tracing_appender::rolling::never(log_parent, Path::new(log_filename));
let (non_blocking_appender, non_blocking_guard) =
tracing_appender::non_blocking(appender);
guard = Some(non_blocking_guard);
file_guard = Some(non_blocking_guard);
let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.file.level),
@ -192,7 +213,8 @@ impl VeilidLogs {
Ok(VeilidLogs {
inner: Arc::new(Mutex::new(VeilidLogsInner {
_guard: guard,
_file_guard: file_guard,
_flame_guard: flame_guard,
filters,
})),
})