more instruments

This commit is contained in:
Christien Rioux 2024-07-03 22:25:29 -04:00
parent c264d6fbbe
commit 9720bbe520
12 changed files with 249 additions and 249 deletions

View File

@ -283,7 +283,7 @@ impl ConnectionManager {
/// This will kill off any connections that are in conflict with the new connection to be made /// This will kill off any connections that are in conflict with the new connection to be made
/// in order to make room for the new connection in the system's connection table /// in order to make room for the new connection in the system's connection table
/// This routine needs to be atomic, or connections may exist in the table that are not established /// This routine needs to be atomic, or connections may exist in the table that are not established
#[instrument(level = "trace", skip(self), ret, err)] //#[instrument(level = "trace", skip(self), ret, err)]
pub async fn get_or_create_connection( pub async fn get_or_create_connection(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,

View File

@ -2,7 +2,7 @@ use super::*;
impl NetworkManager { impl NetworkManager {
// Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism) // Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism)
#[instrument(level = "trace", skip(self), ret, err)] #[instrument(level = "trace", target = "net", skip(self), ret, err)]
pub(crate) async fn handle_boot_request(&self, flow: Flow) -> EyreResult<NetworkResult<()>> { pub(crate) async fn handle_boot_request(&self, flow: Flow) -> EyreResult<NetworkResult<()>> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
@ -33,7 +33,7 @@ impl NetworkManager {
} }
// Direct bootstrap request // Direct bootstrap request
#[instrument(level = "trace", err, skip(self))] #[instrument(level = "trace", target = "net", err, skip(self))]
pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult<Vec<PeerInfo>> { pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult<Vec<PeerInfo>> {
let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms); let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms);
// Send boot magic to requested peer address // Send boot magic to requested peer address

View File

@ -395,67 +395,72 @@ impl Network {
// This creates a short-lived connection in the case of connection-oriented protocols // This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message. // for the purpose of sending this one message.
// This bypasses the connection table as it is not a 'node to node' connection. // This bypasses the connection table as it is not a 'node to node' connection.
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_data_unbound_to_dial_info( pub async fn send_data_unbound_to_dial_info(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> { ) -> EyreResult<NetworkResult<()>> {
self.record_dial_info_failure(dial_info.clone(), async move { self.record_dial_info_failure(
let data_len = data.len(); dial_info.clone(),
let connect_timeout_ms = { async move {
let c = self.config.get(); let data_len = data.len();
c.network.connection_initial_timeout_ms let connect_timeout_ms = {
}; let c = self.config.get();
c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().ip_addr()) .is_ip_addr_punished(dial_info.address().ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
} }
match dial_info.protocol_type() { match dial_info.protocol_type() {
ProtocolType::UDP => { ProtocolType::UDP => {
let peer_socket_addr = dial_info.to_socket_addr(); let peer_socket_addr = dial_info.to_socket_addr();
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) let h =
RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
.await
.wrap_err("create socket failure")?;
let _ = network_result_try!(h
.send_message(data, peer_socket_addr)
.await
.map(NetworkResult::Value)
.wrap_err("send message failure")?);
}
ProtocolType::TCP => {
let peer_socket_addr = dial_info.to_socket_addr();
let pnc = network_result_try!(RawTcpProtocolHandler::connect(
None,
peer_socket_addr,
connect_timeout_ms
)
.await .await
.wrap_err("create socket failure")?; .wrap_err("connect failure")?);
let _ = network_result_try!(h network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
.send_message(data, peer_socket_addr) }
ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
None,
&dial_info,
connect_timeout_ms
)
.await .await
.map(NetworkResult::Value) .wrap_err("connect failure")?);
.wrap_err("send message failure")?); network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
} }
ProtocolType::TCP => { // Network accounting
let peer_socket_addr = dial_info.to_socket_addr(); self.network_manager()
let pnc = network_result_try!(RawTcpProtocolHandler::connect( .stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
None,
peer_socket_addr,
connect_timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
None,
&dial_info,
connect_timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
}
// Network accounting
self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::Value(())) Ok(NetworkResult::Value(()))
}) }
.in_current_span(),
)
.await .await
} }
@ -464,103 +469,122 @@ impl Network {
// This creates a short-lived connection in the case of connection-oriented protocols // This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message. // for the purpose of sending this one message.
// This bypasses the connection table as it is not a 'node to node' connection. // This bypasses the connection table as it is not a 'node to node' connection.
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_recv_data_unbound_to_dial_info( pub async fn send_recv_data_unbound_to_dial_info(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
timeout_ms: u32, timeout_ms: u32,
) -> EyreResult<NetworkResult<Vec<u8>>> { ) -> EyreResult<NetworkResult<Vec<u8>>> {
self.record_dial_info_failure(dial_info.clone(), async move { self.record_dial_info_failure(
let data_len = data.len(); dial_info.clone(),
let connect_timeout_ms = { async move {
let c = self.config.get(); let data_len = data.len();
c.network.connection_initial_timeout_ms let connect_timeout_ms = {
}; let c = self.config.get();
c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().ip_addr()) .is_ip_addr_punished(dial_info.address().ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
let peer_socket_addr = dial_info.to_socket_addr();
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
.await
.wrap_err("create socket failure")?;
network_result_try!(h
.send_message(data, peer_socket_addr)
.await
.wrap_err("send message failure")?);
self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
// receive single response
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
let (recv_len, recv_addr) = network_result_try!(timeout(
timeout_ms,
h.recv_message(&mut out).instrument(Span::current())
)
.await
.into_network_result())
.wrap_err("recv_message failure")?;
let recv_socket_addr = recv_addr.remote_address().socket_addr();
self.network_manager()
.stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64));
// if the from address is not the same as the one we sent to, then drop this
if recv_socket_addr != peer_socket_addr {
bail!("wrong address");
}
out.resize(recv_len, 0u8);
Ok(NetworkResult::Value(out))
} }
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(match dial_info.protocol_type() { match dial_info.protocol_type() {
ProtocolType::UDP => unreachable!(), ProtocolType::UDP => {
ProtocolType::TCP => { let peer_socket_addr = dial_info.to_socket_addr();
let peer_socket_addr = dial_info.to_socket_addr(); let h =
RawTcpProtocolHandler::connect( RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
None, .await
peer_socket_addr, .wrap_err("create socket failure")?;
connect_timeout_ms, network_result_try!(h
) .send_message(data, peer_socket_addr)
.await .await
.wrap_err("connect failure")? .wrap_err("send message failure")?);
self.network_manager().stats_packet_sent(
dial_info.ip_addr(),
ByteCount::new(data_len as u64),
);
// receive single response
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
let (recv_len, recv_addr) = network_result_try!(timeout(
timeout_ms,
h.recv_message(&mut out).instrument(Span::current())
)
.await
.into_network_result())
.wrap_err("recv_message failure")?;
let recv_socket_addr = recv_addr.remote_address().socket_addr();
self.network_manager().stats_packet_rcvd(
recv_socket_addr.ip(),
ByteCount::new(recv_len as u64),
);
// if the from address is not the same as the one we sent to, then drop this
if recv_socket_addr != peer_socket_addr {
bail!("wrong address");
} }
ProtocolType::WS | ProtocolType::WSS => { out.resize(recv_len, 0u8);
WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms) Ok(NetworkResult::Value(out))
}
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(match dial_info.protocol_type() {
ProtocolType::UDP => unreachable!(),
ProtocolType::TCP => {
let peer_socket_addr = dial_info.to_socket_addr();
RawTcpProtocolHandler::connect(
None,
peer_socket_addr,
connect_timeout_ms,
)
.await .await
.wrap_err("connect failure")? .wrap_err("connect failure")?
} }
}); ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::connect(
None,
&dial_info,
connect_timeout_ms,
)
.await
.wrap_err("connect failure")?
}
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?); network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
self.network_manager() self.network_manager().stats_packet_sent(
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64)); dial_info.ip_addr(),
ByteCount::new(data_len as u64),
);
let out = let out = network_result_try!(network_result_try!(timeout(
network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) timeout_ms,
.await pnc.recv()
.into_network_result()) )
.await
.into_network_result())
.wrap_err("recv failure")?); .wrap_err("recv failure")?);
self.network_manager() self.network_manager().stats_packet_rcvd(
.stats_packet_rcvd(dial_info.ip_addr(), ByteCount::new(out.len() as u64)); dial_info.ip_addr(),
ByteCount::new(out.len() as u64),
);
Ok(NetworkResult::Value(out)) Ok(NetworkResult::Value(out))
}
} }
} }
}) .in_current_span(),
)
.await .await
} }
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_data_to_existing_flow( pub async fn send_data_to_existing_flow(
&self, &self,
flow: Flow, flow: Flow,
@ -625,57 +649,61 @@ impl Network {
// Send data directly to a dial info, possibly without knowing which node it is going to // Send data directly to a dial info, possibly without knowing which node it is going to
// Returns a flow for the connection used to send the data // Returns a flow for the connection used to send the data
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_data_to_dial_info( pub async fn send_data_to_dial_info(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> { ) -> EyreResult<NetworkResult<UniqueFlow>> {
self.record_dial_info_failure(dial_info.clone(), async move { self.record_dial_info_failure(
let data_len = data.len(); dial_info.clone(),
let unique_flow; async move {
if dial_info.protocol_type() == ProtocolType::UDP { let data_len = data.len();
// Handle connectionless protocol let unique_flow;
let peer_socket_addr = dial_info.to_socket_addr(); if dial_info.protocol_type() == ProtocolType::UDP {
let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { // Handle connectionless protocol
Some(ph) => ph, let peer_socket_addr = dial_info.to_socket_addr();
None => { let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) {
return Ok(NetworkResult::no_connection_other( Some(ph) => ph,
"no appropriate UDP protocol handler for dial_info", None => {
)); return Ok(NetworkResult::no_connection_other(
"no appropriate UDP protocol handler for dial_info",
));
}
};
let flow = network_result_try!(ph
.send_message(data, peer_socket_addr)
.await
.wrap_err("failed to send data to dial info")?);
unique_flow = UniqueFlow {
flow,
connection_id: None,
};
} else {
// Handle connection-oriented protocols
let conn = network_result_try!(
self.connection_manager()
.get_or_create_connection(dial_info.clone())
.await?
);
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
return Ok(NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset,
"failed to send",
)));
} }
}; unique_flow = conn.unique_flow();
let flow = network_result_try!(ph
.send_message(data, peer_socket_addr)
.await
.wrap_err("failed to send data to dial info")?);
unique_flow = UniqueFlow {
flow,
connection_id: None,
};
} else {
// Handle connection-oriented protocols
let conn = network_result_try!(
self.connection_manager()
.get_or_create_connection(dial_info.clone())
.await?
);
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
return Ok(NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset,
"failed to send",
)));
} }
unique_flow = conn.unique_flow();
// Network accounting
self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::value(unique_flow))
} }
.in_current_span(),
// Network accounting )
self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::value(unique_flow))
})
.await .await
} }

View File

@ -22,10 +22,7 @@ impl RawTcpNetworkConnection {
self.flow self.flow
} }
#[cfg_attr( #[instrument(level = "trace", target = "protocol", err, skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", err, skip(self))
)]
pub async fn close(&self) -> io::Result<NetworkResult<()>> { pub async fn close(&self) -> io::Result<NetworkResult<()>> {
let mut stream = self.stream.clone(); let mut stream = self.stream.clone();
let _ = stream.close().await; let _ = stream.close().await;
@ -47,6 +44,7 @@ impl RawTcpNetworkConnection {
// } // }
} }
#[instrument(level = "trace", target = "protocol", err, skip_all)]
async fn send_internal( async fn send_internal(
stream: &mut AsyncPeekStream, stream: &mut AsyncPeekStream,
message: Vec<u8>, message: Vec<u8>,
@ -63,7 +61,7 @@ impl RawTcpNetworkConnection {
stream.flush().await.into_network_result() stream.flush().await.into_network_result()
} }
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, message), fields(network_result, message.len = message.len())))] #[instrument(level="trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
let mut stream = self.stream.clone(); let mut stream = self.stream.clone();
let out = Self::send_internal(&mut stream, message).await?; let out = Self::send_internal(&mut stream, message).await?;
@ -72,6 +70,7 @@ impl RawTcpNetworkConnection {
Ok(out) Ok(out)
} }
#[instrument(level = "trace", target = "protocol", err, skip_all)]
async fn recv_internal(stream: &mut AsyncPeekStream) -> io::Result<NetworkResult<Vec<u8>>> { async fn recv_internal(stream: &mut AsyncPeekStream) -> io::Result<NetworkResult<Vec<u8>>> {
let mut header = [0u8; 4]; let mut header = [0u8; 4];
@ -95,10 +94,7 @@ impl RawTcpNetworkConnection {
Ok(NetworkResult::Value(out)) Ok(NetworkResult::Value(out))
} }
#[cfg_attr( #[instrument(level = "trace", target = "protocol", err, skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", err, skip(self), fields(network_result))
)]
pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> { pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
let mut stream = self.stream.clone(); let mut stream = self.stream.clone();
let out = Self::recv_internal(&mut stream).await?; let out = Self::recv_internal(&mut stream).await?;
@ -127,7 +123,7 @@ impl RawTcpProtocolHandler {
} }
} }
#[instrument(level = "trace", err, skip(self, ps))] #[instrument(level = "trace", target = "protocol", err, skip_all)]
async fn on_accept_async( async fn on_accept_async(
self, self,
ps: AsyncPeekStream, ps: AsyncPeekStream,
@ -160,7 +156,7 @@ impl RawTcpProtocolHandler {
Ok(Some(conn)) Ok(Some(conn))
} }
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", target = "protocol", err, skip_all)]
pub async fn connect( pub async fn connect(
local_address: Option<SocketAddr>, local_address: Option<SocketAddr>,
socket_addr: SocketAddr, socket_addr: SocketAddr,

View File

@ -17,7 +17,7 @@ impl RawUdpProtocolHandler {
} }
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.flow)))] #[instrument(level = "trace", target = "protocol", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.flow))]
pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, Flow)> { pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, Flow)> {
let (message_len, flow) = loop { let (message_len, flow) = loop {
// Get a packet // Get a packet
@ -80,7 +80,7 @@ impl RawUdpProtocolHandler {
Ok((message_len, flow)) Ok((message_len, flow))
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.flow)))] #[instrument(level = "trace", target = "protocol", err, skip(self, data), fields(data.len = data.len(), ret.flow))]
pub async fn send_message( pub async fn send_message(
&self, &self,
data: Vec<u8>, data: Vec<u8>,
@ -135,7 +135,7 @@ impl RawUdpProtocolHandler {
Ok(NetworkResult::value(flow)) Ok(NetworkResult::value(flow))
} }
#[instrument(level = "trace", err)] #[instrument(level = "trace", target = "protocol", err)]
pub async fn new_unspecified_bound_handler( pub async fn new_unspecified_bound_handler(
socket_addr: &SocketAddr, socket_addr: &SocketAddr,
) -> io::Result<RawUdpProtocolHandler> { ) -> io::Result<RawUdpProtocolHandler> {

View File

@ -82,10 +82,7 @@ where
self.flow self.flow
} }
#[cfg_attr( #[instrument(level = "trace", target = "protocol", err, skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", err, skip(self))
)]
pub async fn close(&self) -> io::Result<NetworkResult<()>> { pub async fn close(&self) -> io::Result<NetworkResult<()>> {
// Make an attempt to close the stream normally // Make an attempt to close the stream normally
let mut stream = self.stream.clone(); let mut stream = self.stream.clone();
@ -127,7 +124,7 @@ where
*/ */
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))] #[instrument(level = "trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
bail_io_error_other!("sending too large WS message"); bail_io_error_other!("sending too large WS message");
@ -142,7 +139,7 @@ where
Ok(out) Ok(out)
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self), fields(network_result, ret.len)))] #[instrument(level = "trace", target="protocol", err, skip(self), fields(network_result, ret.len))]
pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> { pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
let out = match self.stream.clone().next().await { let out = match self.stream.clone().next().await {
Some(Ok(Message::Binary(v))) => { Some(Ok(Message::Binary(v))) => {
@ -212,7 +209,7 @@ impl WebsocketProtocolHandler {
} }
} }
#[instrument(level = "trace", err, skip(self, ps))] #[instrument(level = "trace", target = "protocol", err, skip(self, ps))]
pub async fn on_accept_async( pub async fn on_accept_async(
self, self,
ps: AsyncPeekStream, ps: AsyncPeekStream,
@ -294,7 +291,7 @@ impl WebsocketProtocolHandler {
Ok(Some(conn)) Ok(Some(conn))
} }
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", target = "protocol", ret, err)]
pub async fn connect( pub async fn connect(
local_address: Option<SocketAddr>, local_address: Option<SocketAddr>,
dial_info: &DialInfo, dial_info: &DialInfo,

View File

@ -265,7 +265,7 @@ impl NetworkConnection {
// Connection receiver loop // Connection receiver loop
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument(level="trace", target="net", skip_all)] //#[instrument(level="trace", target="net", skip_all)]
fn process_connection( fn process_connection(
connection_manager: ConnectionManager, connection_manager: ConnectionManager,
local_stop_token: StopToken, local_stop_token: StopToken,
@ -307,10 +307,11 @@ impl NetworkConnection {
need_sender = false; need_sender = false;
let sender_fut = receiver.recv_async().then(|res| async { let sender_fut = receiver.recv_async().then(|res| async {
match res { match res {
Ok((span_id, message)) => { Ok((_span_id, message)) => {
let span = span!(parent: span_id, Level::TRACE, "process_connection send"); // let span = span!(Level::TRACE, "process_connection send");
let _enter = span.enter(); // span.follows_from(span_id);
// let _enter = span.enter();
// Touch the LRU for this connection // Touch the LRU for this connection
connection_manager.touch_connection_by_id(connection_id); connection_manager.touch_connection_by_id(connection_id);

View File

@ -54,6 +54,7 @@ impl NetworkManager {
self.try_possibly_relayed_contact_method(possibly_relayed_contact_method, destination_node_ref, data).await self.try_possibly_relayed_contact_method(possibly_relayed_contact_method, destination_node_ref, data).await
} }
#[instrument(level="trace", target="net", skip_all)]
pub(crate) fn try_possibly_relayed_contact_method(&self, pub(crate) fn try_possibly_relayed_contact_method(&self,
possibly_relayed_contact_method: NodeContactMethod, possibly_relayed_contact_method: NodeContactMethod,
destination_node_ref: NodeRef, destination_node_ref: NodeRef,

View File

@ -69,7 +69,7 @@ impl WebsocketNetworkConnection {
Ok(NetworkResult::value(())) Ok(NetworkResult::value(()))
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))] #[instrument(level = "trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
bail_io_error_other!("sending too large WS message"); bail_io_error_other!("sending too large WS message");
@ -89,7 +89,7 @@ impl WebsocketNetworkConnection {
Ok(out) Ok(out)
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self), fields(network_result, ret.len)))] #[instrument(level = "trace", target="protocol", err, skip(self), fields(network_result, ret.len))]
pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> { pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
let out = match SendWrapper::new(self.inner.ws_stream.clone().next()).await { let out = match SendWrapper::new(self.inner.ws_stream.clone().next()).await {
Some(WsMessage::Binary(v)) => { Some(WsMessage::Binary(v)) => {
@ -121,7 +121,7 @@ impl WebsocketNetworkConnection {
pub(in crate::network_manager) struct WebsocketProtocolHandler {} pub(in crate::network_manager) struct WebsocketProtocolHandler {}
impl WebsocketProtocolHandler { impl WebsocketProtocolHandler {
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", target = "protocol", ret, err)]
pub async fn connect( pub async fn connect(
dial_info: &DialInfo, dial_info: &DialInfo,
timeout_ms: u32, timeout_ms: u32,

View File

@ -4,6 +4,7 @@ impl RoutingTable {
/// Utility to find the closest nodes to a particular key, preferring reliable nodes first, /// Utility to find the closest nodes to a particular key, preferring reliable nodes first,
/// including possibly our own node and nodes further away from the key than our own, /// including possibly our own node and nodes further away from the key than our own,
/// returning their peer info /// returning their peer info
#[instrument(level = "trace", target = "rtab", skip_all)]
pub fn find_preferred_closest_peers( pub fn find_preferred_closest_peers(
&self, &self,
key: TypedKey, key: TypedKey,
@ -72,6 +73,7 @@ impl RoutingTable {
/// Utility to find nodes that are closer to a key than our own node, /// Utility to find nodes that are closer to a key than our own node,
/// preferring reliable nodes first, and returning their peer info /// preferring reliable nodes first, and returning their peer info
/// Can filter based on a particular set of capabilities /// Can filter based on a particular set of capabilities
#[instrument(level = "trace", target = "rtab", skip_all)]
pub fn find_preferred_peers_closer_to_key( pub fn find_preferred_peers_closer_to_key(
&self, &self,
key: TypedKey, key: TypedKey,
@ -167,6 +169,7 @@ impl RoutingTable {
} }
/// Determine if set of peers is closer to key_near than key_far is to key_near /// Determine if set of peers is closer to key_near than key_far is to key_near
#[instrument(level = "trace", target = "rtab", skip_all, err)]
pub(crate) fn verify_peers_closer( pub(crate) fn verify_peers_closer(
vcrypto: CryptoSystemVersion, vcrypto: CryptoSystemVersion,
key_far: TypedKey, key_far: TypedKey,

View File

@ -77,7 +77,7 @@ impl RouteSpecStore {
} }
} }
#[instrument(level = "trace", skip(routing_table), err)] #[instrument(level = "trace", target = "route", skip(routing_table), err)]
pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStore> { pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStore> {
let (max_route_hop_count, default_route_hop_count) = { let (max_route_hop_count, default_route_hop_count) = {
let config = routing_table.network_manager().config(); let config = routing_table.network_manager().config();
@ -115,7 +115,7 @@ impl RouteSpecStore {
Ok(rss) Ok(rss)
} }
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", target = "route", skip(self), err)]
pub async fn save(&self) -> EyreResult<()> { pub async fn save(&self) -> EyreResult<()> {
let content = { let content = {
let inner = self.inner.lock(); let inner = self.inner.lock();
@ -130,7 +130,7 @@ impl RouteSpecStore {
Ok(()) Ok(())
} }
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", target = "route", skip(self))]
pub fn send_route_update(&self) { pub fn send_route_update(&self) {
let (dead_routes, dead_remote_routes) = { let (dead_routes, dead_remote_routes) = {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -170,7 +170,7 @@ impl RouteSpecStore {
/// Returns Err(VeilidAPIError::TryAgain) if no route could be allocated at this time /// Returns Err(VeilidAPIError::TryAgain) if no route could be allocated at this time
/// Returns other errors on failure /// Returns other errors on failure
/// Returns Ok(route id string) on success /// Returns Ok(route id string) on success
#[instrument(level = "trace", skip(self), ret, err(level=Level::TRACE))] #[instrument(level = "trace", target="route", skip(self), ret, err(level=Level::TRACE))]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn allocate_route( pub fn allocate_route(
&self, &self,
@ -199,7 +199,7 @@ impl RouteSpecStore {
) )
} }
#[instrument(level = "trace", skip(self, inner, rti), ret, err(level=Level::TRACE))] #[instrument(level = "trace", target="route", skip(self, inner, rti), ret, err(level=Level::TRACE))]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn allocate_route_inner( fn allocate_route_inner(
&self, &self,
@ -640,10 +640,7 @@ impl RouteSpecStore {
} }
/// validate data using a private route's key and signature chain /// validate data using a private route's key and signature chain
#[cfg_attr( #[instrument(level = "trace", target = "route", skip(self, data, callback), ret)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self, data, callback), ret)
)]
pub fn with_signature_validated_route<F, R>( pub fn with_signature_validated_route<F, R>(
&self, &self,
public_key: &TypedKey, public_key: &TypedKey,
@ -711,10 +708,7 @@ impl RouteSpecStore {
Some(callback(rssd, rsd)) Some(callback(rssd, rsd))
} }
#[cfg_attr( #[instrument(level = "trace", target = "route", skip(self), ret, err)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err)
)]
async fn test_allocated_route(&self, private_route_id: RouteId) -> VeilidAPIResult<bool> { async fn test_allocated_route(&self, private_route_id: RouteId) -> VeilidAPIResult<bool> {
// Make loopback route to test with // Make loopback route to test with
let (dest, hops) = { let (dest, hops) = {
@ -781,7 +775,7 @@ impl RouteSpecStore {
Ok(true) Ok(true)
} }
#[instrument(level = "trace", skip(self), ret, err)] #[instrument(level = "trace", target = "route", skip(self), ret, err)]
async fn test_remote_route(&self, private_route_id: RouteId) -> VeilidAPIResult<bool> { async fn test_remote_route(&self, private_route_id: RouteId) -> VeilidAPIResult<bool> {
// Make private route test // Make private route test
let dest = { let dest = {
@ -825,7 +819,7 @@ impl RouteSpecStore {
} }
/// Release an allocated route that is no longer in use /// Release an allocated route that is no longer in use
#[instrument(level = "trace", skip(self), ret)] #[instrument(level = "trace", target = "route", skip(self), ret)]
fn release_allocated_route(&self, id: RouteId) -> bool { fn release_allocated_route(&self, id: RouteId) -> bool {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let Some(rssd) = inner.content.remove_detail(&id) else { let Some(rssd) = inner.content.remove_detail(&id) else {
@ -852,10 +846,7 @@ impl RouteSpecStore {
} }
/// Test an allocated route for continuity /// Test an allocated route for continuity
#[cfg_attr( #[instrument(level = "trace", target = "route", skip(self), ret, err)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err)
)]
pub async fn test_route(&self, id: RouteId) -> VeilidAPIResult<bool> { pub async fn test_route(&self, id: RouteId) -> VeilidAPIResult<bool> {
let is_remote = self.is_route_id_remote(&id); let is_remote = self.is_route_id_remote(&id);
if is_remote { if is_remote {
@ -866,7 +857,7 @@ impl RouteSpecStore {
} }
/// Release an allocated or remote route that is no longer in use /// Release an allocated or remote route that is no longer in use
#[instrument(level = "trace", skip(self), ret)] #[instrument(level = "trace", target = "route", skip(self), ret)]
pub fn release_route(&self, id: RouteId) -> bool { pub fn release_route(&self, id: RouteId) -> bool {
let is_remote = self.is_route_id_remote(&id); let is_remote = self.is_route_id_remote(&id);
if is_remote { if is_remote {
@ -879,6 +870,7 @@ impl RouteSpecStore {
/// Find first matching unpublished route that fits into the selection criteria /// Find first matching unpublished route that fits into the selection criteria
/// Don't pick any routes that have failed and haven't been tested yet /// Don't pick any routes that have failed and haven't been tested yet
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", target = "route", skip_all)]
fn first_available_route_inner( fn first_available_route_inner(
inner: &RouteSpecStoreInner, inner: &RouteSpecStoreInner,
crypto_kind: CryptoKind, crypto_kind: CryptoKind,
@ -1002,6 +994,8 @@ impl RouteSpecStore {
/// Returns Err(VeilidAPIError::TryAgain) if no allocation could happen at this time (not an error) /// Returns Err(VeilidAPIError::TryAgain) if no allocation could happen at this time (not an error)
/// Returns other Err() if the parameters are wrong /// Returns other Err() if the parameters are wrong
/// Returns Ok(compiled route) on success /// Returns Ok(compiled route) on success
#[instrument(level = "trace", target = "route", skip_all)]
pub fn compile_safety_route( pub fn compile_safety_route(
&self, &self,
safety_selection: SafetySelection, safety_selection: SafetySelection,
@ -1269,10 +1263,7 @@ impl RouteSpecStore {
} }
/// Get an allocated route that matches a particular safety spec /// Get an allocated route that matches a particular safety spec
#[cfg_attr( #[instrument(level = "trace", target = "route", skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self, inner, rti), ret, err)
)]
fn get_route_for_safety_spec_inner( fn get_route_for_safety_spec_inner(
&self, &self,
inner: &mut RouteSpecStoreInner, inner: &mut RouteSpecStoreInner,
@ -1353,10 +1344,7 @@ impl RouteSpecStore {
} }
/// Get a private route to use for the answer to question /// Get a private route to use for the answer to question
#[cfg_attr( #[instrument(level = "trace", target = "route", skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err)
)]
pub fn get_private_route_for_safety_spec( pub fn get_private_route_for_safety_spec(
&self, &self,
crypto_kind: CryptoKind, crypto_kind: CryptoKind,
@ -1477,10 +1465,7 @@ impl RouteSpecStore {
/// Assemble a single private route for publication /// Assemble a single private route for publication
/// Returns a PrivateRoute object for an allocated private route key /// Returns a PrivateRoute object for an allocated private route key
#[cfg_attr( #[instrument(level = "trace", target = "route", skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self), err)
)]
pub fn assemble_private_route( pub fn assemble_private_route(
&self, &self,
key: &PublicKey, key: &PublicKey,
@ -1510,10 +1495,7 @@ impl RouteSpecStore {
/// Assemble private route set for publication /// Assemble private route set for publication
/// Returns a vec of PrivateRoute objects for an allocated private route /// Returns a vec of PrivateRoute objects for an allocated private route
#[cfg_attr( #[instrument(level = "trace", target = "route", skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self), err)
)]
pub fn assemble_private_routes( pub fn assemble_private_routes(
&self, &self,
id: &RouteId, id: &RouteId,
@ -1541,10 +1523,7 @@ impl RouteSpecStore {
/// Import a remote private route set blob for compilation /// Import a remote private route set blob for compilation
/// It is safe to import the same route more than once and it will return the same route id /// It is safe to import the same route more than once and it will return the same route id
/// Returns a route set id /// Returns a route set id
#[cfg_attr( #[instrument(level = "trace", target = "route", skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self, blob), ret, err)
)]
pub fn import_remote_private_route_blob(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> { pub fn import_remote_private_route_blob(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> {
let cur_ts = get_aligned_timestamp(); let cur_ts = get_aligned_timestamp();
@ -1581,10 +1560,7 @@ impl RouteSpecStore {
/// Add a single remote private route for compilation /// Add a single remote private route for compilation
/// It is safe to add the same route more than once and it will return the same route id /// It is safe to add the same route more than once and it will return the same route id
/// Returns a route set id /// Returns a route set id
#[cfg_attr( #[instrument(level = "trace", target = "route", skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err)
)]
pub fn add_remote_private_route( pub fn add_remote_private_route(
&self, &self,
private_route: PrivateRoute, private_route: PrivateRoute,
@ -1619,10 +1595,7 @@ impl RouteSpecStore {
} }
/// Release a remote private route that is no longer in use /// Release a remote private route that is no longer in use
#[cfg_attr( #[instrument(level = "trace", target = "route", skip_all)]
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret)
)]
pub fn release_remote_private_route(&self, id: RouteId) -> bool { pub fn release_remote_private_route(&self, id: RouteId) -> bool {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
inner.cache.remove_remote_private_route(id) inner.cache.remove_remote_private_route(id)
@ -1739,7 +1712,7 @@ impl RouteSpecStore {
} }
/// Clear caches when local our local node info changes /// Clear caches when local our local node info changes
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", target = "route", skip(self))]
pub fn reset(&self) { pub fn reset(&self) {
log_rtab!(debug "flushing route spec store"); log_rtab!(debug "flushing route spec store");

View File

@ -22,6 +22,7 @@ pub type PermFunc<'t> = Box<dyn FnMut(&[usize]) -> Option<PermReturnType> + Send
/// for a set of 'hop_count' nodes. the first node is always fixed, and the maximum /// for a set of 'hop_count' nodes. the first node is always fixed, and the maximum
/// number of permutations is given by get_route_permutation_count() /// number of permutations is given by get_route_permutation_count()
#[instrument(level = "trace", target = "route", skip_all)]
pub fn with_route_permutations( pub fn with_route_permutations(
hop_count: usize, hop_count: usize,
start: usize, start: usize,