virtual network work

This commit is contained in:
Christien Rioux 2024-11-04 22:40:10 -05:00
parent 212205a5ee
commit 507c5d492e
24 changed files with 1933 additions and 496 deletions

1198
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -133,8 +133,8 @@ hickory-resolver = { version = "0.24.1", optional = true }
# Serialization
capnp = { version = "0.19.6", default-features = false, features = ["alloc"] }
serde = { version = "1.0.204", features = ["derive", "rc"] }
serde_json = { version = "1.0.120" }
serde = { version = "1.0.214", features = ["derive", "rc"] }
serde_json = { version = "1.0.132" }
serde-big-array = "0.5.1"
json = "0.12.4"
data-encoding = { version = "2.6.0" }

View File

@ -151,42 +151,6 @@ macro_rules! log_client_api {
}
}
#[macro_export]
macro_rules! log_network_result {
(error $text:expr) => {error!(
target: "network_result",
"{}",
$text,
)};
(error $fmt:literal, $($arg:expr),+) => {
error!(target: "network_result", $fmt, $($arg),+);
};
(warn $text:expr) => {warn!(
target: "network_result",
"{}",
$text,
)};
(warn $fmt:literal, $($arg:expr),+) => {
warn!(target:"network_result", $fmt, $($arg),+);
};
(debug $text:expr) => {debug!(
target: "network_result",
"{}",
$text,
)};
(debug $fmt:literal, $($arg:expr),+) => {
debug!(target:"network_result", $fmt, $($arg),+);
};
($text:expr) => {trace!(
target: "network_result",
"{}",
$text,
)};
($fmt:literal, $($arg:expr),+) => {
trace!(target:"network_result", $fmt, $($arg),+);
}
}
#[macro_export]
macro_rules! log_rpc {
(error $text:expr) => { error!(

View File

@ -450,7 +450,7 @@ packages:
path: ".."
relative: true
source: path
version: "0.3.4"
version: "0.4.1"
veilid_test:
dependency: "direct dev"
description:

View File

@ -54,11 +54,14 @@ tracing-subscriber = { version = "0.3.18", features = [
log = { version = "0.4.22" }
eyre = "0.6.12"
static_assertions = "1.1.0"
serde = { version = "1.0.214", features = ["derive", "rc"] }
postcard = { version = "1.0.10", features = ["use-std"] }
cfg-if = "1.0.0"
thiserror = "1.0.63"
futures-util = { version = "0.3.30", default-features = false, features = [
"alloc",
] }
futures_codec = "0.4.1"
parking_lot = "0.12.3"
async-lock = "3.4.0"
once_cell = "1.19.0"

View File

@ -46,7 +46,6 @@ pub mod mutable_future;
#[cfg(not(target_arch = "wasm32"))]
pub mod network_interfaces;
pub mod network_result;
pub mod network_shim;
pub mod random;
pub mod single_shot_eventual;
pub mod sleep;
@ -58,6 +57,7 @@ pub mod timeout;
pub mod timeout_or;
pub mod timestamp;
pub mod tools;
pub mod virtual_network;
#[cfg(target_arch = "wasm32")]
pub mod wasm;

View File

@ -1,6 +1,7 @@
mod tools;
use crate::*;
use serde::*;
cfg_if::cfg_if! {
if #[cfg(any(target_os = "linux", target_os = "android"))] {
@ -23,7 +24,7 @@ cfg_if::cfg_if! {
}
}
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Hash, Clone)]
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Hash, Clone, Serialize, Deserialize)]
pub enum IfAddr {
V4(Ifv4Addr),
V6(Ifv6Addr),
@ -51,7 +52,7 @@ impl IfAddr {
}
/// Details about the ipv4 address of an interface on this host.
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Hash, Clone)]
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Hash, Clone, Serialize, Deserialize)]
pub struct Ifv4Addr {
/// The IP address of the interface.
pub ip: Ipv4Addr,
@ -62,7 +63,7 @@ pub struct Ifv4Addr {
}
/// Details about the ipv6 address of an interface on this host.
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Hash, Clone)]
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Hash, Clone, Serialize, Deserialize)]
pub struct Ifv6Addr {
/// The IP address of the interface.
pub ip: Ipv6Addr,
@ -73,7 +74,9 @@ pub struct Ifv6Addr {
}
/// Some of the flags associated with an interface.
#[derive(Debug, Default, PartialEq, Eq, Ord, PartialOrd, Hash, Clone, Copy)]
#[derive(
Debug, Default, PartialEq, Eq, Ord, PartialOrd, Hash, Clone, Copy, Serialize, Deserialize,
)]
pub struct InterfaceFlags {
pub is_loopback: bool,
pub is_running: bool,
@ -82,7 +85,9 @@ pub struct InterfaceFlags {
}
/// Some of the flags associated with an address.
#[derive(Debug, Default, PartialEq, Eq, Ord, PartialOrd, Hash, Clone, Copy)]
#[derive(
Debug, Default, PartialEq, Eq, Ord, PartialOrd, Hash, Clone, Copy, Serialize, Deserialize,
)]
pub struct AddressFlags {
// common flags
pub is_dynamic: bool,
@ -91,7 +96,7 @@ pub struct AddressFlags {
pub is_preferred: bool,
}
#[derive(PartialEq, Eq, Clone, Debug)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct InterfaceAddress {
if_addr: IfAddr,
flags: AddressFlags,
@ -226,7 +231,7 @@ impl InterfaceAddress {
// Wired, // Wired is usually free or cheap and high speed
// }
#[derive(PartialEq, Eq, Clone)]
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct NetworkInterface {
pub name: String,
pub flags: InterfaceFlags,

View File

@ -308,6 +308,42 @@ macro_rules! network_result_try {
};
}
#[macro_export]
macro_rules! log_network_result {
(error $text:expr) => {error!(
target: "network_result",
"{}",
$text,
)};
(error $fmt:literal, $($arg:expr),+) => {
error!(target: "network_result", $fmt, $($arg),+);
};
(warn $text:expr) => {warn!(
target: "network_result",
"{}",
$text,
)};
(warn $fmt:literal, $($arg:expr),+) => {
warn!(target:"network_result", $fmt, $($arg),+);
};
(debug $text:expr) => {debug!(
target: "network_result",
"{}",
$text,
)};
(debug $fmt:literal, $($arg:expr),+) => {
debug!(target:"network_result", $fmt, $($arg),+);
};
($text:expr) => {trace!(
target: "network_result",
"{}",
$text,
)};
($fmt:literal, $($arg:expr),+) => {
trace!(target:"network_result", $fmt, $($arg),+);
}
}
#[macro_export]
macro_rules! network_result_value_or_log {
($r:expr => $f:expr) => {

View File

@ -1,63 +0,0 @@
//! # Network Shim
//!
//! ## Networking abstraction layer
//!
//! Support for mocking and virtualizing network connections, as well as passing through to supported
//! networking functionality.
//!
//! The following structs are available that allow connecting to a centralized virtual
//! router to emulate a large scale network.
//!
//! * RouterClient
//! * RouterServer
//!
//! Additional traits are is implemented for all shimmed APIs that have static methods
//! like `new()`, `default()`, `connect()` and `bind()` to allow optional namespacing
//! such that the structs they produce are new network router clients with their own
//! distinct IP addresses, network segments, and network characteristics as allocated
//! by the [RouterServer].
//!
//! A singleton RouterClient is created by this module that is used by default unless the
//! extension traits on the shimmed APIs are used to override it with another RouterClient instance.
//!
//! ## Available shims
//!
//! API-compatible shims for:
//! * Tokio (Native)
//! - [tokio::net::TcpListener]
//! - [tokio::net::TcpStream]
//! - [tokio::net::UdpSocket]
//! - [tokio_stream::wrappers::TcpListenerStream]
//! * Async-std (Native)
//! - [async_std::net::TcpListener]
//! - [async_std::net::TcpStream]
//! - [async_std::net::UdpSocket]
//! - [async_std::net::Incoming]
//! * std::net (Native)
//! - [std::net::TcpListener]
//! - [std::net::TcpStream]
//! - [std::net::UdpSocket]
//! - [std::net::Incoming]
//! * ws_stream_wasm (browser WASM)
//! - [ws_stream_wasm::WsMeta]
//! - [ws_stream_wasm::WsStream]
//! - [ws_stream_wasm::WsStreamIo]
//!
//! ## Other modules leveraging this module
//!
//! * `veilid-core`'s network `native` and `wasm` modules
//! * This crate's `network_interfaces` module
//! * This crate's `dns_lookup` module
mod router_client;
mod router_server;
#[cfg(feature = "async-std")]
mod async_std;
mod std;
#[cfg(feature = "tokio")]
mod tokio;
#[cfg(feature = "tokio-stream")]
mod tokio_stream;
#[cfg(feature = "ws_stream_wasm")]
mod ws_stream_wasm;

View File

@ -0,0 +1,23 @@
use super::*;
pub type MachineId = u64;
#[derive(Clone)]
pub struct Machine {
pub router_client: RouterClient,
pub id: MachineId,
}
pub fn set_default_machine(machine: Machine) {
*DEFAULT_MACHINE.lock() = Some(machine);
}
pub fn take_default_machine() -> Option<Machine> {
DEFAULT_MACHINE.lock().take()
}
pub fn default_machine() -> Option<Machine> {
(*DEFAULT_MACHINE.lock()).clone()
}
static DEFAULT_MACHINE: Mutex<Option<Machine>> = Mutex::new(None);

View File

@ -0,0 +1,57 @@
//! # Virtual Network
//!
//! ## Networking abstraction layer
//!
//! Support for mocking and virtualizing network connections, as well as passing through to supported
//! networking functionality.
//!
//! The following structs are available that allow connecting to a centralized virtual
//! router to emulate a large scale network.
//!
//! * RouterClient
//! * RouterServer
//! * Machine
//!
//! Additional traits are is implemented for all shimmed APIs that have static methods
//! like `new()`, `default()`, `connect()` and `bind()` to allow optional namespacing
//! such that the structs they produce are new network router clients with their own
//! distinct IP addresses, network segments, and network characteristics as allocated
//! by the [RouterServer].
//!
//! A singleton RouterClient can be registered with this module that is used by default unless the
//! `*_with_machine` API are used to override it with another Machine instance.
//!
//! ## Available APIs
//!
//! [VirtualTcpStream]
//! [VirtualUdpSocket]
//! [VirtualTcpListenerStream]
//! [VirtualWsMeta]
//! [VirtualWsStream]
//!
//! Traits are implemented for [futures_util::AsyncRead] and [futures_util::AsyncWrite]
//! Conversion traits are available for use with Tokio
//!
//! ## Other modules leveraging this module
//!
//! * `veilid-core`'s network `native` and `wasm` modules
//! * This crate's `network_interfaces` module
//! * This crate's `dns_lookup` module
mod machine;
mod router_client;
mod router_op_table;
mod router_server;
mod serde_io_error;
mod virtual_network_error;
mod virtual_tcp_stream;
mod virtual_udp_socket;
use super::*;
pub use machine::*;
pub use router_client::*;
pub use router_server::*;
pub use virtual_network_error::*;
pub use virtual_tcp_stream::*;
pub use virtual_udp_socket::*;

View File

@ -0,0 +1,518 @@
use super::*;
use core::sync::atomic::AtomicU64;
use futures_codec::{Bytes, BytesCodec, FramedRead, FramedWrite};
use futures_util::{
io::BufReader, stream::FuturesUnordered, AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt,
StreamExt, TryStreamExt,
};
use postcard::{from_bytes, to_stdvec};
use router_op_table::*;
use serde::*;
use std::io;
use stop_token::future::FutureExt as _;
struct RouterClientInner {
jh_handler: Option<MustJoinHandle<()>>,
stop_source: Option<StopSource>,
}
struct RouterClientUnlockedInner {
receiver: flume::Receiver<Bytes>,
sender: flume::Sender<Bytes>,
next_message_id: AtomicU64,
router_op_waiter: RouterOpWaiter<ServerProcessorResponseStatus, ()>,
}
pub type MessageId = u64;
pub type SocketId = u64;
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
enum ServerProcessorRequest {
AllocateMachine,
ReleaseMachine {
machine_id: MachineId,
},
GetInterfaces {
machine_id: MachineId,
},
TcpConnect {
machine_id: MachineId,
local_address: Option<SocketAddr>,
remote_address: SocketAddr,
timeout_ms: u32,
options: VirtualTcpOptions,
},
TcpBind {
machine_id: MachineId,
local_address: Option<SocketAddr>,
options: VirtualTcpOptions,
},
TcpAccept {
machine_id: MachineId,
socket_id: SocketId,
},
Close {
machine_id: MachineId,
socket_id: SocketId,
},
UdpBind {
machine_id: MachineId,
local_address: Option<SocketAddr>,
options: VirtualUdpOptions,
},
Send {
machine_id: MachineId,
socket_id: SocketId,
data: Vec<u8>,
},
SendTo {
machine_id: MachineId,
socket_id: SocketId,
remote_address: SocketAddr,
data: Vec<u8>,
},
Recv {
machine_id: MachineId,
socket_id: u64,
len: u32,
},
RecvFrom {
machine_id: MachineId,
socket_id: u64,
len: u32,
},
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
struct ServerProcessorRequestMessage {
message_id: MessageId,
request: ServerProcessorRequest,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
enum ServerProcessorResponse {
AllocateMachine {
machine_id: MachineId,
},
ReleaseMachine,
GetInterfaces {
interfaces: BTreeMap<String, NetworkInterface>,
},
TcpConnect {
socket_id: SocketId,
},
TcpBind {
socket_id: SocketId,
},
TcpAccept {
child_socket_id: SocketId,
},
UdpBind {
socket_id: SocketId,
},
Close,
Send {
len: u32,
},
SendTo {
len: u32,
},
Recv {
data: Vec<u8>,
},
RecvFrom {
remote_address: SocketAddr,
data: Vec<u8>,
},
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
enum ServerProcessorResponseStatus {
Success(ServerProcessorResponse),
InvalidMachineId,
InvalidSocketId,
IoError(#[serde(with = "serde_io_error::SerdeIoErrorKindDef")] io::ErrorKind),
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
struct ServerProcessorResponseMessage {
message_id: MessageId,
status: ServerProcessorResponseStatus,
}
#[derive(Clone)]
pub struct RouterClient {
unlocked_inner: Arc<RouterClientUnlockedInner>,
inner: Arc<Mutex<RouterClientInner>>,
}
impl RouterClient {
//////////////////////////////////////////////////////////////////////////
// Public interface
#[cfg(not(target_arch = "wasm32"))]
pub async fn router_connect_tcp<H: ToSocketAddrs>(host: H) -> ::std::io::Result<RouterClient> {
let addrs = host.to_socket_addrs()?.collect::<Vec<_>>();
// Connect to RouterServer
let ts_reader;
let ts_writer;
cfg_if! {
if #[cfg(feature="rt-tokio")] {
let ts = ::tokio::net::TcpStream::connect(addrs.as_slice()).await?;
let (reader, writer) = ts.into_split();
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
ts_reader = reader.compat();
ts_writer = writer.compat_write();
} else if #[cfg(feature="rt-async-std")] {
use futures_util::io::AsyncReadExt;
ts = ::async_std::net::TcpStream::connect(addrs.as_slice()).await?;
(ts_reader, ts_writer) = ts.split();
} else {
compile_error!("must choose an executor");
}
}
let ts_buf_reader = BufReader::new(ts_reader);
// Create channels
let (client_sender, server_receiver) = flume::unbounded::<Bytes>();
let (server_sender, client_receiver) = flume::unbounded::<Bytes>();
// Create stopper
let stop_source = StopSource::new();
// Spawn a client connection handler
let jh_handler = spawn(
"RouterClient server processor",
Self::run_server_processor(
ts_buf_reader,
ts_writer,
server_receiver,
server_sender,
stop_source.token(),
),
);
Ok(Self::new(
client_receiver,
client_sender,
jh_handler,
stop_source,
))
}
#[cfg(target_arch = "wasm32")]
pub async fn router_connect_ws<H: AsRef<str>>(host: H) -> ::std::io::Result<RouterClient> {
let host = host.as_ref();
Ok(RouterClient {})
}
pub async fn disconnect(self) {
drop(self.inner.lock().stop_source.take());
let jh_handler = self.inner.lock().jh_handler.take();
if let Some(jh_handler) = jh_handler {
jh_handler.await;
}
}
pub async fn allocate_machine(self) -> VirtualNetworkResult<MachineId> {
let request = ServerProcessorRequest::AllocateMachine;
let ServerProcessorResponse::AllocateMachine { machine_id } =
self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(machine_id)
}
pub async fn release_machine(self, machine_id: MachineId) -> VirtualNetworkResult<()> {
let request = ServerProcessorRequest::ReleaseMachine { machine_id };
let ServerProcessorResponse::ReleaseMachine = self.perform_request(request).await? else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(())
}
pub async fn get_interfaces(
self,
machine_id: MachineId,
) -> VirtualNetworkResult<BTreeMap<String, NetworkInterface>> {
let request = ServerProcessorRequest::GetInterfaces { machine_id };
let ServerProcessorResponse::GetInterfaces { interfaces } =
self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(interfaces)
}
pub async fn tcp_connect(
self,
machine_id: MachineId,
remote_address: SocketAddr,
local_address: Option<SocketAddr>,
timeout_ms: u32,
options: VirtualTcpOptions,
) -> VirtualNetworkResult<SocketId> {
let request = ServerProcessorRequest::TcpConnect {
machine_id,
local_address,
remote_address,
timeout_ms,
options,
};
let ServerProcessorResponse::TcpConnect { socket_id } =
self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(socket_id)
}
pub async fn tcp_bind(
self,
machine_id: MachineId,
local_address: Option<SocketAddr>,
options: VirtualTcpOptions,
) -> VirtualNetworkResult<SocketId> {
let request = ServerProcessorRequest::TcpBind {
machine_id,
local_address,
options,
};
let ServerProcessorResponse::TcpBind { socket_id } = self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(socket_id)
}
pub async fn tcp_accept(
self,
machine_id: MachineId,
socket_id: SocketId,
) -> VirtualNetworkResult<SocketId> {
let request = ServerProcessorRequest::TcpAccept {
machine_id,
socket_id,
};
let ServerProcessorResponse::TcpAccept { child_socket_id } =
self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(child_socket_id)
}
pub async fn udp_bind(
self,
machine_id: MachineId,
local_address: Option<SocketAddr>,
options: VirtualUdpOptions,
) -> VirtualNetworkResult<SocketId> {
let request = ServerProcessorRequest::UdpBind {
machine_id,
local_address,
options,
};
let ServerProcessorResponse::UdpBind { socket_id } = self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(socket_id)
}
pub async fn close(
self,
machine_id: MachineId,
socket_id: SocketId,
) -> VirtualNetworkResult<()> {
let request = ServerProcessorRequest::Close {
machine_id,
socket_id,
};
let ServerProcessorResponse::Close = self.perform_request(request).await? else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(())
}
pub async fn send(
self,
machine_id: MachineId,
socket_id: SocketId,
data: Vec<u8>,
) -> VirtualNetworkResult<usize> {
let request = ServerProcessorRequest::Send {
machine_id,
socket_id,
data,
};
let ServerProcessorResponse::Send { len } = self.perform_request(request).await? else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(len as usize)
}
pub async fn send_to(
self,
machine_id: MachineId,
socket_id: SocketId,
remote_address: SocketAddr,
data: Vec<u8>,
) -> VirtualNetworkResult<usize> {
let request = ServerProcessorRequest::SendTo {
machine_id,
socket_id,
data,
remote_address,
};
let ServerProcessorResponse::SendTo { len } = self.perform_request(request).await? else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(len as usize)
}
pub async fn recv(
self,
machine_id: MachineId,
socket_id: u64,
len: u32,
) -> VirtualNetworkResult<Vec<u8>> {
let request = ServerProcessorRequest::Recv {
machine_id,
socket_id,
len,
};
let ServerProcessorResponse::Recv { data } = self.perform_request(request).await? else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(data)
}
pub async fn recv_from(
self,
machine_id: MachineId,
socket_id: u64,
len: u32,
) -> VirtualNetworkResult<(Vec<u8>, SocketAddr)> {
let request = ServerProcessorRequest::RecvFrom {
machine_id,
socket_id,
len,
};
let ServerProcessorResponse::RecvFrom {
data,
remote_address,
} = self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok((data, remote_address))
}
//////////////////////////////////////////////////////////////////////////
// Private implementation
fn new(
receiver: flume::Receiver<Bytes>,
sender: flume::Sender<Bytes>,
jh_handler: MustJoinHandle<()>,
stop_source: StopSource,
) -> RouterClient {
RouterClient {
unlocked_inner: Arc::new(RouterClientUnlockedInner {
receiver,
sender,
next_message_id: AtomicU64::new(0),
router_op_waiter: RouterOpWaiter::new(),
}),
inner: Arc::new(Mutex::new(RouterClientInner {
jh_handler: Some(jh_handler),
stop_source: Some(stop_source),
})),
}
}
async fn perform_request(
&self,
request: ServerProcessorRequest,
) -> VirtualNetworkResult<ServerProcessorResponse> {
let message_id = self
.unlocked_inner
.next_message_id
.fetch_add(1, Ordering::AcqRel);
let msg = ServerProcessorRequestMessage {
message_id,
request,
};
let msg_vec =
Bytes::from(to_stdvec(&msg).map_err(VirtualNetworkError::SerializationError)?);
self.unlocked_inner
.sender
.send_async(msg_vec)
.await
.map_err(|_| VirtualNetworkError::IoError(io::ErrorKind::BrokenPipe))?;
let handle = self
.unlocked_inner
.router_op_waiter
.add_op_waiter(message_id, ());
let status = self
.unlocked_inner
.router_op_waiter
.wait_for_op(handle)
.await
.map_err(|_| VirtualNetworkError::WaitError)?;
match status {
ServerProcessorResponseStatus::Success(server_processor_response) => {
Ok(server_processor_response)
}
ServerProcessorResponseStatus::InvalidMachineId => {
Err(VirtualNetworkError::InvalidMachineId)
}
ServerProcessorResponseStatus::InvalidSocketId => {
Err(VirtualNetworkError::InvalidSocketId)
}
ServerProcessorResponseStatus::IoError(k) => Err(VirtualNetworkError::IoError(k)),
}
}
async fn run_server_processor<R, W>(
reader: R,
writer: W,
receiver: flume::Receiver<Bytes>,
sender: flume::Sender<Bytes>,
stop_token: StopToken,
) where
R: AsyncReadExt + Unpin + Send,
W: AsyncWriteExt + Unpin + Send,
{
let mut unord = FuturesUnordered::new();
let framed_reader = FramedRead::new(reader, BytesCodec);
let framed_writer = FramedWrite::new(writer, BytesCodec);
let framed_writer_fut = system_boxed(async move {
if let Err(e) = receiver.into_stream().map(Ok).forward(framed_writer).await {
error!("{}", e);
}
});
let framed_reader_fut = system_boxed(async move {
if let Err(e) = framed_reader
.forward(sender.into_sink().sink_map_err(::std::io::Error::other))
.await
{
error!("{}", e);
}
});
unord.push(framed_writer_fut);
unord.push(framed_reader_fut);
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
}
}

View File

@ -0,0 +1,175 @@
use super::*;
pub type RouterOpId = u64;
#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
pub enum RouterOpWaitError<T> {
#[error("Send error: {0}")]
SendError(flume::SendError<T>),
#[error("Recv error: {0}")]
RecvError(flume::RecvError),
#[error("Unmatched operation id: {0}")]
UnmatchedOpId(RouterOpId),
#[error("Missing operation id: {0}")]
MissingOpId(RouterOpId),
}
#[derive(Debug)]
pub struct RouterOpWaitHandle<T, C>
where
T: Unpin,
C: Unpin + Clone,
{
waiter: RouterOpWaiter<T, C>,
op_id: RouterOpId,
result_receiver: Option<flume::Receiver<T>>,
}
impl<T, C> RouterOpWaitHandle<T, C>
where
T: Unpin,
C: Unpin + Clone,
{
pub fn id(&self) -> RouterOpId {
self.op_id
}
}
impl<T, C> Drop for RouterOpWaitHandle<T, C>
where
T: Unpin,
C: Unpin + Clone,
{
fn drop(&mut self) {
if self.result_receiver.is_some() {
self.waiter.cancel_op_waiter(self.op_id);
}
}
}
#[derive(Debug)]
struct RouterWaitingOp<T, C>
where
T: Unpin,
C: Unpin + Clone,
{
context: C,
result_sender: flume::Sender<T>,
}
#[derive(Debug)]
struct RouterOpWaiterInner<T, C>
where
T: Unpin,
C: Unpin + Clone,
{
waiting_op_table: HashMap<RouterOpId, RouterWaitingOp<T, C>>,
}
#[derive(Debug)]
pub(super) struct RouterOpWaiter<T, C>
where
T: Unpin,
C: Unpin + Clone,
{
inner: Arc<Mutex<RouterOpWaiterInner<T, C>>>,
}
impl<T, C> Clone for RouterOpWaiter<T, C>
where
T: Unpin,
C: Unpin + Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T, C> RouterOpWaiter<T, C>
where
T: Unpin,
C: Unpin + Clone,
{
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(RouterOpWaiterInner {
waiting_op_table: HashMap::new(),
})),
}
}
/// Set up wait for operation to complete
pub fn add_op_waiter(&self, op_id: RouterOpId, context: C) -> RouterOpWaitHandle<T, C> {
let mut inner = self.inner.lock();
let (result_sender, result_receiver) = flume::bounded(1);
let waiting_op = RouterWaitingOp {
context,
result_sender,
};
if inner.waiting_op_table.insert(op_id, waiting_op).is_some() {
error!(
"add_op_waiter collision should not happen for op_id {}",
op_id
);
}
RouterOpWaitHandle {
waiter: self.clone(),
op_id,
result_receiver: Some(result_receiver),
}
}
/// Get operation context
pub fn get_op_context(&self, op_id: RouterOpId) -> Result<C, RouterOpWaitError<T>> {
let inner = self.inner.lock();
let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else {
return Err(RouterOpWaitError::MissingOpId(op_id));
};
Ok(waiting_op.context.clone())
}
/// Remove wait for op
#[instrument(level = "trace", target = "rpc", skip_all)]
fn cancel_op_waiter(&self, op_id: RouterOpId) {
let mut inner = self.inner.lock();
inner.waiting_op_table.remove(&op_id);
}
/// Complete the waiting op
#[instrument(level = "trace", target = "rpc", skip_all)]
pub fn complete_op_waiter(
&self,
op_id: RouterOpId,
message: T,
) -> Result<(), RouterOpWaitError<T>> {
let waiting_op = {
let mut inner = self.inner.lock();
inner
.waiting_op_table
.remove(&op_id)
.ok_or_else(|| RouterOpWaitError::UnmatchedOpId(op_id))?
};
waiting_op
.result_sender
.send(message)
.map_err(RouterOpWaitError::SendError)
}
/// Wait for operation to complete
#[instrument(level = "trace", target = "rpc", skip_all)]
pub async fn wait_for_op(
&self,
mut handle: RouterOpWaitHandle<T, C>,
) -> Result<T, RouterOpWaitError<T>> {
// Take the receiver
// After this, we must manually cancel since the cancel on handle drop is disabled
let result_receiver = handle.result_receiver.take().unwrap();
let result_fut = result_receiver.recv_async();
// wait for eventualvalue
result_fut.await.map_err(RouterOpWaitError::RecvError)
}
}

View File

@ -0,0 +1,70 @@
use serde::*;
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
#[allow(deprecated)]
#[non_exhaustive]
#[serde(remote = "std::io::ErrorKind")]
pub enum SerdeIoErrorKindDef {
NotFound,
PermissionDenied,
ConnectionRefused,
ConnectionReset,
// #[cfg(feature = "io_error_more")]
// HostUnreachable,
// #[cfg(feature = "io_error_more")]
// NetworkUnreachable,
ConnectionAborted,
NotConnected,
AddrInUse,
AddrNotAvailable,
// #[cfg(feature = "io_error_more")]
// NetworkDown,
BrokenPipe,
AlreadyExists,
WouldBlock,
// #[cfg(feature = "io_error_more")]
// NotADirectory,
// #[cfg(feature = "io_error_more")]
// IsADirectory,
// #[cfg(feature = "io_error_more")]
// DirectoryNotEmpty,
// #[cfg(feature = "io_error_more")]
// ReadOnlyFilesystem,
// #[cfg(feature = "io_error_more")]
// FilesystemLoop,
// #[cfg(feature = "io_error_more")]
// StaleNetworkFileHandle,
InvalidInput,
InvalidData,
TimedOut,
WriteZero,
// #[cfg(feature = "io_error_more")]
// StorageFull,
// #[cfg(feature = "io_error_more")]
// NotSeekable,
// #[cfg(feature = "io_error_more")]
// FilesystemQuotaExceeded,
// #[cfg(feature = "io_error_more")]
// FileTooLarge,
// #[cfg(feature = "io_error_more")]
// ResourceBusy,
// #[cfg(feature = "io_error_more")]
// ExecutableFileBusy,
// #[cfg(feature = "io_error_more")]
// Deadlock,
// #[cfg(feature = "io_error_more")]
// CrossesDevices,
// #[cfg(feature = "io_error_more")]
// TooManyLinks,
// #[cfg(feature = "io_error_more")]
// InvalidFilename,
// #[cfg(feature = "io_error_more")]
// ArgumentListTooLong,
Interrupted,
Unsupported,
UnexpectedEof,
OutOfMemory,
Other,
// #[cfg(feature = "io_error_uncategorized")]
// Uncategorized,
}

View File

@ -0,0 +1,29 @@
use super::*;
use std::io;
#[derive(ThisError, Clone, Debug, PartialEq, Eq)]
pub enum VirtualNetworkError {
#[error("Serialization Error: {0}")]
SerializationError(postcard::Error),
#[error("Response Mismatch")]
ResponseMismatch,
#[error("Wait error")]
WaitError,
#[error("Invalid machine id")]
InvalidMachineId,
#[error("Invalid socket id")]
InvalidSocketId,
#[error("Io error: {0}")]
IoError(io::ErrorKind),
}
impl From<VirtualNetworkError> for io::Error {
fn from(value: VirtualNetworkError) -> Self {
match value {
VirtualNetworkError::IoError(e) => io::Error::from(e),
e => io::Error::other(e),
}
}
}
pub type VirtualNetworkResult<T> = Result<T, VirtualNetworkError>;

View File

@ -0,0 +1,138 @@
use super::*;
use futures_util::FutureExt;
use serde::*;
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct VirtualTcpOptions {
linger: Option<Duration>,
no_delay: bool,
only_v6: bool,
reuse_address_port: bool,
}
pub struct VirtualTcpStream {
machine: Machine,
socket_id: SocketId,
current_recv_fut: Option<SendPinBoxFuture<Result<Vec<u8>, VirtualNetworkError>>>,
current_send_fut: Option<SendPinBoxFuture<Result<usize, VirtualNetworkError>>>,
current_close_fut: Option<SendPinBoxFuture<Result<(), VirtualNetworkError>>>,
}
impl VirtualTcpStream {
pub async fn connect(
remote_address: SocketAddr,
local_address: Option<SocketAddr>,
timeout_ms: u32,
options: VirtualTcpOptions,
) -> VirtualNetworkResult<Self> {
let machine = default_machine().unwrap();
Self::connect_with_machine(machine, remote_address, local_address, timeout_ms, options)
.await
}
pub async fn connect_with_machine(
machine: Machine,
remote_address: SocketAddr,
local_address: Option<SocketAddr>,
timeout_ms: u32,
options: VirtualTcpOptions,
) -> VirtualNetworkResult<Self> {
machine
.router_client
.clone()
.tcp_connect(
machine.id,
remote_address,
local_address,
timeout_ms,
options,
)
.await
.map(|socket_id| Self {
machine,
socket_id,
current_recv_fut: None,
current_send_fut: None,
current_close_fut: None,
})
}
}
impl futures_util::AsyncRead for VirtualTcpStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut [u8],
) -> task::Poll<std::io::Result<usize>> {
if self.current_recv_fut.is_none() {
self.current_recv_fut = Some(Box::pin(self.machine.router_client.clone().recv(
self.machine.id,
self.socket_id,
buf.len() as u32,
)));
}
let fut = self.current_recv_fut.as_mut().unwrap();
fut.poll_unpin(cx).map(|v| match v {
Ok(v) => {
let len = usize::min(buf.len(), v.len());
buf[0..len].copy_from_slice(&v[0..len]);
self.current_recv_fut = None;
Ok(len)
}
Err(e) => Err(e.into()),
})
}
}
impl futures_util::AsyncWrite for VirtualTcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
) -> task::Poll<std::io::Result<usize>> {
if self.current_send_fut.is_none() {
self.current_send_fut = Some(Box::pin(self.machine.router_client.clone().send(
self.machine.id,
self.socket_id,
buf.to_vec(),
)));
}
let fut = self.current_send_fut.as_mut().unwrap();
fut.poll_unpin(cx).map(|v| match v {
Ok(v) => {
self.current_send_fut = None;
Ok(v)
}
Err(e) => Err(e.into()),
})
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
) -> task::Poll<std::io::Result<()>> {
task::Poll::Ready(Ok(()))
}
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<std::io::Result<()>> {
if self.current_close_fut.is_none() {
self.current_close_fut = Some(Box::pin(
self.machine
.router_client
.clone()
.close(self.machine.id, self.socket_id),
));
}
let fut = self.current_close_fut.as_mut().unwrap();
fut.poll_unpin(cx).map(|v| match v {
Ok(v) => {
self.current_close_fut = None;
Ok(v)
}
Err(e) => Err(e.into()),
})
}
}

View File

@ -0,0 +1,56 @@
use super::*;
use serde::*;
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct VirtualUdpOptions {
only_v6: bool,
reuse_address_port: bool,
}
pub struct VirtualUdpSocket {
machine: Machine,
socket_id: SocketId,
}
impl VirtualUdpSocket {
// pub async fn connect(
// remote_address: SocketAddr,
// local_address: Option<SocketAddr>,
// timeout_ms: u32,
// options: VirtualTcpOptions,
// ) -> VirtualNetworkResult<Self> {
// let machine = default_machine().unwrap();
// Self::connect_with_machine(machine, remote_address, local_address, timeout_ms, options)
// .await
// }
// pub async fn connect_with_machine(
// machine: Machine,
// remote_address: SocketAddr,
// local_address: Option<SocketAddr>,
// timeout_ms: u32,
// options: VirtualTcpOptions,
// ) -> VirtualNetworkResult<Self> {
// machine
// .router_client
// .tcp_connect(
// machine.id,
// remote_address,
// local_address,
// timeout_ms,
// options,
// )
// .await
// .map(|socket_id| Self { machine, socket_id })
// }
}
// impl futures_util::AsyncRead for VirtualUdpSocket {
// fn poll_read(
// self: Pin<&mut Self>,
// cx: &mut task::Context<'_>,
// buf: &mut [u8],
// ) -> task::Poll<std::io::Result<usize>> {
// todo!()
// }
// }