connection debugging

This commit is contained in:
Christien Rioux 2023-11-01 10:31:31 -04:00
parent f47d6402c3
commit 520d8c43f7
6 changed files with 100 additions and 51 deletions

View File

@ -204,7 +204,7 @@ impl ConnectionManager {
Ok(Some(conn)) => {
// Connection added and a different one LRU'd out
// Send it to be terminated
// log_net!(debug "== LRU kill connection due to limit: {:?}", conn);
log_net!(debug "== LRU kill connection due to limit: {:?}", conn.debug_print(get_aligned_timestamp()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
}
Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
@ -219,6 +219,7 @@ impl ConnectionManager {
Err(ConnectionTableAddError::AlreadyExists(conn)) => {
// Connection already exists
let desc = conn.connection_descriptor();
log_net!(debug "== Connection already exists: {:?}", conn.debug_print(get_aligned_timestamp()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!(
"connection already exists: {:?}",
@ -228,6 +229,7 @@ impl ConnectionManager {
Err(ConnectionTableAddError::TableFull(conn)) => {
// Connection table is full
let desc = conn.connection_descriptor();
log_net!(debug "== Connection table full: {:?}", conn.debug_print(get_aligned_timestamp()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!(
"connection table is full: {:?}",
@ -242,7 +244,12 @@ impl ConnectionManager {
pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> {
self.arc
.connection_table
.get_connection_by_descriptor(descriptor)
.peek_connection_by_descriptor(descriptor)
}
// Returns a network connection if one already is established
pub(super) fn touch_connection_by_id(&self, id: NetworkConnectionId) {
self.arc.connection_table.touch_connection_by_id(id)
}
// Protects a network connection if one already is established

View File

@ -25,6 +25,7 @@ impl ConnectionTableAddError {
}
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum ConnectionRefKind {
AddRef,
RemoveRef,
@ -97,8 +98,9 @@ impl ConnectionTable {
let mut inner = self.inner.lock();
let unord = FuturesUnordered::new();
for table in &mut inner.conn_by_id {
for (_, v) in table.drain() {
for (_, mut v) in table.drain() {
trace!("connection table join: {:?}", v);
v.close();
unord.push(v);
}
}
@ -206,8 +208,6 @@ impl ConnectionTable {
};
let dead_conn = Self::remove_connection_records(&mut inner, dead_k);
log_net!(debug "== LRU Connection Killed: {} -> {}", dead_k, dead_conn.debug_print(get_aligned_timestamp()));
out_conn = Some(dead_conn);
}
@ -224,24 +224,28 @@ impl ConnectionTable {
}
//#[instrument(level = "trace", skip(self), ret)]
#[allow(dead_code)]
pub fn get_connection_by_id(&self, id: NetworkConnectionId) -> Option<ConnectionHandle> {
pub fn touch_connection_by_id(&self, id: NetworkConnectionId) {
let mut inner = self.inner.lock();
let protocol_index = *inner.protocol_index_by_id.get(&id)?;
let out = inner.conn_by_id[protocol_index].get(&id).unwrap();
Some(out.get_handle())
let Some(protocol_index) = inner.protocol_index_by_id.get(&id).copied() else {
return;
};
let _ = inner.conn_by_id[protocol_index].get(&id).unwrap();
}
//#[instrument(level = "trace", skip(self), ret)]
pub fn get_connection_by_descriptor(
pub fn peek_connection_by_descriptor(
&self,
descriptor: ConnectionDescriptor,
) -> Option<ConnectionHandle> {
let mut inner = self.inner.lock();
if descriptor.protocol_type() == ProtocolType::UDP {
return None;
}
let inner = self.inner.lock();
let id = *inner.id_by_descriptor.get(&descriptor)?;
let protocol_index = Self::protocol_to_index(descriptor.protocol_type());
let out = inner.conn_by_id[protocol_index].get(&id).unwrap();
let out = inner.conn_by_id[protocol_index].peek(&id).unwrap();
Some(out.get_handle())
}
@ -252,9 +256,14 @@ impl ConnectionTable {
ref_type: ConnectionRefKind,
protect: bool,
) -> bool {
if descriptor.protocol_type() == ProtocolType::UDP {
return false;
}
let mut inner = self.inner.lock();
let Some(id) = inner.id_by_descriptor.get(&descriptor).copied() else {
log_net!(error "failed to ref descriptor: {:?} ({:?})", descriptor, ref_type);
return false;
};
let protocol_index = Self::protocol_to_index(descriptor.protocol_type());

View File

@ -1,21 +1,25 @@
use super::*;
use async_tls::TlsConnector;
use async_tungstenite::tungstenite::error::ProtocolError;
use async_tungstenite::tungstenite::handshake::server::{
Callback, ErrorResponse, Request, Response,
};
use async_tungstenite::tungstenite::http::StatusCode;
use async_tungstenite::tungstenite::protocol::Message;
use async_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message};
use async_tungstenite::tungstenite::Error;
use async_tungstenite::{accept_hdr_async, client_async, WebSocketStream};
use futures_util::{AsyncRead, AsyncWrite, SinkExt};
use sockets::*;
/// Maximum number of websocket request headers to permit
// Maximum number of websocket request headers to permit
const MAX_WS_HEADERS: usize = 24;
/// Maximum size of any one specific websocket header
// Maximum size of any one specific websocket header
const MAX_WS_HEADER_LENGTH: usize = 512;
/// Maximum total size of headers and request including newlines
// Maximum total size of headers and request including newlines
const MAX_WS_BEFORE_BODY: usize = 2048;
// Wait time for connection close
// const MAX_CONNECTION_CLOSE_WAIT_US: u64 = 5_000_000;
cfg_if! {
if #[cfg(feature="rt-async-std")] {
@ -31,14 +35,14 @@ cfg_if! {
}
}
fn err_to_network_result<T>(err: async_tungstenite::tungstenite::Error) -> NetworkResult<T> {
fn err_to_network_result<T>(err: Error) -> NetworkResult<T> {
match err {
async_tungstenite::tungstenite::Error::ConnectionClosed
| async_tungstenite::tungstenite::Error::AlreadyClosed
| async_tungstenite::tungstenite::Error::Io(_)
| async_tungstenite::tungstenite::Error::Protocol(
async_tungstenite::tungstenite::error::ProtocolError::ResetWithoutClosingHandshake,
) => NetworkResult::NoConnection(to_io_error_other(err)),
Error::ConnectionClosed
| Error::AlreadyClosed
| Error::Io(_)
| Error::Protocol(ProtocolError::ResetWithoutClosingHandshake) => {
NetworkResult::NoConnection(to_io_error_other(err))
}
_ => NetworkResult::InvalidMessage(err.to_string()),
}
}
@ -82,38 +86,58 @@ where
instrument(level = "trace", err, skip(self))
)]
pub async fn close(&self) -> io::Result<NetworkResult<()>> {
// Make an attempt to flush the stream
self.stream
.clone()
.close()
// Make an attempt to close the stream normally
let mut stream = self.stream.clone();
stream
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: "".into(),
})))
.await
.map_err(to_io_error_other)?;
// // Then forcibly close the socket
// self.tcp_stream
// .shutdown(Shutdown::Both)
// .map_err(to_io_error_other)
// match stream.flush().await {
// Ok(()) => Ok(NetworkResult::value(())),
// Err(Error::Io(ioerr)) => Err(ioerr).into_network_result(),
// Err(Error::ConnectionClosed) => Ok(NetworkResult::value(())),
// Err(e) => Err(to_io_error_other(e)),
// }
stream.close().await.map_err(to_io_error_other)?;
Ok(NetworkResult::value(()))
// Drive connection to close
/*
let cur_ts = get_timestamp();
loop {
match stream.flush().await {
Ok(()) => {}
Err(Error::Io(ioerr)) => {
break Err(ioerr).into_network_result();
}
Err(Error::ConnectionClosed) => {
break Ok(NetworkResult::value(()));
}
Err(e) => {
break Err(to_io_error_other(e));
}
}
if get_timestamp().saturating_sub(cur_ts) >= MAX_CONNECTION_CLOSE_WAIT_US {
return Ok(NetworkResult::Timeout);
}
}
*/
}
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
if message.len() > MAX_MESSAGE_SIZE {
bail_io_error_other!("received too large WS message");
bail_io_error_other!("sending too large WS message");
}
let out = match self.stream.clone().send(Message::binary(message)).await {
Ok(v) => NetworkResult::value(v),
Err(e) => err_to_network_result(e),
};
if !out.is_value() {
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out));
return Ok(out);
}
let out = match self.stream.clone().flush().await {
Ok(v) => NetworkResult::value(v),
Err(e) => err_to_network_result(e),
};
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out));

View File

@ -101,7 +101,7 @@ pub struct NetworkConnection {
impl Drop for NetworkConnection {
fn drop(&mut self) {
if self.ref_count != 0 && self.stop_source.is_some() {
log_net!(error "ref_count for network connection should be zero: {:?}", self.ref_count);
log_net!(error "ref_count for network connection should be zero: {:?}", self);
}
}
}
@ -302,6 +302,9 @@ impl NetworkConnection {
// xxx: causes crash (Missing otel data span extensions)
// recv_span.follows_from(span_id);
// Touch the LRU for this connection
connection_manager.touch_connection_by_id(connection_id);
// send the packet
if let Err(e) = Self::send_internal(
&protocol_connection,
@ -366,6 +369,11 @@ impl NetworkConnection {
log_net!(debug "failed to process received envelope: {}", e);
RecvLoopAction::Finish
} else {
// Touch the LRU for this connection
connection_manager.touch_connection_by_id(connection_id);
RecvLoopAction::Recv
}
}

View File

@ -64,7 +64,7 @@ pub async fn test_add_get_remove() {
assert_ne!(a4, c5.connection_descriptor());
assert_eq!(table.connection_count(), 0);
assert_eq!(table.get_connection_by_descriptor(a1), None);
assert_eq!(table.peek_connection_by_descriptor(a1), None);
table.add_connection(c1).unwrap();
assert!(table.add_connection(c1b).is_err());
@ -72,13 +72,13 @@ pub async fn test_add_get_remove() {
assert!(table.remove_connection_by_id(4.into()).is_none());
assert!(table.remove_connection_by_id(5.into()).is_none());
assert_eq!(table.connection_count(), 1);
assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.connection_count(), 1);
assert_err!(table.add_connection(c2));
assert_eq!(table.connection_count(), 1);
assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.connection_count(), 1);
assert_eq!(
table
@ -90,8 +90,8 @@ pub async fn test_add_get_remove() {
assert_eq!(table.connection_count(), 0);
assert!(table.remove_connection_by_id(2.into()).is_none());
assert_eq!(table.connection_count(), 0);
assert_eq!(table.get_connection_by_descriptor(a2), None);
assert_eq!(table.get_connection_by_descriptor(a1), None);
assert_eq!(table.peek_connection_by_descriptor(a2), None);
assert_eq!(table.peek_connection_by_descriptor(a1), None);
assert_eq!(table.connection_count(), 0);
let c1 = NetworkConnection::dummy(6.into(), a1);
table.add_connection(c1).unwrap();

View File

@ -64,7 +64,8 @@ impl WebsocketNetworkConnection {
instrument(level = "trace", err, skip(self))
)]
pub async fn close(&self) -> io::Result<NetworkResult<()>> {
let _ = self.inner.ws_meta.close().await.map_err(to_io)?;
let x = self.inner.ws_meta.close().await.map_err(to_io);
info!("close result: {:?}", x);
Ok(NetworkResult::value(()))
}