mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-02-05 01:15:32 -05:00
listener
This commit is contained in:
parent
51b78de530
commit
77fb665598
52
Cargo.lock
generated
52
Cargo.lock
generated
@ -530,6 +530,19 @@ dependencies = [
|
|||||||
"tungstenite 0.23.0",
|
"tungstenite 0.23.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-tungstenite"
|
||||||
|
version = "0.28.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "90e661b6cb0a6eb34d02c520b052daa3aa9ac0cc02495c9d066bbce13ead132b"
|
||||||
|
dependencies = [
|
||||||
|
"futures-io",
|
||||||
|
"futures-util",
|
||||||
|
"log",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tungstenite 0.24.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async_executors"
|
name = "async_executors"
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
@ -6106,6 +6119,24 @@ dependencies = [
|
|||||||
"utf-8",
|
"utf-8",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tungstenite"
|
||||||
|
version = "0.24.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
|
||||||
|
dependencies = [
|
||||||
|
"byteorder",
|
||||||
|
"bytes 1.8.0",
|
||||||
|
"data-encoding",
|
||||||
|
"http 1.1.0",
|
||||||
|
"httparse",
|
||||||
|
"log",
|
||||||
|
"rand",
|
||||||
|
"sha1",
|
||||||
|
"thiserror",
|
||||||
|
"utf-8",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "typenum"
|
name = "typenum"
|
||||||
version = "1.17.0"
|
version = "1.17.0"
|
||||||
@ -6519,6 +6550,7 @@ dependencies = [
|
|||||||
"async-io 1.13.0",
|
"async-io 1.13.0",
|
||||||
"async-lock 3.4.0",
|
"async-lock 3.4.0",
|
||||||
"async-std",
|
"async-std",
|
||||||
|
"async-tungstenite 0.28.0",
|
||||||
"async_executors",
|
"async_executors",
|
||||||
"backtrace",
|
"backtrace",
|
||||||
"cfg-if 1.0.0",
|
"cfg-if 1.0.0",
|
||||||
@ -6573,6 +6605,7 @@ dependencies = [
|
|||||||
"wasm-logger",
|
"wasm-logger",
|
||||||
"wee_alloc",
|
"wee_alloc",
|
||||||
"winapi",
|
"winapi",
|
||||||
|
"ws_stream_tungstenite",
|
||||||
"ws_stream_wasm",
|
"ws_stream_wasm",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -7185,6 +7218,25 @@ version = "0.5.5"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
|
checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ws_stream_tungstenite"
|
||||||
|
version = "0.14.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ed39ff9f8b2eda91bf6390f9f49eee93d655489e15708e3bb638c1c4f07cecb4"
|
||||||
|
dependencies = [
|
||||||
|
"async-tungstenite 0.28.0",
|
||||||
|
"async_io_stream",
|
||||||
|
"bitflags 2.6.0",
|
||||||
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
|
"futures-sink",
|
||||||
|
"futures-util",
|
||||||
|
"pharos",
|
||||||
|
"rustc_version",
|
||||||
|
"tracing",
|
||||||
|
"tungstenite 0.24.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ws_stream_wasm"
|
name = "ws_stream_wasm"
|
||||||
version = "0.7.4"
|
version = "0.7.4"
|
||||||
|
@ -56,7 +56,7 @@ veilid_core_ios_tests = ["dep:tracing-oslog"]
|
|||||||
debug-locks = ["veilid-tools/debug-locks"]
|
debug-locks = ["veilid-tools/debug-locks"]
|
||||||
unstable-blockstore = []
|
unstable-blockstore = []
|
||||||
unstable-tunnels = []
|
unstable-tunnels = []
|
||||||
virtual-network = []
|
virtual-network = ["veilid-tools/virtual-network"]
|
||||||
|
|
||||||
# GeoIP
|
# GeoIP
|
||||||
geolocation = ["maxminddb", "reqwest"]
|
geolocation = ["maxminddb", "reqwest"]
|
||||||
|
@ -17,7 +17,7 @@ crate-type = ["cdylib", "staticlib", "rlib"]
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["rt-tokio"]
|
default = ["rt-tokio", "virtual-network", "virtual-network-server"]
|
||||||
rt-async-std = [
|
rt-async-std = [
|
||||||
"async-std",
|
"async-std",
|
||||||
"async_executors/async_std",
|
"async_executors/async_std",
|
||||||
@ -43,6 +43,9 @@ veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"]
|
|||||||
tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"]
|
tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"]
|
||||||
debug-locks = []
|
debug-locks = []
|
||||||
|
|
||||||
|
virtual-network = []
|
||||||
|
virtual-network-server = ["dep:ws_stream_tungstenite", "dep:async-tungstenite"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tracing = { version = "0.1.40", features = [
|
tracing = { version = "0.1.40", features = [
|
||||||
"log",
|
"log",
|
||||||
@ -92,6 +95,8 @@ socket2 = { version = "0.5.7", features = ["all"] }
|
|||||||
tokio = { version = "1.38.1", features = ["full"], optional = true }
|
tokio = { version = "1.38.1", features = ["full"], optional = true }
|
||||||
tokio-util = { version = "0.7.11", features = ["compat"], optional = true }
|
tokio-util = { version = "0.7.11", features = ["compat"], optional = true }
|
||||||
tokio-stream = { version = "0.1.15", features = ["net"], optional = true }
|
tokio-stream = { version = "0.1.15", features = ["net"], optional = true }
|
||||||
|
ws_stream_tungstenite = { version = "0.14.0", optional = true }
|
||||||
|
async-tungstenite = { version = "0.28.0", optional = true }
|
||||||
|
|
||||||
# Dependencies for WASM builds only
|
# Dependencies for WASM builds only
|
||||||
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
|
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
|
||||||
|
@ -40,10 +40,14 @@
|
|||||||
//! * This crate's `network_interfaces` module
|
//! * This crate's `network_interfaces` module
|
||||||
//! * This crate's `dns_lookup` module
|
//! * This crate's `dns_lookup` module
|
||||||
|
|
||||||
|
mod commands;
|
||||||
mod machine;
|
mod machine;
|
||||||
mod router_client;
|
mod router_client;
|
||||||
mod router_op_table;
|
mod router_op_table;
|
||||||
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
|
#[cfg(all(
|
||||||
|
feature = "virtual-network-server",
|
||||||
|
not(all(target_arch = "wasm32", target_os = "unknown"))
|
||||||
|
))]
|
||||||
mod router_server;
|
mod router_server;
|
||||||
mod serde_io_error;
|
mod serde_io_error;
|
||||||
mod virtual_gateway;
|
mod virtual_gateway;
|
||||||
@ -54,10 +58,14 @@ mod virtual_tcp_stream;
|
|||||||
mod virtual_udp_socket;
|
mod virtual_udp_socket;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use commands::*;
|
||||||
|
|
||||||
pub use machine::*;
|
pub use machine::*;
|
||||||
pub use router_client::*;
|
pub use router_client::*;
|
||||||
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
|
#[cfg(all(
|
||||||
|
feature = "virtual-network-server",
|
||||||
|
not(all(target_arch = "wasm32", target_os = "unknown"))
|
||||||
|
))]
|
||||||
pub use router_server::*;
|
pub use router_server::*;
|
||||||
pub use virtual_gateway::*;
|
pub use virtual_gateway::*;
|
||||||
pub use virtual_network_error::*;
|
pub use virtual_network_error::*;
|
||||||
|
@ -618,14 +618,14 @@ impl RouterClient {
|
|||||||
.into_stream()
|
.into_stream()
|
||||||
.map(io::Result::<ServerProcessorEvent>::Ok);
|
.map(io::Result::<ServerProcessorEvent>::Ok);
|
||||||
|
|
||||||
let framed_reader_fut = system_boxed(async move {
|
let receiver_fut = system_boxed(async move {
|
||||||
let fut =
|
let fut =
|
||||||
receiver.try_for_each(|evt| Self::process_event(evt, router_op_waiter.clone()));
|
receiver.try_for_each(|evt| Self::process_event(evt, router_op_waiter.clone()));
|
||||||
if let Err(e) = fut.await {
|
if let Err(e) = fut.await {
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
unord.push(framed_reader_fut);
|
unord.push(receiver_fut);
|
||||||
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
|
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,64 +1,317 @@
|
|||||||
mod commands;
|
|
||||||
|
|
||||||
pub(super) use commands::*;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use async_tungstenite::accept_async;
|
||||||
use futures_codec::{Bytes, BytesCodec, FramedRead, FramedWrite};
|
use futures_codec::{Bytes, BytesCodec, FramedRead, FramedWrite};
|
||||||
|
use futures_util::{stream::FuturesUnordered, AsyncReadExt, StreamExt, TryStreamExt};
|
||||||
|
use postcard::{from_bytes, to_stdvec};
|
||||||
|
use std::io;
|
||||||
|
use stop_token::future::FutureExt as _;
|
||||||
|
use ws_stream_tungstenite::*;
|
||||||
|
|
||||||
#[derive(ThisError, Debug, Clone, PartialEq, Eq)]
|
#[derive(ThisError, Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum RouterServerError {
|
pub enum RouterServerError {
|
||||||
#[error("Serialization Error: {0}")]
|
#[error("Serialization Error: {0}")]
|
||||||
SerializationError(postcard::Error),
|
SerializationError(postcard::Error),
|
||||||
|
#[error("IO Error: {0}")]
|
||||||
|
IoError(io::ErrorKind),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type RouterServerResult<T> = Result<T, RouterServerError>;
|
pub type RouterServerResult<T> = Result<T, RouterServerError>;
|
||||||
|
|
||||||
|
pub const DEFAULT_VIRTUAL_ROUTER_PORT_TCP: u16 = 5149u16;
|
||||||
|
pub const DEFAULT_VIRTUAL_ROUTER_PORT_WS: u16 = 5148u16;
|
||||||
|
|
||||||
|
enum RunLoopEvent {
|
||||||
|
AddClient(SendPinBoxFuture<RunLoopEvent>),
|
||||||
|
Done,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct RouterServerUnlockedInner {
|
||||||
|
new_client_sender: flume::Sender<SendPinBoxFuture<RunLoopEvent>>,
|
||||||
|
new_client_receiver: flume::Receiver<SendPinBoxFuture<RunLoopEvent>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct RouterServerInner {
|
||||||
|
//tcp_connections: HashMap<
|
||||||
|
}
|
||||||
|
|
||||||
/// Router server for virtual networking
|
/// Router server for virtual networking
|
||||||
///
|
///
|
||||||
/// Connect to this with a `RouterClient`. Simulates machines, allocates sockets
|
/// Connect to this with a `RouterClient`. Simulates machines, allocates sockets
|
||||||
/// and gateways, manages a virtual simulated Internet and routes packets
|
/// and gateways, manages a virtual simulated Internet and routes packets
|
||||||
/// virtually between `Machines` associated with `RouterClient`s.
|
/// virtually between `Machines` associated with `RouterClient`s.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct RouterServer {
|
pub struct RouterServer {
|
||||||
//tcp_connections: HashMap<
|
unlocked_inner: Arc<RouterServerUnlockedInner>,
|
||||||
client_inbound_sender: flume::Sender<Bytes>,
|
inner: Arc<Mutex<RouterServerInner>>,
|
||||||
client_inbound_receiver: flume::Receiver<Bytes>,
|
|
||||||
local_inbound_sender: flume::Sender<ServerProcessorCommand>,
|
|
||||||
local_inbound_receiver: flume::Receiver<ServerProcessorCommand>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RouterServer {
|
impl RouterServer {
|
||||||
|
////////////////////////////////////////////////////////////////////
|
||||||
|
// Public Interface
|
||||||
|
|
||||||
/// Create a router server for virtual networking
|
/// Create a router server for virtual networking
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {}
|
// Make a channel to receive new clients
|
||||||
|
let (new_client_sender, new_client_receiver) = flume::unbounded();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
unlocked_inner: Arc::new(RouterServerUnlockedInner {
|
||||||
|
new_client_sender,
|
||||||
|
new_client_receiver,
|
||||||
|
}),
|
||||||
|
inner: Arc::new(Mutex::new(RouterServerInner {})),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_connection<R, W>(self, reader: R, writer: W) -> RunLoopEvent
|
||||||
|
where
|
||||||
|
R: AsyncRead + Send + Unpin,
|
||||||
|
W: AsyncWrite + Send + Unpin,
|
||||||
|
{
|
||||||
|
let framed_reader = FramedRead::new(reader, BytesCodec);
|
||||||
|
let framed_writer = FramedWrite::new(writer, BytesCodec);
|
||||||
|
|
||||||
|
let (outbound_sender, outbound_receiver) = flume::unbounded();
|
||||||
|
let outbound_fut = system_boxed(
|
||||||
|
outbound_receiver
|
||||||
|
.into_stream()
|
||||||
|
.map(|command| {
|
||||||
|
to_stdvec(&command)
|
||||||
|
.map_err(io::Error::other)
|
||||||
|
.map(Bytes::from)
|
||||||
|
})
|
||||||
|
.forward(framed_writer),
|
||||||
|
);
|
||||||
|
|
||||||
|
let inbound_fut = system_boxed(framed_reader.try_for_each(|x| async {
|
||||||
|
let x = x;
|
||||||
|
let cmd = from_bytes::<ServerProcessorCommand>(&x).map_err(io::Error::other)?;
|
||||||
|
|
||||||
|
self.clone()
|
||||||
|
.process_command(cmd, outbound_sender.clone())
|
||||||
|
.await
|
||||||
|
.map_err(io::Error::other)
|
||||||
|
}));
|
||||||
|
|
||||||
|
let mut unord = FuturesUnordered::new();
|
||||||
|
unord.push(outbound_fut);
|
||||||
|
unord.push(inbound_fut);
|
||||||
|
|
||||||
|
if let Some(Err(e)) = unord.next().await {
|
||||||
|
error!("{}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
RunLoopEvent::Done
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Accept RouterClient connections on a TCP socket
|
/// Accept RouterClient connections on a TCP socket
|
||||||
pub fn listen_tcp(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
|
pub async fn listen_tcp(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
|
||||||
Ok(())
|
let listener = TcpListener::bind(addr.unwrap_or(SocketAddr::V6(SocketAddrV6::new(
|
||||||
|
Ipv6Addr::UNSPECIFIED,
|
||||||
|
DEFAULT_VIRTUAL_ROUTER_PORT_TCP,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
))))
|
||||||
|
.await
|
||||||
|
.map_err(|e| RouterServerError::IoError(e.kind()))?;
|
||||||
|
|
||||||
|
let stop_source = StopSource::new();
|
||||||
|
let stop_token = stop_source.token();
|
||||||
|
|
||||||
|
let this = self.clone();
|
||||||
|
let listener_fut = system_boxed(async move {
|
||||||
|
loop {
|
||||||
|
// Wait for a new connection
|
||||||
|
match listener.accept().timeout_at(stop_token.clone()).await {
|
||||||
|
Ok(Ok((conn, _addr))) => {
|
||||||
|
let conn = conn.compat();
|
||||||
|
// Register a connection processing inbound receiver
|
||||||
|
let this2 = this.clone();
|
||||||
|
let inbound_receiver_fut = system_boxed(async move {
|
||||||
|
let (reader, writer) = conn.split();
|
||||||
|
|
||||||
|
this2.process_connection(reader, writer).await
|
||||||
|
});
|
||||||
|
if let Err(e) = this
|
||||||
|
.unlocked_inner
|
||||||
|
.new_client_sender
|
||||||
|
.send(inbound_receiver_fut)
|
||||||
|
{
|
||||||
|
// Error register connection processor
|
||||||
|
error!("{}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
// Error processing an accept
|
||||||
|
error!("{}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Stop requested
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RunLoopEvent::Done
|
||||||
|
});
|
||||||
|
|
||||||
|
self.unlocked_inner
|
||||||
|
.new_client_sender
|
||||||
|
.send(listener_fut)
|
||||||
|
.expect("should be able to send client");
|
||||||
|
|
||||||
|
Ok(stop_source)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Accept RouterClient connections on a WebSocket
|
/// Accept RouterClient connections on a WebSocket
|
||||||
pub fn listen_ws(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
|
pub async fn listen_ws(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
|
||||||
Ok(())
|
let listener = TcpListener::bind(addr.unwrap_or(SocketAddr::V6(SocketAddrV6::new(
|
||||||
|
Ipv6Addr::UNSPECIFIED,
|
||||||
|
DEFAULT_VIRTUAL_ROUTER_PORT_WS,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
))))
|
||||||
|
.await
|
||||||
|
.map_err(|e| RouterServerError::IoError(e.kind()))?;
|
||||||
|
|
||||||
|
let stop_source = StopSource::new();
|
||||||
|
let stop_token = stop_source.token();
|
||||||
|
|
||||||
|
let this = self.clone();
|
||||||
|
let listener_fut = system_boxed(async move {
|
||||||
|
loop {
|
||||||
|
// Wait for a new connection
|
||||||
|
match listener.accept().timeout_at(stop_token.clone()).await {
|
||||||
|
Ok(Ok((conn, _addr))) => {
|
||||||
|
let conn = conn.compat();
|
||||||
|
if let Ok(s) = accept_async(conn).await {
|
||||||
|
let ws = WsStream::new(s);
|
||||||
|
// Register a connection processing inbound receiver
|
||||||
|
let this2 = this.clone();
|
||||||
|
let inbound_receiver_fut = system_boxed(async move {
|
||||||
|
let (reader, writer) = ws.split();
|
||||||
|
this2.process_connection(reader, writer).await
|
||||||
|
});
|
||||||
|
if let Err(e) = this
|
||||||
|
.unlocked_inner
|
||||||
|
.new_client_sender
|
||||||
|
.send(inbound_receiver_fut)
|
||||||
|
{
|
||||||
|
// Error register connection processor
|
||||||
|
error!("{}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
// Error processing an accept
|
||||||
|
error!("{}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Stop requested
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RunLoopEvent::Done
|
||||||
|
});
|
||||||
|
|
||||||
|
self.unlocked_inner
|
||||||
|
.new_client_sender
|
||||||
|
.send(listener_fut)
|
||||||
|
.expect("should be able to send client");
|
||||||
|
|
||||||
|
Ok(stop_source)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a local RouterClient
|
/// Return a local RouterClient
|
||||||
pub fn router_client(&self) -> RouterClient {
|
pub fn router_client(&self) -> RouterClient {
|
||||||
// Create the outbound channel
|
// Create the inbound/outbound channels
|
||||||
xxxx get these channels right
|
let (local_inbound_sender, local_inbound_receiver) = flume::unbounded();
|
||||||
let (local_outbound_sender, local_outbound_receiver) = flume::unbounded();
|
let (local_outbound_sender, local_outbound_receiver) = flume::unbounded();
|
||||||
|
|
||||||
|
let this = self.clone();
|
||||||
|
let inbound_receiver_fut = system_boxed(async move {
|
||||||
|
let fut = local_inbound_receiver
|
||||||
|
.into_stream()
|
||||||
|
.map(Ok)
|
||||||
|
.try_for_each(|cmd| {
|
||||||
|
this.clone()
|
||||||
|
.process_command(cmd, local_outbound_sender.clone())
|
||||||
|
});
|
||||||
|
if let Err(e) = fut.await {
|
||||||
|
error!("{}", e);
|
||||||
|
}
|
||||||
|
RunLoopEvent::Done
|
||||||
|
});
|
||||||
|
|
||||||
|
// Send the new client to the run loop
|
||||||
|
self.unlocked_inner
|
||||||
|
.new_client_sender
|
||||||
|
.send(inbound_receiver_fut)
|
||||||
|
.expect("should be able to send client");
|
||||||
|
|
||||||
// Create a RouterClient directly connected to this RouterServer
|
// Create a RouterClient directly connected to this RouterServer
|
||||||
RouterClient::local_router_client(
|
RouterClient::local_router_client(local_inbound_sender, local_outbound_receiver)
|
||||||
self.local_inbound_sender.clone(),
|
|
||||||
local_outbound_receiver,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run the router server until a stop is requested
|
/// Run the router server until a stop is requested
|
||||||
pub async fn run(&self, stop_token: StopToken) -> RouterServerResult<()> {
|
pub async fn run(&self, stop_token: StopToken) -> RouterServerResult<()> {
|
||||||
|
let mut unord = FuturesUnordered::<SendPinBoxFuture<RunLoopEvent>>::new();
|
||||||
|
|
||||||
|
let mut need_new_client_fut = true;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if need_new_client_fut {
|
||||||
|
let new_client_receiver = self.unlocked_inner.new_client_receiver.clone();
|
||||||
|
unord.push(Box::pin(async move {
|
||||||
|
if let Ok(res) = new_client_receiver.into_recv_async().await {
|
||||||
|
return RunLoopEvent::AddClient(res);
|
||||||
|
}
|
||||||
|
RunLoopEvent::Done
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
match unord.next().timeout_at(stop_token.clone()).await {
|
||||||
|
Ok(Some(RunLoopEvent::AddClient(client_fut))) => {
|
||||||
|
// Add new client
|
||||||
|
unord.push(client_fut);
|
||||||
|
|
||||||
|
// Wait for next new client
|
||||||
|
need_new_client_fut = true;
|
||||||
|
}
|
||||||
|
Ok(Some(RunLoopEvent::Done)) => {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
// Finished normally
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Stop requested
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////
|
||||||
|
// Private Implementation
|
||||||
|
|
||||||
|
async fn process_command(
|
||||||
|
self,
|
||||||
|
cmd: ServerProcessorCommand,
|
||||||
|
outbound_sender: flume::Sender<ServerProcessorEvent>,
|
||||||
|
) -> RouterServerResult<()> {
|
||||||
|
//
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user