From 909fea721afa0bf22addc01756db3f814e3d6007 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 3 Mar 2025 21:02:20 -0500 Subject: [PATCH] [ci skip] connection table debugging --- .../src/network_manager/connection_manager.rs | 13 ++++----- veilid-core/src/network_manager/native/mod.rs | 27 +++++-------------- .../src/network_manager/network_connection.rs | 13 ++++----- veilid-tools/src/async_tag_lock.rs | 9 ++++--- 4 files changed, 25 insertions(+), 37 deletions(-) diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 304eaaf8..8cb1bdb7 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -415,7 +415,8 @@ impl ConnectionManager { let best_port = preferred_local_address.map(|pla| pla.port()); // Async lock on the remote address for atomicity per remote - // Use the initial timeout here as the 'close' timeout as well + // Use the initial connection timeout here because multiple calls to get_or_create_connection + // can be performed simultaneously and we want to wait for the first one to succeed or not let Ok(_lock_guard) = timeout( self.arc.connection_initial_timeout_ms, self.arc.address_lock_table.lock_tag(remote_addr), @@ -461,6 +462,7 @@ impl ConnectionManager { let network_manager = self.network_manager(); let prot_conn = network_result_try!(loop { + veilid_log!(self debug "get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info); let result_net_res = ProtocolNetworkConnection::connect( self.registry(), preferred_local_address, @@ -485,11 +487,10 @@ impl ConnectionManager { } } }; - veilid_log!(self debug "get_or_create_connection retries left: {}", retry_count); retry_count -= 1; - // XXX: This should not be necessary - // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of + // // XXX: This should not be necessary + // // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of // preferred_local_address = None; sleep(NEW_CONNECTION_RETRY_DELAY_MS).await; }); @@ -610,7 +611,7 @@ impl ConnectionManager { // Callback from network connection receive loop when it exits // cleans up the entry in the connection table - pub(super) async fn report_connection_finished(&self, connection_id: NetworkConnectionId) { + pub(super) fn report_connection_finished(&self, connection_id: NetworkConnectionId) { // Get channel sender let sender = { let mut inner = self.arc.inner.lock(); @@ -680,7 +681,7 @@ impl ConnectionManager { } } } - let _ = sender.send_async(ConnectionManagerEvent::Dead(conn)).await; + let _ = sender.send(ConnectionManagerEvent::Dead(conn)); } } diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 13ed2a9c..2facc059 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -636,10 +636,6 @@ impl Network { ) -> EyreResult> { let _guard = self.startup_lock.enter()?; - let connection_initial_timeout_us = self - .config() - .with(|c| c.network.connection_initial_timeout_ms as u64 * 1000); - self.record_dial_info_failure( dial_info.clone(), async move { @@ -656,12 +652,10 @@ impl Network { )); } }; - let flow = network_result_try!(debug_duration( - || { ph.send_message(data, peer_socket_addr) }, - Some(connection_initial_timeout_us * 2) - ) - .await - .wrap_err("failed to send data to dial info")?); + let flow = network_result_try!(ph + .send_message(data, peer_socket_addr) + .await + .wrap_err("failed to send data to dial info")?); unique_flow = UniqueFlow { flow, connection_id: None, @@ -670,19 +664,10 @@ impl Network { // Handle connection-oriented protocols let connmgr = self.network_manager().connection_manager(); let conn = network_result_try!( - debug_duration( - || { connmgr.get_or_create_connection(dial_info.clone()) }, - Some(connection_initial_timeout_us * 2) - ) - .await? + connmgr.get_or_create_connection(dial_info.clone()).await? ); - if let ConnectionHandleSendResult::NotSent(_) = debug_duration( - || conn.send_async(data), - Some(connection_initial_timeout_us * 2), - ) - .await - { + if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { return Ok(NetworkResult::NoConnection(io::Error::new( io::ErrorKind::ConnectionReset, "failed to send", diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index d52a43a2..552571f7 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -480,20 +480,21 @@ impl NetworkConnection { } } - veilid_log!(registry trace - "Connection loop finished flow={:?}", - flow - ); // Let the connection manager know the receive loop exited connection_manager - .report_connection_finished(connection_id) - .await; + .report_connection_finished(connection_id); // Close the low level socket if let Err(e) = protocol_connection.close().await { veilid_log!(registry debug "Protocol connection close error: {}", e); } + + veilid_log!(registry trace + "Connection loop exited flow={:?}", + flow + ); + }.in_current_span()) } diff --git a/veilid-tools/src/async_tag_lock.rs b/veilid-tools/src/async_tag_lock.rs index 0b8306ea..ff892c95 100644 --- a/veilid-tools/src/async_tag_lock.rs +++ b/veilid-tools/src/async_tag_lock.rs @@ -10,7 +10,7 @@ where { table: AsyncTagLockTable, tag: T, - _guard: AsyncMutexGuardArc<()>, + guard: Option>, } impl AsyncTagLockGuard @@ -21,7 +21,7 @@ where Self { table, tag, - _guard: guard, + guard: Some(guard), } } } @@ -45,7 +45,8 @@ where if guards == 0 { inner.table.remove(&self.tag).unwrap(); } - // Proceed with releasing _guard, which may cause some concurrent tag lock to acquire + // Proceed with releasing guard, which may cause some concurrent tag lock to acquire + drop(self.guard.take()); } } @@ -153,7 +154,7 @@ where } std::collections::hash_map::Entry::Vacant(v) => { let mutex = Arc::new(AsyncMutex::new(())); - let guard = asyncmutex_try_lock_arc!(mutex)?; + let guard = asyncmutex_try_lock_arc!(mutex).unwrap(); v.insert(AsyncTagLockTableEntry { mutex, guards: 1 }); guard }