mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-08-09 23:23:00 -04:00
more refactor, not quite done.
This commit is contained in:
parent
55a44e0c8f
commit
e32984a5aa
19 changed files with 318 additions and 339 deletions
|
@ -1,5 +1,6 @@
|
|||
use crate::connection_table::*;
|
||||
use crate::intf::*;
|
||||
use crate::network_connection::*;
|
||||
use crate::network_manager::*;
|
||||
use crate::xx::*;
|
||||
use crate::*;
|
||||
|
@ -8,73 +9,10 @@ use futures_util::stream::{FuturesUnordered, StreamExt};
|
|||
|
||||
const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize;
|
||||
|
||||
///////////////////////////////////////////////////////////
|
||||
// Accept
|
||||
|
||||
cfg_if! {
|
||||
if #[cfg(not(target_arch = "wasm32"))] {
|
||||
use async_std::net::*;
|
||||
use utils::async_peek_stream::*;
|
||||
|
||||
pub trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync {
|
||||
fn on_accept(
|
||||
&self,
|
||||
stream: AsyncPeekStream,
|
||||
peer_addr: SocketAddr,
|
||||
) -> SystemPinBoxFuture<Result<Option<NetworkConnection>, String>>;
|
||||
}
|
||||
|
||||
pub trait ProtocolAcceptHandlerClone {
|
||||
fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler>;
|
||||
}
|
||||
|
||||
impl<T> ProtocolAcceptHandlerClone for T
|
||||
where
|
||||
T: 'static + ProtocolAcceptHandler + Clone,
|
||||
{
|
||||
fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
impl Clone for Box<dyn ProtocolAcceptHandler> {
|
||||
fn clone(&self) -> Box<dyn ProtocolAcceptHandler> {
|
||||
self.clone_box()
|
||||
}
|
||||
}
|
||||
|
||||
pub type NewProtocolAcceptHandler =
|
||||
dyn Fn(VeilidConfig, bool, SocketAddr) -> Box<dyn ProtocolAcceptHandler> + Send;
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////
|
||||
// Dummy network connection for testing
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct DummyNetworkConnection {
|
||||
descriptor: ConnectionDescriptor,
|
||||
}
|
||||
|
||||
impl DummyNetworkConnection {
|
||||
pub fn new(descriptor: ConnectionDescriptor) -> Self {
|
||||
Self { descriptor }
|
||||
}
|
||||
pub fn connection_descriptor(&self) -> ConnectionDescriptor {
|
||||
self.descriptor.clone()
|
||||
}
|
||||
pub async fn send(&self, _message: Vec<u8>) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
pub async fn recv(&self) -> Result<Vec<u8>, String> {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////
|
||||
// Connection manager
|
||||
|
||||
pub struct ConnectionManagerInner {
|
||||
network_manager: NetworkManager,
|
||||
struct ConnectionManagerInner {
|
||||
connection_table: ConnectionTable,
|
||||
connection_processor_jh: Option<JoinHandle<()>>,
|
||||
connection_add_channel_tx: Option<utils::channel::Sender<SystemPinBoxFuture<()>>>,
|
||||
|
@ -88,75 +26,98 @@ impl core::fmt::Debug for ConnectionManagerInner {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectionManager {
|
||||
inner: Arc<Mutex<ConnectionManagerInner>>,
|
||||
struct ConnectionManagerArc {
|
||||
network_manager: NetworkManager,
|
||||
inner: AsyncMutex<ConnectionManagerInner>,
|
||||
}
|
||||
impl core::fmt::Debug for ConnectionManager {
|
||||
impl core::fmt::Debug for ConnectionManagerArc {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
f.debug_struct("ConnectionManager")
|
||||
.field("inner", &*self.inner.lock())
|
||||
f.debug_struct("ConnectionManagerArc")
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ConnectionManager {
|
||||
arc: Arc<ConnectionManagerArc>,
|
||||
}
|
||||
|
||||
impl ConnectionManager {
|
||||
fn new_inner(network_manager: NetworkManager) -> ConnectionManagerInner {
|
||||
fn new_inner() -> ConnectionManagerInner {
|
||||
ConnectionManagerInner {
|
||||
network_manager,
|
||||
connection_table: ConnectionTable::new(),
|
||||
connection_processor_jh: None,
|
||||
connection_add_channel_tx: None,
|
||||
}
|
||||
}
|
||||
fn new_arc(network_manager: NetworkManager) -> ConnectionManagerArc {
|
||||
ConnectionManagerArc {
|
||||
network_manager,
|
||||
inner: AsyncMutex::new(Self::new_inner()),
|
||||
}
|
||||
}
|
||||
pub fn new(network_manager: NetworkManager) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
|
||||
arc: Arc::new(Self::new_arc(network_manager)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn network_manager(&self) -> NetworkManager {
|
||||
self.inner.lock().network_manager.clone()
|
||||
self.arc.network_manager.clone()
|
||||
}
|
||||
|
||||
pub async fn startup(&self) {
|
||||
let mut inner = self.arc.inner.lock().await;
|
||||
let cac = utils::channel::channel(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config
|
||||
self.inner.lock().connection_add_channel_tx = Some(cac.0);
|
||||
inner.connection_add_channel_tx = Some(cac.0);
|
||||
let rx = cac.1.clone();
|
||||
let this = self.clone();
|
||||
self.inner.lock().connection_processor_jh = Some(spawn(this.connection_processor(rx)));
|
||||
inner.connection_processor_jh = Some(spawn(this.connection_processor(rx)));
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
*self.inner.lock() = Self::new_inner(self.network_manager());
|
||||
*self.arc.inner.lock().await = Self::new_inner();
|
||||
}
|
||||
|
||||
// Returns a network connection if one already is established
|
||||
pub fn get_connection(&self, descriptor: &ConnectionDescriptor) -> Option<NetworkConnection> {
|
||||
self.inner
|
||||
.lock()
|
||||
.connection_table
|
||||
.get_connection(descriptor)
|
||||
.map(|e| e.conn)
|
||||
pub async fn get_connection(
|
||||
&self,
|
||||
descriptor: &ConnectionDescriptor,
|
||||
) -> Option<NetworkConnection> {
|
||||
let inner = self.arc.inner.lock().await;
|
||||
inner.connection_table.get_connection(descriptor)
|
||||
}
|
||||
|
||||
// Internal routine to register new connection
|
||||
async fn on_new_connection_internal(
|
||||
&self,
|
||||
mut inner: AsyncMutexGuard<'_, ConnectionManagerInner>,
|
||||
conn: NetworkConnection,
|
||||
) -> Result<(), String> {
|
||||
let tx = inner
|
||||
.connection_add_channel_tx
|
||||
.as_ref()
|
||||
.ok_or_else(fn_string!("connection channel isn't open yet"))?
|
||||
.clone();
|
||||
|
||||
let receiver_loop_future = Self::process_connection(self.clone(), conn.clone());
|
||||
tx.try_send(receiver_loop_future)
|
||||
.await
|
||||
.map_err(map_to_string)
|
||||
.map_err(logthru_net!(error "failed to start receiver loop"))?;
|
||||
|
||||
// If the receiver loop started successfully,
|
||||
// add the new connection to the table
|
||||
inner.connection_table.add_connection(conn)
|
||||
}
|
||||
|
||||
// Called by low-level network when any connection-oriented protocol connection appears
|
||||
// either from incoming or outgoing connections. Registers connection in the connection table for later access
|
||||
// and spawns a message processing loop for the connection
|
||||
pub async fn on_new_connection(&self, conn: NetworkConnection) -> Result<(), String> {
|
||||
let tx = self
|
||||
.inner
|
||||
.lock()
|
||||
.connection_add_channel_tx
|
||||
.as_ref()
|
||||
.ok_or_else(fn_string!("connection channel isn't open yet"))?
|
||||
.clone();
|
||||
|
||||
let receiver_loop_future = Self::process_connection(self.clone(), conn);
|
||||
tx.try_send(receiver_loop_future)
|
||||
.await
|
||||
.map_err(map_to_string)
|
||||
.map_err(logthru_net!(error "failed to start receiver loop"))
|
||||
let inner = self.arc.inner.lock().await;
|
||||
self.on_new_connection_internal(inner, conn).await
|
||||
}
|
||||
|
||||
pub async fn get_or_create_connection(
|
||||
|
@ -173,9 +134,9 @@ impl ConnectionManager {
|
|||
};
|
||||
|
||||
// If connection exists, then return it
|
||||
if let Some(conn) = self
|
||||
.inner
|
||||
.lock()
|
||||
let inner = self.arc.inner.lock().await;
|
||||
|
||||
if let Some(conn) = inner
|
||||
.connection_table
|
||||
.get_connection(&descriptor)
|
||||
.map(|e| e.conn)
|
||||
|
@ -186,7 +147,7 @@ impl ConnectionManager {
|
|||
// If not, attempt new connection
|
||||
let conn = NetworkConnection::connect(local_addr, dial_info).await?;
|
||||
|
||||
self.on_new_connection(conn.clone()).await?;
|
||||
self.on_new_connection_internal(inner, conn).await;
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
|
@ -198,20 +159,6 @@ impl ConnectionManager {
|
|||
) -> SystemPinBoxFuture<()> {
|
||||
let network_manager = this.network_manager();
|
||||
Box::pin(async move {
|
||||
// Add new connections to the table
|
||||
let entry = match this
|
||||
.inner
|
||||
.lock()
|
||||
.connection_table
|
||||
.add_connection(conn.clone())
|
||||
{
|
||||
Ok(e) => e,
|
||||
Err(err) => {
|
||||
error!(target: "net", "{}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
//
|
||||
let exit_value: Result<Vec<u8>, ()> = Err(());
|
||||
let descriptor = conn.connection_descriptor();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue