This commit is contained in:
Christien Rioux 2024-11-08 16:49:17 -05:00
parent 8b8341bbb6
commit 9fe8f0f102
7 changed files with 346 additions and 28 deletions

52
Cargo.lock generated
View File

@ -530,6 +530,19 @@ dependencies = [
"tungstenite 0.23.0",
]
[[package]]
name = "async-tungstenite"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e661b6cb0a6eb34d02c520b052daa3aa9ac0cc02495c9d066bbce13ead132b"
dependencies = [
"futures-io",
"futures-util",
"log",
"pin-project-lite",
"tungstenite 0.24.0",
]
[[package]]
name = "async_executors"
version = "0.7.0"
@ -6106,6 +6119,24 @@ dependencies = [
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
dependencies = [
"byteorder",
"bytes 1.8.0",
"data-encoding",
"http 1.1.0",
"httparse",
"log",
"rand",
"sha1",
"thiserror",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -6519,6 +6550,7 @@ dependencies = [
"async-io 1.13.0",
"async-lock 3.4.0",
"async-std",
"async-tungstenite 0.28.0",
"async_executors",
"backtrace",
"cfg-if 1.0.0",
@ -6573,6 +6605,7 @@ dependencies = [
"wasm-logger",
"wee_alloc",
"winapi",
"ws_stream_tungstenite",
"ws_stream_wasm",
]
@ -7185,6 +7218,25 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
[[package]]
name = "ws_stream_tungstenite"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed39ff9f8b2eda91bf6390f9f49eee93d655489e15708e3bb638c1c4f07cecb4"
dependencies = [
"async-tungstenite 0.28.0",
"async_io_stream",
"bitflags 2.6.0",
"futures-core",
"futures-io",
"futures-sink",
"futures-util",
"pharos",
"rustc_version",
"tracing",
"tungstenite 0.24.0",
]
[[package]]
name = "ws_stream_wasm"
version = "0.7.4"

View File

@ -56,7 +56,7 @@ veilid_core_ios_tests = ["dep:tracing-oslog"]
debug-locks = ["veilid-tools/debug-locks"]
unstable-blockstore = []
unstable-tunnels = []
virtual-network = []
virtual-network = ["veilid-tools/virtual-network"]
# GeoIP
geolocation = ["maxminddb", "reqwest"]

View File

@ -17,7 +17,7 @@ crate-type = ["cdylib", "staticlib", "rlib"]
path = "src/lib.rs"
[features]
default = ["rt-tokio"]
default = ["rt-tokio", "virtual-network", "virtual-network-server"]
rt-async-std = [
"async-std",
"async_executors/async_std",
@ -43,6 +43,9 @@ veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"]
tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"]
debug-locks = []
virtual-network = []
virtual-network-server = ["dep:ws_stream_tungstenite", "dep:async-tungstenite"]
[dependencies]
tracing = { version = "0.1.40", features = [
"log",
@ -92,6 +95,8 @@ socket2 = { version = "0.5.7", features = ["all"] }
tokio = { version = "1.38.1", features = ["full"], optional = true }
tokio-util = { version = "0.7.11", features = ["compat"], optional = true }
tokio-stream = { version = "0.1.15", features = ["net"], optional = true }
ws_stream_tungstenite = { version = "0.14.0", optional = true }
async-tungstenite = { version = "0.28.0", optional = true }
# Dependencies for WASM builds only
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]

View File

@ -40,10 +40,14 @@
//! * This crate's `network_interfaces` module
//! * This crate's `dns_lookup` module
mod commands;
mod machine;
mod router_client;
mod router_op_table;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
#[cfg(all(
feature = "virtual-network-server",
not(all(target_arch = "wasm32", target_os = "unknown"))
))]
mod router_server;
mod serde_io_error;
mod virtual_gateway;
@ -54,10 +58,14 @@ mod virtual_tcp_stream;
mod virtual_udp_socket;
use super::*;
use commands::*;
pub use machine::*;
pub use router_client::*;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
#[cfg(all(
feature = "virtual-network-server",
not(all(target_arch = "wasm32", target_os = "unknown"))
))]
pub use router_server::*;
pub use virtual_gateway::*;
pub use virtual_network_error::*;

View File

@ -618,14 +618,14 @@ impl RouterClient {
.into_stream()
.map(io::Result::<ServerProcessorEvent>::Ok);
let framed_reader_fut = system_boxed(async move {
let receiver_fut = system_boxed(async move {
let fut =
receiver.try_for_each(|evt| Self::process_event(evt, router_op_waiter.clone()));
if let Err(e) = fut.await {
error!("{}", e);
}
});
unord.push(framed_reader_fut);
unord.push(receiver_fut);
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
}

View File

@ -1,64 +1,317 @@
mod commands;
pub(super) use commands::*;
use super::*;
use async_tungstenite::accept_async;
use futures_codec::{Bytes, BytesCodec, FramedRead, FramedWrite};
use futures_util::{stream::FuturesUnordered, AsyncReadExt, StreamExt, TryStreamExt};
use postcard::{from_bytes, to_stdvec};
use std::io;
use stop_token::future::FutureExt as _;
use ws_stream_tungstenite::*;
#[derive(ThisError, Debug, Clone, PartialEq, Eq)]
pub enum RouterServerError {
#[error("Serialization Error: {0}")]
SerializationError(postcard::Error),
#[error("IO Error: {0}")]
IoError(io::ErrorKind),
}
pub type RouterServerResult<T> = Result<T, RouterServerError>;
pub const DEFAULT_VIRTUAL_ROUTER_PORT_TCP: u16 = 5149u16;
pub const DEFAULT_VIRTUAL_ROUTER_PORT_WS: u16 = 5148u16;
enum RunLoopEvent {
AddClient(SendPinBoxFuture<RunLoopEvent>),
Done,
}
#[derive(Debug)]
struct RouterServerUnlockedInner {
new_client_sender: flume::Sender<SendPinBoxFuture<RunLoopEvent>>,
new_client_receiver: flume::Receiver<SendPinBoxFuture<RunLoopEvent>>,
}
#[derive(Debug)]
struct RouterServerInner {
//tcp_connections: HashMap<
}
/// Router server for virtual networking
///
/// Connect to this with a `RouterClient`. Simulates machines, allocates sockets
/// and gateways, manages a virtual simulated Internet and routes packets
/// virtually between `Machines` associated with `RouterClient`s.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RouterServer {
//tcp_connections: HashMap<
client_inbound_sender: flume::Sender<Bytes>,
client_inbound_receiver: flume::Receiver<Bytes>,
local_inbound_sender: flume::Sender<ServerProcessorCommand>,
local_inbound_receiver: flume::Receiver<ServerProcessorCommand>,
unlocked_inner: Arc<RouterServerUnlockedInner>,
inner: Arc<Mutex<RouterServerInner>>,
}
impl RouterServer {
////////////////////////////////////////////////////////////////////
// Public Interface
/// Create a router server for virtual networking
pub fn new() -> Self {
Self {}
// Make a channel to receive new clients
let (new_client_sender, new_client_receiver) = flume::unbounded();
Self {
unlocked_inner: Arc::new(RouterServerUnlockedInner {
new_client_sender,
new_client_receiver,
}),
inner: Arc::new(Mutex::new(RouterServerInner {})),
}
}
async fn process_connection<R, W>(self, reader: R, writer: W) -> RunLoopEvent
where
R: AsyncRead + Send + Unpin,
W: AsyncWrite + Send + Unpin,
{
let framed_reader = FramedRead::new(reader, BytesCodec);
let framed_writer = FramedWrite::new(writer, BytesCodec);
let (outbound_sender, outbound_receiver) = flume::unbounded();
let outbound_fut = system_boxed(
outbound_receiver
.into_stream()
.map(|command| {
to_stdvec(&command)
.map_err(io::Error::other)
.map(Bytes::from)
})
.forward(framed_writer),
);
let inbound_fut = system_boxed(framed_reader.try_for_each(|x| async {
let x = x;
let cmd = from_bytes::<ServerProcessorCommand>(&x).map_err(io::Error::other)?;
self.clone()
.process_command(cmd, outbound_sender.clone())
.await
.map_err(io::Error::other)
}));
let mut unord = FuturesUnordered::new();
unord.push(outbound_fut);
unord.push(inbound_fut);
if let Some(Err(e)) = unord.next().await {
error!("{}", e);
}
RunLoopEvent::Done
}
/// Accept RouterClient connections on a TCP socket
pub fn listen_tcp(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
Ok(())
pub async fn listen_tcp(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
let listener = TcpListener::bind(addr.unwrap_or(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::UNSPECIFIED,
DEFAULT_VIRTUAL_ROUTER_PORT_TCP,
0,
0,
))))
.await
.map_err(|e| RouterServerError::IoError(e.kind()))?;
let stop_source = StopSource::new();
let stop_token = stop_source.token();
let this = self.clone();
let listener_fut = system_boxed(async move {
loop {
// Wait for a new connection
match listener.accept().timeout_at(stop_token.clone()).await {
Ok(Ok((conn, _addr))) => {
let conn = conn.compat();
// Register a connection processing inbound receiver
let this2 = this.clone();
let inbound_receiver_fut = system_boxed(async move {
let (reader, writer) = conn.split();
this2.process_connection(reader, writer).await
});
if let Err(e) = this
.unlocked_inner
.new_client_sender
.send(inbound_receiver_fut)
{
// Error register connection processor
error!("{}", e);
break;
}
}
Ok(Err(e)) => {
// Error processing an accept
error!("{}", e);
break;
}
Err(_) => {
// Stop requested
break;
}
}
}
RunLoopEvent::Done
});
self.unlocked_inner
.new_client_sender
.send(listener_fut)
.expect("should be able to send client");
Ok(stop_source)
}
/// Accept RouterClient connections on a WebSocket
pub fn listen_ws(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
Ok(())
pub async fn listen_ws(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
let listener = TcpListener::bind(addr.unwrap_or(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::UNSPECIFIED,
DEFAULT_VIRTUAL_ROUTER_PORT_WS,
0,
0,
))))
.await
.map_err(|e| RouterServerError::IoError(e.kind()))?;
let stop_source = StopSource::new();
let stop_token = stop_source.token();
let this = self.clone();
let listener_fut = system_boxed(async move {
loop {
// Wait for a new connection
match listener.accept().timeout_at(stop_token.clone()).await {
Ok(Ok((conn, _addr))) => {
let conn = conn.compat();
if let Ok(s) = accept_async(conn).await {
let ws = WsStream::new(s);
// Register a connection processing inbound receiver
let this2 = this.clone();
let inbound_receiver_fut = system_boxed(async move {
let (reader, writer) = ws.split();
this2.process_connection(reader, writer).await
});
if let Err(e) = this
.unlocked_inner
.new_client_sender
.send(inbound_receiver_fut)
{
// Error register connection processor
error!("{}", e);
break;
}
}
}
Ok(Err(e)) => {
// Error processing an accept
error!("{}", e);
break;
}
Err(_) => {
// Stop requested
break;
}
}
}
RunLoopEvent::Done
});
self.unlocked_inner
.new_client_sender
.send(listener_fut)
.expect("should be able to send client");
Ok(stop_source)
}
/// Return a local RouterClient
pub fn router_client(&self) -> RouterClient {
// Create the outbound channel
xxxx get these channels right
// Create the inbound/outbound channels
let (local_inbound_sender, local_inbound_receiver) = flume::unbounded();
let (local_outbound_sender, local_outbound_receiver) = flume::unbounded();
let this = self.clone();
let inbound_receiver_fut = system_boxed(async move {
let fut = local_inbound_receiver
.into_stream()
.map(Ok)
.try_for_each(|cmd| {
this.clone()
.process_command(cmd, local_outbound_sender.clone())
});
if let Err(e) = fut.await {
error!("{}", e);
}
RunLoopEvent::Done
});
// Send the new client to the run loop
self.unlocked_inner
.new_client_sender
.send(inbound_receiver_fut)
.expect("should be able to send client");
// Create a RouterClient directly connected to this RouterServer
RouterClient::local_router_client(
self.local_inbound_sender.clone(),
local_outbound_receiver,
)
RouterClient::local_router_client(local_inbound_sender, local_outbound_receiver)
}
/// Run the router server until a stop is requested
pub async fn run(&self, stop_token: StopToken) -> RouterServerResult<()> {
let mut unord = FuturesUnordered::<SendPinBoxFuture<RunLoopEvent>>::new();
let mut need_new_client_fut = true;
loop {
if need_new_client_fut {
let new_client_receiver = self.unlocked_inner.new_client_receiver.clone();
unord.push(Box::pin(async move {
if let Ok(res) = new_client_receiver.into_recv_async().await {
return RunLoopEvent::AddClient(res);
}
RunLoopEvent::Done
}));
}
match unord.next().timeout_at(stop_token.clone()).await {
Ok(Some(RunLoopEvent::AddClient(client_fut))) => {
// Add new client
unord.push(client_fut);
// Wait for next new client
need_new_client_fut = true;
}
Ok(Some(RunLoopEvent::Done)) => {
// Do nothing
}
Ok(None) => {
// Finished normally
break;
}
Err(_) => {
// Stop requested
break;
}
}
}
Ok(())
}
////////////////////////////////////////////////////////////////////
// Private Implementation
async fn process_command(
self,
cmd: ServerProcessorCommand,
outbound_sender: flume::Sender<ServerProcessorEvent>,
) -> RouterServerResult<()> {
//
Ok(())
}
}