From 9fe8f0f1027ad36fc0f727abb1254b019ed06274 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 8 Nov 2024 16:49:17 -0500 Subject: [PATCH] listener --- Cargo.lock | 52 +++ veilid-core/Cargo.toml | 2 +- veilid-tools/Cargo.toml | 7 +- .../{router_server => }/commands.rs | 0 veilid-tools/src/virtual_network/mod.rs | 12 +- .../src/virtual_network/router_client.rs | 4 +- .../src/virtual_network/router_server/mod.rs | 297 ++++++++++++++++-- 7 files changed, 346 insertions(+), 28 deletions(-) rename veilid-tools/src/virtual_network/{router_server => }/commands.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index b3ffdc12..f579aade 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -530,6 +530,19 @@ dependencies = [ "tungstenite 0.23.0", ] +[[package]] +name = "async-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90e661b6cb0a6eb34d02c520b052daa3aa9ac0cc02495c9d066bbce13ead132b" +dependencies = [ + "futures-io", + "futures-util", + "log", + "pin-project-lite", + "tungstenite 0.24.0", +] + [[package]] name = "async_executors" version = "0.7.0" @@ -6106,6 +6119,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes 1.8.0", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -6519,6 +6550,7 @@ dependencies = [ "async-io 1.13.0", "async-lock 3.4.0", "async-std", + "async-tungstenite 0.28.0", "async_executors", "backtrace", "cfg-if 1.0.0", @@ -6573,6 +6605,7 @@ dependencies = [ "wasm-logger", "wee_alloc", "winapi", + "ws_stream_tungstenite", "ws_stream_wasm", ] @@ -7185,6 +7218,25 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "ws_stream_tungstenite" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed39ff9f8b2eda91bf6390f9f49eee93d655489e15708e3bb638c1c4f07cecb4" +dependencies = [ + "async-tungstenite 0.28.0", + "async_io_stream", + "bitflags 2.6.0", + "futures-core", + "futures-io", + "futures-sink", + "futures-util", + "pharos", + "rustc_version", + "tracing", + "tungstenite 0.24.0", +] + [[package]] name = "ws_stream_wasm" version = "0.7.4" diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index d0a218f2..661e43ff 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -56,7 +56,7 @@ veilid_core_ios_tests = ["dep:tracing-oslog"] debug-locks = ["veilid-tools/debug-locks"] unstable-blockstore = [] unstable-tunnels = [] -virtual-network = [] +virtual-network = ["veilid-tools/virtual-network"] # GeoIP geolocation = ["maxminddb", "reqwest"] diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index 2fe0d7cd..012ee07e 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -17,7 +17,7 @@ crate-type = ["cdylib", "staticlib", "rlib"] path = "src/lib.rs" [features] -default = ["rt-tokio"] +default = ["rt-tokio", "virtual-network", "virtual-network-server"] rt-async-std = [ "async-std", "async_executors/async_std", @@ -43,6 +43,9 @@ veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"] tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"] debug-locks = [] +virtual-network = [] +virtual-network-server = ["dep:ws_stream_tungstenite", "dep:async-tungstenite"] + [dependencies] tracing = { version = "0.1.40", features = [ "log", @@ -92,6 +95,8 @@ socket2 = { version = "0.5.7", features = ["all"] } tokio = { version = "1.38.1", features = ["full"], optional = true } tokio-util = { version = "0.7.11", features = ["compat"], optional = true } tokio-stream = { version = "0.1.15", features = ["net"], optional = true } +ws_stream_tungstenite = { version = "0.14.0", optional = true } +async-tungstenite = { version = "0.28.0", optional = true } # Dependencies for WASM builds only [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies] diff --git a/veilid-tools/src/virtual_network/router_server/commands.rs b/veilid-tools/src/virtual_network/commands.rs similarity index 100% rename from veilid-tools/src/virtual_network/router_server/commands.rs rename to veilid-tools/src/virtual_network/commands.rs diff --git a/veilid-tools/src/virtual_network/mod.rs b/veilid-tools/src/virtual_network/mod.rs index 04ee90d8..38e46d70 100644 --- a/veilid-tools/src/virtual_network/mod.rs +++ b/veilid-tools/src/virtual_network/mod.rs @@ -40,10 +40,14 @@ //! * This crate's `network_interfaces` module //! * This crate's `dns_lookup` module +mod commands; mod machine; mod router_client; mod router_op_table; -#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] +#[cfg(all( + feature = "virtual-network-server", + not(all(target_arch = "wasm32", target_os = "unknown")) +))] mod router_server; mod serde_io_error; mod virtual_gateway; @@ -54,10 +58,14 @@ mod virtual_tcp_stream; mod virtual_udp_socket; use super::*; +use commands::*; pub use machine::*; pub use router_client::*; -#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] +#[cfg(all( + feature = "virtual-network-server", + not(all(target_arch = "wasm32", target_os = "unknown")) +))] pub use router_server::*; pub use virtual_gateway::*; pub use virtual_network_error::*; diff --git a/veilid-tools/src/virtual_network/router_client.rs b/veilid-tools/src/virtual_network/router_client.rs index 6b6f2b09..bccd7ff4 100644 --- a/veilid-tools/src/virtual_network/router_client.rs +++ b/veilid-tools/src/virtual_network/router_client.rs @@ -618,14 +618,14 @@ impl RouterClient { .into_stream() .map(io::Result::::Ok); - let framed_reader_fut = system_boxed(async move { + let receiver_fut = system_boxed(async move { let fut = receiver.try_for_each(|evt| Self::process_event(evt, router_op_waiter.clone())); if let Err(e) = fut.await { error!("{}", e); } }); - unord.push(framed_reader_fut); + unord.push(receiver_fut); while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} } diff --git a/veilid-tools/src/virtual_network/router_server/mod.rs b/veilid-tools/src/virtual_network/router_server/mod.rs index e63f2410..130a698a 100644 --- a/veilid-tools/src/virtual_network/router_server/mod.rs +++ b/veilid-tools/src/virtual_network/router_server/mod.rs @@ -1,64 +1,317 @@ -mod commands; - -pub(super) use commands::*; - use super::*; +use async_tungstenite::accept_async; use futures_codec::{Bytes, BytesCodec, FramedRead, FramedWrite}; +use futures_util::{stream::FuturesUnordered, AsyncReadExt, StreamExt, TryStreamExt}; +use postcard::{from_bytes, to_stdvec}; +use std::io; +use stop_token::future::FutureExt as _; +use ws_stream_tungstenite::*; #[derive(ThisError, Debug, Clone, PartialEq, Eq)] pub enum RouterServerError { #[error("Serialization Error: {0}")] SerializationError(postcard::Error), + #[error("IO Error: {0}")] + IoError(io::ErrorKind), } pub type RouterServerResult = Result; +pub const DEFAULT_VIRTUAL_ROUTER_PORT_TCP: u16 = 5149u16; +pub const DEFAULT_VIRTUAL_ROUTER_PORT_WS: u16 = 5148u16; + +enum RunLoopEvent { + AddClient(SendPinBoxFuture), + Done, +} + +#[derive(Debug)] +struct RouterServerUnlockedInner { + new_client_sender: flume::Sender>, + new_client_receiver: flume::Receiver>, +} + +#[derive(Debug)] +struct RouterServerInner { + //tcp_connections: HashMap< +} + /// Router server for virtual networking /// /// Connect to this with a `RouterClient`. Simulates machines, allocates sockets /// and gateways, manages a virtual simulated Internet and routes packets /// virtually between `Machines` associated with `RouterClient`s. - -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RouterServer { - //tcp_connections: HashMap< - client_inbound_sender: flume::Sender, - client_inbound_receiver: flume::Receiver, - local_inbound_sender: flume::Sender, - local_inbound_receiver: flume::Receiver, + unlocked_inner: Arc, + inner: Arc>, } impl RouterServer { + //////////////////////////////////////////////////////////////////// + // Public Interface + /// Create a router server for virtual networking pub fn new() -> Self { - Self {} + // Make a channel to receive new clients + let (new_client_sender, new_client_receiver) = flume::unbounded(); + + Self { + unlocked_inner: Arc::new(RouterServerUnlockedInner { + new_client_sender, + new_client_receiver, + }), + inner: Arc::new(Mutex::new(RouterServerInner {})), + } + } + + async fn process_connection(self, reader: R, writer: W) -> RunLoopEvent + where + R: AsyncRead + Send + Unpin, + W: AsyncWrite + Send + Unpin, + { + let framed_reader = FramedRead::new(reader, BytesCodec); + let framed_writer = FramedWrite::new(writer, BytesCodec); + + let (outbound_sender, outbound_receiver) = flume::unbounded(); + let outbound_fut = system_boxed( + outbound_receiver + .into_stream() + .map(|command| { + to_stdvec(&command) + .map_err(io::Error::other) + .map(Bytes::from) + }) + .forward(framed_writer), + ); + + let inbound_fut = system_boxed(framed_reader.try_for_each(|x| async { + let x = x; + let cmd = from_bytes::(&x).map_err(io::Error::other)?; + + self.clone() + .process_command(cmd, outbound_sender.clone()) + .await + .map_err(io::Error::other) + })); + + let mut unord = FuturesUnordered::new(); + unord.push(outbound_fut); + unord.push(inbound_fut); + + if let Some(Err(e)) = unord.next().await { + error!("{}", e); + } + + RunLoopEvent::Done } /// Accept RouterClient connections on a TCP socket - pub fn listen_tcp(&self, addr: Option) -> RouterServerResult { - Ok(()) + pub async fn listen_tcp(&self, addr: Option) -> RouterServerResult { + let listener = TcpListener::bind(addr.unwrap_or(SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::UNSPECIFIED, + DEFAULT_VIRTUAL_ROUTER_PORT_TCP, + 0, + 0, + )))) + .await + .map_err(|e| RouterServerError::IoError(e.kind()))?; + + let stop_source = StopSource::new(); + let stop_token = stop_source.token(); + + let this = self.clone(); + let listener_fut = system_boxed(async move { + loop { + // Wait for a new connection + match listener.accept().timeout_at(stop_token.clone()).await { + Ok(Ok((conn, _addr))) => { + let conn = conn.compat(); + // Register a connection processing inbound receiver + let this2 = this.clone(); + let inbound_receiver_fut = system_boxed(async move { + let (reader, writer) = conn.split(); + + this2.process_connection(reader, writer).await + }); + if let Err(e) = this + .unlocked_inner + .new_client_sender + .send(inbound_receiver_fut) + { + // Error register connection processor + error!("{}", e); + break; + } + } + Ok(Err(e)) => { + // Error processing an accept + error!("{}", e); + break; + } + Err(_) => { + // Stop requested + break; + } + } + } + + RunLoopEvent::Done + }); + + self.unlocked_inner + .new_client_sender + .send(listener_fut) + .expect("should be able to send client"); + + Ok(stop_source) } /// Accept RouterClient connections on a WebSocket - pub fn listen_ws(&self, addr: Option) -> RouterServerResult { - Ok(()) + pub async fn listen_ws(&self, addr: Option) -> RouterServerResult { + let listener = TcpListener::bind(addr.unwrap_or(SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::UNSPECIFIED, + DEFAULT_VIRTUAL_ROUTER_PORT_WS, + 0, + 0, + )))) + .await + .map_err(|e| RouterServerError::IoError(e.kind()))?; + + let stop_source = StopSource::new(); + let stop_token = stop_source.token(); + + let this = self.clone(); + let listener_fut = system_boxed(async move { + loop { + // Wait for a new connection + match listener.accept().timeout_at(stop_token.clone()).await { + Ok(Ok((conn, _addr))) => { + let conn = conn.compat(); + if let Ok(s) = accept_async(conn).await { + let ws = WsStream::new(s); + // Register a connection processing inbound receiver + let this2 = this.clone(); + let inbound_receiver_fut = system_boxed(async move { + let (reader, writer) = ws.split(); + this2.process_connection(reader, writer).await + }); + if let Err(e) = this + .unlocked_inner + .new_client_sender + .send(inbound_receiver_fut) + { + // Error register connection processor + error!("{}", e); + break; + } + } + } + Ok(Err(e)) => { + // Error processing an accept + error!("{}", e); + break; + } + Err(_) => { + // Stop requested + break; + } + } + } + + RunLoopEvent::Done + }); + + self.unlocked_inner + .new_client_sender + .send(listener_fut) + .expect("should be able to send client"); + + Ok(stop_source) } /// Return a local RouterClient pub fn router_client(&self) -> RouterClient { - // Create the outbound channel - xxxx get these channels right + // Create the inbound/outbound channels + let (local_inbound_sender, local_inbound_receiver) = flume::unbounded(); let (local_outbound_sender, local_outbound_receiver) = flume::unbounded(); + let this = self.clone(); + let inbound_receiver_fut = system_boxed(async move { + let fut = local_inbound_receiver + .into_stream() + .map(Ok) + .try_for_each(|cmd| { + this.clone() + .process_command(cmd, local_outbound_sender.clone()) + }); + if let Err(e) = fut.await { + error!("{}", e); + } + RunLoopEvent::Done + }); + + // Send the new client to the run loop + self.unlocked_inner + .new_client_sender + .send(inbound_receiver_fut) + .expect("should be able to send client"); + // Create a RouterClient directly connected to this RouterServer - RouterClient::local_router_client( - self.local_inbound_sender.clone(), - local_outbound_receiver, - ) + RouterClient::local_router_client(local_inbound_sender, local_outbound_receiver) } /// Run the router server until a stop is requested pub async fn run(&self, stop_token: StopToken) -> RouterServerResult<()> { + let mut unord = FuturesUnordered::>::new(); + + let mut need_new_client_fut = true; + + loop { + if need_new_client_fut { + let new_client_receiver = self.unlocked_inner.new_client_receiver.clone(); + unord.push(Box::pin(async move { + if let Ok(res) = new_client_receiver.into_recv_async().await { + return RunLoopEvent::AddClient(res); + } + RunLoopEvent::Done + })); + } + + match unord.next().timeout_at(stop_token.clone()).await { + Ok(Some(RunLoopEvent::AddClient(client_fut))) => { + // Add new client + unord.push(client_fut); + + // Wait for next new client + need_new_client_fut = true; + } + Ok(Some(RunLoopEvent::Done)) => { + // Do nothing + } + Ok(None) => { + // Finished normally + break; + } + Err(_) => { + // Stop requested + break; + } + } + } + + Ok(()) + } + + //////////////////////////////////////////////////////////////////// + // Private Implementation + + async fn process_command( + self, + cmd: ServerProcessorCommand, + outbound_sender: flume::Sender, + ) -> RouterServerResult<()> { + // Ok(()) } }