[ci skip] debugging

This commit is contained in:
Christien Rioux 2025-03-03 16:06:56 -05:00
parent 7c75cf02dd
commit 9a3cab071a
7 changed files with 165 additions and 59 deletions

View File

@ -8,7 +8,7 @@ impl_veilid_log_facility!("net");
const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10); const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10);
const PROTECTED_CONNECTION_DROP_COUNT: usize = 3; const PROTECTED_CONNECTION_DROP_COUNT: usize = 3;
const NEW_CONNECTION_RETRY_COUNT: usize = 1; const NEW_CONNECTION_RETRY_COUNT: usize = 0;
const NEW_CONNECTION_RETRY_DELAY_MS: u32 = 500; const NEW_CONNECTION_RETRY_DELAY_MS: u32 = 500;
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
@ -415,7 +415,18 @@ impl ConnectionManager {
let best_port = preferred_local_address.map(|pla| pla.port()); let best_port = preferred_local_address.map(|pla| pla.port());
// Async lock on the remote address for atomicity per remote // Async lock on the remote address for atomicity per remote
let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr).await; // Use the initial timeout here as the 'close' timeout as well
let Ok(_lock_guard) = timeout(
self.arc.connection_initial_timeout_ms,
self.arc.address_lock_table.lock_tag(remote_addr),
)
.await
else {
veilid_log!(self debug "== get_or_create_connection: connection busy, not connecting to dial_info={:?}", dial_info);
return Ok(NetworkResult::no_connection_other(
"connection endpoint busy",
));
};
veilid_log!(self trace "== get_or_create_connection dial_info={:?}", dial_info); veilid_log!(self trace "== get_or_create_connection dial_info={:?}", dial_info);
@ -477,8 +488,9 @@ impl ConnectionManager {
veilid_log!(self debug "get_or_create_connection retries left: {}", retry_count); veilid_log!(self debug "get_or_create_connection retries left: {}", retry_count);
retry_count -= 1; retry_count -= 1;
// XXX: This should not be necessary
// Release the preferred local address if things can't connect due to a low-level collision we dont have a record of // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of
preferred_local_address = None; // preferred_local_address = None;
sleep(NEW_CONNECTION_RETRY_DELAY_MS).await; sleep(NEW_CONNECTION_RETRY_DELAY_MS).await;
}); });

View File

@ -636,6 +636,10 @@ impl Network {
) -> EyreResult<NetworkResult<UniqueFlow>> { ) -> EyreResult<NetworkResult<UniqueFlow>> {
let _guard = self.startup_lock.enter()?; let _guard = self.startup_lock.enter()?;
let connection_initial_timeout_us = self
.config()
.with(|c| c.network.connection_initial_timeout_ms as u64 * 1000);
self.record_dial_info_failure( self.record_dial_info_failure(
dial_info.clone(), dial_info.clone(),
async move { async move {
@ -652,8 +656,10 @@ impl Network {
)); ));
} }
}; };
let flow = network_result_try!(ph let flow = network_result_try!(debug_duration(
.send_message(data, peer_socket_addr) || { ph.send_message(data, peer_socket_addr) },
Some(connection_initial_timeout_us * 2)
)
.await .await
.wrap_err("failed to send data to dial info")?); .wrap_err("failed to send data to dial info")?);
unique_flow = UniqueFlow { unique_flow = UniqueFlow {
@ -662,14 +668,21 @@ impl Network {
}; };
} else { } else {
// Handle connection-oriented protocols // Handle connection-oriented protocols
let connmgr = self.network_manager().connection_manager();
let conn = network_result_try!( let conn = network_result_try!(
self.network_manager() debug_duration(
.connection_manager() || { connmgr.get_or_create_connection(dial_info.clone()) },
.get_or_create_connection(dial_info.clone()) Some(connection_initial_timeout_us * 2)
)
.await? .await?
); );
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { if let ConnectionHandleSendResult::NotSent(_) = debug_duration(
|| conn.send_async(data),
Some(connection_initial_timeout_us * 2),
)
.await
{
return Ok(NetworkResult::NoConnection(io::Error::new( return Ok(NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset, io::ErrorKind::ConnectionReset,
"failed to send", "failed to send",

View File

@ -6,7 +6,7 @@ use async_tungstenite::tungstenite::handshake::server::{
Callback, ErrorResponse, Request, Response, Callback, ErrorResponse, Request, Response,
}; };
use async_tungstenite::tungstenite::http::StatusCode; use async_tungstenite::tungstenite::http::StatusCode;
use async_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message}; use async_tungstenite::tungstenite::protocol::Message;
use async_tungstenite::tungstenite::Error; use async_tungstenite::tungstenite::Error;
use async_tungstenite::{accept_hdr_async, client_async, WebSocketStream}; use async_tungstenite::{accept_hdr_async, client_async, WebSocketStream};
use futures_util::{AsyncRead, AsyncWrite, SinkExt}; use futures_util::{AsyncRead, AsyncWrite, SinkExt};
@ -98,45 +98,27 @@ where
#[instrument(level = "trace", target = "protocol", err, skip_all)] #[instrument(level = "trace", target = "protocol", err, skip_all)]
pub async fn close(&self) -> io::Result<NetworkResult<()>> { pub async fn close(&self) -> io::Result<NetworkResult<()>> {
let timeout_ms = self
.registry
.config()
.with(|c| c.network.connection_initial_timeout_ms);
// Make an attempt to close the stream normally // Make an attempt to close the stream normally
let mut stream = self.stream.clone(); let mut stream = self.stream.clone();
let out = match stream
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: "".into(),
})))
.await
{
Ok(v) => NetworkResult::value(v),
Err(e) => err_to_network_result(e),
};
// This close does not do a TCP shutdown so it is safe and will not cause TIME_WAIT // This close does not do a TCP shutdown so it is safe and will not cause TIME_WAIT
let _ = stream.close().await; match timeout(timeout_ms, stream.close()).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
return Ok(err_to_network_result(e));
}
Err(_) => {
// Timed out
return Ok(NetworkResult::timeout());
}
};
Ok(out) Ok(NetworkResult::value(()))
// Drive connection to close
/*
let cur_ts = get_timestamp();
loop {
match stream.flush().await {
Ok(()) => {}
Err(Error::Io(ioerr)) => {
break Err(ioerr).into_network_result();
}
Err(Error::ConnectionClosed) => {
break Ok(NetworkResult::value(()));
}
Err(e) => {
break Err(to_io_error_other(e));
}
}
if get_timestamp().saturating_sub(cur_ts) >= MAX_CONNECTION_CLOSE_WAIT_US {
return Ok(NetworkResult::Timeout);
}
}
*/
} }
#[instrument(level = "trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))] #[instrument(level = "trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))]

View File

@ -152,7 +152,7 @@ impl NetworkManager {
// If a node is unreachable it may still have an existing inbound connection // If a node is unreachable it may still have an existing inbound connection
// Try that, but don't cache anything // Try that, but don't cache anything
network_result_try!( network_result_try!(
pin_future_closure!(self.send_data_ncm_existing(target_node_ref, data)).await? pin_future_closure!(self.send_data_unreachable(target_node_ref, data)).await?
) )
} }
Some(NodeContactMethod { Some(NodeContactMethod {
@ -239,6 +239,42 @@ impl NetworkManager {
})) }))
} }
/// Send data to unreachable node
#[instrument(level = "trace", target = "net", skip_all, err)]
async fn send_data_unreachable(
&self,
target_node_ref: FilteredNodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
// First try to send data to the last connection we've seen this peer on
let Some(flow) = target_node_ref.last_flow() else {
return Ok(NetworkResult::no_connection_other(format!(
"node was unreachable: {}",
target_node_ref
)));
};
let net = self.net();
let unique_flow = match pin_future!(debug_duration(
|| { net.send_data_to_existing_flow(flow, data) },
Some(1_000_000)
))
.await?
{
SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow,
SendDataToExistingFlowResult::NotSent(_) => {
return Ok(NetworkResult::no_connection_other(
"failed to send to existing flow",
));
}
};
// Update timestamp for this last connection since we just sent to it
self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
Ok(NetworkResult::value(unique_flow))
}
/// Send data using NodeContactMethod::Existing /// Send data using NodeContactMethod::Existing
#[instrument(level = "trace", target = "net", skip_all, err)] #[instrument(level = "trace", target = "net", skip_all, err)]
async fn send_data_ncm_existing( async fn send_data_ncm_existing(
@ -255,7 +291,12 @@ impl NetworkManager {
}; };
let net = self.net(); let net = self.net();
let unique_flow = match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { let unique_flow = match pin_future!(debug_duration(
|| { net.send_data_to_existing_flow(flow, data) },
Some(1_000_000)
))
.await?
{
SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow,
SendDataToExistingFlowResult::NotSent(_) => { SendDataToExistingFlowResult::NotSent(_) => {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::no_connection_other(
@ -297,7 +338,12 @@ impl NetworkManager {
// First try to send data to the last flow we've seen this peer on // First try to send data to the last flow we've seen this peer on
let data = if let Some(flow) = seq_target_node_ref.last_flow() { let data = if let Some(flow) = seq_target_node_ref.last_flow() {
let net = self.net(); let net = self.net();
match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { match pin_future!(debug_duration(
|| { net.send_data_to_existing_flow(flow, data) },
Some(1_000_000)
))
.await?
{
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now()); self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
@ -321,8 +367,15 @@ impl NetworkManager {
data data
}; };
let connection_initial_timeout_us = self
.config()
.with(|c| c.network.connection_initial_timeout_ms as u64 * 1000);
let unique_flow = network_result_try!( let unique_flow = network_result_try!(
pin_future!(self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data)) pin_future!(debug_duration(
|| { self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data) },
Some(connection_initial_timeout_us * 2)
))
.await? .await?
); );
Ok(NetworkResult::value(unique_flow)) Ok(NetworkResult::value(unique_flow))
@ -339,7 +392,12 @@ impl NetworkManager {
// First try to send data to the last flow we've seen this peer on // First try to send data to the last flow we've seen this peer on
let data = if let Some(flow) = target_node_ref.last_flow() { let data = if let Some(flow) = target_node_ref.last_flow() {
let net = self.net(); let net = self.net();
match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { match pin_future!(debug_duration(
|| { net.send_data_to_existing_flow(flow, data) },
Some(1_000_000)
))
.await?
{
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now()); self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
@ -363,8 +421,15 @@ impl NetworkManager {
data data
}; };
let hole_punch_receipt_time_us = self
.config()
.with(|c| c.network.hole_punch_receipt_time_ms as u64 * 1000);
let unique_flow = network_result_try!( let unique_flow = network_result_try!(
pin_future!(self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data)) pin_future!(debug_duration(
|| { self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data) },
Some(hole_punch_receipt_time_us * 2)
))
.await? .await?
); );
@ -391,7 +456,12 @@ impl NetworkManager {
); );
let net = self.net(); let net = self.net();
match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { match pin_future!(debug_duration(
|| { net.send_data_to_existing_flow(flow, data) },
Some(1_000_000)
))
.await?
{
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
self.set_last_flow(node_ref.unfiltered(), flow, Timestamp::now()); self.set_last_flow(node_ref.unfiltered(), flow, Timestamp::now());

View File

@ -52,8 +52,16 @@ impl WebsocketNetworkConnection {
instrument(level = "trace", err, skip(self)) instrument(level = "trace", err, skip(self))
)] )]
pub async fn close(&self) -> io::Result<NetworkResult<()>> { pub async fn close(&self) -> io::Result<NetworkResult<()>> {
let timeout_ms = self
.registry
.config()
.with(|c| c.network.connection_initial_timeout_ms);
#[allow(unused_variables)] #[allow(unused_variables)]
let x = self.inner.ws_meta.close().await.map_err(ws_err_to_io_error); let x = match timeout(timeout_ms, self.inner.ws_meta.close()).await {
Ok(v) => v.map_err(ws_err_to_io_error),
Err(_) => return Ok(NetworkResult::timeout()),
};
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
veilid_log!(self debug "close result: {:?}", x); veilid_log!(self debug "close result: {:?}", x);
Ok(NetworkResult::value(())) Ok(NetworkResult::value(()))

View File

@ -47,6 +47,7 @@ veilid_tools_android_tests = ["dep:paranoid-android"]
veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"] veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"]
tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"] tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"]
debug-locks = [] debug-locks = []
debug-duration-timeout = []
virtual-network = [] virtual-network = []
virtual-network-server = [ virtual-network-server = [

View File

@ -522,13 +522,33 @@ pub fn is_debug_backtrace_enabled() -> bool {
} }
#[track_caller] #[track_caller]
pub fn debug_duration<R, F: Future<Output = R>, T: FnOnce() -> F>(f: T) -> impl Future<Output = R> { pub fn debug_duration<R, F: Future<Output = R>, T: FnOnce() -> F>(
let location = std::panic::Location::caller(); f: T,
opt_timeout_us: Option<u64>,
) -> impl Future<Output = R> {
let location = core::panic::Location::caller();
async move { async move {
let t1 = get_timestamp(); let t1 = get_timestamp();
let out = f().await; let out = f().await;
let t2 = get_timestamp(); let t2 = get_timestamp();
debug!("duration@{}: {}", location, display_duration(t2 - t1)); let duration_us = t2 - t1;
if let Some(timeout_us) = opt_timeout_us {
if duration_us > timeout_us {
#[cfg(not(feature = "debug-duration-timeout"))]
debug!(
"Excessive duration: {}\n{:?}",
display_duration(duration_us),
backtrace::Backtrace::new()
);
#[cfg(feature = "debug-duration-timeout")]
panic!(format!(
"Duration panic timeout exceeded: {}",
display_duration(duration_us)
));
}
} else {
debug!("Duration: {} = {}", location, display_duration(duration_us),);
}
out out
} }
} }