From e32984a5aae534a881e7cbe8e53b75fd0dfecb1e Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 3 Jan 2022 23:58:26 -0500 Subject: [PATCH] more refactor, not quite done. --- Cargo.lock | 9 +- veilid-core/Cargo.toml | 19 +- veilid-core/src/connection_manager.rs | 175 ++++++------------ veilid-core/src/connection_table.rs | 35 +--- veilid-core/src/intf/native/network/mod.rs | 2 +- .../src/intf/native/network/network_tcp.rs | 2 +- .../src/intf/native/network/protocol/mod.rs | 32 +--- .../src/intf/native/network/protocol/tcp.rs | 66 ++----- .../src/intf/native/network/protocol/udp.rs | 25 +-- .../src/intf/native/network/protocol/ws.rs | 84 +++------ .../src/intf/wasm/network/protocol/mod.rs | 19 +- .../src/intf/wasm/network/protocol/ws.rs | 23 +-- veilid-core/src/lib.rs | 1 + veilid-core/src/network_connection.rs | 139 ++++++++++++++ .../src/tests/common/test_connection_table.rs | 13 +- veilid-core/src/veilid_api/mod.rs | 4 +- veilid-core/src/xx/mod.rs | 4 + veilid-core/src/xx/single_future.rs | 2 + veilid-core/src/xx/tick_task.rs | 3 + 19 files changed, 318 insertions(+), 339 deletions(-) create mode 100644 veilid-core/src/network_connection.rs diff --git a/Cargo.lock b/Cargo.lock index 859234a7..6c35aadd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,6 +225,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "async-lock" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab86ee898bb6d5d0118270acaa8536c59885231ca6f653b4c35a148f8ee6235" + [[package]] name = "async-lock" version = "2.4.0" @@ -270,7 +276,7 @@ dependencies = [ "async-channel", "async-global-executor", "async-io", - "async-lock", + "async-lock 2.4.0", "async-process", "crossbeam-utils", "futures-channel", @@ -3743,6 +3749,7 @@ version = "0.1.0" dependencies = [ "android_logger", "anyhow", + "async-lock 0.1.0", "async-std", "async-tls", "async-tungstenite 0.16.0", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index cad8b881..3ab96f97 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -86,20 +86,21 @@ serde_cbor = { version = "^0", default-features = false, features = ["alloc"] } getrandom = { version = "^0", features = ["js"] } ws_stream_wasm = "^0" async_executors = { version = "^0", features = [ "bindgen" ]} +async-lock = "^0" # Configuration for WASM32 'web-sys' crate [target.'cfg(target_arch = "wasm32")'.dependencies.web-sys] version = "^0" features = [ -# 'Document', -# 'Element', -# 'HtmlElement', -# 'Node', - 'IdbFactory', - 'IdbOpenDbRequest', - 'Storage', - 'Location', - 'Window', + # 'Document', + # 'Element', + # 'HtmlElement', + # 'Node', + 'IdbFactory', + 'IdbOpenDbRequest', + 'Storage', + 'Location', + 'Window', ] # Dependencies for Android diff --git a/veilid-core/src/connection_manager.rs b/veilid-core/src/connection_manager.rs index e98f5bea..f8dc964f 100644 --- a/veilid-core/src/connection_manager.rs +++ b/veilid-core/src/connection_manager.rs @@ -1,5 +1,6 @@ use crate::connection_table::*; use crate::intf::*; +use crate::network_connection::*; use crate::network_manager::*; use crate::xx::*; use crate::*; @@ -8,73 +9,10 @@ use futures_util::stream::{FuturesUnordered, StreamExt}; const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize; -/////////////////////////////////////////////////////////// -// Accept - -cfg_if! { - if #[cfg(not(target_arch = "wasm32"))] { - use async_std::net::*; - use utils::async_peek_stream::*; - - pub trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync { - fn on_accept( - &self, - stream: AsyncPeekStream, - peer_addr: SocketAddr, - ) -> SystemPinBoxFuture, String>>; - } - - pub trait ProtocolAcceptHandlerClone { - fn clone_box(&self) -> Box; - } - - impl ProtocolAcceptHandlerClone for T - where - T: 'static + ProtocolAcceptHandler + Clone, - { - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } - } - impl Clone for Box { - fn clone(&self) -> Box { - self.clone_box() - } - } - - pub type NewProtocolAcceptHandler = - dyn Fn(VeilidConfig, bool, SocketAddr) -> Box + Send; - } -} - -/////////////////////////////////////////////////////////// -// Dummy network connection for testing - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DummyNetworkConnection { - descriptor: ConnectionDescriptor, -} - -impl DummyNetworkConnection { - pub fn new(descriptor: ConnectionDescriptor) -> Self { - Self { descriptor } - } - pub fn connection_descriptor(&self) -> ConnectionDescriptor { - self.descriptor.clone() - } - pub async fn send(&self, _message: Vec) -> Result<(), String> { - Ok(()) - } - pub async fn recv(&self) -> Result, String> { - Ok(Vec::new()) - } -} - /////////////////////////////////////////////////////////// // Connection manager -pub struct ConnectionManagerInner { - network_manager: NetworkManager, +struct ConnectionManagerInner { connection_table: ConnectionTable, connection_processor_jh: Option>, connection_add_channel_tx: Option>>, @@ -88,75 +26,98 @@ impl core::fmt::Debug for ConnectionManagerInner { } } -#[derive(Clone)] -pub struct ConnectionManager { - inner: Arc>, +struct ConnectionManagerArc { + network_manager: NetworkManager, + inner: AsyncMutex, } -impl core::fmt::Debug for ConnectionManager { +impl core::fmt::Debug for ConnectionManagerArc { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("ConnectionManager") - .field("inner", &*self.inner.lock()) + f.debug_struct("ConnectionManagerArc") + .field("inner", &self.inner) .finish() } } +#[derive(Debug, Clone)] +pub struct ConnectionManager { + arc: Arc, +} + impl ConnectionManager { - fn new_inner(network_manager: NetworkManager) -> ConnectionManagerInner { + fn new_inner() -> ConnectionManagerInner { ConnectionManagerInner { - network_manager, connection_table: ConnectionTable::new(), connection_processor_jh: None, connection_add_channel_tx: None, } } + fn new_arc(network_manager: NetworkManager) -> ConnectionManagerArc { + ConnectionManagerArc { + network_manager, + inner: AsyncMutex::new(Self::new_inner()), + } + } pub fn new(network_manager: NetworkManager) -> Self { Self { - inner: Arc::new(Mutex::new(Self::new_inner(network_manager))), + arc: Arc::new(Self::new_arc(network_manager)), } } pub fn network_manager(&self) -> NetworkManager { - self.inner.lock().network_manager.clone() + self.arc.network_manager.clone() } pub async fn startup(&self) { + let mut inner = self.arc.inner.lock().await; let cac = utils::channel::channel(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config - self.inner.lock().connection_add_channel_tx = Some(cac.0); + inner.connection_add_channel_tx = Some(cac.0); let rx = cac.1.clone(); let this = self.clone(); - self.inner.lock().connection_processor_jh = Some(spawn(this.connection_processor(rx))); + inner.connection_processor_jh = Some(spawn(this.connection_processor(rx))); } pub async fn shutdown(&self) { - *self.inner.lock() = Self::new_inner(self.network_manager()); + *self.arc.inner.lock().await = Self::new_inner(); } // Returns a network connection if one already is established - pub fn get_connection(&self, descriptor: &ConnectionDescriptor) -> Option { - self.inner - .lock() - .connection_table - .get_connection(descriptor) - .map(|e| e.conn) + pub async fn get_connection( + &self, + descriptor: &ConnectionDescriptor, + ) -> Option { + let inner = self.arc.inner.lock().await; + inner.connection_table.get_connection(descriptor) + } + + // Internal routine to register new connection + async fn on_new_connection_internal( + &self, + mut inner: AsyncMutexGuard<'_, ConnectionManagerInner>, + conn: NetworkConnection, + ) -> Result<(), String> { + let tx = inner + .connection_add_channel_tx + .as_ref() + .ok_or_else(fn_string!("connection channel isn't open yet"))? + .clone(); + + let receiver_loop_future = Self::process_connection(self.clone(), conn.clone()); + tx.try_send(receiver_loop_future) + .await + .map_err(map_to_string) + .map_err(logthru_net!(error "failed to start receiver loop"))?; + + // If the receiver loop started successfully, + // add the new connection to the table + inner.connection_table.add_connection(conn) } // Called by low-level network when any connection-oriented protocol connection appears // either from incoming or outgoing connections. Registers connection in the connection table for later access // and spawns a message processing loop for the connection pub async fn on_new_connection(&self, conn: NetworkConnection) -> Result<(), String> { - let tx = self - .inner - .lock() - .connection_add_channel_tx - .as_ref() - .ok_or_else(fn_string!("connection channel isn't open yet"))? - .clone(); - - let receiver_loop_future = Self::process_connection(self.clone(), conn); - tx.try_send(receiver_loop_future) - .await - .map_err(map_to_string) - .map_err(logthru_net!(error "failed to start receiver loop")) + let inner = self.arc.inner.lock().await; + self.on_new_connection_internal(inner, conn).await } pub async fn get_or_create_connection( @@ -173,9 +134,9 @@ impl ConnectionManager { }; // If connection exists, then return it - if let Some(conn) = self - .inner - .lock() + let inner = self.arc.inner.lock().await; + + if let Some(conn) = inner .connection_table .get_connection(&descriptor) .map(|e| e.conn) @@ -186,7 +147,7 @@ impl ConnectionManager { // If not, attempt new connection let conn = NetworkConnection::connect(local_addr, dial_info).await?; - self.on_new_connection(conn.clone()).await?; + self.on_new_connection_internal(inner, conn).await; Ok(conn) } @@ -198,20 +159,6 @@ impl ConnectionManager { ) -> SystemPinBoxFuture<()> { let network_manager = this.network_manager(); Box::pin(async move { - // Add new connections to the table - let entry = match this - .inner - .lock() - .connection_table - .add_connection(conn.clone()) - { - Ok(e) => e, - Err(err) => { - error!(target: "net", "{}", err); - return; - } - }; - // let exit_value: Result, ()> = Err(()); let descriptor = conn.connection_descriptor(); diff --git a/veilid-core/src/connection_table.rs b/veilid-core/src/connection_table.rs index 27576ff8..5d18ecae 100644 --- a/veilid-core/src/connection_table.rs +++ b/veilid-core/src/connection_table.rs @@ -1,37 +1,11 @@ use crate::intf::*; +use crate::network_connection::*; use crate::xx::*; use crate::*; -#[derive(Clone, Debug)] -pub struct ConnectionTableEntry { - pub conn: NetworkConnection, - pub established_time: u64, - pub last_message_sent_time: Option, - pub last_message_recv_time: Option, - pub stopper: Eventual, -} - -impl PartialEq for ConnectionTableEntry { - fn eq(&self, other: &ConnectionTableEntry) -> bool { - if self.conn != other.conn { - return false; - } - if self.established_time != other.established_time { - return false; - } - if self.last_message_sent_time != other.last_message_sent_time { - return false; - } - if self.last_message_recv_time != other.last_message_recv_time { - return false; - } - true - } -} - #[derive(Debug)] pub struct ConnectionTable { - conn_by_addr: BTreeMap, + conn_by_addr: BTreeMap, } impl ConnectionTable { @@ -41,10 +15,7 @@ impl ConnectionTable { } } - pub fn add_connection( - &mut self, - conn: NetworkConnection, - ) -> Result { + pub fn add_connection(&mut self, conn: NetworkConnection) -> Result<(), String> { let descriptor = conn.connection_descriptor(); assert_ne!( diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index bdb8f450..83643db2 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -301,7 +301,7 @@ impl Network { } // Handle connection-oriented protocols - if let Some(conn) = self.connection_manager().get_connection(descriptor) { + if let Some(conn) = self.connection_manager().get_connection(descriptor).await { // connection exists, send over it conn.send(data).await.map_err(logthru_net!())?; diff --git a/veilid-core/src/intf/native/network/network_tcp.rs b/veilid-core/src/intf/native/network/network_tcp.rs index b30d547a..ae051864 100644 --- a/veilid-core/src/intf/native/network/network_tcp.rs +++ b/veilid-core/src/intf/native/network/network_tcp.rs @@ -1,6 +1,6 @@ use super::*; -use crate::connection_manager::*; use crate::intf::*; +use crate::network_connection::*; use utils::clone_stream::*; use async_tls::TlsAcceptor; diff --git a/veilid-core/src/intf/native/network/protocol/mod.rs b/veilid-core/src/intf/native/network/protocol/mod.rs index 22e689a4..e9b71684 100644 --- a/veilid-core/src/intf/native/network/protocol/mod.rs +++ b/veilid-core/src/intf/native/network/protocol/mod.rs @@ -3,13 +3,13 @@ pub mod udp; pub mod wrtc; pub mod ws; -use crate::connection_manager::*; +use crate::network_connection::*; use crate::xx::*; use crate::*; use socket2::{Domain, Protocol, Socket, Type}; -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum NetworkConnection { +#[derive(Debug)] +pub enum ProtocolNetworkConnection { Dummy(DummyNetworkConnection), RawTcp(tcp::RawTcpNetworkConnection), WsAccepted(ws::WebSocketNetworkConnectionAccepted), @@ -18,7 +18,7 @@ pub enum NetworkConnection { //WebRTC(wrtc::WebRTCNetworkConnection), } -impl NetworkConnection { +impl ProtocolNetworkConnection { pub async fn connect( local_address: Option, dial_info: DialInfo, @@ -35,11 +35,8 @@ impl NetworkConnection { } } } - pub async fn send_unbound_message( - &self, - dial_info: &DialInfo, - data: Vec, - ) -> Result<(), String> { + + pub async fn send_unbound_message(dial_info: &DialInfo, data: Vec) -> Result<(), String> { match dial_info.protocol_type() { ProtocolType::UDP => { let peer_socket_addr = dial_info.to_socket_addr(); @@ -59,27 +56,18 @@ impl NetworkConnection { } } - pub fn connection_descriptor(&self) -> ConnectionDescriptor { + pub async fn send(&mut self, message: Vec) -> Result<(), String> { match self { - Self::Dummy(d) => d.connection_descriptor(), - Self::RawTcp(t) => t.connection_descriptor(), - Self::WsAccepted(w) => w.connection_descriptor(), - Self::Ws(w) => w.connection_descriptor(), - Self::Wss(w) => w.connection_descriptor(), - } - } - pub async fn send(&self, message: Vec) -> Result<(), String> { - match self { - Self::Dummy(d) => d.send(message).await, + Self::Dummy(d) => d.send(message), Self::RawTcp(t) => t.send(message).await, Self::WsAccepted(w) => w.send(message).await, Self::Ws(w) => w.send(message).await, Self::Wss(w) => w.send(message).await, } } - pub async fn recv(&self) -> Result, String> { + pub async fn recv(&mut self) -> Result, String> { match self { - Self::Dummy(d) => d.recv().await, + Self::Dummy(d) => d.recv(), Self::RawTcp(t) => t.recv().await, Self::WsAccepted(w) => w.recv().await, Self::Ws(w) => w.recv().await, diff --git a/veilid-core/src/intf/native/network/protocol/tcp.rs b/veilid-core/src/intf/native/network/protocol/tcp.rs index 9121deeb..db5c77d6 100644 --- a/veilid-core/src/intf/native/network/protocol/tcp.rs +++ b/veilid-core/src/intf/native/network/protocol/tcp.rs @@ -5,79 +5,46 @@ use crate::network_manager::MAX_MESSAGE_SIZE; use crate::*; use async_std::net::*; use async_std::prelude::*; -use async_std::sync::Mutex as AsyncMutex; use std::fmt; -struct RawTcpNetworkConnectionInner { - stream: AsyncPeekStream, -} - -#[derive(Clone)] pub struct RawTcpNetworkConnection { - inner: Arc>, - connection_descriptor: ConnectionDescriptor, + stream: AsyncPeekStream, } impl fmt::Debug for RawTcpNetworkConnection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RawTCPNetworkConnection") - .field("connection_descriptor", &self.connection_descriptor) - .finish() + f.debug_struct("RawTCPNetworkConnection").finish() } } -impl PartialEq for RawTcpNetworkConnection { - fn eq(&self, other: &Self) -> bool { - Arc::as_ptr(&self.inner) == Arc::as_ptr(&other.inner) - } -} - -impl Eq for RawTcpNetworkConnection {} - impl RawTcpNetworkConnection { - fn new_inner(stream: AsyncPeekStream) -> RawTcpNetworkConnectionInner { - RawTcpNetworkConnectionInner { stream } + pub fn new(stream: AsyncPeekStream) -> Self { + Self { stream } } - pub fn new(stream: AsyncPeekStream, connection_descriptor: ConnectionDescriptor) -> Self { - Self { - inner: Arc::new(AsyncMutex::new(Self::new_inner(stream))), - connection_descriptor, - } - } - - pub fn connection_descriptor(&self) -> ConnectionDescriptor { - self.connection_descriptor.clone() - } - - pub async fn send(&self, message: Vec) -> Result<(), String> { + pub async fn send(&mut self, message: Vec) -> Result<(), String> { if message.len() > MAX_MESSAGE_SIZE { return Err("sending too large TCP message".to_owned()); } let len = message.len() as u16; let header = [b'V', b'L', len as u8, (len >> 8) as u8]; - let mut inner = self.inner.lock().await; - inner - .stream + self.stream .write_all(&header) .await .map_err(map_to_string) .map_err(logthru_net!())?; - inner - .stream + self.stream .write_all(&message) .await .map_err(map_to_string) .map_err(logthru_net!()) } - pub async fn recv(&self) -> Result, String> { + pub async fn recv(&mut self) -> Result, String> { let mut header = [0u8; 4]; - let mut inner = self.inner.lock().await; - inner - .stream + self.stream .read_exact(&mut header) .await .map_err(|e| format!("TCP recv error: {}", e))?; @@ -90,8 +57,7 @@ impl RawTcpNetworkConnection { } let mut out: Vec = vec![0u8; len]; - inner - .stream + self.stream .read_exact(&mut out) .await .map_err(map_to_string)?; @@ -143,10 +109,10 @@ impl RawTcpProtocolHandler { ProtocolType::TCP, ); let local_address = self.inner.lock().local_address; - let conn = NetworkConnection::RawTcp(RawTcpNetworkConnection::new( - stream, + let conn = NetworkConnection::from_protocol( ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address)), - )); + ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)), + ); Ok(Some(conn)) } @@ -182,13 +148,13 @@ impl RawTcpProtocolHandler { let ps = AsyncPeekStream::new(ts); // Wrap the stream in a network connection and return it - let conn = NetworkConnection::RawTcp(RawTcpNetworkConnection::new( - ps, + let conn = NetworkConnection::from_protocol( ConnectionDescriptor { local: Some(SocketAddress::from_socket_addr(actual_local_address)), remote: dial_info.to_peer_address(), }, - )); + ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(ps)), + ); Ok(conn) } diff --git a/veilid-core/src/intf/native/network/protocol/udp.rs b/veilid-core/src/intf/native/network/protocol/udp.rs index aaf44695..efaddfc0 100644 --- a/veilid-core/src/intf/native/network/protocol/udp.rs +++ b/veilid-core/src/intf/native/network/protocol/udp.rs @@ -3,32 +3,21 @@ use crate::network_manager::MAX_MESSAGE_SIZE; use crate::*; use async_std::net::*; -struct RawUdpProtocolHandlerInner { +#[derive(Clone)] +pub struct RawUdpProtocolHandler { socket: Arc, } -#[derive(Clone)] -pub struct RawUdpProtocolHandler { - inner: Arc>, -} - impl RawUdpProtocolHandler { - fn new_inner(socket: Arc) -> RawUdpProtocolHandlerInner { - RawUdpProtocolHandlerInner { socket } - } - pub fn new(socket: Arc) -> Self { - Self { - inner: Arc::new(Mutex::new(Self::new_inner(socket))), - } + Self { socket } } pub async fn recv_message( &self, data: &mut [u8], ) -> Result<(usize, ConnectionDescriptor), String> { - let socket = self.inner.lock().socket.clone(); - let (size, remote_addr) = socket.recv_from(data).await.map_err(map_to_string)?; + let (size, remote_addr) = self.socket.recv_from(data).await.map_err(map_to_string)?; if size > MAX_MESSAGE_SIZE { return Err("received too large UDP message".to_owned()); @@ -45,7 +34,7 @@ impl RawUdpProtocolHandler { SocketAddress::from_socket_addr(remote_addr), ProtocolType::UDP, ); - let local_socket_addr = socket.local_addr().map_err(map_to_string)?; + let local_socket_addr = self.socket.local_addr().map_err(map_to_string)?; let descriptor = ConnectionDescriptor::new( peer_addr, SocketAddress::from_socket_addr(local_socket_addr), @@ -64,8 +53,8 @@ impl RawUdpProtocolHandler { socket_addr ); - let socket = self.inner.lock().socket.clone(); - let len = socket + let len = self + .socket .send_to(&data, socket_addr) .await .map_err(map_to_string) diff --git a/veilid-core/src/intf/native/network/protocol/ws.rs b/veilid-core/src/intf/native/network/protocol/ws.rs index c339c297..d43bd7a0 100644 --- a/veilid-core/src/intf/native/network/protocol/ws.rs +++ b/veilid-core/src/intf/native/network/protocol/ws.rs @@ -5,7 +5,6 @@ use crate::network_manager::MAX_MESSAGE_SIZE; use crate::*; use async_std::io; use async_std::net::*; -use async_std::sync::Mutex as AsyncMutex; use async_tls::TlsConnector; use async_tungstenite::tungstenite::protocol::Message; use async_tungstenite::{accept_async, client_async, WebSocketStream}; @@ -32,7 +31,6 @@ where T: io::Read + io::Write + Send + Unpin + 'static, { tls: bool, - connection_descriptor: ConnectionDescriptor, inner: Arc>>, } @@ -43,7 +41,6 @@ where fn clone(&self) -> Self { Self { tls: self.tls, - connection_descriptor: self.connection_descriptor.clone(), inner: self.inner.clone(), } } @@ -58,41 +55,19 @@ where } } -impl PartialEq for WebsocketNetworkConnection -where - T: io::Read + io::Write + Send + Unpin + 'static, -{ - fn eq(&self, other: &Self) -> bool { - self.tls == other.tls - && self.connection_descriptor == other.connection_descriptor - && Arc::as_ptr(&self.inner) == Arc::as_ptr(&other.inner) - } -} - -impl Eq for WebsocketNetworkConnection where T: io::Read + io::Write + Send + Unpin + 'static {} - impl WebsocketNetworkConnection where T: io::Read + io::Write + Send + Unpin + 'static, { - pub fn new( - tls: bool, - connection_descriptor: ConnectionDescriptor, - ws_stream: WebSocketStream, - ) -> Self { + pub fn new(tls: bool, ws_stream: WebSocketStream) -> Self { Self { tls, - connection_descriptor, inner: Arc::new(AsyncMutex::new(WebSocketNetworkConnectionInner { ws_stream, })), } } - pub fn connection_descriptor(&self) -> ConnectionDescriptor { - self.connection_descriptor.clone() - } - pub async fn send(&self, message: Vec) -> Result<(), String> { if message.len() > MAX_MESSAGE_SIZE { return Err("received too large WS message".to_owned()); @@ -130,7 +105,7 @@ where /////////////////////////////////////////////////////////// /// -struct WebsocketProtocolHandlerInner { +struct WebsocketProtocolHandlerArc { tls: bool, local_address: SocketAddr, request_path: Vec, @@ -142,7 +117,7 @@ pub struct WebsocketProtocolHandler where Self: ProtocolAcceptHandler, { - inner: Arc, + arc: Arc, } impl WebsocketProtocolHandler { pub fn new(config: VeilidConfig, tls: bool, local_address: SocketAddr) -> Self { @@ -158,14 +133,13 @@ impl WebsocketProtocolHandler { c.network.connection_initial_timeout }; - let inner = WebsocketProtocolHandlerInner { - tls, - local_address, - request_path: path.as_bytes().to_vec(), - connection_initial_timeout, - }; Self { - inner: Arc::new(inner), + arc: Arc::new(WebsocketProtocolHandlerArc { + tls, + local_address, + request_path: path.as_bytes().to_vec(), + connection_initial_timeout, + }), } } @@ -174,10 +148,10 @@ impl WebsocketProtocolHandler { ps: AsyncPeekStream, socket_addr: SocketAddr, ) -> Result, String> { - let request_path_len = self.inner.request_path.len() + 2; + let request_path_len = self.arc.request_path.len() + 2; let mut peekbuf: Vec = vec![0u8; request_path_len]; match io::timeout( - Duration::from_micros(self.inner.connection_initial_timeout), + Duration::from_micros(self.arc.connection_initial_timeout), ps.peek_exact(&mut peekbuf), ) .await @@ -191,7 +165,7 @@ impl WebsocketProtocolHandler { } } // Check for websocket path - let matches_path = &peekbuf[0..request_path_len - 2] == self.inner.request_path.as_slice() + let matches_path = &peekbuf[0..request_path_len - 2] == self.arc.request_path.as_slice() && (peekbuf[request_path_len - 2] == b' ' || (peekbuf[request_path_len - 2] == b'/' && peekbuf[request_path_len - 1] == b' ')); @@ -208,7 +182,7 @@ impl WebsocketProtocolHandler { .map_err(logthru_net!("failed websockets handshake"))?; // Wrap the websocket in a NetworkConnection and register it - let protocol_type = if self.inner.tls { + let protocol_type = if self.arc.tls { ProtocolType::WSS } else { ProtocolType::WS @@ -217,14 +191,16 @@ impl WebsocketProtocolHandler { let peer_addr = PeerAddress::new(SocketAddress::from_socket_addr(socket_addr), protocol_type); - let conn = NetworkConnection::WsAccepted(WebsocketNetworkConnection::new( - self.inner.tls, + let conn = NetworkConnection::from_protocol( ConnectionDescriptor::new( peer_addr, - SocketAddress::from_socket_addr(self.inner.local_address), + SocketAddress::from_socket_addr(self.arc.local_address), ), - ws_stream, - )); + ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new( + self.arc.tls, + ws_stream, + )), + ); Ok(Some(conn)) } @@ -271,7 +247,7 @@ impl WebsocketProtocolHandler { .map_err(logthru_net!())?; // Make our connection descriptor - let connection_descriptor = ConnectionDescriptor { + let descriptor = ConnectionDescriptor { local: Some(SocketAddress::from_socket_addr(actual_local_addr)), remote: dial_info.to_peer_address(), }; @@ -288,21 +264,19 @@ impl WebsocketProtocolHandler { .map_err(map_to_string) .map_err(logthru_net!(error))?; - Ok(NetworkConnection::Wss(WebsocketNetworkConnection::new( - tls, - connection_descriptor, - ws_stream, - ))) + Ok(NetworkConnection::from_protocol( + descriptor, + ProtocolNetworkConnection::Wss(WebsocketNetworkConnection::new(tls, ws_stream)), + )) } else { let (ws_stream, _response) = client_async(request, tcp_stream) .await .map_err(map_to_string) .map_err(logthru_net!(error))?; - Ok(NetworkConnection::Ws(WebsocketNetworkConnection::new( - tls, - connection_descriptor, - ws_stream, - ))) + Ok(NetworkConnection::from_protocol( + descriptor, + ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(tls, ws_stream)), + )) } } diff --git a/veilid-core/src/intf/wasm/network/protocol/mod.rs b/veilid-core/src/intf/wasm/network/protocol/mod.rs index 8f9ee3f3..2dc3d3bd 100644 --- a/veilid-core/src/intf/wasm/network/protocol/mod.rs +++ b/veilid-core/src/intf/wasm/network/protocol/mod.rs @@ -5,14 +5,14 @@ use crate::connection_manager::*; use crate::veilid_api::ProtocolType; use crate::xx::*; -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum NetworkConnection { +#[derive(Debug)] +pub enum ProtocolNetworkConnection { Dummy(DummyNetworkConnection), WS(ws::WebsocketNetworkConnection), //WebRTC(wrtc::WebRTCNetworkConnection), } -impl NetworkConnection { +impl ProtocolNetworkConnection { pub async fn connect( local_address: Option, dial_info: DialInfo, @@ -31,7 +31,6 @@ impl NetworkConnection { } pub async fn send_unbound_message( - &self, dial_info: &DialInfo, data: Vec, ) -> Result<(), String> { @@ -48,17 +47,17 @@ impl NetworkConnection { } } - pub async fn send(&self, message: Vec) -> Result<(), String> { + pub async fn send(&mut self, message: Vec) -> Result<(), String> { match self { - Self::Dummy(d) => d.send(message).await, - Self::WS(w) => w.send(message).await, + Self::Dummy(d) => d.send(message), + Self::WS(w) => w.send(message), } } - pub async fn recv(&self) -> Result, String> { + pub async fn recv(&mut self) -> Result, String> { match self { - Self::Dummy(d) => d.recv().await, - Self::WS(w) => w.recv().await, + Self::Dummy(d) => d.recv(), + Self::WS(w) => w.recv(), } } } diff --git a/veilid-core/src/intf/wasm/network/protocol/ws.rs b/veilid-core/src/intf/wasm/network/protocol/ws.rs index 782a4828..de2f1cd9 100644 --- a/veilid-core/src/intf/wasm/network/protocol/ws.rs +++ b/veilid-core/src/intf/wasm/network/protocol/ws.rs @@ -14,7 +14,6 @@ struct WebsocketNetworkConnectionInner { #[derive(Clone)] pub struct WebsocketNetworkConnection { tls: bool, - connection_descriptor: ConnectionDescriptor, inner: Arc>, } @@ -24,20 +23,11 @@ impl fmt::Debug for WebsocketNetworkConnection { } } -impl PartialEq for WebsocketNetworkConnection { - fn eq(&self, other: &Self) -> bool { - self.tls == other.tls && Arc::as_ptr(&self.inner) == Arc::as_ptr(&other.inner) - } -} - -impl Eq for WebsocketNetworkConnection {} - impl WebsocketNetworkConnection { - pub fn new(tls: bool, connection_descriptor: ConnectionDescriptor, ws_stream: WsStream) -> Self { + pub fn new(tls: bool, ws_stream: WsStream) -> Self { let ws = ws_stream.wrapped().clone(); Self { tls, - connection_descriptor, inner: Arc::new(Mutex::new(WebsocketNetworkConnectionInner { ws_stream, ws, @@ -45,11 +35,10 @@ impl WebsocketNetworkConnection { } } - pub fn connection_descriptor(&self) -> ConnectionDescriptor { - self.connection_descriptor.clone() - } +xxx convert this to async and use stream api not low level websocket +xxx implement close() everywhere and skip using eventual for loop shutdown - pub async fn send(&self, message: Vec) -> Result<(), String> { + pub fn send(&self, message: Vec) -> Result<(), String> { if message.len() > MAX_MESSAGE_SIZE { return Err("sending too large WS message".to_owned()).map_err(logthru_net!(error)); } @@ -60,7 +49,7 @@ impl WebsocketNetworkConnection { .map_err(|_| "failed to send to websocket".to_owned()) .map_err(logthru_net!(error)) } - pub async fn recv(&self) -> Result, String> { + pub fn recv(&self) -> Result, String> { let out = match self.inner.lock().ws_stream.next().await { Some(WsMessage::Binary(v)) => v, Some(_) => { @@ -123,7 +112,7 @@ impl WebsocketProtocolHandler { remote: dial_info.to_peer_address(), }; - Ok(NetworkConnection::WS(WebsocketNetworkConnection::new(tls, connection_descriptor, wsio))) + Ok(NetworkConnection::from_protocol(descriptor,ProtocolNetworkConnection::WS(WebsocketNetworkConnection::new(tls, wsio)))) } pub async fn send_unbound_message(dial_info: &DialInfo, data: Vec) -> Result<(), String> { diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index 729cb511..88476cd1 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -12,6 +12,7 @@ mod connection_table; mod dht; mod intf; mod lease_manager; +mod network_connection; mod network_manager; mod receipt_manager; mod routing_table; diff --git a/veilid-core/src/network_connection.rs b/veilid-core/src/network_connection.rs new file mode 100644 index 00000000..5c8cc4ab --- /dev/null +++ b/veilid-core/src/network_connection.rs @@ -0,0 +1,139 @@ +use crate::intf::*; +use crate::xx::*; +use crate::*; + +/////////////////////////////////////////////////////////// +// Accept + +cfg_if! { + if #[cfg(not(target_arch = "wasm32"))] { + use async_std::net::*; + use utils::async_peek_stream::*; + + pub trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync { + fn on_accept( + &self, + stream: AsyncPeekStream, + peer_addr: SocketAddr, + ) -> SystemPinBoxFuture, String>>; + } + + pub trait ProtocolAcceptHandlerClone { + fn clone_box(&self) -> Box; + } + + impl ProtocolAcceptHandlerClone for T + where + T: 'static + ProtocolAcceptHandler + Clone, + { + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + } + impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } + } + + pub type NewProtocolAcceptHandler = + dyn Fn(VeilidConfig, bool, SocketAddr) -> Box + Send; + } +} + +/////////////////////////////////////////////////////////// +// Dummy protocol network connection for testing + +#[derive(Debug)] +pub struct DummyNetworkConnection {} + +impl DummyNetworkConnection { + pub fn new(descriptor: ConnectionDescriptor) -> NetworkConnection { + NetworkConnection::from_protocol(descriptor, ProtocolNetworkConnection::Dummy(Self {})) + } + pub fn send(&self, _message: Vec) -> Result<(), String> { + Ok(()) + } + pub fn recv(&self) -> Result, String> { + Ok(Vec::new()) + } +} + +/////////////////////////////////////////////////////////// +// Top-level protocol independent network connection object + +#[derive(Debug)] +struct NetworkConnectionInner { + protocol_connection: ProtocolNetworkConnection, + last_message_sent_time: Option, + last_message_recv_time: Option, +} + +#[derive(Debug)] +struct NetworkConnectionArc { + descriptor: ConnectionDescriptor, + established_time: u64, + inner: AsyncMutex, +} + +#[derive(Clone, Debug)] +pub struct NetworkConnection { + arc: Arc, +} + +impl NetworkConnection { + fn new_inner(protocol_connection: ProtocolNetworkConnection) -> NetworkConnectionInner { + NetworkConnectionInner { + protocol_connection, + last_message_sent_time: None, + last_message_recv_time: None, + } + } + fn new_arc( + descriptor: ConnectionDescriptor, + protocol_connection: ProtocolNetworkConnection, + ) -> NetworkConnectionArc { + NetworkConnectionArc { + descriptor, + established_time: intf::get_timestamp(), + inner: AsyncMutex::new(Self::new_inner(protocol_connection)), + } + } + + pub fn from_protocol( + descriptor: ConnectionDescriptor, + protocol_connection: ProtocolNetworkConnection, + ) -> Self { + Self { + arc: Arc::new(Self::new_arc(descriptor, protocol_connection)), + } + } + + pub async fn connect( + local_address: Option, + dial_info: DialInfo, + ) -> Result { + ProtocolNetworkConnection::connect(local_address, dial_info).await + } + + pub fn connection_descriptor(&self) -> ConnectionDescriptor { + self.arc.descriptor + } + + pub async fn send(&self, message: Vec) -> Result<(), String> { + let mut inner = self.arc.inner.lock().await; + let out = inner.protocol_connection.send(message).await; + if out.is_ok() { + inner.last_message_sent_time = Some(intf::get_timestamp()); + } + out + } + pub async fn recv(&self) -> Result, String> { + let mut inner = self.arc.inner.lock().await; + let out = inner.protocol_connection.recv().await; + if out.is_ok() { + inner.last_message_recv_time = Some(intf::get_timestamp()); + } + out + } +} diff --git a/veilid-core/src/tests/common/test_connection_table.rs b/veilid-core/src/tests/common/test_connection_table.rs index 242726e3..deda9616 100644 --- a/veilid-core/src/tests/common/test_connection_table.rs +++ b/veilid-core/src/tests/common/test_connection_table.rs @@ -1,6 +1,5 @@ -use crate::connection_manager::*; use crate::connection_table::*; -use crate::intf::*; +use crate::network_connection::*; use crate::xx::*; use crate::*; @@ -49,11 +48,11 @@ pub async fn test_add_get_remove() { ))), ); - let c1 = NetworkConnection::Dummy(DummyNetworkConnection::new(a1.clone())); - let c2 = NetworkConnection::Dummy(DummyNetworkConnection::new(a2.clone())); - let c3 = NetworkConnection::Dummy(DummyNetworkConnection::new(a3.clone())); - let c4 = NetworkConnection::Dummy(DummyNetworkConnection::new(a4.clone())); - let c5 = NetworkConnection::Dummy(DummyNetworkConnection::new(a5)); + let c1 = DummyNetworkConnection::new(a1.clone()); + let c2 = DummyNetworkConnection::new(a2.clone()); + let c3 = DummyNetworkConnection::new(a3.clone()); + let c4 = DummyNetworkConnection::new(a4.clone()); + let c5 = DummyNetworkConnection::new(a5); assert_eq!(a1, c2.connection_descriptor()); assert_ne!(a3, c4.connection_descriptor()); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index b682abe3..06ea707c 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -686,7 +686,7 @@ pub struct PeerInfo { pub dial_infos: Vec, } -#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)] pub struct PeerAddress { pub socket_address: SocketAddress, pub protocol_type: ProtocolType, @@ -709,7 +709,7 @@ impl PeerAddress { } } -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ConnectionDescriptor { pub remote: PeerAddress, pub local: Option, diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs index da3988c6..e7e7ad46 100644 --- a/veilid-core/src/xx/mod.rs +++ b/veilid-core/src/xx/mod.rs @@ -45,6 +45,8 @@ cfg_if! { pub use core::sync::atomic::{Ordering, AtomicBool}; pub use alloc::sync::{Arc, Weak}; pub use core::ops::{FnOnce, FnMut, Fn}; + pub use async_lock::Mutex as AsyncMutex; + pub use async_lock::MutexGuard as AsyncMutexGuard; pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub type SystemPinBoxFuture = PinBox + 'static>; pub type SystemPinBoxFutureLifetime<'a, T> = PinBox + 'a>; @@ -65,6 +67,8 @@ cfg_if! { pub use std::ops::{FnOnce, FnMut, Fn}; pub use async_std::future::Future; pub use async_std::pin::Pin; + pub use async_std::sync::Mutex as AsyncMutex; + pub use async_std::sync::MutexGuard as AsyncMutexGuard; pub use std::net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub type SystemPinBoxFuture = PinBox + Send + 'static>; pub type SystemPinBoxFutureLifetime<'a, T> = PinBox + Send + 'a>; diff --git a/veilid-core/src/xx/single_future.rs b/veilid-core/src/xx/single_future.rs index 072fb179..9e6a3222 100644 --- a/veilid-core/src/xx/single_future.rs +++ b/veilid-core/src/xx/single_future.rs @@ -13,6 +13,8 @@ where join_handle: Option>, } +/// Spawns a single background processing task idempotently, possibly returning the return value of the previously executed background task +/// This does not queue, just ensures that no more than a single copy of the task is running at a time, but allowing tasks to be retriggered #[derive(Debug, Clone)] pub struct SingleFuture where diff --git a/veilid-core/src/xx/tick_task.rs b/veilid-core/src/xx/tick_task.rs index f51acde3..53f1fd90 100644 --- a/veilid-core/src/xx/tick_task.rs +++ b/veilid-core/src/xx/tick_task.rs @@ -13,6 +13,9 @@ cfg_if! { } } +/// Runs a single-future background processing task, attempting to run it once every 'tick period' microseconds. +/// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again. +/// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity. pub struct TickTask { last_timestamp_us: AtomicU64, tick_period_us: u64,