veilid/veilid-core/src/connection_manager.rs

258 lines
9.1 KiB
Rust
Raw Normal View History

use crate::connection_table::*;
use crate::intf::*;
2022-01-03 23:58:26 -05:00
use crate::network_connection::*;
use crate::network_manager::*;
use crate::xx::*;
use crate::*;
use futures_util::stream::{FuturesUnordered, StreamExt};
2022-03-19 18:19:40 -04:00
use futures_util::{select, FutureExt};
const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize;
2022-01-03 16:29:04 -05:00
///////////////////////////////////////////////////////////
// Connection manager
2022-01-03 23:58:26 -05:00
struct ConnectionManagerInner {
connection_table: ConnectionTable,
connection_processor_jh: Option<JoinHandle<()>>,
2022-03-10 10:18:47 -05:00
connection_add_channel_tx: Option<flume::Sender<SystemPinBoxFuture<()>>>,
}
impl core::fmt::Debug for ConnectionManagerInner {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("ConnectionManagerInner")
.field("connection_table", &self.connection_table)
.finish()
}
}
2022-01-03 23:58:26 -05:00
struct ConnectionManagerArc {
network_manager: NetworkManager,
inner: AsyncMutex<ConnectionManagerInner>,
}
2022-01-03 23:58:26 -05:00
impl core::fmt::Debug for ConnectionManagerArc {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
2022-01-03 23:58:26 -05:00
f.debug_struct("ConnectionManagerArc")
.field("inner", &self.inner)
.finish()
}
}
2022-01-03 23:58:26 -05:00
#[derive(Debug, Clone)]
pub struct ConnectionManager {
arc: Arc<ConnectionManagerArc>,
}
impl ConnectionManager {
2022-01-03 23:58:26 -05:00
fn new_inner() -> ConnectionManagerInner {
ConnectionManagerInner {
connection_table: ConnectionTable::new(),
connection_processor_jh: None,
connection_add_channel_tx: None,
}
}
2022-01-03 23:58:26 -05:00
fn new_arc(network_manager: NetworkManager) -> ConnectionManagerArc {
ConnectionManagerArc {
network_manager,
inner: AsyncMutex::new(Self::new_inner()),
}
}
pub fn new(network_manager: NetworkManager) -> Self {
Self {
2022-01-03 23:58:26 -05:00
arc: Arc::new(Self::new_arc(network_manager)),
}
}
pub fn network_manager(&self) -> NetworkManager {
2022-01-03 23:58:26 -05:00
self.arc.network_manager.clone()
}
pub async fn startup(&self) {
2022-03-10 09:51:53 -05:00
trace!("startup connection manager");
2022-01-03 23:58:26 -05:00
let mut inner = self.arc.inner.lock().await;
2022-03-24 10:14:50 -04:00
let cac = flume::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE);
2022-01-03 23:58:26 -05:00
inner.connection_add_channel_tx = Some(cac.0);
let rx = cac.1.clone();
let this = self.clone();
2022-01-03 23:58:26 -05:00
inner.connection_processor_jh = Some(spawn(this.connection_processor(rx)));
}
pub async fn shutdown(&self) {
2022-01-03 23:58:26 -05:00
*self.arc.inner.lock().await = Self::new_inner();
}
// Returns a network connection if one already is established
2022-04-17 19:10:10 -04:00
2022-01-03 23:58:26 -05:00
pub async fn get_connection(
&self,
2022-01-04 14:25:32 -05:00
descriptor: ConnectionDescriptor,
2022-01-03 23:58:26 -05:00
) -> Option<NetworkConnection> {
let inner = self.arc.inner.lock().await;
inner.connection_table.get_connection(descriptor)
}
2022-01-04 09:53:30 -05:00
// Internal routine to register new connection atomically
2022-01-05 12:01:02 -05:00
fn on_new_connection_internal(
2022-01-03 23:58:26 -05:00
&self,
2022-01-04 14:25:32 -05:00
inner: &mut ConnectionManagerInner,
2022-01-03 23:58:26 -05:00
conn: NetworkConnection,
) -> Result<(), String> {
let tx = inner
.connection_add_channel_tx
.as_ref()
.ok_or_else(fn_string!("connection channel isn't open yet"))?
.clone();
2022-01-03 23:58:26 -05:00
let receiver_loop_future = Self::process_connection(self.clone(), conn.clone());
tx.try_send(receiver_loop_future)
.map_err(map_to_string)
2022-01-03 23:58:26 -05:00
.map_err(logthru_net!(error "failed to start receiver loop"))?;
// If the receiver loop started successfully,
// add the new connection to the table
inner.connection_table.add_connection(conn)
}
// Called by low-level network when any connection-oriented protocol connection appears
// either from incoming or outgoing connections. Registers connection in the connection table for later access
// and spawns a message processing loop for the connection
pub async fn on_new_connection(&self, conn: NetworkConnection) -> Result<(), String> {
2022-01-04 14:25:32 -05:00
let mut inner = self.arc.inner.lock().await;
2022-01-05 12:01:02 -05:00
self.on_new_connection_internal(&mut *inner, conn)
}
2022-01-03 16:29:04 -05:00
pub async fn get_or_create_connection(
&self,
local_addr: Option<SocketAddr>,
dial_info: DialInfo,
) -> Result<NetworkConnection, String> {
let peer_address = dial_info.to_peer_address();
let descriptor = match local_addr {
Some(la) => {
ConnectionDescriptor::new(peer_address, SocketAddress::from_socket_addr(la))
}
None => ConnectionDescriptor::new_no_local(peer_address),
};
2022-01-05 12:01:02 -05:00
// If any connection to this remote exists that has the same protocol, return it
// Any connection will do, we don't have to match the local address
2022-01-04 14:25:32 -05:00
let mut inner = self.arc.inner.lock().await;
2022-01-03 23:58:26 -05:00
2022-01-05 12:01:02 -05:00
if let Some(conn) = inner
.connection_table
.get_last_connection_by_remote(descriptor.remote)
{
2022-01-03 16:29:04 -05:00
return Ok(conn);
}
// If not, attempt new connection
let conn = NetworkConnection::connect(local_addr, dial_info).await?;
2022-01-05 12:01:02 -05:00
self.on_new_connection_internal(&mut *inner, conn.clone())?;
2022-01-03 16:29:04 -05:00
Ok(conn)
}
// Connection receiver loop
fn process_connection(
this: ConnectionManager,
conn: NetworkConnection,
) -> SystemPinBoxFuture<()> {
2022-01-05 12:01:02 -05:00
log_net!("Starting process_connection loop for {:?}", conn);
let network_manager = this.network_manager();
Box::pin(async move {
//
let descriptor = conn.connection_descriptor();
2022-03-19 18:19:40 -04:00
let inactivity_timeout = this
.network_manager()
.config()
.get()
.network
.connection_inactivity_timeout_ms;
loop {
2022-03-19 18:19:40 -04:00
// process inactivity timeout on receives only
// if you want a keepalive, it has to be requested from the other side
let message = select! {
res = conn.recv().fuse() => {
match res {
Ok(v) => v,
Err(e) => {
log_net!(error e);
break;
}
}
}
_ = intf::sleep(inactivity_timeout).fuse()=> {
// timeout
log_net!("connection timeout on {:?}", descriptor);
2022-01-05 12:01:02 -05:00
break;
}
};
2022-01-04 09:53:30 -05:00
if let Err(e) = network_manager
.on_recv_envelope(message.as_slice(), descriptor)
.await
{
2022-01-04 09:53:30 -05:00
log_net!(error e);
break;
}
}
2022-01-04 09:53:30 -05:00
if let Err(e) = this
.arc
.inner
.lock()
2022-01-04 09:53:30 -05:00
.await
.connection_table
2022-01-04 14:25:32 -05:00
.remove_connection(descriptor)
{
2022-01-04 09:53:30 -05:00
log_net!(error e);
}
})
}
// Process connection oriented sockets in the background
// This never terminates and must have its task cancelled once started
// Task cancellation is performed by shutdown() by dropping the join handle
2022-03-10 10:18:47 -05:00
async fn connection_processor(self, rx: flume::Receiver<SystemPinBoxFuture<()>>) {
let mut connection_futures: FuturesUnordered<SystemPinBoxFuture<()>> =
FuturesUnordered::new();
loop {
// Either process an existing connection, or receive a new one to add to our list
2022-03-19 18:19:40 -04:00
select! {
x = connection_futures.next().fuse() => {
// Processed some connection to completion, or there are none left
match x {
Some(()) => {
// Processed some connection to completion
}
None => {
// No connections to process, wait for one
2022-03-10 10:18:47 -05:00
match rx.recv_async().await {
Ok(v) => {
connection_futures.push(v);
}
Err(e) => {
log_net!(error "connection processor error: {:?}", e);
// xxx: do something here?? should the network be restarted if this happens?
}
};
}
}
}
2022-03-19 18:19:40 -04:00
x = rx.recv_async().fuse() => {
// Got a new connection future
match x {
Ok(v) => {
connection_futures.push(v);
}
Err(e) => {
log_net!(error "connection processor error: {:?}", e);
// xxx: do something here?? should the network be restarted if this happens?
}
};
}
}
}
}
}