remove owo colors

This commit is contained in:
John Smith 2023-06-25 14:09:22 -04:00
parent 0f3e7010f2
commit 297908796c
21 changed files with 218 additions and 121 deletions

2
Cargo.lock generated
View File

@ -6469,7 +6469,6 @@ dependencies = [
"nix 0.26.2", "nix 0.26.2",
"once_cell", "once_cell",
"owning_ref", "owning_ref",
"owo-colors",
"paranoid-android", "paranoid-android",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"paste", "paste",
@ -6623,7 +6622,6 @@ dependencies = [
"nix 0.26.2", "nix 0.26.2",
"once_cell", "once_cell",
"oslog", "oslog",
"owo-colors",
"paranoid-android", "paranoid-android",
"parking_lot 0.11.2", "parking_lot 0.11.2",
"rand 0.7.3", "rand 0.7.3",

View File

@ -23,6 +23,7 @@ rt-wasm-bindgen = ["veilid-tools/rt-wasm-bindgen", "async_executors/bindgen"]
veilid_core_android_tests = ["dep:paranoid-android"] veilid_core_android_tests = ["dep:paranoid-android"]
veilid_core_ios_tests = ["dep:tracing-oslog"] veilid_core_ios_tests = ["dep:tracing-oslog"]
tracking = [] tracking = []
network-result-extra = ["veilid-tools/network-result-extra"]
[dependencies] [dependencies]
veilid-tools = { path = "../veilid-tools", features = [ "tracing" ] } veilid-tools = { path = "../veilid-tools", features = [ "tracing" ] }
@ -53,7 +54,6 @@ owning_ref = "^0"
flume = { version = "^0", features = ["async"] } flume = { version = "^0", features = ["async"] }
enumset = { version= "^1", features = ["serde"] } enumset = { version= "^1", features = ["serde"] }
backtrace = { version = "^0" } backtrace = { version = "^0" }
owo-colors = "^3"
stop-token = { version = "^0", default-features = false } stop-token = { version = "^0", default-features = false }
ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] }

View File

@ -97,7 +97,6 @@ use tracing::*;
use veilid_tools::*; use veilid_tools::*;
type RkyvDefaultValidator<'t> = rkyv::validation::validators::DefaultValidator<'t>; type RkyvDefaultValidator<'t> = rkyv::validation::validators::DefaultValidator<'t>;
use futures_util::stream::FuturesUnordered; use futures_util::stream::FuturesUnordered;
use owo_colors::OwoColorize;
use schemars::{schema_for, JsonSchema}; use schemars::{schema_for, JsonSchema};
use serde::*; use serde::*;
use stop_token::*; use stop_token::*;

View File

@ -256,8 +256,8 @@ impl ConnectionManager {
log_net!( log_net!(
"== get_or_create_connection local_addr={:?} dial_info={:?}", "== get_or_create_connection local_addr={:?} dial_info={:?}",
local_addr.green(), local_addr,
dial_info.green() dial_info
); );
// Kill off any possibly conflicting connections // Kill off any possibly conflicting connections
@ -273,8 +273,8 @@ impl ConnectionManager {
{ {
log_net!( log_net!(
"== Returning existing connection local_addr={:?} peer_address={:?}", "== Returning existing connection local_addr={:?} peer_address={:?}",
local_addr.green(), local_addr,
peer_address.green() peer_address
); );
return Ok(NetworkResult::Value(conn)); return Ok(NetworkResult::Value(conn));
@ -300,8 +300,8 @@ impl ConnectionManager {
{ {
log_net!( log_net!(
"== Returning existing connection in race local_addr={:?} peer_address={:?}", "== Returning existing connection in race local_addr={:?} peer_address={:?}",
local_addr.green(), local_addr,
peer_address.green() peer_address
); );
return Ok(NetworkResult::Value(conn)); return Ok(NetworkResult::Value(conn));

View File

@ -45,7 +45,7 @@ impl NetworkManager {
let out_data: Vec<u8> = network_result_value_or_log!(self let out_data: Vec<u8> = network_result_value_or_log!(self
.net() .net()
.send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms) .send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms)
.await? => .await? => [ format!(": dial_info={}, data.len={}", dial_info, data.len()) ]
{ {
return Ok(Vec::new()); return Ok(Vec::new());
}); });

View File

@ -845,7 +845,7 @@ impl NetworkManager {
} }
/// Called by the RPC handler when we want to issue an direct receipt /// Called by the RPC handler when we want to issue an direct receipt
#[instrument(level = "trace", skip(self, rcpt_data), err)] #[instrument(level = "debug", skip(self, rcpt_data), err)]
pub async fn send_out_of_band_receipt( pub async fn send_out_of_band_receipt(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
@ -858,11 +858,10 @@ impl NetworkManager {
// should not be subject to our ability to decode it // should not be subject to our ability to decode it
// Send receipt directly // Send receipt directly
log_net!(debug "send_out_of_band_receipt: dial_info={}", dial_info);
network_result_value_or_log!(self network_result_value_or_log!(self
.net() .net()
.send_data_unbound_to_dial_info(dial_info, rcpt_data) .send_data_unbound_to_dial_info(dial_info, rcpt_data)
.await? => { .await? => [ format!(": dial_info={}, rcpt_data.len={}", dial_info, rcpt_data.len()) ] {
return Ok(()); return Ok(());
} }
); );
@ -928,13 +927,13 @@ impl NetworkManager {
// Is this a direct bootstrap request instead of an envelope? // Is this a direct bootstrap request instead of an envelope?
if data[0..4] == *BOOT_MAGIC { if data[0..4] == *BOOT_MAGIC {
network_result_value_or_log!(self.handle_boot_request(connection_descriptor).await? => {}); network_result_value_or_log!(self.handle_boot_request(connection_descriptor).await? => [ format!(": connection_descriptor={:?}", connection_descriptor) ] {});
return Ok(true); return Ok(true);
} }
// Is this an out-of-band receipt instead of an envelope? // Is this an out-of-band receipt instead of an envelope?
if data[0..3] == *RECEIPT_MAGIC { if data[0..3] == *RECEIPT_MAGIC {
network_result_value_or_log!(self.handle_out_of_band_receipt(data).await => {}); network_result_value_or_log!(self.handle_out_of_band_receipt(data).await => [ format!(": data.len={}", data.len()) ] {});
return Ok(true); return Ok(true);
} }
@ -1040,7 +1039,7 @@ impl NetworkManager {
log_net!(debug "failed to forward envelope: {}" ,e); log_net!(debug "failed to forward envelope: {}" ,e);
return Ok(false); return Ok(false);
} }
} => { } => [ format!(": relay_nr={}, data.len={}", relay_nr, data.len()) ] {
return Ok(false); return Ok(false);
} }
); );

View File

@ -515,7 +515,8 @@ impl Network {
network_result_value_or_log!(ph.clone() network_result_value_or_log!(ph.clone()
.send_message(data.clone(), peer_socket_addr) .send_message(data.clone(), peer_socket_addr)
.await .await
.wrap_err("sending data to existing conection")? => { return Ok(Some(data)); } ); .wrap_err("sending data to existing conection")? => [ format!(": data.len={}, descriptor={:?}", data.len(), descriptor) ]
{ return Ok(Some(data)); } );
// Network accounting // Network accounting
self.network_manager() self.network_manager()

View File

@ -92,7 +92,7 @@ impl DiscoveryContext {
); );
return None; return None;
} }
} => { } => [ format!(": node_ref={}", node_ref) ] {
return None; return None;
} }
); );

View File

@ -28,7 +28,7 @@ impl RawUdpProtocolHandler {
// Check length of reassembled message (same for all protocols) // Check length of reassembled message (same for all protocols)
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
log_net!(debug "{}({}) at {}@{}:{}", "Invalid message".green(), "received too large UDP message", file!(), line!(), column!()); log_net!(debug "{}({}) at {}@{}:{}", "Invalid message", "received too large UDP message", file!(), line!(), column!());
continue; continue;
} }

View File

@ -236,7 +236,7 @@ impl NetworkConnection {
Box::pin(async move { Box::pin(async move {
log_net!( log_net!(
"== Starting process_connection loop for id={}, {:?}", connection_id, "== Starting process_connection loop for id={}, {:?}", connection_id,
descriptor.green() descriptor
); );
let network_manager = connection_manager.network_manager(); let network_manager = connection_manager.network_manager();
@ -249,7 +249,7 @@ impl NetworkConnection {
let new_timer = || { let new_timer = || {
sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async { sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async {
// timeout // timeout
log_net!("== Connection timeout on {:?}", descriptor.green()); log_net!("== Connection timeout on {:?}", descriptor);
RecvLoopAction::Timeout RecvLoopAction::Timeout
}) })
}; };
@ -306,7 +306,7 @@ impl NetworkConnection {
log_net!(debug "Connection closed from: {} ({})", peer_address.socket_address().to_socket_addr(), peer_address.protocol_type()); log_net!(debug "Connection closed from: {} ({})", peer_address.socket_address().to_socket_addr(), peer_address.protocol_type());
return RecvLoopAction::Finish; return RecvLoopAction::Finish;
} }
let mut message = network_result_value_or_log!(v => { let mut message = network_result_value_or_log!(v => [ format!(": protocol_connection={:?}", protocol_connection) ] {
return RecvLoopAction::Finish; return RecvLoopAction::Finish;
}); });
@ -366,7 +366,7 @@ impl NetworkConnection {
log_net!( log_net!(
"== Connection loop finished descriptor={:?}", "== Connection loop finished descriptor={:?}",
descriptor.green() descriptor
); );
// Let the connection manager know the receive loop exited // Let the connection manager know the receive loop exited

View File

@ -1121,7 +1121,7 @@ impl RoutingTable {
return; return;
} }
Ok(v) => v, Ok(v) => v,
} => { } => [ format!(": crypto_kind={} node_ref={} wide={}", crypto_kind, node_ref, wide) ] {
return; return;
}); });
@ -1137,7 +1137,7 @@ impl RoutingTable {
continue; continue;
} }
Ok(v) => v, Ok(v) => v,
} => { } => [ format!(": crypto_kind={} closest_nr={} wide={}", crypto_kind, closest_nr, wide) ] {
// Do nothing with non-values // Do nothing with non-values
continue; continue;
}); });

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Clone)]
pub struct RoutedOperation { pub struct RoutedOperation {
sequencing: Sequencing, sequencing: Sequencing,
signatures: Vec<Signature>, signatures: Vec<Signature>,
@ -8,6 +8,17 @@ pub struct RoutedOperation {
data: Vec<u8>, data: Vec<u8>,
} }
impl fmt::Debug for RoutedOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RoutedOperation")
.field("sequencing", &self.sequencing)
.field("signatures.len", &self.signatures.len())
.field("nonce", &self.nonce)
.field("data(len)", &self.data.len())
.finish()
}
}
impl RoutedOperation { impl RoutedOperation {
pub fn new(sequencing: Sequencing, nonce: Nonce, data: Vec<u8>) -> Self { pub fn new(sequencing: Sequencing, nonce: Nonce, data: Vec<u8>) -> Self {
Self { Self {

View File

@ -182,7 +182,7 @@ impl<T> Answer<T> {
} }
} }
/// An operation that has been fully prepared for envelope r /// An operation that has been fully prepared for envelope
struct RenderedOperation { struct RenderedOperation {
/// The rendered operation bytes /// The rendered operation bytes
message: Vec<u8>, message: Vec<u8>,
@ -200,6 +200,20 @@ struct RenderedOperation {
reply_private_route: Option<PublicKey>, reply_private_route: Option<PublicKey>,
} }
impl fmt::Debug for RenderedOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RenderedOperation")
.field("message(len)", &self.message.len())
.field("destination_node_ref", &self.destination_node_ref)
.field("node_ref", &self.node_ref)
.field("hop_count", &self.hop_count)
.field("safety_route", &self.safety_route)
.field("remote_private_route", &self.remote_private_route)
.field("reply_private_route", &self.reply_private_route)
.finish()
}
}
/// Node information exchanged during every RPC message /// Node information exchanged during every RPC message
#[derive(Default, Debug, Clone)] #[derive(Default, Debug, Clone)]
pub struct SenderPeerInfo { pub struct SenderPeerInfo {
@ -431,7 +445,7 @@ impl RPCProcessor {
.await .await
{ {
Ok(v) => { Ok(v) => {
let v = network_result_value_or_log!(v => { let v = network_result_value_or_log!(v => [ format!(": node_id={} count={} fanout={} fanout={} safety_selection={:?}", node_id, count, fanout, timeout_us, safety_selection) ] {
// Any other failures, just try the next node // Any other failures, just try the next node
return Ok(None); return Ok(None);
}); });
@ -541,8 +555,7 @@ impl RPCProcessor {
.await; .await;
match &out { match &out {
Err(e) => { Err(e) => {
let msg = format!("RPC Lost ({}): {}", debug_string, e); log_rpc!(debug "RPC Lost ({}): {}", debug_string, e);
log_rpc!(debug "{}", msg.bright_magenta());
self.record_question_lost( self.record_question_lost(
waitable_reply.send_ts, waitable_reply.send_ts,
waitable_reply.node_ref.clone(), waitable_reply.node_ref.clone(),
@ -552,8 +565,7 @@ impl RPCProcessor {
); );
} }
Ok(TimeoutOr::Timeout) => { Ok(TimeoutOr::Timeout) => {
let msg = format!("RPC Lost ({}): Timeout", debug_string); log_rpc!(debug "RPC Lost ({}): Timeout", debug_string);
log_rpc!(debug "{}", msg.bright_cyan());
self.record_question_lost( self.record_question_lost(
waitable_reply.send_ts, waitable_reply.send_ts,
waitable_reply.node_ref.clone(), waitable_reply.node_ref.clone(),
@ -1148,17 +1160,31 @@ impl RPCProcessor {
// Send question // Send question
let bytes: ByteCount = (message.len() as u64).into(); let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp(); let send_ts = get_aligned_timestamp();
let send_data_kind = network_result_try!(self #[allow(unused_variables)]
let message_len = message.len();
let res = self
.network_manager() .network_manager()
.send_envelope(node_ref.clone(), Some(destination_node_ref), message) .send_envelope(
node_ref.clone(),
Some(destination_node_ref.clone()),
message,
)
.await .await
.map_err(|e| { .map_err(|e| {
// If we're returning an error, clean up // If we're returning an error, clean up
self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route); self.record_send_failure(
RPCKind::Question,
send_ts,
node_ref.clone(),
safety_route,
remote_private_route,
);
RPCError::network(e) RPCError::network(e)
})? => { })?;
let send_data_kind = network_result_value_or_log!( res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up // If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route); self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route);
network_result_raise!(res);
} }
); );
@ -1218,17 +1244,31 @@ impl RPCProcessor {
// Send statement // Send statement
let bytes: ByteCount = (message.len() as u64).into(); let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp(); let send_ts = get_aligned_timestamp();
let _send_data_kind = network_result_try!(self #[allow(unused_variables)]
let message_len = message.len();
let res = self
.network_manager() .network_manager()
.send_envelope(node_ref.clone(), Some(destination_node_ref), message) .send_envelope(
node_ref.clone(),
Some(destination_node_ref.clone()),
message,
)
.await .await
.map_err(|e| { .map_err(|e| {
// If we're returning an error, clean up // If we're returning an error, clean up
self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route); self.record_send_failure(
RPCKind::Statement,
send_ts,
node_ref.clone(),
safety_route,
remote_private_route,
);
RPCError::network(e) RPCError::network(e)
})? => { })?;
let _send_data_kind = network_result_value_or_log!( res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up // If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route); self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route);
network_result_raise!(res);
} }
); );
@ -1281,16 +1321,31 @@ impl RPCProcessor {
// Send the reply // Send the reply
let bytes: ByteCount = (message.len() as u64).into(); let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp(); let send_ts = get_aligned_timestamp();
network_result_try!(self.network_manager() #[allow(unused_variables)]
.send_envelope(node_ref.clone(), Some(destination_node_ref), message) let message_len = message.len();
let res = self
.network_manager()
.send_envelope(
node_ref.clone(),
Some(destination_node_ref.clone()),
message,
)
.await .await
.map_err(|e| { .map_err(|e| {
// If we're returning an error, clean up // If we're returning an error, clean up
self.record_send_failure(RPCKind::Answer, send_ts, node_ref.clone(), safety_route, remote_private_route); self.record_send_failure(
RPCKind::Answer,
send_ts,
node_ref.clone(),
safety_route,
remote_private_route,
);
RPCError::network(e) RPCError::network(e)
})? => { })?;
let _send_data_kind = network_result_value_or_log!( res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up // If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Answer, send_ts, node_ref.clone(), safety_route, remote_private_route); self.record_send_failure(RPCKind::Answer, send_ts, node_ref.clone(), safety_route, remote_private_route);
network_result_raise!(res);
} }
); );
@ -1513,7 +1568,8 @@ impl RPCProcessor {
let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv"); let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv");
// xxx: causes crash (Missing otel data span extensions) // xxx: causes crash (Missing otel data span extensions)
// rpc_worker_span.follows_from(span_id); // rpc_worker_span.follows_from(span_id);
let res = match self
network_result_value_or_log!(match self
.process_rpc_message(msg) .process_rpc_message(msg)
.instrument(rpc_worker_span) .instrument(rpc_worker_span)
.await .await
@ -1524,9 +1580,7 @@ impl RPCProcessor {
} }
Ok(v) => v, Ok(v) => v,
}; } => [ format!(": msg.header={:?}", msg.header) ] {});
network_result_value_or_log!(res => {});
} }
} }
@ -1542,8 +1596,7 @@ impl RPCProcessor {
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
body: Vec<u8>, body: Vec<u8>,
) -> EyreResult<()> { ) -> EyreResult<()> {
let msg = RPCMessageEncoded { let header = RPCMessageHeader {
header: RPCMessageHeader {
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect { detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
envelope, envelope,
peer_noderef, peer_noderef,
@ -1552,9 +1605,13 @@ impl RPCProcessor {
}), }),
timestamp: get_aligned_timestamp(), timestamp: get_aligned_timestamp(),
body_len: ByteCount::new(body.len() as u64), body_len: ByteCount::new(body.len() as u64),
}, };
let msg = RPCMessageEncoded {
header,
data: RPCMessageData { contents: body }, data: RPCMessageData { contents: body },
}; };
let send_channel = { let send_channel = {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner.send_channel.as_ref().unwrap().clone() inner.send_channel.as_ref().unwrap().clone()
@ -1577,8 +1634,7 @@ impl RPCProcessor {
sequencing: Sequencing, sequencing: Sequencing,
body: Vec<u8>, body: Vec<u8>,
) -> EyreResult<()> { ) -> EyreResult<()> {
let msg = RPCMessageEncoded { let header = RPCMessageHeader {
header: RPCMessageHeader {
detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted { detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted {
direct, direct,
remote_safety_route, remote_safety_route,
@ -1586,7 +1642,10 @@ impl RPCProcessor {
}), }),
timestamp: get_aligned_timestamp(), timestamp: get_aligned_timestamp(),
body_len: (body.len() as u64).into(), body_len: (body.len() as u64).into(),
}, };
let msg = RPCMessageEncoded {
header,
data: RPCMessageData { contents: body }, data: RPCMessageData { contents: body },
}; };
let send_channel = { let send_channel = {
@ -1612,21 +1671,22 @@ impl RPCProcessor {
safety_spec: SafetySpec, safety_spec: SafetySpec,
body: Vec<u8>, body: Vec<u8>,
) -> EyreResult<()> { ) -> EyreResult<()> {
let msg = RPCMessageEncoded { let header = RPCMessageHeader {
header: RPCMessageHeader { detail: RPCMessageHeaderDetail::PrivateRouted(RPCMessageHeaderDetailPrivateRouted {
detail: RPCMessageHeaderDetail::PrivateRouted(
RPCMessageHeaderDetailPrivateRouted {
direct, direct,
remote_safety_route, remote_safety_route,
private_route, private_route,
safety_spec, safety_spec,
}, }),
),
timestamp: get_aligned_timestamp(), timestamp: get_aligned_timestamp(),
body_len: (body.len() as u64).into(), body_len: (body.len() as u64).into(),
}, };
let msg = RPCMessageEncoded {
header,
data: RPCMessageData { contents: body }, data: RPCMessageData { contents: body },
}; };
let send_channel = { let send_channel = {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner.send_channel.as_ref().unwrap().clone() inner.send_channel.as_ref().unwrap().clone()

View File

@ -360,7 +360,7 @@ impl RPCProcessor {
Ok(NetworkResult::value(route_hop)) Ok(NetworkResult::value(route_hop))
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self), ret, err))]
pub(crate) async fn process_route( pub(crate) async fn process_route(
&self, &self,
msg: RPCMessage, msg: RPCMessage,

View File

@ -26,7 +26,7 @@ impl RPCProcessor {
// Send the validate_dial_info request // Send the validate_dial_info request
// This can only be sent directly, as relays can not validate dial info // This can only be sent directly, as relays can not validate dial info
network_result_value_or_log!(self.statement(Destination::direct(peer), statement) network_result_value_or_log!(self.statement(Destination::direct(peer), statement)
.await? => { .await? => [ format!(": peer={} statement={:?}", peer, statement) ] {
return Ok(false); return Ok(false);
} }
); );
@ -36,7 +36,7 @@ impl RPCProcessor {
ReceiptEvent::ReturnedPrivate { private_route: _ } ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedInBand { inbound_noderef: _ } | ReceiptEvent::ReturnedInBand { inbound_noderef: _ }
| ReceiptEvent::ReturnedSafety => { | ReceiptEvent::ReturnedSafety => {
log_net!(debug "validate_dial_info receipt should be returned out-of-band".green()); log_net!(debug "validate_dial_info receipt should be returned out-of-band");
Ok(false) Ok(false)
} }
ReceiptEvent::ReturnedOutOfBand => { ReceiptEvent::ReturnedOutOfBand => {
@ -44,7 +44,7 @@ impl RPCProcessor {
Ok(true) Ok(true)
} }
ReceiptEvent::Expired => { ReceiptEvent::Expired => {
log_net!(debug "validate_dial_info receipt expired".green()); log_net!(debug "validate_dial_info receipt expired");
Ok(false) Ok(false)
} }
ReceiptEvent::Cancelled => { ReceiptEvent::Cancelled => {
@ -141,7 +141,7 @@ impl RPCProcessor {
// Send the validate_dial_info request // Send the validate_dial_info request
// This can only be sent directly, as relays can not validate dial info // This can only be sent directly, as relays can not validate dial info
network_result_value_or_log!(self.statement(Destination::direct(peer), statement) network_result_value_or_log!(self.statement(Destination::direct(peer), statement)
.await? => { .await? => [ format!(": peer={} statement={:?}", peer, statement) ] {
continue; continue;
} }
); );

View File

@ -58,13 +58,13 @@ impl StorageManager {
let vres = rpc_processor let vres = rpc_processor
.clone() .clone()
.rpc_call_get_value( .rpc_call_get_value(
Destination::direct(next_node).with_safety(safety_selection), Destination::direct(next_node.clone()).with_safety(safety_selection),
key, key,
subkey, subkey,
last_descriptor, last_descriptor,
) )
.await?; .await?;
let gva = network_result_value_or_log!(vres => { let gva = network_result_value_or_log!(vres => [ format!(": next_node={} safety_selection={:?} key={} subkey={}", next_node, safety_selection, key, subkey) ] {
// Any other failures, just try the next node // Any other failures, just try the next node
return Ok(None); return Ok(None);
}); });

View File

@ -62,7 +62,7 @@ impl StorageManager {
let vres = rpc_processor let vres = rpc_processor
.clone() .clone()
.rpc_call_set_value( .rpc_call_set_value(
Destination::direct(next_node).with_safety(safety_selection), Destination::direct(next_node.clone()).with_safety(safety_selection),
key, key,
subkey, subkey,
value, value,
@ -70,7 +70,7 @@ impl StorageManager {
send_descriptor, send_descriptor,
) )
.await?; .await?;
let sva = network_result_value_or_log!(vres => { let sva = network_result_value_or_log!(vres => [ format!(": next_node={} safety_selection={:?} key={} subkey={} send_descriptor={}", next_node, safety_selection, key, subkey, send_descriptor) ] {
// Any other failures, just try the next node and pretend this one never happened // Any other failures, just try the next node and pretend this one never happened
return Ok(None); return Ok(None);
}); });

View File

@ -17,6 +17,7 @@ crypto-test-none = ["rt-tokio", "veilid-core/crypto-test-none"]
rt-async-std = ["veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys"] rt-async-std = ["veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys"]
rt-tokio = ["veilid-core/rt-tokio", "tokio", "tokio-stream", "tokio-util", "opentelemetry/rt-tokio", "console-subscriber"] rt-tokio = ["veilid-core/rt-tokio", "tokio", "tokio-stream", "tokio-util", "opentelemetry/rt-tokio", "console-subscriber"]
tracking = ["veilid-core/tracking"] tracking = ["veilid-core/tracking"]
network-result-extra = ["veilid-core/network-result-extra"]
[dependencies] [dependencies]
veilid-core = { path = "../veilid-core", default-features = false } veilid-core = { path = "../veilid-core", default-features = false }

View File

@ -18,6 +18,8 @@ rt-wasm-bindgen = [ "async_executors/bindgen", "async_executors/timer"]
veilid_tools_android_tests = [ "dep:paranoid-android" ] veilid_tools_android_tests = [ "dep:paranoid-android" ]
veilid_tools_ios_tests = [ "dep:oslog", "dep:tracing-oslog" ] veilid_tools_ios_tests = [ "dep:oslog", "dep:tracing-oslog" ]
tracing = [ "dep:tracing", "dep:tracing-subscriber" ] tracing = [ "dep:tracing", "dep:tracing-subscriber" ]
network-result-extra = []
network-result-info = []
[dependencies] [dependencies]
tracing = { version = "^0", features = ["log", "attributes"], optional = true } tracing = { version = "^0", features = ["log", "attributes"], optional = true }
@ -30,7 +32,6 @@ thiserror = "^1"
futures-util = { version = "^0", default_features = false, features = ["alloc"] } futures-util = { version = "^0", default_features = false, features = ["alloc"] }
parking_lot = "^0" parking_lot = "^0"
once_cell = "^1" once_cell = "^1"
owo-colors = "^3"
stop-token = { version = "^0", default-features = false } stop-token = { version = "^0", default-features = false }
rand = "^0.7" rand = "^0.7"
rust-fsm = "^0" rust-fsm = "^0"

View File

@ -271,6 +271,20 @@ impl<T: Debug + Display> Error for NetworkResult<T> {}
////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////
// Non-fallible network result macros // Non-fallible network result macros
#[macro_export]
macro_rules! network_result_raise {
($r: expr) => {
match $r {
NetworkResult::Timeout => return Ok(NetworkResult::Timeout),
NetworkResult::ServiceUnavailable(s) => return Ok(NetworkResult::ServiceUnavailable(s)),
NetworkResult::NoConnection(e) => return Ok(NetworkResult::NoConnection(e)),
NetworkResult::AlreadyExists(e) => return Ok(NetworkResult::AlreadyExists(e)),
NetworkResult::InvalidMessage(s) => return Ok(NetworkResult::InvalidMessage(s)),
NetworkResult::Value(_) => panic!("Can not raise value"),
}
};
}
#[macro_export] #[macro_export]
macro_rules! network_result_try { macro_rules! network_result_try {
($r: expr) => { ($r: expr) => {
@ -313,89 +327,102 @@ macro_rules! network_result_try {
#[macro_export] #[macro_export]
macro_rules! log_network_result { macro_rules! log_network_result {
($text:expr) => { ($text:expr) => {
// cfg_if::cfg_if! { cfg_if::cfg_if! {
// if #[cfg(debug_assertions)] { if #[cfg(feature="network-result-info")] {
// info!(target: "network_result", "{}", $text) info!(target: "network_result", "{}", format!("{}", $text))
// } else { } else {
debug!(target: "network_result", "{}", $text) debug!(target: "network_result", "{}", format!("{}", $text))
// } }
// } }
}; };
($fmt:literal, $($arg:expr),+) => { ($fmt:literal, $($arg:expr),+) => {
// cfg_if::cfg_if! { cfg_if::cfg_if! {
// if #[cfg(debug_assertions)] { if #[cfg(feature="network-result-info")] {
// info!(target: "network_result", $fmt, $($arg),+); info!(target: "network_result", "{}", format!($fmt, $($arg),+));
// } else { } else {
debug!(target: "network_result", $fmt, $($arg),+); debug!(target: "network_result", "{}", format!($fmt, $($arg),+));
// } }
// } }
}; };
} }
#[macro_export] #[macro_export]
macro_rules! network_result_value_or_log { macro_rules! network_result_value_or_log {
($r: expr => $f:tt) => { ($r:expr => $f:expr) => {
network_result_value_or_log!($r => [ "" ] $f )
};
($r:expr => [ $d:expr ] $f:expr) => { {
#[cfg(feature="network-result-extra")]
let __extra_message = $d;
#[cfg(not(feature="network-result-extra"))]
let __extra_message = "";
match $r { match $r {
NetworkResult::Timeout => { NetworkResult::Timeout => {
log_network_result!( log_network_result!(
"{} at {}@{}:{} in {}", "{} at {}@{}:{} in {}{}",
"Timeout", "Timeout",
file!(), file!(),
line!(), line!(),
column!(), column!(),
fn_name::uninstantiated!() fn_name::uninstantiated!(),
__extra_message
); );
$f $f
} }
NetworkResult::ServiceUnavailable(s) => { NetworkResult::ServiceUnavailable(ref s) => {
log_network_result!( log_network_result!(
"{}({}) at {}@{}:{} in {}", "{}({}) at {}@{}:{} in {}{}",
"ServiceUnavailable", "ServiceUnavailable",
s, s,
file!(), file!(),
line!(), line!(),
column!(), column!(),
fn_name::uninstantiated!() fn_name::uninstantiated!(),
__extra_message
); );
$f $f
} }
NetworkResult::NoConnection(e) => { NetworkResult::NoConnection(ref e) => {
log_network_result!( log_network_result!(
"{}({}) at {}@{}:{} in {}", "{}({}) at {}@{}:{} in {}{}",
"No connection", "No connection",
e.to_string(), e.to_string(),
file!(), file!(),
line!(), line!(),
column!(), column!(),
fn_name::uninstantiated!() fn_name::uninstantiated!(),
__extra_message
); );
$f $f
} }
NetworkResult::AlreadyExists(e) => { NetworkResult::AlreadyExists(ref e) => {
log_network_result!( log_network_result!(
"{}({}) at {}@{}:{} in {}", "{}({}) at {}@{}:{} in {}{}",
"Already exists", "Already exists",
e.to_string(), e.to_string(),
file!(), file!(),
line!(), line!(),
column!(), column!(),
fn_name::uninstantiated!() fn_name::uninstantiated!(),
__extra_message
); );
$f $f
} }
NetworkResult::InvalidMessage(s) => { NetworkResult::InvalidMessage(ref s) => {
log_network_result!( log_network_result!(
"{}({}) at {}@{}:{} in {}", "{}({}) at {}@{}:{} in {}{}",
"Invalid message", "Invalid message",
s, s,
file!(), file!(),
line!(), line!(),
column!(), column!(),
fn_name::uninstantiated!() fn_name::uninstantiated!(),
__extra_message
); );
$f $f
} }
NetworkResult::Value(v) => v, NetworkResult::Value(v) => v,
} }
}; } };
} }