diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 71eef5ab..304eaaf8 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -8,7 +8,7 @@ impl_veilid_log_facility!("net"); const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10); const PROTECTED_CONNECTION_DROP_COUNT: usize = 3; -const NEW_CONNECTION_RETRY_COUNT: usize = 1; +const NEW_CONNECTION_RETRY_COUNT: usize = 0; const NEW_CONNECTION_RETRY_DELAY_MS: u32 = 500; /////////////////////////////////////////////////////////// @@ -415,7 +415,18 @@ impl ConnectionManager { let best_port = preferred_local_address.map(|pla| pla.port()); // Async lock on the remote address for atomicity per remote - let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr).await; + // Use the initial timeout here as the 'close' timeout as well + let Ok(_lock_guard) = timeout( + self.arc.connection_initial_timeout_ms, + self.arc.address_lock_table.lock_tag(remote_addr), + ) + .await + else { + veilid_log!(self debug "== get_or_create_connection: connection busy, not connecting to dial_info={:?}", dial_info); + return Ok(NetworkResult::no_connection_other( + "connection endpoint busy", + )); + }; veilid_log!(self trace "== get_or_create_connection dial_info={:?}", dial_info); @@ -477,8 +488,9 @@ impl ConnectionManager { veilid_log!(self debug "get_or_create_connection retries left: {}", retry_count); retry_count -= 1; + // XXX: This should not be necessary // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of - preferred_local_address = None; + // preferred_local_address = None; sleep(NEW_CONNECTION_RETRY_DELAY_MS).await; }); diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index c3de6093..13ed2a9c 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -636,6 +636,10 @@ impl Network { ) -> EyreResult> { let _guard = self.startup_lock.enter()?; + let connection_initial_timeout_us = self + .config() + .with(|c| c.network.connection_initial_timeout_ms as u64 * 1000); + self.record_dial_info_failure( dial_info.clone(), async move { @@ -652,24 +656,33 @@ impl Network { )); } }; - let flow = network_result_try!(ph - .send_message(data, peer_socket_addr) - .await - .wrap_err("failed to send data to dial info")?); + let flow = network_result_try!(debug_duration( + || { ph.send_message(data, peer_socket_addr) }, + Some(connection_initial_timeout_us * 2) + ) + .await + .wrap_err("failed to send data to dial info")?); unique_flow = UniqueFlow { flow, connection_id: None, }; } else { // Handle connection-oriented protocols + let connmgr = self.network_manager().connection_manager(); let conn = network_result_try!( - self.network_manager() - .connection_manager() - .get_or_create_connection(dial_info.clone()) - .await? + debug_duration( + || { connmgr.get_or_create_connection(dial_info.clone()) }, + Some(connection_initial_timeout_us * 2) + ) + .await? ); - if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { + if let ConnectionHandleSendResult::NotSent(_) = debug_duration( + || conn.send_async(data), + Some(connection_initial_timeout_us * 2), + ) + .await + { return Ok(NetworkResult::NoConnection(io::Error::new( io::ErrorKind::ConnectionReset, "failed to send", diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 7253d45a..b3c2c58f 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -6,7 +6,7 @@ use async_tungstenite::tungstenite::handshake::server::{ Callback, ErrorResponse, Request, Response, }; use async_tungstenite::tungstenite::http::StatusCode; -use async_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message}; +use async_tungstenite::tungstenite::protocol::Message; use async_tungstenite::tungstenite::Error; use async_tungstenite::{accept_hdr_async, client_async, WebSocketStream}; use futures_util::{AsyncRead, AsyncWrite, SinkExt}; @@ -98,45 +98,27 @@ where #[instrument(level = "trace", target = "protocol", err, skip_all)] pub async fn close(&self) -> io::Result> { + let timeout_ms = self + .registry + .config() + .with(|c| c.network.connection_initial_timeout_ms); + // Make an attempt to close the stream normally let mut stream = self.stream.clone(); - let out = match stream - .send(Message::Close(Some(CloseFrame { - code: CloseCode::Normal, - reason: "".into(), - }))) - .await - { - Ok(v) => NetworkResult::value(v), - Err(e) => err_to_network_result(e), - }; // This close does not do a TCP shutdown so it is safe and will not cause TIME_WAIT - let _ = stream.close().await; - - Ok(out) - - // Drive connection to close - /* - let cur_ts = get_timestamp(); - loop { - match stream.flush().await { - Ok(()) => {} - Err(Error::Io(ioerr)) => { - break Err(ioerr).into_network_result(); - } - Err(Error::ConnectionClosed) => { - break Ok(NetworkResult::value(())); - } - Err(e) => { - break Err(to_io_error_other(e)); - } + match timeout(timeout_ms, stream.close()).await { + Ok(Ok(())) => {} + Ok(Err(e)) => { + return Ok(err_to_network_result(e)); } - if get_timestamp().saturating_sub(cur_ts) >= MAX_CONNECTION_CLOSE_WAIT_US { - return Ok(NetworkResult::Timeout); + Err(_) => { + // Timed out + return Ok(NetworkResult::timeout()); } - } - */ + }; + + Ok(NetworkResult::value(())) } #[instrument(level = "trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))] diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 51d179b6..b3adddc5 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -152,7 +152,7 @@ impl NetworkManager { // If a node is unreachable it may still have an existing inbound connection // Try that, but don't cache anything network_result_try!( - pin_future_closure!(self.send_data_ncm_existing(target_node_ref, data)).await? + pin_future_closure!(self.send_data_unreachable(target_node_ref, data)).await? ) } Some(NodeContactMethod { @@ -239,6 +239,42 @@ impl NetworkManager { })) } + /// Send data to unreachable node + #[instrument(level = "trace", target = "net", skip_all, err)] + async fn send_data_unreachable( + &self, + target_node_ref: FilteredNodeRef, + data: Vec, + ) -> EyreResult> { + // First try to send data to the last connection we've seen this peer on + let Some(flow) = target_node_ref.last_flow() else { + return Ok(NetworkResult::no_connection_other(format!( + "node was unreachable: {}", + target_node_ref + ))); + }; + + let net = self.net(); + let unique_flow = match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { + SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, + SendDataToExistingFlowResult::NotSent(_) => { + return Ok(NetworkResult::no_connection_other( + "failed to send to existing flow", + )); + } + }; + + // Update timestamp for this last connection since we just sent to it + self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now()); + + Ok(NetworkResult::value(unique_flow)) + } + /// Send data using NodeContactMethod::Existing #[instrument(level = "trace", target = "net", skip_all, err)] async fn send_data_ncm_existing( @@ -255,7 +291,12 @@ impl NetworkManager { }; let net = self.net(); - let unique_flow = match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { + let unique_flow = match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, SendDataToExistingFlowResult::NotSent(_) => { return Ok(NetworkResult::no_connection_other( @@ -297,7 +338,12 @@ impl NetworkManager { // First try to send data to the last flow we've seen this peer on let data = if let Some(flow) = seq_target_node_ref.last_flow() { let net = self.net(); - match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { + match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now()); @@ -321,9 +367,16 @@ impl NetworkManager { data }; + let connection_initial_timeout_us = self + .config() + .with(|c| c.network.connection_initial_timeout_ms as u64 * 1000); + let unique_flow = network_result_try!( - pin_future!(self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data)) - .await? + pin_future!(debug_duration( + || { self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data) }, + Some(connection_initial_timeout_us * 2) + )) + .await? ); Ok(NetworkResult::value(unique_flow)) } @@ -339,7 +392,12 @@ impl NetworkManager { // First try to send data to the last flow we've seen this peer on let data = if let Some(flow) = target_node_ref.last_flow() { let net = self.net(); - match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { + match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now()); @@ -363,9 +421,16 @@ impl NetworkManager { data }; + let hole_punch_receipt_time_us = self + .config() + .with(|c| c.network.hole_punch_receipt_time_ms as u64 * 1000); + let unique_flow = network_result_try!( - pin_future!(self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data)) - .await? + pin_future!(debug_duration( + || { self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data) }, + Some(hole_punch_receipt_time_us * 2) + )) + .await? ); Ok(NetworkResult::value(unique_flow)) @@ -391,7 +456,12 @@ impl NetworkManager { ); let net = self.net(); - match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { + match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it self.set_last_flow(node_ref.unfiltered(), flow, Timestamp::now()); diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index 1af70872..28d22e2b 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -52,8 +52,16 @@ impl WebsocketNetworkConnection { instrument(level = "trace", err, skip(self)) )] pub async fn close(&self) -> io::Result> { + let timeout_ms = self + .registry + .config() + .with(|c| c.network.connection_initial_timeout_ms); + #[allow(unused_variables)] - let x = self.inner.ws_meta.close().await.map_err(ws_err_to_io_error); + let x = match timeout(timeout_ms, self.inner.ws_meta.close()).await { + Ok(v) => v.map_err(ws_err_to_io_error), + Err(_) => return Ok(NetworkResult::timeout()), + }; #[cfg(feature = "verbose-tracing")] veilid_log!(self debug "close result: {:?}", x); Ok(NetworkResult::value(())) diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index 22d6283b..1da65745 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -47,6 +47,7 @@ veilid_tools_android_tests = ["dep:paranoid-android"] veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"] tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"] debug-locks = [] +debug-duration-timeout = [] virtual-network = [] virtual-network-server = [ diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index eec24484..05289a3d 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -522,13 +522,33 @@ pub fn is_debug_backtrace_enabled() -> bool { } #[track_caller] -pub fn debug_duration, T: FnOnce() -> F>(f: T) -> impl Future { - let location = std::panic::Location::caller(); +pub fn debug_duration, T: FnOnce() -> F>( + f: T, + opt_timeout_us: Option, +) -> impl Future { + let location = core::panic::Location::caller(); async move { let t1 = get_timestamp(); let out = f().await; let t2 = get_timestamp(); - debug!("duration@{}: {}", location, display_duration(t2 - t1)); + let duration_us = t2 - t1; + if let Some(timeout_us) = opt_timeout_us { + if duration_us > timeout_us { + #[cfg(not(feature = "debug-duration-timeout"))] + debug!( + "Excessive duration: {}\n{:?}", + display_duration(duration_us), + backtrace::Backtrace::new() + ); + #[cfg(feature = "debug-duration-timeout")] + panic!(format!( + "Duration panic timeout exceeded: {}", + display_duration(duration_us) + )); + } + } else { + debug!("Duration: {} = {}", location, display_duration(duration_us),); + } out } }