From 9a3cab071afbfee8ad33ba6711b8f03a1a9e3bdb Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 3 Mar 2025 16:06:56 -0500 Subject: [PATCH 1/4] [ci skip] debugging --- .../src/network_manager/connection_manager.rs | 18 +++- veilid-core/src/network_manager/native/mod.rs | 31 +++++-- .../src/network_manager/native/protocol/ws.rs | 50 ++++------- veilid-core/src/network_manager/send_data.rs | 88 +++++++++++++++++-- .../src/network_manager/wasm/protocol/ws.rs | 10 ++- veilid-tools/Cargo.toml | 1 + veilid-tools/src/tools.rs | 26 +++++- 7 files changed, 165 insertions(+), 59 deletions(-) diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 71eef5ab..304eaaf8 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -8,7 +8,7 @@ impl_veilid_log_facility!("net"); const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10); const PROTECTED_CONNECTION_DROP_COUNT: usize = 3; -const NEW_CONNECTION_RETRY_COUNT: usize = 1; +const NEW_CONNECTION_RETRY_COUNT: usize = 0; const NEW_CONNECTION_RETRY_DELAY_MS: u32 = 500; /////////////////////////////////////////////////////////// @@ -415,7 +415,18 @@ impl ConnectionManager { let best_port = preferred_local_address.map(|pla| pla.port()); // Async lock on the remote address for atomicity per remote - let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr).await; + // Use the initial timeout here as the 'close' timeout as well + let Ok(_lock_guard) = timeout( + self.arc.connection_initial_timeout_ms, + self.arc.address_lock_table.lock_tag(remote_addr), + ) + .await + else { + veilid_log!(self debug "== get_or_create_connection: connection busy, not connecting to dial_info={:?}", dial_info); + return Ok(NetworkResult::no_connection_other( + "connection endpoint busy", + )); + }; veilid_log!(self trace "== get_or_create_connection dial_info={:?}", dial_info); @@ -477,8 +488,9 @@ impl ConnectionManager { veilid_log!(self debug "get_or_create_connection retries left: {}", retry_count); retry_count -= 1; + // XXX: This should not be necessary // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of - preferred_local_address = None; + // preferred_local_address = None; sleep(NEW_CONNECTION_RETRY_DELAY_MS).await; }); diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index c3de6093..13ed2a9c 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -636,6 +636,10 @@ impl Network { ) -> EyreResult> { let _guard = self.startup_lock.enter()?; + let connection_initial_timeout_us = self + .config() + .with(|c| c.network.connection_initial_timeout_ms as u64 * 1000); + self.record_dial_info_failure( dial_info.clone(), async move { @@ -652,24 +656,33 @@ impl Network { )); } }; - let flow = network_result_try!(ph - .send_message(data, peer_socket_addr) - .await - .wrap_err("failed to send data to dial info")?); + let flow = network_result_try!(debug_duration( + || { ph.send_message(data, peer_socket_addr) }, + Some(connection_initial_timeout_us * 2) + ) + .await + .wrap_err("failed to send data to dial info")?); unique_flow = UniqueFlow { flow, connection_id: None, }; } else { // Handle connection-oriented protocols + let connmgr = self.network_manager().connection_manager(); let conn = network_result_try!( - self.network_manager() - .connection_manager() - .get_or_create_connection(dial_info.clone()) - .await? + debug_duration( + || { connmgr.get_or_create_connection(dial_info.clone()) }, + Some(connection_initial_timeout_us * 2) + ) + .await? ); - if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { + if let ConnectionHandleSendResult::NotSent(_) = debug_duration( + || conn.send_async(data), + Some(connection_initial_timeout_us * 2), + ) + .await + { return Ok(NetworkResult::NoConnection(io::Error::new( io::ErrorKind::ConnectionReset, "failed to send", diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 7253d45a..b3c2c58f 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -6,7 +6,7 @@ use async_tungstenite::tungstenite::handshake::server::{ Callback, ErrorResponse, Request, Response, }; use async_tungstenite::tungstenite::http::StatusCode; -use async_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message}; +use async_tungstenite::tungstenite::protocol::Message; use async_tungstenite::tungstenite::Error; use async_tungstenite::{accept_hdr_async, client_async, WebSocketStream}; use futures_util::{AsyncRead, AsyncWrite, SinkExt}; @@ -98,45 +98,27 @@ where #[instrument(level = "trace", target = "protocol", err, skip_all)] pub async fn close(&self) -> io::Result> { + let timeout_ms = self + .registry + .config() + .with(|c| c.network.connection_initial_timeout_ms); + // Make an attempt to close the stream normally let mut stream = self.stream.clone(); - let out = match stream - .send(Message::Close(Some(CloseFrame { - code: CloseCode::Normal, - reason: "".into(), - }))) - .await - { - Ok(v) => NetworkResult::value(v), - Err(e) => err_to_network_result(e), - }; // This close does not do a TCP shutdown so it is safe and will not cause TIME_WAIT - let _ = stream.close().await; - - Ok(out) - - // Drive connection to close - /* - let cur_ts = get_timestamp(); - loop { - match stream.flush().await { - Ok(()) => {} - Err(Error::Io(ioerr)) => { - break Err(ioerr).into_network_result(); - } - Err(Error::ConnectionClosed) => { - break Ok(NetworkResult::value(())); - } - Err(e) => { - break Err(to_io_error_other(e)); - } + match timeout(timeout_ms, stream.close()).await { + Ok(Ok(())) => {} + Ok(Err(e)) => { + return Ok(err_to_network_result(e)); } - if get_timestamp().saturating_sub(cur_ts) >= MAX_CONNECTION_CLOSE_WAIT_US { - return Ok(NetworkResult::Timeout); + Err(_) => { + // Timed out + return Ok(NetworkResult::timeout()); } - } - */ + }; + + Ok(NetworkResult::value(())) } #[instrument(level = "trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))] diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 51d179b6..b3adddc5 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -152,7 +152,7 @@ impl NetworkManager { // If a node is unreachable it may still have an existing inbound connection // Try that, but don't cache anything network_result_try!( - pin_future_closure!(self.send_data_ncm_existing(target_node_ref, data)).await? + pin_future_closure!(self.send_data_unreachable(target_node_ref, data)).await? ) } Some(NodeContactMethod { @@ -239,6 +239,42 @@ impl NetworkManager { })) } + /// Send data to unreachable node + #[instrument(level = "trace", target = "net", skip_all, err)] + async fn send_data_unreachable( + &self, + target_node_ref: FilteredNodeRef, + data: Vec, + ) -> EyreResult> { + // First try to send data to the last connection we've seen this peer on + let Some(flow) = target_node_ref.last_flow() else { + return Ok(NetworkResult::no_connection_other(format!( + "node was unreachable: {}", + target_node_ref + ))); + }; + + let net = self.net(); + let unique_flow = match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { + SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, + SendDataToExistingFlowResult::NotSent(_) => { + return Ok(NetworkResult::no_connection_other( + "failed to send to existing flow", + )); + } + }; + + // Update timestamp for this last connection since we just sent to it + self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now()); + + Ok(NetworkResult::value(unique_flow)) + } + /// Send data using NodeContactMethod::Existing #[instrument(level = "trace", target = "net", skip_all, err)] async fn send_data_ncm_existing( @@ -255,7 +291,12 @@ impl NetworkManager { }; let net = self.net(); - let unique_flow = match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { + let unique_flow = match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, SendDataToExistingFlowResult::NotSent(_) => { return Ok(NetworkResult::no_connection_other( @@ -297,7 +338,12 @@ impl NetworkManager { // First try to send data to the last flow we've seen this peer on let data = if let Some(flow) = seq_target_node_ref.last_flow() { let net = self.net(); - match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { + match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now()); @@ -321,9 +367,16 @@ impl NetworkManager { data }; + let connection_initial_timeout_us = self + .config() + .with(|c| c.network.connection_initial_timeout_ms as u64 * 1000); + let unique_flow = network_result_try!( - pin_future!(self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data)) - .await? + pin_future!(debug_duration( + || { self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data) }, + Some(connection_initial_timeout_us * 2) + )) + .await? ); Ok(NetworkResult::value(unique_flow)) } @@ -339,7 +392,12 @@ impl NetworkManager { // First try to send data to the last flow we've seen this peer on let data = if let Some(flow) = target_node_ref.last_flow() { let net = self.net(); - match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { + match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now()); @@ -363,9 +421,16 @@ impl NetworkManager { data }; + let hole_punch_receipt_time_us = self + .config() + .with(|c| c.network.hole_punch_receipt_time_ms as u64 * 1000); + let unique_flow = network_result_try!( - pin_future!(self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data)) - .await? + pin_future!(debug_duration( + || { self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data) }, + Some(hole_punch_receipt_time_us * 2) + )) + .await? ); Ok(NetworkResult::value(unique_flow)) @@ -391,7 +456,12 @@ impl NetworkManager { ); let net = self.net(); - match pin_future!(net.send_data_to_existing_flow(flow, data)).await? { + match pin_future!(debug_duration( + || { net.send_data_to_existing_flow(flow, data) }, + Some(1_000_000) + )) + .await? + { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it self.set_last_flow(node_ref.unfiltered(), flow, Timestamp::now()); diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index 1af70872..28d22e2b 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -52,8 +52,16 @@ impl WebsocketNetworkConnection { instrument(level = "trace", err, skip(self)) )] pub async fn close(&self) -> io::Result> { + let timeout_ms = self + .registry + .config() + .with(|c| c.network.connection_initial_timeout_ms); + #[allow(unused_variables)] - let x = self.inner.ws_meta.close().await.map_err(ws_err_to_io_error); + let x = match timeout(timeout_ms, self.inner.ws_meta.close()).await { + Ok(v) => v.map_err(ws_err_to_io_error), + Err(_) => return Ok(NetworkResult::timeout()), + }; #[cfg(feature = "verbose-tracing")] veilid_log!(self debug "close result: {:?}", x); Ok(NetworkResult::value(())) diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index 22d6283b..1da65745 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -47,6 +47,7 @@ veilid_tools_android_tests = ["dep:paranoid-android"] veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"] tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"] debug-locks = [] +debug-duration-timeout = [] virtual-network = [] virtual-network-server = [ diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index eec24484..05289a3d 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -522,13 +522,33 @@ pub fn is_debug_backtrace_enabled() -> bool { } #[track_caller] -pub fn debug_duration, T: FnOnce() -> F>(f: T) -> impl Future { - let location = std::panic::Location::caller(); +pub fn debug_duration, T: FnOnce() -> F>( + f: T, + opt_timeout_us: Option, +) -> impl Future { + let location = core::panic::Location::caller(); async move { let t1 = get_timestamp(); let out = f().await; let t2 = get_timestamp(); - debug!("duration@{}: {}", location, display_duration(t2 - t1)); + let duration_us = t2 - t1; + if let Some(timeout_us) = opt_timeout_us { + if duration_us > timeout_us { + #[cfg(not(feature = "debug-duration-timeout"))] + debug!( + "Excessive duration: {}\n{:?}", + display_duration(duration_us), + backtrace::Backtrace::new() + ); + #[cfg(feature = "debug-duration-timeout")] + panic!(format!( + "Duration panic timeout exceeded: {}", + display_duration(duration_us) + )); + } + } else { + debug!("Duration: {} = {}", location, display_duration(duration_us),); + } out } } From 500547cfa81d33254e0d6b4d3a86164df055b6bc Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 3 Mar 2025 17:03:54 -0500 Subject: [PATCH 2/4] [ci skip] debugging --- .../node_ref/filtered_node_ref.rs | 18 ++++++++++++++++++ veilid-core/src/routing_table/node_ref/mod.rs | 18 ++++++++++++++++++ .../routing_table/node_ref/node_ref_lock.rs | 15 +++++++++++++++ .../node_ref/node_ref_lock_mut.rs | 16 ++++++++++++++++ .../src/routing_table/node_ref/traits.rs | 17 +++++++++++++++-- 5 files changed, 82 insertions(+), 2 deletions(-) diff --git a/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs b/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs index e22b039b..16faf44d 100644 --- a/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs +++ b/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs @@ -119,6 +119,24 @@ impl NodeRefOperateTrait for FilteredNodeRef { let inner = &mut *routing_table.inner.write(); self.entry.with_mut(inner, f) } + + fn with_inner(&self, f: F) -> T + where + F: FnOnce(&RoutingTableInner) -> T, + { + let routing_table = self.registry.routing_table(); + let inner = &*routing_table.inner.read(); + f(inner) + } + + fn with_inner_mut(&self, f: F) -> T + where + F: FnOnce(&mut RoutingTableInner) -> T, + { + let routing_table = self.registry.routing_table(); + let inner = &mut *routing_table.inner.write(); + f(inner) + } } impl NodeRefCommonTrait for FilteredNodeRef {} diff --git a/veilid-core/src/routing_table/node_ref/mod.rs b/veilid-core/src/routing_table/node_ref/mod.rs index 4a997892..7a3a41cc 100644 --- a/veilid-core/src/routing_table/node_ref/mod.rs +++ b/veilid-core/src/routing_table/node_ref/mod.rs @@ -139,6 +139,24 @@ impl NodeRefOperateTrait for NodeRef { let inner = &mut *routing_table.inner.write(); self.entry.with_mut(inner, f) } + + fn with_inner(&self, f: F) -> T + where + F: FnOnce(&RoutingTableInner) -> T, + { + let routing_table = self.routing_table(); + let inner = &*routing_table.inner.read(); + f(inner) + } + + fn with_inner_mut(&self, f: F) -> T + where + F: FnOnce(&mut RoutingTableInner) -> T, + { + let routing_table = self.routing_table(); + let inner = &mut *routing_table.inner.write(); + f(inner) + } } impl NodeRefCommonTrait for NodeRef {} diff --git a/veilid-core/src/routing_table/node_ref/node_ref_lock.rs b/veilid-core/src/routing_table/node_ref/node_ref_lock.rs index b8e33f38..d07a8abf 100644 --- a/veilid-core/src/routing_table/node_ref/node_ref_lock.rs +++ b/veilid-core/src/routing_table/node_ref/node_ref_lock.rs @@ -90,6 +90,21 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp { panic!("need to locked_mut() for this operation") } + + fn with_inner(&self, f: F) -> T + where + F: FnOnce(&RoutingTableInner) -> T, + { + let inner = &*self.inner.lock(); + f(inner) + } + + fn with_inner_mut(&self, _f: F) -> T + where + F: FnOnce(&mut RoutingTableInner) -> T, + { + panic!("need to locked_mut() for this operation") + } } impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone> diff --git a/veilid-core/src/routing_table/node_ref/node_ref_lock_mut.rs b/veilid-core/src/routing_table/node_ref/node_ref_lock_mut.rs index c132e85c..8b0acb9b 100644 --- a/veilid-core/src/routing_table/node_ref/node_ref_lock_mut.rs +++ b/veilid-core/src/routing_table/node_ref/node_ref_lock_mut.rs @@ -92,6 +92,22 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp let inner = &mut *self.inner.lock(); self.nr.entry().with_mut(inner, f) } + + fn with_inner(&self, f: F) -> T + where + F: FnOnce(&RoutingTableInner) -> T, + { + let inner = &*self.inner.lock(); + f(inner) + } + + fn with_inner_mut(&self, f: F) -> T + where + F: FnOnce(&mut RoutingTableInner) -> T, + { + let inner = &mut *self.inner.lock(); + f(inner) + } } impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone> diff --git a/veilid-core/src/routing_table/node_ref/traits.rs b/veilid-core/src/routing_table/node_ref/traits.rs index 02649e1e..4c499afd 100644 --- a/veilid-core/src/routing_table/node_ref/traits.rs +++ b/veilid-core/src/routing_table/node_ref/traits.rs @@ -20,6 +20,13 @@ pub(crate) trait NodeRefOperateTrait { fn operate_mut(&self, f: F) -> T where F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T; + #[expect(dead_code)] + fn with_inner(&self, f: F) -> T + where + F: FnOnce(&RoutingTableInner) -> T; + fn with_inner_mut(&self, f: F) -> T + where + F: FnOnce(&mut RoutingTableInner) -> T; } // Common Operations @@ -115,7 +122,7 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait // } fn relay(&self, routing_domain: RoutingDomain) -> EyreResult> { - self.operate_mut(|rti, e| { + let Some(rpi) = self.operate(|rti, e| { let Some(sni) = e.signed_node_info(routing_domain) else { return Ok(None); }; @@ -127,8 +134,14 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait if rti.routing_table().matches_own_node_id(rpi.node_ids()) { bail!("Can't relay though ourselves"); } + Ok(Some(rpi)) + })? + else { + return Ok(None); + }; - // Register relay node and return noderef + // Register relay node and return noderef + self.with_inner_mut(|rti| { let nr = rti.register_node_with_peer_info(rpi, false)?; Ok(Some(nr)) }) From 909fea721afa0bf22addc01756db3f814e3d6007 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 3 Mar 2025 21:02:20 -0500 Subject: [PATCH 3/4] [ci skip] connection table debugging --- .../src/network_manager/connection_manager.rs | 13 ++++----- veilid-core/src/network_manager/native/mod.rs | 27 +++++-------------- .../src/network_manager/network_connection.rs | 13 ++++----- veilid-tools/src/async_tag_lock.rs | 9 ++++--- 4 files changed, 25 insertions(+), 37 deletions(-) diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 304eaaf8..8cb1bdb7 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -415,7 +415,8 @@ impl ConnectionManager { let best_port = preferred_local_address.map(|pla| pla.port()); // Async lock on the remote address for atomicity per remote - // Use the initial timeout here as the 'close' timeout as well + // Use the initial connection timeout here because multiple calls to get_or_create_connection + // can be performed simultaneously and we want to wait for the first one to succeed or not let Ok(_lock_guard) = timeout( self.arc.connection_initial_timeout_ms, self.arc.address_lock_table.lock_tag(remote_addr), @@ -461,6 +462,7 @@ impl ConnectionManager { let network_manager = self.network_manager(); let prot_conn = network_result_try!(loop { + veilid_log!(self debug "get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info); let result_net_res = ProtocolNetworkConnection::connect( self.registry(), preferred_local_address, @@ -485,11 +487,10 @@ impl ConnectionManager { } } }; - veilid_log!(self debug "get_or_create_connection retries left: {}", retry_count); retry_count -= 1; - // XXX: This should not be necessary - // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of + // // XXX: This should not be necessary + // // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of // preferred_local_address = None; sleep(NEW_CONNECTION_RETRY_DELAY_MS).await; }); @@ -610,7 +611,7 @@ impl ConnectionManager { // Callback from network connection receive loop when it exits // cleans up the entry in the connection table - pub(super) async fn report_connection_finished(&self, connection_id: NetworkConnectionId) { + pub(super) fn report_connection_finished(&self, connection_id: NetworkConnectionId) { // Get channel sender let sender = { let mut inner = self.arc.inner.lock(); @@ -680,7 +681,7 @@ impl ConnectionManager { } } } - let _ = sender.send_async(ConnectionManagerEvent::Dead(conn)).await; + let _ = sender.send(ConnectionManagerEvent::Dead(conn)); } } diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 13ed2a9c..2facc059 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -636,10 +636,6 @@ impl Network { ) -> EyreResult> { let _guard = self.startup_lock.enter()?; - let connection_initial_timeout_us = self - .config() - .with(|c| c.network.connection_initial_timeout_ms as u64 * 1000); - self.record_dial_info_failure( dial_info.clone(), async move { @@ -656,12 +652,10 @@ impl Network { )); } }; - let flow = network_result_try!(debug_duration( - || { ph.send_message(data, peer_socket_addr) }, - Some(connection_initial_timeout_us * 2) - ) - .await - .wrap_err("failed to send data to dial info")?); + let flow = network_result_try!(ph + .send_message(data, peer_socket_addr) + .await + .wrap_err("failed to send data to dial info")?); unique_flow = UniqueFlow { flow, connection_id: None, @@ -670,19 +664,10 @@ impl Network { // Handle connection-oriented protocols let connmgr = self.network_manager().connection_manager(); let conn = network_result_try!( - debug_duration( - || { connmgr.get_or_create_connection(dial_info.clone()) }, - Some(connection_initial_timeout_us * 2) - ) - .await? + connmgr.get_or_create_connection(dial_info.clone()).await? ); - if let ConnectionHandleSendResult::NotSent(_) = debug_duration( - || conn.send_async(data), - Some(connection_initial_timeout_us * 2), - ) - .await - { + if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { return Ok(NetworkResult::NoConnection(io::Error::new( io::ErrorKind::ConnectionReset, "failed to send", diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index d52a43a2..552571f7 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -480,20 +480,21 @@ impl NetworkConnection { } } - veilid_log!(registry trace - "Connection loop finished flow={:?}", - flow - ); // Let the connection manager know the receive loop exited connection_manager - .report_connection_finished(connection_id) - .await; + .report_connection_finished(connection_id); // Close the low level socket if let Err(e) = protocol_connection.close().await { veilid_log!(registry debug "Protocol connection close error: {}", e); } + + veilid_log!(registry trace + "Connection loop exited flow={:?}", + flow + ); + }.in_current_span()) } diff --git a/veilid-tools/src/async_tag_lock.rs b/veilid-tools/src/async_tag_lock.rs index 0b8306ea..ff892c95 100644 --- a/veilid-tools/src/async_tag_lock.rs +++ b/veilid-tools/src/async_tag_lock.rs @@ -10,7 +10,7 @@ where { table: AsyncTagLockTable, tag: T, - _guard: AsyncMutexGuardArc<()>, + guard: Option>, } impl AsyncTagLockGuard @@ -21,7 +21,7 @@ where Self { table, tag, - _guard: guard, + guard: Some(guard), } } } @@ -45,7 +45,8 @@ where if guards == 0 { inner.table.remove(&self.tag).unwrap(); } - // Proceed with releasing _guard, which may cause some concurrent tag lock to acquire + // Proceed with releasing guard, which may cause some concurrent tag lock to acquire + drop(self.guard.take()); } } @@ -153,7 +154,7 @@ where } std::collections::hash_map::Entry::Vacant(v) => { let mutex = Arc::new(AsyncMutex::new(())); - let guard = asyncmutex_try_lock_arc!(mutex)?; + let guard = asyncmutex_try_lock_arc!(mutex).unwrap(); v.insert(AsyncTagLockTableEntry { mutex, guards: 1 }); guard } From 8f521099bd55c7131d39390b601de907900429fd Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 4 Mar 2025 21:56:55 -0500 Subject: [PATCH 4/4] rpc cleanup --- veilid-core/src/logging/facilities.rs | 88 +++++++++++++++++++ .../src/network_manager/connection_manager.rs | 8 +- veilid-core/src/routing_table/bucket_entry.rs | 7 +- veilid-core/src/routing_table/mod.rs | 16 ++-- .../src/routing_table/route_spec_store/mod.rs | 4 +- .../src/routing_table/tasks/bootstrap.rs | 4 +- veilid-core/src/rpc_processor/mod.rs | 29 ++++-- .../src/rpc_processor/operation_waiter.rs | 77 ++++++++-------- veilid-core/src/rpc_processor/rpc_worker.rs | 2 +- veilid-core/src/storage_manager/mod.rs | 2 +- veilid-tools/src/network_result.rs | 82 ----------------- 11 files changed, 174 insertions(+), 145 deletions(-) diff --git a/veilid-core/src/logging/facilities.rs b/veilid-core/src/logging/facilities.rs index c9ed341f..7963175b 100644 --- a/veilid-core/src/logging/facilities.rs +++ b/veilid-core/src/logging/facilities.rs @@ -289,3 +289,91 @@ macro_rules! veilid_log { $($k).+ = $($fields)* )}; } + +#[macro_export] +macro_rules! network_result_value_or_log { + ($self:ident $r:expr => $f:expr) => { + network_result_value_or_log!($self target: self::__VEILID_LOG_FACILITY, $r => [ "" ] $f ) + }; + ($self:ident $r:expr => [ $d:expr ] $f:expr) => { + network_result_value_or_log!($self target: self::__VEILID_LOG_FACILITY, $r => [ $d ] $f ) + }; + ($self:ident target: $target:expr, $r:expr => $f:expr) => { + network_result_value_or_log!($self target: $target, $r => [ "" ] $f ) + }; + ($self:ident target: $target:expr, $r:expr => [ $d:expr ] $f:expr) => { { + let __extra_message = if debug_target_enabled!("network_result") { + $d.to_string() + } else { + "".to_string() + }; + match $r { + NetworkResult::Timeout => { + veilid_log!($self debug target: $target, + "{} at {}@{}:{} in {}{}", + "Timeout", + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::ServiceUnavailable(ref s) => { + veilid_log!($self debug target: $target, + "{}({}) at {}@{}:{} in {}{}", + "ServiceUnavailable", + s, + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::NoConnection(ref e) => { + veilid_log!($self debug target: $target, + "{}({}) at {}@{}:{} in {}{}", + "No connection", + e.to_string(), + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::AlreadyExists(ref e) => { + veilid_log!($self debug target: $target, + "{}({}) at {}@{}:{} in {}{}", + "Already exists", + e.to_string(), + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::InvalidMessage(ref s) => { + veilid_log!($self debug target: $target, + "{}({}) at {}@{}:{} in {}{}", + "Invalid message", + s, + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::Value(v) => v, + } + } }; + +} diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 8cb1bdb7..c63e34be 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -461,8 +461,8 @@ impl ConnectionManager { let mut retry_count = NEW_CONNECTION_RETRY_COUNT; let network_manager = self.network_manager(); - let prot_conn = network_result_try!(loop { - veilid_log!(self debug "get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info); + let nres = loop { + veilid_log!(self trace "== get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info); let result_net_res = ProtocolNetworkConnection::connect( self.registry(), preferred_local_address, @@ -493,6 +493,10 @@ impl ConnectionManager { // // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of // preferred_local_address = None; sleep(NEW_CONNECTION_RETRY_DELAY_MS).await; + }; + + let prot_conn = network_result_value_or_log!(self target:"network_result", nres => [ format!("== get_or_create_connection failed {:?} -> {}", preferred_local_address, dial_info) ] { + network_result_raise!(nres); }); // Add to the connection table diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index c0f81088..d97ffcca 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -1011,7 +1011,12 @@ impl BucketEntryInner { match latest_contact_time { None => { - error!("Peer is reliable, but not seen!"); + // Peer may be appear reliable from a previous attach/detach + // But reliability uses last_seen_ts not the last_outbound_contact_time + // Regardless, if we haven't pinged it, we need to ping it. + // But it it was reliable before, and pings successfully then it can + // stay reliable, so we don't make it unreliable just because we haven't + // contacted it yet during this attachment. true } Some(latest_contact_time) => { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index be252fe0..7fe044bb 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -1041,7 +1041,7 @@ impl RoutingTable { #[instrument(level = "trace", skip(self), err)] pub async fn find_nodes_close_to_node_id( &self, - node_ref: NodeRef, + node_ref: FilteredNodeRef, node_id: TypedKey, capabilities: Vec, ) -> EyreResult>> { @@ -1049,11 +1049,7 @@ impl RoutingTable { let res = network_result_try!( rpc_processor - .rpc_call_find_node( - Destination::direct(node_ref.default_filtered()), - node_id, - capabilities - ) + .rpc_call_find_node(Destination::direct(node_ref), node_id, capabilities) .await? ); @@ -1069,7 +1065,7 @@ impl RoutingTable { pub async fn find_nodes_close_to_self( &self, crypto_kind: CryptoKind, - node_ref: NodeRef, + node_ref: FilteredNodeRef, capabilities: Vec, ) -> EyreResult>> { let self_node_id = self.node_id(crypto_kind); @@ -1083,7 +1079,7 @@ impl RoutingTable { pub async fn find_nodes_close_to_node_ref( &self, crypto_kind: CryptoKind, - node_ref: NodeRef, + node_ref: FilteredNodeRef, capabilities: Vec, ) -> EyreResult>> { let Some(target_node_id) = node_ref.node_ids().get(crypto_kind) else { @@ -1104,7 +1100,7 @@ impl RoutingTable { capabilities: Vec, ) { // Ask node for nodes closest to our own node - let closest_nodes = network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, node_ref.clone(), capabilities.clone())).await { + let closest_nodes = network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, node_ref.sequencing_filtered(Sequencing::PreferOrdered), capabilities.clone())).await { Err(e) => { veilid_log!(self error "find_self failed for {:?}: {:?}", @@ -1120,7 +1116,7 @@ impl RoutingTable { // Ask each node near us to find us as well if wide { for closest_nr in closest_nodes { - network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, closest_nr.clone(), capabilities.clone())).await { + network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, closest_nr.sequencing_filtered(Sequencing::PreferOrdered), capabilities.clone())).await { Err(e) => { veilid_log!(self error "find_self failed for {:?}: {:?}", diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs index 45623c11..ad3b39b2 100644 --- a/veilid-core/src/routing_table/route_spec_store/mod.rs +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -716,7 +716,7 @@ impl RouteSpecStore { }; let Some(rsid) = inner.content.get_id_by_key(&public_key.value) else { - veilid_log!(self debug "route id does not exist: {:?}", public_key.value); + veilid_log!(self debug target: "network_result", "route id does not exist: {:?}", public_key.value); return None; }; let Some(rssd) = inner.content.get_detail(&rsid) else { @@ -753,7 +753,7 @@ impl RouteSpecStore { return None; } Err(e) => { - veilid_log!(self debug "errir verifying signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e); + veilid_log!(self debug "error verifying signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e); return None; } } diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index 8ab20079..31dc5089 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -289,7 +289,7 @@ impl RoutingTable { // Get what contact method would be used for contacting the bootstrap let bsdi = match network_manager - .get_node_contact_method(nr.default_filtered()) + .get_node_contact_method(nr.sequencing_filtered(Sequencing::PreferOrdered)) { Ok(Some(ncm)) if ncm.is_direct() => ncm.direct_dial_info().unwrap(), Ok(v) => { @@ -307,7 +307,7 @@ impl RoutingTable { // Need VALID signed peer info, so ask bootstrap to find_node of itself // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = routing_table.find_nodes_close_to_node_ref(crypto_kind, nr.clone(), vec![]).await; + let _ = routing_table.find_nodes_close_to_node_ref(crypto_kind, nr.sequencing_filtered(Sequencing::PreferOrdered), vec![]).await; // Ensure we got the signed peer info if !nr.signed_node_info_has_valid_signature(routing_domain) { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index b623f5ba..68bab229 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1471,11 +1471,24 @@ impl RPCProcessor { let operation = match self.decode_rpc_operation(&encoded_msg) { Ok(v) => v, Err(e) => { - // Debug on error - veilid_log!(self debug "Dropping routed RPC: {}", e); + match e { + // Invalid messages that should be punished + RPCError::Protocol(_) | RPCError::InvalidFormat(_) => { + veilid_log!(self debug "Invalid routed RPC Operation: {}", e); + + // XXX: Punish routes that send routed undecodable crap + // self.network_manager().address_filter().punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage); + } + // Ignored messages that should be dropped + RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => { + veilid_log!(self trace "Dropping routed RPC Operation: {}", e); + } + // Internal errors that deserve louder logging + RPCError::Unimplemented(_) | RPCError::Internal(_) => { + veilid_log!(self error "Error decoding routed RPC operation: {}", e); + } + }; - // XXX: Punish routes that send routed undecodable crap - // self.network_manager().address_filter().punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage); return Ok(NetworkResult::invalid_message(e)); } }; @@ -1593,16 +1606,16 @@ impl RPCProcessor { if let Err(e) = self.waiting_rpc_table.complete_op_waiter(op_id, msg) { match e { RPCError::Unimplemented(_) | RPCError::Internal(_) => { - veilid_log!(self error "Could not complete rpc operation: id = {}: {}", op_id, e); + veilid_log!(self error "Error in RPC operation: id = {}: {}", op_id, e); } RPCError::InvalidFormat(_) | RPCError::Protocol(_) | RPCError::Network(_) | RPCError::TryAgain(_) => { - veilid_log!(self debug "Could not complete rpc operation: id = {}: {}", op_id, e); + veilid_log!(self debug "Could not complete RPC operation: id = {}: {}", op_id, e); } - RPCError::Ignore(_) => { - veilid_log!(self debug "Answer late: id = {}", op_id); + RPCError::Ignore(e) => { + veilid_log!(self debug "RPC operation ignored: id = {}: {}", op_id, e); } }; // Don't throw an error here because it's okay if the original operation timed out diff --git a/veilid-core/src/rpc_processor/operation_waiter.rs b/veilid-core/src/rpc_processor/operation_waiter.rs index 205fa339..7e4ee05c 100644 --- a/veilid-core/src/rpc_processor/operation_waiter.rs +++ b/veilid-core/src/rpc_processor/operation_waiter.rs @@ -8,7 +8,7 @@ where { waiter: OperationWaiter, op_id: OperationId, - result_receiver: Option>, + result_receiver: flume::Receiver<(Span, T)>, } impl OperationWaitHandle @@ -27,9 +27,7 @@ where C: Unpin + Clone, { fn drop(&mut self) { - if self.result_receiver.is_some() { - self.waiter.cancel_op_waiter(self.op_id); - } + self.waiter.cancel_op_waiter(self.op_id); } } @@ -106,7 +104,7 @@ where OperationWaitHandle { waiter: self.clone(), op_id, - result_receiver: Some(result_receiver), + result_receiver, } } @@ -125,65 +123,69 @@ where /// Get operation context pub fn get_op_context(&self, op_id: OperationId) -> Result { let inner = self.inner.lock(); - let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else { - return Err(RPCError::ignore(format!( - "Missing operation id getting op context: id={}", - op_id - ))); + let res = { + let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else { + return Err(RPCError::ignore(format!( + "Missing operation id getting op context: id={}", + op_id + ))); + }; + Ok(waiting_op.context.clone()) }; - Ok(waiting_op.context.clone()) + drop(inner); + res } /// Remove wait for op #[instrument(level = "trace", target = "rpc", skip_all)] fn cancel_op_waiter(&self, op_id: OperationId) { let mut inner = self.inner.lock(); - inner.waiting_op_table.remove(&op_id); + { + let waiting_op = inner.waiting_op_table.remove(&op_id); + drop(waiting_op); + } + drop(inner); } /// Complete the waiting op #[instrument(level = "trace", target = "rpc", skip_all)] pub fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> { - let waiting_op = { - let mut inner = self.inner.lock(); - inner - .waiting_op_table - .remove(&op_id) - .ok_or_else(RPCError::else_ignore(format!( - "Unmatched operation id: {}", - op_id - )))? + let mut inner = self.inner.lock(); + let res = { + let waiting_op = + inner + .waiting_op_table + .remove(&op_id) + .ok_or_else(RPCError::else_ignore(format!( + "Unmatched operation id: {}", + op_id + )))?; + waiting_op + .result_sender + .send((Span::current(), message)) + .map_err(RPCError::ignore) }; - waiting_op - .result_sender - .send((Span::current(), message)) - .map_err(RPCError::ignore) + drop(inner); + res } /// Wait for operation to complete #[instrument(level = "trace", target = "rpc", skip_all)] pub async fn wait_for_op( &self, - mut handle: OperationWaitHandle, + handle: OperationWaitHandle, timeout_us: TimestampDuration, ) -> Result, RPCError> { let timeout_ms = us_to_ms(timeout_us.as_u64()).map_err(RPCError::internal)?; - // Take the receiver - // After this, we must manually cancel since the cancel on handle drop is disabled - let result_receiver = handle.result_receiver.take().unwrap(); - - let result_fut = result_receiver.recv_async().in_current_span(); + let result_fut = handle.result_receiver.recv_async().in_current_span(); // wait for eventualvalue let start_ts = Timestamp::now(); let res = timeout(timeout_ms, result_fut).await.into_timeout_or(); match res { - TimeoutOr::Timeout => { - self.cancel_op_waiter(handle.op_id); - Ok(TimeoutOr::Timeout) - } + TimeoutOr::Timeout => Ok(TimeoutOr::Timeout), TimeoutOr::Value(Ok((_span_id, ret))) => { let end_ts = Timestamp::now(); @@ -192,7 +194,10 @@ where Ok(TimeoutOr::Value((ret, end_ts.saturating_sub(start_ts)))) } - TimeoutOr::Value(Err(e)) => Err(RPCError::ignore(e)), + TimeoutOr::Value(Err(e)) => { + // + Err(RPCError::ignore(e)) + } } } } diff --git a/veilid-core/src/rpc_processor/rpc_worker.rs b/veilid-core/src/rpc_processor/rpc_worker.rs index 8141f965..194a0f2f 100644 --- a/veilid-core/src/rpc_processor/rpc_worker.rs +++ b/veilid-core/src/rpc_processor/rpc_worker.rs @@ -70,7 +70,7 @@ impl RPCProcessor { match request.kind { // Process RPC Message RPCWorkerRequestKind::Message { message_encoded } => { - network_result_value_or_log!(self match self + network_result_value_or_log!(self target:"network_result", match self .process_rpc_message(message_encoded).instrument(rpc_request_span) .await { diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 73ea7ebf..722c41b9 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1120,7 +1120,7 @@ impl StorageManager { let dest = rpc_processor .resolve_target_to_destination( vc.target, - SafetySelection::Unsafe(Sequencing::NoPreference), + SafetySelection::Unsafe(Sequencing::PreferOrdered), ) .await .map_err(VeilidAPIError::from)?; diff --git a/veilid-tools/src/network_result.rs b/veilid-tools/src/network_result.rs index 88201b60..89ccf1f8 100644 --- a/veilid-tools/src/network_result.rs +++ b/veilid-tools/src/network_result.rs @@ -278,85 +278,3 @@ macro_rules! network_result_try { } }; } - -#[macro_export] -macro_rules! network_result_value_or_log { - ($self:ident $r:expr => $f:expr) => { - network_result_value_or_log!($self $r => [ "" ] $f ) - }; - ($self:ident $r:expr => [ $d:expr ] $f:expr) => { { - let __extra_message = if debug_target_enabled!("network_result") { - $d.to_string() - } else { - "".to_string() - }; - match $r { - NetworkResult::Timeout => { - veilid_log!($self debug - "{} at {}@{}:{} in {}{}", - "Timeout", - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::ServiceUnavailable(ref s) => { - veilid_log!($self debug - "{}({}) at {}@{}:{} in {}{}", - "ServiceUnavailable", - s, - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::NoConnection(ref e) => { - veilid_log!($self debug - "{}({}) at {}@{}:{} in {}{}", - "No connection", - e.to_string(), - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::AlreadyExists(ref e) => { - veilid_log!($self debug - "{}({}) at {}@{}:{} in {}{}", - "Already exists", - e.to_string(), - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::InvalidMessage(ref s) => { - veilid_log!($self debug - "{}({}) at {}@{}:{} in {}{}", - "Invalid message", - s, - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::Value(v) => v, - } - } }; - -}