connection table cleanup

This commit is contained in:
John Smith 2022-09-14 14:36:29 -04:00
parent 72b03939ef
commit 8878817961
13 changed files with 401 additions and 352 deletions

View File

@ -2,6 +2,7 @@ use super::*;
#[derive(Clone, Debug)]
pub struct ConnectionHandle {
id: u64,
descriptor: ConnectionDescriptor,
channel: flume::Sender<Vec<u8>>,
}
@ -13,13 +14,22 @@ pub enum ConnectionHandleSendResult {
}
impl ConnectionHandle {
pub(super) fn new(descriptor: ConnectionDescriptor, channel: flume::Sender<Vec<u8>>) -> Self {
pub(super) fn new(
id: u64,
descriptor: ConnectionDescriptor,
channel: flume::Sender<Vec<u8>>,
) -> Self {
Self {
id,
descriptor,
channel,
}
}
pub fn connection_id(&self) -> u64 {
self.id
}
pub fn connection_descriptor(&self) -> ConnectionDescriptor {
self.descriptor.clone()
}

View File

@ -11,12 +11,11 @@ use stop_token::future::FutureExt;
enum ConnectionManagerEvent {
Accepted(ProtocolNetworkConnection),
Dead(NetworkConnection),
Finished(ConnectionDescriptor),
}
#[derive(Debug)]
struct ConnectionManagerInner {
connection_table: ConnectionTable,
next_id: NetworkConnectionId,
sender: flume::Sender<ConnectionManagerEvent>,
async_processor_jh: Option<MustJoinHandle<()>>,
stop_source: Option<StopSource>,
@ -24,6 +23,9 @@ struct ConnectionManagerInner {
struct ConnectionManagerArc {
network_manager: NetworkManager,
connection_initial_timeout_ms: u32,
connection_inactivity_timeout_ms: u32,
connection_table: ConnectionTable,
inner: Mutex<Option<ConnectionManagerInner>>,
}
impl core::fmt::Debug for ConnectionManagerArc {
@ -41,21 +43,32 @@ pub struct ConnectionManager {
impl ConnectionManager {
fn new_inner(
config: VeilidConfig,
stop_source: StopSource,
sender: flume::Sender<ConnectionManagerEvent>,
async_processor_jh: MustJoinHandle<()>,
) -> ConnectionManagerInner {
ConnectionManagerInner {
next_id: 0,
stop_source: Some(stop_source),
sender: sender,
async_processor_jh: Some(async_processor_jh),
connection_table: ConnectionTable::new(config),
}
}
fn new_arc(network_manager: NetworkManager) -> ConnectionManagerArc {
let config = network_manager.config();
let (connection_initial_timeout_ms, connection_inactivity_timeout_ms) = {
let c = config.get();
(
c.network.connection_initial_timeout_ms,
c.network.connection_inactivity_timeout_ms,
)
};
ConnectionManagerArc {
network_manager,
connection_initial_timeout_ms,
connection_inactivity_timeout_ms,
connection_table: ConnectionTable::new(config),
inner: Mutex::new(None),
}
}
@ -69,6 +82,14 @@ impl ConnectionManager {
self.arc.network_manager.clone()
}
pub fn connection_initial_timeout_ms(&self) -> u32 {
self.arc.connection_initial_timeout_ms
}
pub fn connection_inactivity_timeout_ms(&self) -> u32 {
self.arc.connection_inactivity_timeout_ms
}
pub async fn startup(&self) {
trace!("startup connection manager");
let mut inner = self.arc.inner.lock();
@ -86,12 +107,7 @@ impl ConnectionManager {
let async_processor = spawn(self.clone().async_processor(stop_source.token(), receiver));
// Store in the inner object
*inner = Some(Self::new_inner(
self.network_manager().config(),
stop_source,
sender,
async_processor,
));
*inner = Some(Self::new_inner(stop_source, sender, async_processor));
}
pub async fn shutdown(&self) {
@ -117,22 +133,10 @@ impl ConnectionManager {
async_processor_jh.await;
// Wait for the connections to complete
debug!("waiting for connection handlers to complete");
inner.connection_table.join().await;
self.arc.connection_table.join().await;
debug!("finished connection manager shutdown");
}
// Returns a network connection if one already is established
pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> {
let mut inner = self.arc.inner.lock();
let inner = match &mut *inner {
Some(v) => v,
None => {
panic!("not started");
}
};
inner.connection_table.get_connection(descriptor)
}
// Internal routine to register new connection atomically.
// Registers connection in the connection table for later access
// and spawns a message processing loop for the connection
@ -141,7 +145,14 @@ impl ConnectionManager {
inner: &mut ConnectionManagerInner,
prot_conn: ProtocolNetworkConnection,
) -> EyreResult<NetworkResult<ConnectionHandle>> {
log_net!("on_new_protocol_network_connection: {:?}", prot_conn);
// Get next connection id to use
let id = inner.next_id;
inner.next_id += 1;
log_net!(
"on_new_protocol_network_connection: id={} prot_conn={:?}",
id,
prot_conn
);
// Wrap with NetworkConnection object to start the connection processing loop
let stop_token = match &inner.stop_source {
@ -149,143 +160,137 @@ impl ConnectionManager {
None => bail!("not creating connection because we are stopping"),
};
let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn);
let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id);
let handle = conn.get_handle();
// Add to the connection table
match inner.connection_table.add_connection(conn) {
match self.arc.connection_table.add_connection(conn) {
Ok(None) => {
// Connection added
}
Ok(Some(conn)) => {
// Connection added and a different one LRU'd out
// Send it to be terminated
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
}
Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
// Connection filtered
let desc = conn.connection_descriptor();
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Err(eyre!("connection filtered: {:?} ({})", desc, e));
return Ok(NetworkResult::no_connection_other(format!(
"connection filtered: {:?} ({})",
desc, e
)));
}
Err(ConnectionTableAddError::AlreadyExists(conn)) => {
// Connection already exists
let desc = conn.connection_descriptor();
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Err(eyre!("connection already exists: {:?}", desc));
// xxx remove this
panic!(
"connection already exists: {:?} connection_table: {:#?}",
desc, self.arc.connection_table
);
return Ok(NetworkResult::no_connection_other(format!(
"connection already exists: {:?}",
desc
)));
}
};
Ok(NetworkResult::Value(handle))
}
// Returns a network connection if one already is established
pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> {
self.arc
.connection_table
.get_connection_by_descriptor(descriptor)
}
// Terminate any connections that would collide with a new connection
// using different protocols to the same remote address and port. Used to ensure
// that we can switch quickly between TCP and WS if necessary to the same node
// Returns true if we killed off colliding connections
async fn kill_off_colliding_connections(&self, dial_info: &DialInfo) -> bool {
let protocol_type = dial_info.protocol_type();
let socket_address = dial_info.socket_address();
let killed = self.arc.connection_table.drain_filter(|prior_descriptor| {
// If the protocol types aren't the same, then this is a candidate to be killed off
// If they are the same, then we would just return the exact same connection from get_or_create_connection()
if prior_descriptor.protocol_type() == protocol_type {
return false;
}
// If the prior remote is not the same address, then we're not going to collide
if *prior_descriptor.remote().socket_address() != socket_address {
return false;
}
log_net!(debug
">< Terminating connection prior_descriptor={:?}",
prior_descriptor
);
true
});
// Wait for the killed connections to end their recv loops
let did_kill = !killed.is_empty();
for k in killed {
k.await;
}
did_kill
}
// Called when we want to create a new connection or get the current one that already exists
// This will kill off any connections that are in conflict with the new connection to be made
// in order to make room for the new connection in the system's connection table
// This routine needs to be atomic, or connections may exist in the table that are not established
pub async fn get_or_create_connection(
&self,
local_addr: Option<SocketAddr>,
dial_info: DialInfo,
) -> EyreResult<NetworkResult<ConnectionHandle>> {
let killed = {
let mut inner = self.arc.inner.lock();
let inner = match &mut *inner {
Some(v) => v,
None => {
panic!("not started");
}
};
log_net!(
"== get_or_create_connection local_addr={:?} dial_info={:?}",
local_addr.green(),
dial_info.green()
);
// Kill off any possibly conflicting connections
let did_kill = self.kill_off_colliding_connections(&dial_info).await;
let mut retry_count = if did_kill { 2 } else { 0 };
// Make a connection descriptor for this dialinfo
let peer_address = dial_info.to_peer_address();
let descriptor = match local_addr {
Some(la) => {
ConnectionDescriptor::new(peer_address, SocketAddress::from_socket_addr(la))
}
None => ConnectionDescriptor::new_no_local(peer_address),
};
// If any connection to this remote exists that has the same protocol, return it
// Any connection will do, we don't have to match the local address
if let Some(conn) = self
.arc
.connection_table
.get_last_connection_by_remote(descriptor.remote())
{
log_net!(
"== get_or_create_connection local_addr={:?} dial_info={:?}",
"== Returning existing connection local_addr={:?} peer_address={:?}",
local_addr.green(),
dial_info.green()
peer_address.green()
);
let peer_address = dial_info.to_peer_address();
// Make a connection to the address
// reject connections to addresses with an unknown or unsupported peer scope
let descriptor = match local_addr {
Some(la) => {
ConnectionDescriptor::new(peer_address, SocketAddress::from_socket_addr(la))
}
None => ConnectionDescriptor::new_no_local(peer_address),
}?;
// If any connection to this remote exists that has the same protocol, return it
// Any connection will do, we don't have to match the local address
if let Some(conn) = inner
.connection_table
.get_last_connection_by_remote(descriptor.remote())
{
log_net!(
"== Returning existing connection local_addr={:?} peer_address={:?}",
local_addr.green(),
peer_address.green()
);
return Ok(NetworkResult::Value(conn));
}
// Drop any other protocols connections to this remote that have the same local addr
// otherwise this connection won't succeed due to binding
let mut killed = Vec::<NetworkConnection>::new();
if let Some(local_addr) = local_addr {
if local_addr.port() != 0 {
for pt in [ProtocolType::TCP, ProtocolType::WS, ProtocolType::WSS] {
let pa = PeerAddress::new(descriptor.remote_address().clone(), pt);
for prior_descriptor in inner
.connection_table
.get_connection_descriptors_by_remote(pa)
{
let mut kill = false;
// See if the local address would collide
if let Some(prior_local) = prior_descriptor.local() {
if (local_addr.ip().is_unspecified()
|| prior_local.to_ip_addr().is_unspecified()
|| (local_addr.ip() == prior_local.to_ip_addr()))
&& prior_local.port() == local_addr.port()
{
kill = true;
}
}
if kill {
log_net!(debug
">< Terminating connection prior_descriptor={:?}",
prior_descriptor
);
let mut conn = inner
.connection_table
.remove_connection(prior_descriptor)
.expect("connection not in table");
conn.close();
killed.push(conn);
}
}
}
}
}
killed
};
// Wait for the killed connections to end their recv loops
let mut retry_count = if !killed.is_empty() { 2 } else { 0 };
for k in killed {
k.await;
return Ok(NetworkResult::Value(conn));
}
// Get connection timeout
let timeout_ms = {
let config = self.network_manager().config();
let c = config.get();
c.network.connection_initial_timeout_ms
};
// Attempt new connection
let conn = network_result_try!(loop {
let result_net_res =
ProtocolNetworkConnection::connect(local_addr, &dial_info, timeout_ms).await;
let prot_conn = network_result_try!(loop {
let result_net_res = ProtocolNetworkConnection::connect(
local_addr,
&dial_info,
self.arc.connection_initial_timeout_ms,
)
.await;
match result_net_res {
Ok(net_res) => {
if net_res.is_value() || retry_count == 0 {
@ -311,7 +316,8 @@ impl ConnectionManager {
bail!("shutting down");
}
};
self.on_new_protocol_network_connection(inner, conn)
self.on_new_protocol_network_connection(inner, prot_conn)
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
@ -344,28 +350,6 @@ impl ConnectionManager {
conn.close();
conn.await;
}
ConnectionManagerEvent::Finished(desc) => {
let conn = {
let mut inner_lock = self.arc.inner.lock();
match &mut *inner_lock {
Some(inner) => {
// Remove the connection and wait for the connection loop to terminate
if let Ok(conn) = inner.connection_table.remove_connection(desc) {
// Must close and wait to ensure things join
Some(conn)
} else {
None
}
}
None => None,
}
};
if let Some(mut conn) = conn {
conn.close();
conn.await;
}
}
}
}
}
@ -375,7 +359,7 @@ impl ConnectionManager {
#[cfg_attr(target_os = "wasm32", allow(dead_code))]
pub(super) async fn on_accepted_protocol_network_connection(
&self,
conn: ProtocolNetworkConnection,
protocol_connection: ProtocolNetworkConnection,
) -> EyreResult<()> {
// Get channel sender
let sender = {
@ -392,14 +376,14 @@ impl ConnectionManager {
// Inform the processor of the event
let _ = sender
.send_async(ConnectionManagerEvent::Accepted(conn))
.send_async(ConnectionManagerEvent::Accepted(protocol_connection))
.await;
Ok(())
}
// Callback from network connection receive loop when it exits
// cleans up the entry in the connection table
pub(super) async fn report_connection_finished(&self, descriptor: ConnectionDescriptor) {
pub(super) async fn report_connection_finished(&self, connection_id: u64) {
// Get channel sender
let sender = {
let mut inner = self.arc.inner.lock();
@ -413,9 +397,15 @@ impl ConnectionManager {
inner.sender.clone()
};
// Remove the connection
let conn = self
.arc
.connection_table
.remove_connection_by_id(connection_id);
// Inform the processor of the event
let _ = sender
.send_async(ConnectionManagerEvent::Finished(descriptor))
.await;
if let Some(conn) = conn {
let _ = sender.send_async(ConnectionManagerEvent::Dead(conn)).await;
}
}
}

View File

@ -1,5 +1,4 @@
use super::*;
use alloc::collections::btree_map::Entry;
use futures_util::StreamExt;
use hashlink::LruCache;
@ -21,36 +20,21 @@ impl ConnectionTableAddError {
}
}
///////////////////////////////////////////////////////////////////////////////
#[derive(ThisError, Debug)]
pub enum ConnectionTableRemoveError {
#[error("Connection not in table")]
NotInTable,
}
impl ConnectionTableRemoveError {
pub fn not_in_table() -> Self {
ConnectionTableRemoveError::NotInTable
}
}
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct ConnectionTable {
pub struct ConnectionTableInner {
max_connections: Vec<usize>,
conn_by_descriptor: Vec<LruCache<ConnectionDescriptor, NetworkConnection>>,
descriptors_by_remote: BTreeMap<PeerAddress, Vec<ConnectionDescriptor>>,
conn_by_id: Vec<LruCache<NetworkConnectionId, NetworkConnection>>,
protocol_index_by_id: BTreeMap<NetworkConnectionId, usize>,
id_by_descriptor: BTreeMap<ConnectionDescriptor, NetworkConnectionId>,
ids_by_remote: BTreeMap<PeerAddress, Vec<NetworkConnectionId>>,
address_filter: ConnectionLimits,
}
fn protocol_to_index(protocol: ProtocolType) -> usize {
match protocol {
ProtocolType::TCP => 0,
ProtocolType::WS => 1,
ProtocolType::WSS => 2,
ProtocolType::UDP => panic!("not a connection-oriented protocol"),
}
#[derive(Debug)]
pub struct ConnectionTable {
inner: Arc<Mutex<ConnectionTableInner>>,
}
impl ConnectionTable {
@ -64,154 +48,217 @@ impl ConnectionTable {
]
};
Self {
max_connections,
conn_by_descriptor: vec![
LruCache::new_unbounded(),
LruCache::new_unbounded(),
LruCache::new_unbounded(),
],
descriptors_by_remote: BTreeMap::new(),
address_filter: ConnectionLimits::new(config),
inner: Arc::new(Mutex::new(ConnectionTableInner {
max_connections,
conn_by_id: vec![
LruCache::new_unbounded(),
LruCache::new_unbounded(),
LruCache::new_unbounded(),
],
protocol_index_by_id: BTreeMap::new(),
id_by_descriptor: BTreeMap::new(),
ids_by_remote: BTreeMap::new(),
address_filter: ConnectionLimits::new(config),
})),
}
}
pub async fn join(&mut self) {
let mut unord = FuturesUnordered::new();
for table in &mut self.conn_by_descriptor {
for (_, v) in table.drain() {
trace!("connection table join: {:?}", v);
unord.push(v);
}
fn protocol_to_index(protocol: ProtocolType) -> usize {
match protocol {
ProtocolType::TCP => 0,
ProtocolType::WS => 1,
ProtocolType::WSS => 2,
ProtocolType::UDP => panic!("not a connection-oriented protocol"),
}
}
pub async fn join(&self) {
let mut unord = {
let mut inner = self.inner.lock();
let unord = FuturesUnordered::new();
for table in &mut inner.conn_by_id {
for (_, v) in table.drain() {
trace!("connection table join: {:?}", v);
unord.push(v);
}
}
inner.id_by_descriptor.clear();
inner.ids_by_remote.clear();
unord
};
while unord.next().await.is_some() {}
}
pub fn add_connection(
&mut self,
conn: NetworkConnection,
&self,
network_connection: NetworkConnection,
) -> Result<Option<NetworkConnection>, ConnectionTableAddError> {
let descriptor = conn.connection_descriptor();
let ip_addr = descriptor.remote_address().to_ip_addr();
// Get indices for network connection table
let id = network_connection.connection_id();
let descriptor = network_connection.connection_descriptor();
let protocol_index = Self::protocol_to_index(descriptor.protocol_type());
let remote = descriptor.remote();
let index = protocol_to_index(descriptor.protocol_type());
if self.conn_by_descriptor[index].contains_key(&descriptor) {
return Err(ConnectionTableAddError::already_exists(conn));
let mut inner = self.inner.lock();
// Two connections to the same descriptor should be rejected (soft rejection)
if inner.id_by_descriptor.contains_key(&descriptor) {
return Err(ConnectionTableAddError::already_exists(network_connection));
}
// Sanity checking this implementation (hard fails that would invalidate the representation)
if inner.conn_by_id[protocol_index].contains_key(&id) {
panic!("duplicate connection id: {:#?}", network_connection);
}
if inner.protocol_index_by_id.get(&id).is_some() {
panic!("duplicate id to protocol index: {:#?}", network_connection);
}
if let Some(ids) = inner.ids_by_remote.get(&descriptor.remote()) {
if ids.contains(&id) {
panic!("duplicate id by remote: {:#?}", network_connection);
}
}
// Filter by ip for connection limits
match self.address_filter.add(ip_addr) {
let ip_addr = descriptor.remote_address().to_ip_addr();
match inner.address_filter.add(ip_addr) {
Ok(()) => {}
Err(e) => {
// send connection to get cleaned up cleanly
return Err(ConnectionTableAddError::address_filter(conn, e));
// Return the connection in the error to be disposed of
return Err(ConnectionTableAddError::address_filter(
network_connection,
e,
));
}
};
// Add the connection to the table
let res = self.conn_by_descriptor[index].insert(descriptor.clone(), conn);
let res = inner.conn_by_id[protocol_index].insert(id, network_connection);
assert!(res.is_none());
// if we have reached the maximum number of connections per protocol type
// then drop the least recently used connection
let mut out_conn = None;
if self.conn_by_descriptor[index].len() > self.max_connections[index] {
if let Some((lruk, lru_conn)) = self.conn_by_descriptor[index].remove_lru() {
debug!("connection lru out: {:?}", lruk);
if inner.conn_by_id[protocol_index].len() > inner.max_connections[protocol_index] {
if let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].remove_lru() {
debug!("connection lru out: {:?}", lru_conn);
out_conn = Some(lru_conn);
self.remove_connection_records(lruk);
Self::remove_connection_records(&mut *inner, lruk);
}
}
// add connection records
let descriptors = self
.descriptors_by_remote
.entry(descriptor.remote())
.or_default();
descriptors.push(descriptor);
inner.protocol_index_by_id.insert(id, protocol_index);
inner.id_by_descriptor.insert(descriptor, id);
inner.ids_by_remote.entry(remote).or_default().push(id);
Ok(out_conn)
}
pub fn get_connection(&mut self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> {
let index = protocol_to_index(descriptor.protocol_type());
let out = self.conn_by_descriptor[index].get(&descriptor);
out.map(|c| c.get_handle())
pub fn get_connection_by_id(&self, id: NetworkConnectionId) -> Option<ConnectionHandle> {
let mut inner = self.inner.lock();
let protocol_index = *inner.protocol_index_by_id.get(&id)?;
let out = inner.conn_by_id[protocol_index].get(&id).unwrap();
Some(out.get_handle())
}
pub fn get_last_connection_by_remote(
&mut self,
remote: PeerAddress,
pub fn get_connection_by_descriptor(
&self,
descriptor: ConnectionDescriptor,
) -> Option<ConnectionHandle> {
let descriptor = self
.descriptors_by_remote
.get(&remote)
.map(|v| v[(v.len() - 1)].clone());
if let Some(descriptor) = descriptor {
// lru bump
let index = protocol_to_index(descriptor.protocol_type());
let handle = self.conn_by_descriptor[index]
.get(&descriptor)
.map(|c| c.get_handle());
handle
} else {
None
}
let mut inner = self.inner.lock();
let id = *inner.id_by_descriptor.get(&descriptor)?;
let protocol_index = Self::protocol_to_index(descriptor.protocol_type());
let out = inner.conn_by_id[protocol_index].get(&id).unwrap();
Some(out.get_handle())
}
pub fn get_connection_descriptors_by_remote(
&mut self,
remote: PeerAddress,
) -> Vec<ConnectionDescriptor> {
self.descriptors_by_remote
pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option<ConnectionHandle> {
let mut inner = self.inner.lock();
let id = inner.ids_by_remote.get(&remote).map(|v| v[(v.len() - 1)])?;
let protocol_index = Self::protocol_to_index(remote.protocol_type());
let out = inner.conn_by_id[protocol_index].get(&id).unwrap();
Some(out.get_handle())
}
pub fn get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec<NetworkConnectionId> {
let inner = self.inner.lock();
inner
.ids_by_remote
.get(&remote)
.cloned()
.unwrap_or_default()
}
pub fn connection_count(&self) -> usize {
self.conn_by_descriptor.iter().fold(0, |b, c| b + c.len())
}
fn remove_connection_records(&mut self, descriptor: ConnectionDescriptor) {
let ip_addr = descriptor.remote_address().to_ip_addr();
// conns_by_remote
match self.descriptors_by_remote.entry(descriptor.remote()) {
Entry::Vacant(_) => {
panic!("inconsistency in connection table")
}
Entry::Occupied(mut o) => {
let v = o.get_mut();
// Remove one matching connection from the list
for (n, elem) in v.iter().enumerate() {
if *elem == descriptor {
v.remove(n);
break;
}
}
// No connections left for this remote, remove the entry from conns_by_remote
if v.is_empty() {
o.remove_entry();
pub fn drain_filter<F>(&self, mut filter: F) -> Vec<NetworkConnection>
where
F: FnMut(ConnectionDescriptor) -> bool,
{
let mut inner = self.inner.lock();
let mut filtered_ids = Vec::new();
for cbi in &mut inner.conn_by_id {
for (id, conn) in cbi {
if filter(conn.connection_descriptor()) {
filtered_ids.push(*id);
}
}
}
self.address_filter
let mut filtered_connections = Vec::new();
for id in filtered_ids {
let conn = Self::remove_connection_records(&mut *inner, id);
filtered_connections.push(conn)
}
filtered_connections
}
pub fn connection_count(&self) -> usize {
let inner = self.inner.lock();
inner.conn_by_id.iter().fold(0, |acc, c| acc + c.len())
}
fn remove_connection_records(
inner: &mut ConnectionTableInner,
id: NetworkConnectionId,
) -> NetworkConnection {
// protocol_index_by_id
let protocol_index = inner.protocol_index_by_id.remove(&id).unwrap();
// conn_by_id
let conn = inner.conn_by_id[protocol_index].remove(&id).unwrap();
// id_by_descriptor
let descriptor = conn.connection_descriptor();
inner.id_by_descriptor.remove(&descriptor).unwrap();
// ids_by_remote
let remote = descriptor.remote();
let ids = inner.ids_by_remote.get_mut(&remote).unwrap();
for (n, elem) in ids.iter().enumerate() {
if *elem == id {
ids.remove(n);
if ids.is_empty() {
inner.ids_by_remote.remove(&remote).unwrap();
}
break;
}
}
// address_filter
let ip_addr = remote.to_socket_addr().ip();
inner
.address_filter
.remove(ip_addr)
.expect("Inconsistency in connection table");
conn
}
pub fn remove_connection(
&mut self,
descriptor: ConnectionDescriptor,
) -> Result<NetworkConnection, ConnectionTableRemoveError> {
let index = protocol_to_index(descriptor.protocol_type());
let conn = self.conn_by_descriptor[index]
.remove(&descriptor)
.ok_or_else(|| ConnectionTableRemoveError::not_in_table())?;
pub fn remove_connection_by_id(&self, id: NetworkConnectionId) -> Option<NetworkConnection> {
let mut inner = self.inner.lock();
self.remove_connection_records(descriptor);
Ok(conn)
let protocol_index = *inner.protocol_index_by_id.get(&id)?;
if !inner.conn_by_id[protocol_index].contains_key(&id) {
return None;
}
let conn = Self::remove_connection_records(&mut *inner, id);
Some(conn)
}
}

View File

@ -1314,14 +1314,27 @@ impl NetworkManager {
) -> SendPinBoxFuture<EyreResult<NetworkResult<SendDataKind>>> {
let this = self.clone();
Box::pin(async move {
info!("{}", format!("send_data to: {:?}", node_ref).red());
// First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = node_ref.last_connection() {
info!(
"{}",
format!("last_connection to: {:?}", connection_descriptor).red()
);
match this
.net()
.send_data_to_existing_connection(connection_descriptor, data)
.await?
{
None => {
info!(
"{}",
format!("sent to existing connection: {:?}", connection_descriptor)
.red()
);
// Update timestamp for this last connection since we just sent to it
node_ref.set_last_connection(connection_descriptor, intf::get_timestamp());
@ -1335,6 +1348,8 @@ impl NetworkManager {
data
};
info!("{}", "no existing connection".red());
// If we don't have last_connection, try to reach out to the peer via its dial info
let contact_method = this.get_contact_method(node_ref.clone());
log_net!(

View File

@ -149,8 +149,7 @@ impl RawTcpProtocolHandler {
);
let local_address = self.inner.lock().local_address;
let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(
ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address))
.map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?,
ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address)),
ps,
));
@ -190,8 +189,7 @@ impl RawTcpProtocolHandler {
ProtocolType::TCP,
),
SocketAddress::from_socket_addr(actual_local_address),
)
.map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?,
),
ps,
));

View File

@ -25,16 +25,10 @@ impl RawUdpProtocolHandler {
ProtocolType::UDP,
);
let local_socket_addr = self.socket.local_addr()?;
let descriptor = match ConnectionDescriptor::new(
let descriptor = ConnectionDescriptor::new(
peer_addr,
SocketAddress::from_socket_addr(local_socket_addr),
) {
Ok(d) => d,
Err(_) => {
log_net!(debug "{}({}) at {}@{}:{}: {:?}", "Invalid peer scope".green(), "received message from invalid peer scope", file!(), line!(), column!(), peer_addr);
continue;
}
};
);
break (size, descriptor);
};
@ -62,8 +56,7 @@ impl RawUdpProtocolHandler {
let descriptor = ConnectionDescriptor::new(
peer_addr,
SocketAddress::from_socket_addr(local_socket_addr),
)
.map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?;
);
let len = network_result_try!(self
.socket

View File

@ -212,8 +212,7 @@ impl WebsocketProtocolHandler {
ConnectionDescriptor::new(
peer_addr,
SocketAddress::from_socket_addr(self.arc.local_address),
)
.map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?,
),
ws_stream,
));
@ -268,8 +267,7 @@ impl WebsocketProtocolHandler {
let descriptor = ConnectionDescriptor::new(
dial_info.to_peer_address(),
SocketAddress::from_socket_addr(actual_local_addr),
)
.map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?;
);
// Negotiate TLS if this is WSS
if tls {

View File

@ -1,6 +1,6 @@
use super::*;
use futures_util::{FutureExt, StreamExt};
use std::io;
use std::{io, sync::Arc};
use stop_token::prelude::*;
cfg_if::cfg_if! {
@ -81,8 +81,12 @@ pub struct NetworkConnectionStats {
last_message_recv_time: Option<u64>,
}
pub type NetworkConnectionId = u64;
#[derive(Debug)]
pub struct NetworkConnection {
connection_id: NetworkConnectionId,
descriptor: ConnectionDescriptor,
processor: Option<MustJoinHandle<()>>,
established_time: u64,
@ -92,11 +96,12 @@ pub struct NetworkConnection {
}
impl NetworkConnection {
pub(super) fn dummy(descriptor: ConnectionDescriptor) -> Self {
pub(super) fn dummy(id: NetworkConnectionId, descriptor: ConnectionDescriptor) -> Self {
// Create handle for sending (dummy is immediately disconnected)
let (sender, _receiver) = flume::bounded(intf::get_concurrency() as usize);
Self {
connection_id: id,
descriptor,
processor: None,
established_time: intf::get_timestamp(),
@ -113,14 +118,10 @@ impl NetworkConnection {
connection_manager: ConnectionManager,
manager_stop_token: StopToken,
protocol_connection: ProtocolNetworkConnection,
connection_id: NetworkConnectionId,
) -> Self {
// Get timeout
let network_manager = connection_manager.network_manager();
let inactivity_timeout = network_manager
.config()
.get()
.network
.connection_inactivity_timeout_ms;
// Get descriptor
let descriptor = protocol_connection.descriptor();
@ -142,15 +143,16 @@ impl NetworkConnection {
connection_manager,
local_stop_token,
manager_stop_token,
connection_id,
descriptor.clone(),
receiver,
protocol_connection,
inactivity_timeout,
stats.clone(),
));
// Return the connection
Self {
connection_id,
descriptor,
processor: Some(processor),
established_time: intf::get_timestamp(),
@ -160,12 +162,16 @@ impl NetworkConnection {
}
}
pub fn connection_id(&self) -> NetworkConnectionId {
self.connection_id
}
pub fn connection_descriptor(&self) -> ConnectionDescriptor {
self.descriptor.clone()
}
pub fn get_handle(&self) -> ConnectionHandle {
ConnectionHandle::new(self.descriptor.clone(), self.sender.clone())
ConnectionHandle::new(self.connection_id, self.descriptor.clone(), self.sender.clone())
}
pub fn close(&mut self) {
@ -215,15 +221,15 @@ impl NetworkConnection {
connection_manager: ConnectionManager,
local_stop_token: StopToken,
manager_stop_token: StopToken,
connection_id: NetworkConnectionId,
descriptor: ConnectionDescriptor,
receiver: flume::Receiver<Vec<u8>>,
protocol_connection: ProtocolNetworkConnection,
connection_inactivity_timeout_ms: u32,
stats: Arc<Mutex<NetworkConnectionStats>>,
) -> SendPinBoxFuture<()> {
Box::pin(async move {
log_net!(
"== Starting process_connection loop for {:?}",
"== Starting process_connection loop for id={}, {:?}", connection_id,
descriptor.green()
);
@ -235,7 +241,7 @@ impl NetworkConnection {
// Push mutable timer so we can reset it
// Normally we would use an io::timeout here, but WASM won't support that, so we use a mutable sleep future
let new_timer = || {
intf::sleep(connection_inactivity_timeout_ms).then(|_| async {
intf::sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async {
// timeout
log_net!("== Connection timeout on {:?}", descriptor.green());
RecvLoopAction::Timeout
@ -317,7 +323,7 @@ impl NetworkConnection {
.timeout_at(local_stop_token.clone())
.timeout_at(manager_stop_token.clone())
.await
.and_then(std::convert::identity) // flatten
.and_then(std::convert::identity) // flatten stoptoken timeouts
{
Ok(Some(RecvLoopAction::Send)) => {
// Don't reset inactivity timer if we're only sending
@ -350,7 +356,7 @@ impl NetworkConnection {
// Let the connection manager know the receive loop exited
connection_manager
.report_connection_finished(descriptor)
.report_connection_finished(connection_id)
.await;
})
}

View File

@ -7,13 +7,12 @@ use crate::*;
pub async fn test_add_get_remove() {
let config = get_config();
let mut table = ConnectionTable::new(config);
let table = ConnectionTable::new(config);
let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new(
SocketAddress::new(Address::IPV4(Ipv4Addr::new(192, 168, 0, 1)), 8080),
ProtocolType::TCP,
))
.unwrap();
));
let a2 = a1;
let a3 = ConnectionDescriptor::new(
PeerAddress::new(
@ -26,8 +25,7 @@ pub async fn test_add_get_remove() {
0,
0,
))),
)
.unwrap();
);
let a4 = ConnectionDescriptor::new(
PeerAddress::new(
SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090),
@ -39,8 +37,7 @@ pub async fn test_add_get_remove() {
0,
0,
))),
)
.unwrap();
);
let a5 = ConnectionDescriptor::new(
PeerAddress::new(
SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090),
@ -52,79 +49,72 @@ pub async fn test_add_get_remove() {
0,
0,
))),
)
.unwrap();
);
let c1 = NetworkConnection::dummy(a1);
let c1 = NetworkConnection::dummy(1, a1);
let c1h = c1.get_handle();
let c2 = NetworkConnection::dummy(a2);
//let c2h = c2.get_handle();
let c3 = NetworkConnection::dummy(a3);
//let c3h = c3.get_handle();
let c4 = NetworkConnection::dummy(a4);
//let c4h = c4.get_handle();
let c5 = NetworkConnection::dummy(a5);
//let c5h = c5.get_handle();
let c2 = NetworkConnection::dummy(2, a2);
let c3 = NetworkConnection::dummy(3, a3);
let c4 = NetworkConnection::dummy(4, a4);
let c5 = NetworkConnection::dummy(5, a5);
assert_eq!(a1, c2.connection_descriptor());
assert_ne!(a3, c4.connection_descriptor());
assert_ne!(a4, c5.connection_descriptor());
assert_eq!(table.connection_count(), 0);
assert_eq!(table.get_connection(a1), None);
assert_eq!(table.get_connection_by_descriptor(a1), None);
table.add_connection(c1).unwrap();
assert_eq!(table.connection_count(), 1);
assert_err!(table.remove_connection(a3));
assert_err!(table.remove_connection(a4));
assert!(table.remove_connection_by_id(4).is_none());
assert!(table.remove_connection_by_id(5).is_none());
assert_eq!(table.connection_count(), 1);
assert_eq!(table.get_connection(a1), Some(c1h.clone()));
assert_eq!(table.get_connection(a1), Some(c1h.clone()));
assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.connection_count(), 1);
assert_err!(table.add_connection(c2));
assert_eq!(table.connection_count(), 1);
assert_eq!(table.get_connection(a1), Some(c1h.clone()));
assert_eq!(table.get_connection(a1), Some(c1h.clone()));
assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone()));
assert_eq!(table.connection_count(), 1);
assert_eq!(
table
.remove_connection(a2)
.remove_connection_by_id(1)
.map(|c| c.connection_descriptor())
.unwrap(),
a1
);
assert_eq!(table.connection_count(), 0);
assert_err!(table.remove_connection(a2));
assert!(table.remove_connection_by_id(2).is_none());
assert_eq!(table.connection_count(), 0);
assert_eq!(table.get_connection(a2), None);
assert_eq!(table.get_connection(a1), None);
assert_eq!(table.get_connection_by_descriptor(a2), None);
assert_eq!(table.get_connection_by_descriptor(a1), None);
assert_eq!(table.connection_count(), 0);
let c1 = NetworkConnection::dummy(a1);
//let c1h = c1.get_handle();
let c1 = NetworkConnection::dummy(6, a1);
table.add_connection(c1).unwrap();
let c2 = NetworkConnection::dummy(a2);
//let c2h = c2.get_handle();
let c2 = NetworkConnection::dummy(7, a2);
assert_err!(table.add_connection(c2));
table.add_connection(c3).unwrap();
table.add_connection(c4).unwrap();
assert_eq!(table.connection_count(), 3);
assert_eq!(
table
.remove_connection(a2)
.remove_connection_by_id(6)
.map(|c| c.connection_descriptor())
.unwrap(),
a2
);
assert_eq!(
table
.remove_connection(a3)
.remove_connection_by_id(3)
.map(|c| c.connection_descriptor())
.unwrap(),
a3
);
assert_eq!(
table
.remove_connection(a4)
.remove_connection_by_id(4)
.map(|c| c.connection_descriptor())
.unwrap(),
a4

View File

@ -134,8 +134,7 @@ impl WebsocketProtocolHandler {
// Make our connection descriptor
let wnc = WebsocketNetworkConnection::new(
ConnectionDescriptor::new_no_local(dial_info.to_peer_address())
.map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?,
ConnectionDescriptor::new_no_local(dial_info.to_peer_address()),
wsmeta,
wsio,
);

View File

@ -741,7 +741,7 @@ impl RoutingTable {
signed_node_info: SignedNodeInfo,
allow_invalid: bool,
) -> Option<NodeRef> {
log_rtab!("register_node_with_signed_node_info: routing_domain: {:?}, node_id: {:?}, signed_node_info: {:?}, allow_invalid: {:?}", routing_domain, node_id, signed_node_info, allow_invalid );
//log_rtab!("register_node_with_signed_node_info: routing_domain: {:?}, node_id: {:?}, signed_node_info: {:?}, allow_invalid: {:?}", routing_domain, node_id, signed_node_info, allow_invalid );
// validate signed node info is not something malicious
if node_id == self.node_id() {
@ -858,6 +858,9 @@ impl RoutingTable {
let mut dead = true;
if let Some(nr) = self.lookup_node_ref(*e) {
if let Some(last_connection) = nr.last_connection() {
out.push((*e, RecentPeersEntry { last_connection }));
dead = false;
}

View File

@ -1507,17 +1507,17 @@ pub struct ConnectionDescriptor {
}
impl ConnectionDescriptor {
pub fn new(remote: PeerAddress, local: SocketAddress) -> Result<Self, VeilidAPIError> {
Ok(Self {
pub fn new(remote: PeerAddress, local: SocketAddress) -> Self {
Self {
remote,
local: Some(local),
})
}
}
pub fn new_no_local(remote: PeerAddress) -> Result<Self, VeilidAPIError> {
Ok(Self {
pub fn new_no_local(remote: PeerAddress) -> Self {
Self {
remote,
local: None,
})
}
}
pub fn remote(&self) -> PeerAddress {
self.remote

View File

@ -19,7 +19,7 @@ EXTERNAL SOURCES:
:path: Flutter/ephemeral/.symlinks/plugins/veilid/macos
SPEC CHECKSUMS:
FlutterMacOS: 57701585bf7de1b3fc2bb61f6378d73bbdea8424
FlutterMacOS: ae6af50a8ea7d6103d888583d46bd8328a7e9811
path_provider_macos: 3c0c3b4b0d4a76d2bf989a913c2de869c5641a19
veilid: 6bed3adec63fd8708a2ace498e0e17941c9fc32b