diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index c47cf32e..c76cb74a 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -204,7 +204,7 @@ impl ConnectionManager { Ok(Some(conn)) => { // Connection added and a different one LRU'd out // Send it to be terminated - // log_net!(debug "== LRU kill connection due to limit: {:?}", conn); + log_net!(debug "== LRU kill connection due to limit: {:?}", conn.debug_print(get_aligned_timestamp())); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); } Err(ConnectionTableAddError::AddressFilter(conn, e)) => { @@ -219,6 +219,7 @@ impl ConnectionManager { Err(ConnectionTableAddError::AlreadyExists(conn)) => { // Connection already exists let desc = conn.connection_descriptor(); + log_net!(debug "== Connection already exists: {:?}", conn.debug_print(get_aligned_timestamp())); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); return Ok(NetworkResult::no_connection_other(format!( "connection already exists: {:?}", @@ -228,6 +229,7 @@ impl ConnectionManager { Err(ConnectionTableAddError::TableFull(conn)) => { // Connection table is full let desc = conn.connection_descriptor(); + log_net!(debug "== Connection table full: {:?}", conn.debug_print(get_aligned_timestamp())); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); return Ok(NetworkResult::no_connection_other(format!( "connection table is full: {:?}", @@ -242,7 +244,12 @@ impl ConnectionManager { pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option { self.arc .connection_table - .get_connection_by_descriptor(descriptor) + .peek_connection_by_descriptor(descriptor) + } + + // Returns a network connection if one already is established + pub(super) fn touch_connection_by_id(&self, id: NetworkConnectionId) { + self.arc.connection_table.touch_connection_by_id(id) } // Protects a network connection if one already is established diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index eeda548e..ef04af41 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -25,6 +25,7 @@ impl ConnectionTableAddError { } } +#[derive(Clone, Copy, Debug)] pub(crate) enum ConnectionRefKind { AddRef, RemoveRef, @@ -97,8 +98,9 @@ impl ConnectionTable { let mut inner = self.inner.lock(); let unord = FuturesUnordered::new(); for table in &mut inner.conn_by_id { - for (_, v) in table.drain() { + for (_, mut v) in table.drain() { trace!("connection table join: {:?}", v); + v.close(); unord.push(v); } } @@ -206,8 +208,6 @@ impl ConnectionTable { }; let dead_conn = Self::remove_connection_records(&mut inner, dead_k); - log_net!(debug "== LRU Connection Killed: {} -> {}", dead_k, dead_conn.debug_print(get_aligned_timestamp())); - out_conn = Some(dead_conn); } @@ -224,24 +224,28 @@ impl ConnectionTable { } //#[instrument(level = "trace", skip(self), ret)] - #[allow(dead_code)] - pub fn get_connection_by_id(&self, id: NetworkConnectionId) -> Option { + pub fn touch_connection_by_id(&self, id: NetworkConnectionId) { let mut inner = self.inner.lock(); - let protocol_index = *inner.protocol_index_by_id.get(&id)?; - let out = inner.conn_by_id[protocol_index].get(&id).unwrap(); - Some(out.get_handle()) + let Some(protocol_index) = inner.protocol_index_by_id.get(&id).copied() else { + return; + }; + let _ = inner.conn_by_id[protocol_index].get(&id).unwrap(); } //#[instrument(level = "trace", skip(self), ret)] - pub fn get_connection_by_descriptor( + pub fn peek_connection_by_descriptor( &self, descriptor: ConnectionDescriptor, ) -> Option { - let mut inner = self.inner.lock(); + if descriptor.protocol_type() == ProtocolType::UDP { + return None; + } + + let inner = self.inner.lock(); let id = *inner.id_by_descriptor.get(&descriptor)?; let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); - let out = inner.conn_by_id[protocol_index].get(&id).unwrap(); + let out = inner.conn_by_id[protocol_index].peek(&id).unwrap(); Some(out.get_handle()) } @@ -252,9 +256,14 @@ impl ConnectionTable { ref_type: ConnectionRefKind, protect: bool, ) -> bool { + if descriptor.protocol_type() == ProtocolType::UDP { + return false; + } + let mut inner = self.inner.lock(); let Some(id) = inner.id_by_descriptor.get(&descriptor).copied() else { + log_net!(error "failed to ref descriptor: {:?} ({:?})", descriptor, ref_type); return false; }; let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index bf2973e0..27e6a0fc 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -1,21 +1,25 @@ use super::*; use async_tls::TlsConnector; +use async_tungstenite::tungstenite::error::ProtocolError; use async_tungstenite::tungstenite::handshake::server::{ Callback, ErrorResponse, Request, Response, }; use async_tungstenite::tungstenite::http::StatusCode; -use async_tungstenite::tungstenite::protocol::Message; +use async_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message}; +use async_tungstenite::tungstenite::Error; use async_tungstenite::{accept_hdr_async, client_async, WebSocketStream}; use futures_util::{AsyncRead, AsyncWrite, SinkExt}; use sockets::*; -/// Maximum number of websocket request headers to permit +// Maximum number of websocket request headers to permit const MAX_WS_HEADERS: usize = 24; -/// Maximum size of any one specific websocket header +// Maximum size of any one specific websocket header const MAX_WS_HEADER_LENGTH: usize = 512; -/// Maximum total size of headers and request including newlines +// Maximum total size of headers and request including newlines const MAX_WS_BEFORE_BODY: usize = 2048; +// Wait time for connection close +// const MAX_CONNECTION_CLOSE_WAIT_US: u64 = 5_000_000; cfg_if! { if #[cfg(feature="rt-async-std")] { @@ -31,14 +35,14 @@ cfg_if! { } } -fn err_to_network_result(err: async_tungstenite::tungstenite::Error) -> NetworkResult { +fn err_to_network_result(err: Error) -> NetworkResult { match err { - async_tungstenite::tungstenite::Error::ConnectionClosed - | async_tungstenite::tungstenite::Error::AlreadyClosed - | async_tungstenite::tungstenite::Error::Io(_) - | async_tungstenite::tungstenite::Error::Protocol( - async_tungstenite::tungstenite::error::ProtocolError::ResetWithoutClosingHandshake, - ) => NetworkResult::NoConnection(to_io_error_other(err)), + Error::ConnectionClosed + | Error::AlreadyClosed + | Error::Io(_) + | Error::Protocol(ProtocolError::ResetWithoutClosingHandshake) => { + NetworkResult::NoConnection(to_io_error_other(err)) + } _ => NetworkResult::InvalidMessage(err.to_string()), } } @@ -82,38 +86,58 @@ where instrument(level = "trace", err, skip(self)) )] pub async fn close(&self) -> io::Result> { - // Make an attempt to flush the stream - self.stream - .clone() - .close() + // Make an attempt to close the stream normally + let mut stream = self.stream.clone(); + stream + .send(Message::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: "".into(), + }))) .await .map_err(to_io_error_other)?; - // // Then forcibly close the socket - // self.tcp_stream - // .shutdown(Shutdown::Both) - // .map_err(to_io_error_other) + // match stream.flush().await { + // Ok(()) => Ok(NetworkResult::value(())), + // Err(Error::Io(ioerr)) => Err(ioerr).into_network_result(), + // Err(Error::ConnectionClosed) => Ok(NetworkResult::value(())), + // Err(e) => Err(to_io_error_other(e)), + // } + + stream.close().await.map_err(to_io_error_other)?; Ok(NetworkResult::value(())) + + // Drive connection to close + /* + let cur_ts = get_timestamp(); + loop { + match stream.flush().await { + Ok(()) => {} + Err(Error::Io(ioerr)) => { + break Err(ioerr).into_network_result(); + } + Err(Error::ConnectionClosed) => { + break Ok(NetworkResult::value(())); + } + Err(e) => { + break Err(to_io_error_other(e)); + } + } + if get_timestamp().saturating_sub(cur_ts) >= MAX_CONNECTION_CLOSE_WAIT_US { + return Ok(NetworkResult::Timeout); + } + } + */ } #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))] pub async fn send(&self, message: Vec) -> io::Result> { if message.len() > MAX_MESSAGE_SIZE { - bail_io_error_other!("received too large WS message"); + bail_io_error_other!("sending too large WS message"); } let out = match self.stream.clone().send(Message::binary(message)).await { Ok(v) => NetworkResult::value(v), Err(e) => err_to_network_result(e), }; - if !out.is_value() { - #[cfg(feature = "verbose-tracing")] - tracing::Span::current().record("network_result", &tracing::field::display(&out)); - return Ok(out); - } - let out = match self.stream.clone().flush().await { - Ok(v) => NetworkResult::value(v), - Err(e) => err_to_network_result(e), - }; #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("network_result", &tracing::field::display(&out)); diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 8a782bdc..bd4f49ea 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -101,7 +101,7 @@ pub struct NetworkConnection { impl Drop for NetworkConnection { fn drop(&mut self) { if self.ref_count != 0 && self.stop_source.is_some() { - log_net!(error "ref_count for network connection should be zero: {:?}", self.ref_count); + log_net!(error "ref_count for network connection should be zero: {:?}", self); } } } @@ -302,6 +302,9 @@ impl NetworkConnection { // xxx: causes crash (Missing otel data span extensions) // recv_span.follows_from(span_id); + // Touch the LRU for this connection + connection_manager.touch_connection_by_id(connection_id); + // send the packet if let Err(e) = Self::send_internal( &protocol_connection, @@ -366,6 +369,11 @@ impl NetworkConnection { log_net!(debug "failed to process received envelope: {}", e); RecvLoopAction::Finish } else { + + // Touch the LRU for this connection + connection_manager.touch_connection_by_id(connection_id); + + RecvLoopAction::Recv } } diff --git a/veilid-core/src/network_manager/tests/test_connection_table.rs b/veilid-core/src/network_manager/tests/test_connection_table.rs index c683b6d7..05d679d1 100644 --- a/veilid-core/src/network_manager/tests/test_connection_table.rs +++ b/veilid-core/src/network_manager/tests/test_connection_table.rs @@ -64,7 +64,7 @@ pub async fn test_add_get_remove() { assert_ne!(a4, c5.connection_descriptor()); assert_eq!(table.connection_count(), 0); - assert_eq!(table.get_connection_by_descriptor(a1), None); + assert_eq!(table.peek_connection_by_descriptor(a1), None); table.add_connection(c1).unwrap(); assert!(table.add_connection(c1b).is_err()); @@ -72,13 +72,13 @@ pub async fn test_add_get_remove() { assert!(table.remove_connection_by_id(4.into()).is_none()); assert!(table.remove_connection_by_id(5.into()).is_none()); assert_eq!(table.connection_count(), 1); - assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); - assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); + assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); + assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); assert_eq!(table.connection_count(), 1); assert_err!(table.add_connection(c2)); assert_eq!(table.connection_count(), 1); - assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); - assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); + assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); + assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); assert_eq!(table.connection_count(), 1); assert_eq!( table @@ -90,8 +90,8 @@ pub async fn test_add_get_remove() { assert_eq!(table.connection_count(), 0); assert!(table.remove_connection_by_id(2.into()).is_none()); assert_eq!(table.connection_count(), 0); - assert_eq!(table.get_connection_by_descriptor(a2), None); - assert_eq!(table.get_connection_by_descriptor(a1), None); + assert_eq!(table.peek_connection_by_descriptor(a2), None); + assert_eq!(table.peek_connection_by_descriptor(a1), None); assert_eq!(table.connection_count(), 0); let c1 = NetworkConnection::dummy(6.into(), a1); table.add_connection(c1).unwrap(); diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index 85fbb46d..0fb0168b 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -64,7 +64,8 @@ impl WebsocketNetworkConnection { instrument(level = "trace", err, skip(self)) )] pub async fn close(&self) -> io::Result> { - let _ = self.inner.ws_meta.close().await.map_err(to_io)?; + let x = self.inner.ws_meta.close().await.map_err(to_io); + info!("close result: {:?}", x); Ok(NetworkResult::value(())) }