[ci skip] connection table debugging

This commit is contained in:
Christien Rioux 2025-03-03 21:02:20 -05:00
parent 500547cfa8
commit 909fea721a
4 changed files with 25 additions and 37 deletions

View File

@ -415,7 +415,8 @@ impl ConnectionManager {
let best_port = preferred_local_address.map(|pla| pla.port());
// Async lock on the remote address for atomicity per remote
// Use the initial timeout here as the 'close' timeout as well
// Use the initial connection timeout here because multiple calls to get_or_create_connection
// can be performed simultaneously and we want to wait for the first one to succeed or not
let Ok(_lock_guard) = timeout(
self.arc.connection_initial_timeout_ms,
self.arc.address_lock_table.lock_tag(remote_addr),
@ -461,6 +462,7 @@ impl ConnectionManager {
let network_manager = self.network_manager();
let prot_conn = network_result_try!(loop {
veilid_log!(self debug "get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info);
let result_net_res = ProtocolNetworkConnection::connect(
self.registry(),
preferred_local_address,
@ -485,11 +487,10 @@ impl ConnectionManager {
}
}
};
veilid_log!(self debug "get_or_create_connection retries left: {}", retry_count);
retry_count -= 1;
// XXX: This should not be necessary
// Release the preferred local address if things can't connect due to a low-level collision we dont have a record of
// // XXX: This should not be necessary
// // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of
// preferred_local_address = None;
sleep(NEW_CONNECTION_RETRY_DELAY_MS).await;
});
@ -610,7 +611,7 @@ impl ConnectionManager {
// Callback from network connection receive loop when it exits
// cleans up the entry in the connection table
pub(super) async fn report_connection_finished(&self, connection_id: NetworkConnectionId) {
pub(super) fn report_connection_finished(&self, connection_id: NetworkConnectionId) {
// Get channel sender
let sender = {
let mut inner = self.arc.inner.lock();
@ -680,7 +681,7 @@ impl ConnectionManager {
}
}
}
let _ = sender.send_async(ConnectionManagerEvent::Dead(conn)).await;
let _ = sender.send(ConnectionManagerEvent::Dead(conn));
}
}

View File

@ -636,10 +636,6 @@ impl Network {
) -> EyreResult<NetworkResult<UniqueFlow>> {
let _guard = self.startup_lock.enter()?;
let connection_initial_timeout_us = self
.config()
.with(|c| c.network.connection_initial_timeout_ms as u64 * 1000);
self.record_dial_info_failure(
dial_info.clone(),
async move {
@ -656,12 +652,10 @@ impl Network {
));
}
};
let flow = network_result_try!(debug_duration(
|| { ph.send_message(data, peer_socket_addr) },
Some(connection_initial_timeout_us * 2)
)
.await
.wrap_err("failed to send data to dial info")?);
let flow = network_result_try!(ph
.send_message(data, peer_socket_addr)
.await
.wrap_err("failed to send data to dial info")?);
unique_flow = UniqueFlow {
flow,
connection_id: None,
@ -670,19 +664,10 @@ impl Network {
// Handle connection-oriented protocols
let connmgr = self.network_manager().connection_manager();
let conn = network_result_try!(
debug_duration(
|| { connmgr.get_or_create_connection(dial_info.clone()) },
Some(connection_initial_timeout_us * 2)
)
.await?
connmgr.get_or_create_connection(dial_info.clone()).await?
);
if let ConnectionHandleSendResult::NotSent(_) = debug_duration(
|| conn.send_async(data),
Some(connection_initial_timeout_us * 2),
)
.await
{
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
return Ok(NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset,
"failed to send",

View File

@ -480,20 +480,21 @@ impl NetworkConnection {
}
}
veilid_log!(registry trace
"Connection loop finished flow={:?}",
flow
);
// Let the connection manager know the receive loop exited
connection_manager
.report_connection_finished(connection_id)
.await;
.report_connection_finished(connection_id);
// Close the low level socket
if let Err(e) = protocol_connection.close().await {
veilid_log!(registry debug "Protocol connection close error: {}", e);
}
veilid_log!(registry trace
"Connection loop exited flow={:?}",
flow
);
}.in_current_span())
}

View File

@ -10,7 +10,7 @@ where
{
table: AsyncTagLockTable<T>,
tag: T,
_guard: AsyncMutexGuardArc<()>,
guard: Option<AsyncMutexGuardArc<()>>,
}
impl<T> AsyncTagLockGuard<T>
@ -21,7 +21,7 @@ where
Self {
table,
tag,
_guard: guard,
guard: Some(guard),
}
}
}
@ -45,7 +45,8 @@ where
if guards == 0 {
inner.table.remove(&self.tag).unwrap();
}
// Proceed with releasing _guard, which may cause some concurrent tag lock to acquire
// Proceed with releasing guard, which may cause some concurrent tag lock to acquire
drop(self.guard.take());
}
}
@ -153,7 +154,7 @@ where
}
std::collections::hash_map::Entry::Vacant(v) => {
let mutex = Arc::new(AsyncMutex::new(()));
let guard = asyncmutex_try_lock_arc!(mutex)?;
let guard = asyncmutex_try_lock_arc!(mutex).unwrap();
v.insert(AsyncTagLockTableEntry { mutex, guards: 1 });
guard
}