diff --git a/veilid-core/src/network_manager/connection_handle.rs b/veilid-core/src/network_manager/connection_handle.rs index 228f44a0..aa37070a 100644 --- a/veilid-core/src/network_manager/connection_handle.rs +++ b/veilid-core/src/network_manager/connection_handle.rs @@ -2,6 +2,7 @@ use super::*; #[derive(Clone, Debug)] pub struct ConnectionHandle { + id: u64, descriptor: ConnectionDescriptor, channel: flume::Sender>, } @@ -13,13 +14,22 @@ pub enum ConnectionHandleSendResult { } impl ConnectionHandle { - pub(super) fn new(descriptor: ConnectionDescriptor, channel: flume::Sender>) -> Self { + pub(super) fn new( + id: u64, + descriptor: ConnectionDescriptor, + channel: flume::Sender>, + ) -> Self { Self { + id, descriptor, channel, } } + pub fn connection_id(&self) -> u64 { + self.id + } + pub fn connection_descriptor(&self) -> ConnectionDescriptor { self.descriptor.clone() } diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index f181dfe9..191bb8b4 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -11,12 +11,11 @@ use stop_token::future::FutureExt; enum ConnectionManagerEvent { Accepted(ProtocolNetworkConnection), Dead(NetworkConnection), - Finished(ConnectionDescriptor), } #[derive(Debug)] struct ConnectionManagerInner { - connection_table: ConnectionTable, + next_id: NetworkConnectionId, sender: flume::Sender, async_processor_jh: Option>, stop_source: Option, @@ -24,6 +23,9 @@ struct ConnectionManagerInner { struct ConnectionManagerArc { network_manager: NetworkManager, + connection_initial_timeout_ms: u32, + connection_inactivity_timeout_ms: u32, + connection_table: ConnectionTable, inner: Mutex>, } impl core::fmt::Debug for ConnectionManagerArc { @@ -41,21 +43,32 @@ pub struct ConnectionManager { impl ConnectionManager { fn new_inner( - config: VeilidConfig, stop_source: StopSource, sender: flume::Sender, async_processor_jh: MustJoinHandle<()>, ) -> ConnectionManagerInner { ConnectionManagerInner { + next_id: 0, stop_source: Some(stop_source), sender: sender, async_processor_jh: Some(async_processor_jh), - connection_table: ConnectionTable::new(config), } } fn new_arc(network_manager: NetworkManager) -> ConnectionManagerArc { + let config = network_manager.config(); + let (connection_initial_timeout_ms, connection_inactivity_timeout_ms) = { + let c = config.get(); + ( + c.network.connection_initial_timeout_ms, + c.network.connection_inactivity_timeout_ms, + ) + }; + ConnectionManagerArc { network_manager, + connection_initial_timeout_ms, + connection_inactivity_timeout_ms, + connection_table: ConnectionTable::new(config), inner: Mutex::new(None), } } @@ -69,6 +82,14 @@ impl ConnectionManager { self.arc.network_manager.clone() } + pub fn connection_initial_timeout_ms(&self) -> u32 { + self.arc.connection_initial_timeout_ms + } + + pub fn connection_inactivity_timeout_ms(&self) -> u32 { + self.arc.connection_inactivity_timeout_ms + } + pub async fn startup(&self) { trace!("startup connection manager"); let mut inner = self.arc.inner.lock(); @@ -86,12 +107,7 @@ impl ConnectionManager { let async_processor = spawn(self.clone().async_processor(stop_source.token(), receiver)); // Store in the inner object - *inner = Some(Self::new_inner( - self.network_manager().config(), - stop_source, - sender, - async_processor, - )); + *inner = Some(Self::new_inner(stop_source, sender, async_processor)); } pub async fn shutdown(&self) { @@ -117,22 +133,10 @@ impl ConnectionManager { async_processor_jh.await; // Wait for the connections to complete debug!("waiting for connection handlers to complete"); - inner.connection_table.join().await; + self.arc.connection_table.join().await; debug!("finished connection manager shutdown"); } - // Returns a network connection if one already is established - pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option { - let mut inner = self.arc.inner.lock(); - let inner = match &mut *inner { - Some(v) => v, - None => { - panic!("not started"); - } - }; - inner.connection_table.get_connection(descriptor) - } - // Internal routine to register new connection atomically. // Registers connection in the connection table for later access // and spawns a message processing loop for the connection @@ -141,7 +145,14 @@ impl ConnectionManager { inner: &mut ConnectionManagerInner, prot_conn: ProtocolNetworkConnection, ) -> EyreResult> { - log_net!("on_new_protocol_network_connection: {:?}", prot_conn); + // Get next connection id to use + let id = inner.next_id; + inner.next_id += 1; + log_net!( + "on_new_protocol_network_connection: id={} prot_conn={:?}", + id, + prot_conn + ); // Wrap with NetworkConnection object to start the connection processing loop let stop_token = match &inner.stop_source { @@ -149,143 +160,137 @@ impl ConnectionManager { None => bail!("not creating connection because we are stopping"), }; - let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn); + let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id); let handle = conn.get_handle(); // Add to the connection table - match inner.connection_table.add_connection(conn) { + match self.arc.connection_table.add_connection(conn) { Ok(None) => { // Connection added } Ok(Some(conn)) => { // Connection added and a different one LRU'd out + // Send it to be terminated let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); } Err(ConnectionTableAddError::AddressFilter(conn, e)) => { // Connection filtered let desc = conn.connection_descriptor(); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); - return Err(eyre!("connection filtered: {:?} ({})", desc, e)); + return Ok(NetworkResult::no_connection_other(format!( + "connection filtered: {:?} ({})", + desc, e + ))); } Err(ConnectionTableAddError::AlreadyExists(conn)) => { // Connection already exists let desc = conn.connection_descriptor(); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); - return Err(eyre!("connection already exists: {:?}", desc)); + // xxx remove this + panic!( + "connection already exists: {:?} connection_table: {:#?}", + desc, self.arc.connection_table + ); + return Ok(NetworkResult::no_connection_other(format!( + "connection already exists: {:?}", + desc + ))); } }; Ok(NetworkResult::Value(handle)) } + // Returns a network connection if one already is established + pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option { + self.arc + .connection_table + .get_connection_by_descriptor(descriptor) + } + + // Terminate any connections that would collide with a new connection + // using different protocols to the same remote address and port. Used to ensure + // that we can switch quickly between TCP and WS if necessary to the same node + // Returns true if we killed off colliding connections + async fn kill_off_colliding_connections(&self, dial_info: &DialInfo) -> bool { + let protocol_type = dial_info.protocol_type(); + let socket_address = dial_info.socket_address(); + + let killed = self.arc.connection_table.drain_filter(|prior_descriptor| { + // If the protocol types aren't the same, then this is a candidate to be killed off + // If they are the same, then we would just return the exact same connection from get_or_create_connection() + if prior_descriptor.protocol_type() == protocol_type { + return false; + } + // If the prior remote is not the same address, then we're not going to collide + if *prior_descriptor.remote().socket_address() != socket_address { + return false; + } + + log_net!(debug + ">< Terminating connection prior_descriptor={:?}", + prior_descriptor + ); + true + }); + // Wait for the killed connections to end their recv loops + let did_kill = !killed.is_empty(); + for k in killed { + k.await; + } + did_kill + } + // Called when we want to create a new connection or get the current one that already exists // This will kill off any connections that are in conflict with the new connection to be made // in order to make room for the new connection in the system's connection table + // This routine needs to be atomic, or connections may exist in the table that are not established pub async fn get_or_create_connection( &self, local_addr: Option, dial_info: DialInfo, ) -> EyreResult> { - let killed = { - let mut inner = self.arc.inner.lock(); - let inner = match &mut *inner { - Some(v) => v, - None => { - panic!("not started"); - } - }; + log_net!( + "== get_or_create_connection local_addr={:?} dial_info={:?}", + local_addr.green(), + dial_info.green() + ); + // Kill off any possibly conflicting connections + let did_kill = self.kill_off_colliding_connections(&dial_info).await; + let mut retry_count = if did_kill { 2 } else { 0 }; + + // Make a connection descriptor for this dialinfo + let peer_address = dial_info.to_peer_address(); + let descriptor = match local_addr { + Some(la) => { + ConnectionDescriptor::new(peer_address, SocketAddress::from_socket_addr(la)) + } + None => ConnectionDescriptor::new_no_local(peer_address), + }; + + // If any connection to this remote exists that has the same protocol, return it + // Any connection will do, we don't have to match the local address + if let Some(conn) = self + .arc + .connection_table + .get_last_connection_by_remote(descriptor.remote()) + { log_net!( - "== get_or_create_connection local_addr={:?} dial_info={:?}", + "== Returning existing connection local_addr={:?} peer_address={:?}", local_addr.green(), - dial_info.green() + peer_address.green() ); - let peer_address = dial_info.to_peer_address(); - - // Make a connection to the address - // reject connections to addresses with an unknown or unsupported peer scope - let descriptor = match local_addr { - Some(la) => { - ConnectionDescriptor::new(peer_address, SocketAddress::from_socket_addr(la)) - } - None => ConnectionDescriptor::new_no_local(peer_address), - }?; - - // If any connection to this remote exists that has the same protocol, return it - // Any connection will do, we don't have to match the local address - - if let Some(conn) = inner - .connection_table - .get_last_connection_by_remote(descriptor.remote()) - { - log_net!( - "== Returning existing connection local_addr={:?} peer_address={:?}", - local_addr.green(), - peer_address.green() - ); - - return Ok(NetworkResult::Value(conn)); - } - - // Drop any other protocols connections to this remote that have the same local addr - // otherwise this connection won't succeed due to binding - let mut killed = Vec::::new(); - if let Some(local_addr) = local_addr { - if local_addr.port() != 0 { - for pt in [ProtocolType::TCP, ProtocolType::WS, ProtocolType::WSS] { - let pa = PeerAddress::new(descriptor.remote_address().clone(), pt); - for prior_descriptor in inner - .connection_table - .get_connection_descriptors_by_remote(pa) - { - let mut kill = false; - // See if the local address would collide - if let Some(prior_local) = prior_descriptor.local() { - if (local_addr.ip().is_unspecified() - || prior_local.to_ip_addr().is_unspecified() - || (local_addr.ip() == prior_local.to_ip_addr())) - && prior_local.port() == local_addr.port() - { - kill = true; - } - } - if kill { - log_net!(debug - ">< Terminating connection prior_descriptor={:?}", - prior_descriptor - ); - let mut conn = inner - .connection_table - .remove_connection(prior_descriptor) - .expect("connection not in table"); - - conn.close(); - - killed.push(conn); - } - } - } - } - } - killed - }; - - // Wait for the killed connections to end their recv loops - let mut retry_count = if !killed.is_empty() { 2 } else { 0 }; - for k in killed { - k.await; + return Ok(NetworkResult::Value(conn)); } - // Get connection timeout - let timeout_ms = { - let config = self.network_manager().config(); - let c = config.get(); - c.network.connection_initial_timeout_ms - }; - // Attempt new connection - let conn = network_result_try!(loop { - let result_net_res = - ProtocolNetworkConnection::connect(local_addr, &dial_info, timeout_ms).await; + let prot_conn = network_result_try!(loop { + let result_net_res = ProtocolNetworkConnection::connect( + local_addr, + &dial_info, + self.arc.connection_initial_timeout_ms, + ) + .await; match result_net_res { Ok(net_res) => { if net_res.is_value() || retry_count == 0 { @@ -311,7 +316,8 @@ impl ConnectionManager { bail!("shutting down"); } }; - self.on_new_protocol_network_connection(inner, conn) + + self.on_new_protocol_network_connection(inner, prot_conn) } /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -344,28 +350,6 @@ impl ConnectionManager { conn.close(); conn.await; } - ConnectionManagerEvent::Finished(desc) => { - let conn = { - let mut inner_lock = self.arc.inner.lock(); - match &mut *inner_lock { - Some(inner) => { - // Remove the connection and wait for the connection loop to terminate - if let Ok(conn) = inner.connection_table.remove_connection(desc) { - // Must close and wait to ensure things join - Some(conn) - } else { - None - } - } - None => None, - } - }; - - if let Some(mut conn) = conn { - conn.close(); - conn.await; - } - } } } } @@ -375,7 +359,7 @@ impl ConnectionManager { #[cfg_attr(target_os = "wasm32", allow(dead_code))] pub(super) async fn on_accepted_protocol_network_connection( &self, - conn: ProtocolNetworkConnection, + protocol_connection: ProtocolNetworkConnection, ) -> EyreResult<()> { // Get channel sender let sender = { @@ -392,14 +376,14 @@ impl ConnectionManager { // Inform the processor of the event let _ = sender - .send_async(ConnectionManagerEvent::Accepted(conn)) + .send_async(ConnectionManagerEvent::Accepted(protocol_connection)) .await; Ok(()) } // Callback from network connection receive loop when it exits // cleans up the entry in the connection table - pub(super) async fn report_connection_finished(&self, descriptor: ConnectionDescriptor) { + pub(super) async fn report_connection_finished(&self, connection_id: u64) { // Get channel sender let sender = { let mut inner = self.arc.inner.lock(); @@ -413,9 +397,15 @@ impl ConnectionManager { inner.sender.clone() }; + // Remove the connection + let conn = self + .arc + .connection_table + .remove_connection_by_id(connection_id); + // Inform the processor of the event - let _ = sender - .send_async(ConnectionManagerEvent::Finished(descriptor)) - .await; + if let Some(conn) = conn { + let _ = sender.send_async(ConnectionManagerEvent::Dead(conn)).await; + } } } diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index b8b9db93..b2e8f223 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -1,5 +1,4 @@ use super::*; -use alloc::collections::btree_map::Entry; use futures_util::StreamExt; use hashlink::LruCache; @@ -21,36 +20,21 @@ impl ConnectionTableAddError { } } -/////////////////////////////////////////////////////////////////////////////// -#[derive(ThisError, Debug)] -pub enum ConnectionTableRemoveError { - #[error("Connection not in table")] - NotInTable, -} - -impl ConnectionTableRemoveError { - pub fn not_in_table() -> Self { - ConnectionTableRemoveError::NotInTable - } -} - /////////////////////////////////////////////////////////////////////////////// #[derive(Debug)] -pub struct ConnectionTable { +pub struct ConnectionTableInner { max_connections: Vec, - conn_by_descriptor: Vec>, - descriptors_by_remote: BTreeMap>, + conn_by_id: Vec>, + protocol_index_by_id: BTreeMap, + id_by_descriptor: BTreeMap, + ids_by_remote: BTreeMap>, address_filter: ConnectionLimits, } -fn protocol_to_index(protocol: ProtocolType) -> usize { - match protocol { - ProtocolType::TCP => 0, - ProtocolType::WS => 1, - ProtocolType::WSS => 2, - ProtocolType::UDP => panic!("not a connection-oriented protocol"), - } +#[derive(Debug)] +pub struct ConnectionTable { + inner: Arc>, } impl ConnectionTable { @@ -64,154 +48,217 @@ impl ConnectionTable { ] }; Self { - max_connections, - conn_by_descriptor: vec![ - LruCache::new_unbounded(), - LruCache::new_unbounded(), - LruCache::new_unbounded(), - ], - descriptors_by_remote: BTreeMap::new(), - address_filter: ConnectionLimits::new(config), + inner: Arc::new(Mutex::new(ConnectionTableInner { + max_connections, + conn_by_id: vec![ + LruCache::new_unbounded(), + LruCache::new_unbounded(), + LruCache::new_unbounded(), + ], + protocol_index_by_id: BTreeMap::new(), + id_by_descriptor: BTreeMap::new(), + ids_by_remote: BTreeMap::new(), + address_filter: ConnectionLimits::new(config), + })), } } - pub async fn join(&mut self) { - let mut unord = FuturesUnordered::new(); - for table in &mut self.conn_by_descriptor { - for (_, v) in table.drain() { - trace!("connection table join: {:?}", v); - unord.push(v); - } + fn protocol_to_index(protocol: ProtocolType) -> usize { + match protocol { + ProtocolType::TCP => 0, + ProtocolType::WS => 1, + ProtocolType::WSS => 2, + ProtocolType::UDP => panic!("not a connection-oriented protocol"), } + } + + pub async fn join(&self) { + let mut unord = { + let mut inner = self.inner.lock(); + let unord = FuturesUnordered::new(); + for table in &mut inner.conn_by_id { + for (_, v) in table.drain() { + trace!("connection table join: {:?}", v); + unord.push(v); + } + } + inner.id_by_descriptor.clear(); + inner.ids_by_remote.clear(); + unord + }; + while unord.next().await.is_some() {} } pub fn add_connection( - &mut self, - conn: NetworkConnection, + &self, + network_connection: NetworkConnection, ) -> Result, ConnectionTableAddError> { - let descriptor = conn.connection_descriptor(); - let ip_addr = descriptor.remote_address().to_ip_addr(); + // Get indices for network connection table + let id = network_connection.connection_id(); + let descriptor = network_connection.connection_descriptor(); + let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); + let remote = descriptor.remote(); - let index = protocol_to_index(descriptor.protocol_type()); - if self.conn_by_descriptor[index].contains_key(&descriptor) { - return Err(ConnectionTableAddError::already_exists(conn)); + let mut inner = self.inner.lock(); + + // Two connections to the same descriptor should be rejected (soft rejection) + if inner.id_by_descriptor.contains_key(&descriptor) { + return Err(ConnectionTableAddError::already_exists(network_connection)); + } + + // Sanity checking this implementation (hard fails that would invalidate the representation) + if inner.conn_by_id[protocol_index].contains_key(&id) { + panic!("duplicate connection id: {:#?}", network_connection); + } + if inner.protocol_index_by_id.get(&id).is_some() { + panic!("duplicate id to protocol index: {:#?}", network_connection); + } + if let Some(ids) = inner.ids_by_remote.get(&descriptor.remote()) { + if ids.contains(&id) { + panic!("duplicate id by remote: {:#?}", network_connection); + } } // Filter by ip for connection limits - match self.address_filter.add(ip_addr) { + let ip_addr = descriptor.remote_address().to_ip_addr(); + match inner.address_filter.add(ip_addr) { Ok(()) => {} Err(e) => { - // send connection to get cleaned up cleanly - return Err(ConnectionTableAddError::address_filter(conn, e)); + // Return the connection in the error to be disposed of + return Err(ConnectionTableAddError::address_filter( + network_connection, + e, + )); } }; // Add the connection to the table - let res = self.conn_by_descriptor[index].insert(descriptor.clone(), conn); + let res = inner.conn_by_id[protocol_index].insert(id, network_connection); assert!(res.is_none()); // if we have reached the maximum number of connections per protocol type // then drop the least recently used connection let mut out_conn = None; - if self.conn_by_descriptor[index].len() > self.max_connections[index] { - if let Some((lruk, lru_conn)) = self.conn_by_descriptor[index].remove_lru() { - debug!("connection lru out: {:?}", lruk); + if inner.conn_by_id[protocol_index].len() > inner.max_connections[protocol_index] { + if let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].remove_lru() { + debug!("connection lru out: {:?}", lru_conn); out_conn = Some(lru_conn); - self.remove_connection_records(lruk); + Self::remove_connection_records(&mut *inner, lruk); } } // add connection records - let descriptors = self - .descriptors_by_remote - .entry(descriptor.remote()) - .or_default(); - - descriptors.push(descriptor); + inner.protocol_index_by_id.insert(id, protocol_index); + inner.id_by_descriptor.insert(descriptor, id); + inner.ids_by_remote.entry(remote).or_default().push(id); Ok(out_conn) } - pub fn get_connection(&mut self, descriptor: ConnectionDescriptor) -> Option { - let index = protocol_to_index(descriptor.protocol_type()); - let out = self.conn_by_descriptor[index].get(&descriptor); - out.map(|c| c.get_handle()) + pub fn get_connection_by_id(&self, id: NetworkConnectionId) -> Option { + let mut inner = self.inner.lock(); + let protocol_index = *inner.protocol_index_by_id.get(&id)?; + let out = inner.conn_by_id[protocol_index].get(&id).unwrap(); + Some(out.get_handle()) } - pub fn get_last_connection_by_remote( - &mut self, - remote: PeerAddress, + pub fn get_connection_by_descriptor( + &self, + descriptor: ConnectionDescriptor, ) -> Option { - let descriptor = self - .descriptors_by_remote - .get(&remote) - .map(|v| v[(v.len() - 1)].clone()); - if let Some(descriptor) = descriptor { - // lru bump - let index = protocol_to_index(descriptor.protocol_type()); - let handle = self.conn_by_descriptor[index] - .get(&descriptor) - .map(|c| c.get_handle()); - handle - } else { - None - } + let mut inner = self.inner.lock(); + + let id = *inner.id_by_descriptor.get(&descriptor)?; + let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); + let out = inner.conn_by_id[protocol_index].get(&id).unwrap(); + Some(out.get_handle()) } - pub fn get_connection_descriptors_by_remote( - &mut self, - remote: PeerAddress, - ) -> Vec { - self.descriptors_by_remote + pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option { + let mut inner = self.inner.lock(); + + let id = inner.ids_by_remote.get(&remote).map(|v| v[(v.len() - 1)])?; + let protocol_index = Self::protocol_to_index(remote.protocol_type()); + let out = inner.conn_by_id[protocol_index].get(&id).unwrap(); + Some(out.get_handle()) + } + + pub fn get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec { + let inner = self.inner.lock(); + inner + .ids_by_remote .get(&remote) .cloned() .unwrap_or_default() } - pub fn connection_count(&self) -> usize { - self.conn_by_descriptor.iter().fold(0, |b, c| b + c.len()) - } - - fn remove_connection_records(&mut self, descriptor: ConnectionDescriptor) { - let ip_addr = descriptor.remote_address().to_ip_addr(); - - // conns_by_remote - match self.descriptors_by_remote.entry(descriptor.remote()) { - Entry::Vacant(_) => { - panic!("inconsistency in connection table") - } - Entry::Occupied(mut o) => { - let v = o.get_mut(); - - // Remove one matching connection from the list - for (n, elem) in v.iter().enumerate() { - if *elem == descriptor { - v.remove(n); - break; - } - } - // No connections left for this remote, remove the entry from conns_by_remote - if v.is_empty() { - o.remove_entry(); + pub fn drain_filter(&self, mut filter: F) -> Vec + where + F: FnMut(ConnectionDescriptor) -> bool, + { + let mut inner = self.inner.lock(); + let mut filtered_ids = Vec::new(); + for cbi in &mut inner.conn_by_id { + for (id, conn) in cbi { + if filter(conn.connection_descriptor()) { + filtered_ids.push(*id); } } } - self.address_filter + let mut filtered_connections = Vec::new(); + for id in filtered_ids { + let conn = Self::remove_connection_records(&mut *inner, id); + filtered_connections.push(conn) + } + filtered_connections + } + + pub fn connection_count(&self) -> usize { + let inner = self.inner.lock(); + inner.conn_by_id.iter().fold(0, |acc, c| acc + c.len()) + } + + fn remove_connection_records( + inner: &mut ConnectionTableInner, + id: NetworkConnectionId, + ) -> NetworkConnection { + // protocol_index_by_id + let protocol_index = inner.protocol_index_by_id.remove(&id).unwrap(); + // conn_by_id + let conn = inner.conn_by_id[protocol_index].remove(&id).unwrap(); + // id_by_descriptor + let descriptor = conn.connection_descriptor(); + inner.id_by_descriptor.remove(&descriptor).unwrap(); + // ids_by_remote + let remote = descriptor.remote(); + let ids = inner.ids_by_remote.get_mut(&remote).unwrap(); + for (n, elem) in ids.iter().enumerate() { + if *elem == id { + ids.remove(n); + if ids.is_empty() { + inner.ids_by_remote.remove(&remote).unwrap(); + } + break; + } + } + // address_filter + let ip_addr = remote.to_socket_addr().ip(); + inner + .address_filter .remove(ip_addr) .expect("Inconsistency in connection table"); + conn } - pub fn remove_connection( - &mut self, - descriptor: ConnectionDescriptor, - ) -> Result { - let index = protocol_to_index(descriptor.protocol_type()); - let conn = self.conn_by_descriptor[index] - .remove(&descriptor) - .ok_or_else(|| ConnectionTableRemoveError::not_in_table())?; + pub fn remove_connection_by_id(&self, id: NetworkConnectionId) -> Option { + let mut inner = self.inner.lock(); - self.remove_connection_records(descriptor); - Ok(conn) + let protocol_index = *inner.protocol_index_by_id.get(&id)?; + if !inner.conn_by_id[protocol_index].contains_key(&id) { + return None; + } + let conn = Self::remove_connection_records(&mut *inner, id); + Some(conn) } } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 36fadf67..3e97582e 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1314,14 +1314,27 @@ impl NetworkManager { ) -> SendPinBoxFuture>> { let this = self.clone(); Box::pin(async move { + info!("{}", format!("send_data to: {:?}", node_ref).red()); + // First try to send data to the last socket we've seen this peer on let data = if let Some(connection_descriptor) = node_ref.last_connection() { + info!( + "{}", + format!("last_connection to: {:?}", connection_descriptor).red() + ); + match this .net() .send_data_to_existing_connection(connection_descriptor, data) .await? { None => { + info!( + "{}", + format!("sent to existing connection: {:?}", connection_descriptor) + .red() + ); + // Update timestamp for this last connection since we just sent to it node_ref.set_last_connection(connection_descriptor, intf::get_timestamp()); @@ -1335,6 +1348,8 @@ impl NetworkManager { data }; + info!("{}", "no existing connection".red()); + // If we don't have last_connection, try to reach out to the peer via its dial info let contact_method = this.get_contact_method(node_ref.clone()); log_net!( diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 1ea67296..d0f843e0 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -149,8 +149,7 @@ impl RawTcpProtocolHandler { ); let local_address = self.inner.lock().local_address; let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( - ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address)) - .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?, + ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address)), ps, )); @@ -190,8 +189,7 @@ impl RawTcpProtocolHandler { ProtocolType::TCP, ), SocketAddress::from_socket_addr(actual_local_address), - ) - .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?, + ), ps, )); diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index 22a3c7ae..8dc39b2e 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -25,16 +25,10 @@ impl RawUdpProtocolHandler { ProtocolType::UDP, ); let local_socket_addr = self.socket.local_addr()?; - let descriptor = match ConnectionDescriptor::new( + let descriptor = ConnectionDescriptor::new( peer_addr, SocketAddress::from_socket_addr(local_socket_addr), - ) { - Ok(d) => d, - Err(_) => { - log_net!(debug "{}({}) at {}@{}:{}: {:?}", "Invalid peer scope".green(), "received message from invalid peer scope", file!(), line!(), column!(), peer_addr); - continue; - } - }; + ); break (size, descriptor); }; @@ -62,8 +56,7 @@ impl RawUdpProtocolHandler { let descriptor = ConnectionDescriptor::new( peer_addr, SocketAddress::from_socket_addr(local_socket_addr), - ) - .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?; + ); let len = network_result_try!(self .socket diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 8a6d8909..a0b48b42 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -212,8 +212,7 @@ impl WebsocketProtocolHandler { ConnectionDescriptor::new( peer_addr, SocketAddress::from_socket_addr(self.arc.local_address), - ) - .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?, + ), ws_stream, )); @@ -268,8 +267,7 @@ impl WebsocketProtocolHandler { let descriptor = ConnectionDescriptor::new( dial_info.to_peer_address(), SocketAddress::from_socket_addr(actual_local_addr), - ) - .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?; + ); // Negotiate TLS if this is WSS if tls { diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 770d7ef8..98d007e2 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -1,6 +1,6 @@ use super::*; use futures_util::{FutureExt, StreamExt}; -use std::io; +use std::{io, sync::Arc}; use stop_token::prelude::*; cfg_if::cfg_if! { @@ -81,8 +81,12 @@ pub struct NetworkConnectionStats { last_message_recv_time: Option, } + +pub type NetworkConnectionId = u64; + #[derive(Debug)] pub struct NetworkConnection { + connection_id: NetworkConnectionId, descriptor: ConnectionDescriptor, processor: Option>, established_time: u64, @@ -92,11 +96,12 @@ pub struct NetworkConnection { } impl NetworkConnection { - pub(super) fn dummy(descriptor: ConnectionDescriptor) -> Self { + pub(super) fn dummy(id: NetworkConnectionId, descriptor: ConnectionDescriptor) -> Self { // Create handle for sending (dummy is immediately disconnected) let (sender, _receiver) = flume::bounded(intf::get_concurrency() as usize); Self { + connection_id: id, descriptor, processor: None, established_time: intf::get_timestamp(), @@ -113,14 +118,10 @@ impl NetworkConnection { connection_manager: ConnectionManager, manager_stop_token: StopToken, protocol_connection: ProtocolNetworkConnection, + connection_id: NetworkConnectionId, ) -> Self { // Get timeout let network_manager = connection_manager.network_manager(); - let inactivity_timeout = network_manager - .config() - .get() - .network - .connection_inactivity_timeout_ms; // Get descriptor let descriptor = protocol_connection.descriptor(); @@ -142,15 +143,16 @@ impl NetworkConnection { connection_manager, local_stop_token, manager_stop_token, + connection_id, descriptor.clone(), receiver, protocol_connection, - inactivity_timeout, stats.clone(), )); // Return the connection Self { + connection_id, descriptor, processor: Some(processor), established_time: intf::get_timestamp(), @@ -160,12 +162,16 @@ impl NetworkConnection { } } + pub fn connection_id(&self) -> NetworkConnectionId { + self.connection_id + } + pub fn connection_descriptor(&self) -> ConnectionDescriptor { self.descriptor.clone() } pub fn get_handle(&self) -> ConnectionHandle { - ConnectionHandle::new(self.descriptor.clone(), self.sender.clone()) + ConnectionHandle::new(self.connection_id, self.descriptor.clone(), self.sender.clone()) } pub fn close(&mut self) { @@ -215,15 +221,15 @@ impl NetworkConnection { connection_manager: ConnectionManager, local_stop_token: StopToken, manager_stop_token: StopToken, + connection_id: NetworkConnectionId, descriptor: ConnectionDescriptor, receiver: flume::Receiver>, protocol_connection: ProtocolNetworkConnection, - connection_inactivity_timeout_ms: u32, stats: Arc>, ) -> SendPinBoxFuture<()> { Box::pin(async move { log_net!( - "== Starting process_connection loop for {:?}", + "== Starting process_connection loop for id={}, {:?}", connection_id, descriptor.green() ); @@ -235,7 +241,7 @@ impl NetworkConnection { // Push mutable timer so we can reset it // Normally we would use an io::timeout here, but WASM won't support that, so we use a mutable sleep future let new_timer = || { - intf::sleep(connection_inactivity_timeout_ms).then(|_| async { + intf::sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async { // timeout log_net!("== Connection timeout on {:?}", descriptor.green()); RecvLoopAction::Timeout @@ -317,7 +323,7 @@ impl NetworkConnection { .timeout_at(local_stop_token.clone()) .timeout_at(manager_stop_token.clone()) .await - .and_then(std::convert::identity) // flatten + .and_then(std::convert::identity) // flatten stoptoken timeouts { Ok(Some(RecvLoopAction::Send)) => { // Don't reset inactivity timer if we're only sending @@ -350,7 +356,7 @@ impl NetworkConnection { // Let the connection manager know the receive loop exited connection_manager - .report_connection_finished(descriptor) + .report_connection_finished(connection_id) .await; }) } diff --git a/veilid-core/src/network_manager/tests/test_connection_table.rs b/veilid-core/src/network_manager/tests/test_connection_table.rs index 6b9c5953..06991423 100644 --- a/veilid-core/src/network_manager/tests/test_connection_table.rs +++ b/veilid-core/src/network_manager/tests/test_connection_table.rs @@ -7,13 +7,12 @@ use crate::*; pub async fn test_add_get_remove() { let config = get_config(); - let mut table = ConnectionTable::new(config); + let table = ConnectionTable::new(config); let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new( SocketAddress::new(Address::IPV4(Ipv4Addr::new(192, 168, 0, 1)), 8080), ProtocolType::TCP, - )) - .unwrap(); + )); let a2 = a1; let a3 = ConnectionDescriptor::new( PeerAddress::new( @@ -26,8 +25,7 @@ pub async fn test_add_get_remove() { 0, 0, ))), - ) - .unwrap(); + ); let a4 = ConnectionDescriptor::new( PeerAddress::new( SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090), @@ -39,8 +37,7 @@ pub async fn test_add_get_remove() { 0, 0, ))), - ) - .unwrap(); + ); let a5 = ConnectionDescriptor::new( PeerAddress::new( SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090), @@ -52,79 +49,72 @@ pub async fn test_add_get_remove() { 0, 0, ))), - ) - .unwrap(); + ); - let c1 = NetworkConnection::dummy(a1); + let c1 = NetworkConnection::dummy(1, a1); let c1h = c1.get_handle(); - let c2 = NetworkConnection::dummy(a2); - //let c2h = c2.get_handle(); - let c3 = NetworkConnection::dummy(a3); - //let c3h = c3.get_handle(); - let c4 = NetworkConnection::dummy(a4); - //let c4h = c4.get_handle(); - let c5 = NetworkConnection::dummy(a5); - //let c5h = c5.get_handle(); + let c2 = NetworkConnection::dummy(2, a2); + let c3 = NetworkConnection::dummy(3, a3); + let c4 = NetworkConnection::dummy(4, a4); + let c5 = NetworkConnection::dummy(5, a5); assert_eq!(a1, c2.connection_descriptor()); assert_ne!(a3, c4.connection_descriptor()); assert_ne!(a4, c5.connection_descriptor()); assert_eq!(table.connection_count(), 0); - assert_eq!(table.get_connection(a1), None); + assert_eq!(table.get_connection_by_descriptor(a1), None); table.add_connection(c1).unwrap(); assert_eq!(table.connection_count(), 1); - assert_err!(table.remove_connection(a3)); - assert_err!(table.remove_connection(a4)); + assert!(table.remove_connection_by_id(4).is_none()); + assert!(table.remove_connection_by_id(5).is_none()); assert_eq!(table.connection_count(), 1); - assert_eq!(table.get_connection(a1), Some(c1h.clone())); - assert_eq!(table.get_connection(a1), Some(c1h.clone())); + assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); + assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); assert_eq!(table.connection_count(), 1); assert_err!(table.add_connection(c2)); assert_eq!(table.connection_count(), 1); - assert_eq!(table.get_connection(a1), Some(c1h.clone())); - assert_eq!(table.get_connection(a1), Some(c1h.clone())); + assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); + assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); assert_eq!(table.connection_count(), 1); assert_eq!( table - .remove_connection(a2) + .remove_connection_by_id(1) .map(|c| c.connection_descriptor()) .unwrap(), a1 ); assert_eq!(table.connection_count(), 0); - assert_err!(table.remove_connection(a2)); + assert!(table.remove_connection_by_id(2).is_none()); assert_eq!(table.connection_count(), 0); - assert_eq!(table.get_connection(a2), None); - assert_eq!(table.get_connection(a1), None); + assert_eq!(table.get_connection_by_descriptor(a2), None); + assert_eq!(table.get_connection_by_descriptor(a1), None); assert_eq!(table.connection_count(), 0); - let c1 = NetworkConnection::dummy(a1); - //let c1h = c1.get_handle(); + let c1 = NetworkConnection::dummy(6, a1); table.add_connection(c1).unwrap(); - let c2 = NetworkConnection::dummy(a2); - //let c2h = c2.get_handle(); + let c2 = NetworkConnection::dummy(7, a2); assert_err!(table.add_connection(c2)); table.add_connection(c3).unwrap(); table.add_connection(c4).unwrap(); assert_eq!(table.connection_count(), 3); assert_eq!( table - .remove_connection(a2) + .remove_connection_by_id(6) .map(|c| c.connection_descriptor()) .unwrap(), a2 ); assert_eq!( table - .remove_connection(a3) + .remove_connection_by_id(3) .map(|c| c.connection_descriptor()) .unwrap(), a3 ); assert_eq!( table - .remove_connection(a4) + .remove_connection_by_id(4) .map(|c| c.connection_descriptor()) .unwrap(), a4 diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index 0915e764..1c9df3c3 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -134,8 +134,7 @@ impl WebsocketProtocolHandler { // Make our connection descriptor let wnc = WebsocketNetworkConnection::new( - ConnectionDescriptor::new_no_local(dial_info.to_peer_address()) - .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?, + ConnectionDescriptor::new_no_local(dial_info.to_peer_address()), wsmeta, wsio, ); diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 1db34935..1bffb04b 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -741,7 +741,7 @@ impl RoutingTable { signed_node_info: SignedNodeInfo, allow_invalid: bool, ) -> Option { - log_rtab!("register_node_with_signed_node_info: routing_domain: {:?}, node_id: {:?}, signed_node_info: {:?}, allow_invalid: {:?}", routing_domain, node_id, signed_node_info, allow_invalid ); + //log_rtab!("register_node_with_signed_node_info: routing_domain: {:?}, node_id: {:?}, signed_node_info: {:?}, allow_invalid: {:?}", routing_domain, node_id, signed_node_info, allow_invalid ); // validate signed node info is not something malicious if node_id == self.node_id() { @@ -858,6 +858,9 @@ impl RoutingTable { let mut dead = true; if let Some(nr) = self.lookup_node_ref(*e) { if let Some(last_connection) = nr.last_connection() { + + + out.push((*e, RecentPeersEntry { last_connection })); dead = false; } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index bd8fbfd3..79d68d3f 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1507,17 +1507,17 @@ pub struct ConnectionDescriptor { } impl ConnectionDescriptor { - pub fn new(remote: PeerAddress, local: SocketAddress) -> Result { - Ok(Self { + pub fn new(remote: PeerAddress, local: SocketAddress) -> Self { + Self { remote, local: Some(local), - }) + } } - pub fn new_no_local(remote: PeerAddress) -> Result { - Ok(Self { + pub fn new_no_local(remote: PeerAddress) -> Self { + Self { remote, local: None, - }) + } } pub fn remote(&self) -> PeerAddress { self.remote diff --git a/veilid-flutter/example/macos/Podfile.lock b/veilid-flutter/example/macos/Podfile.lock index b4a53fb0..b63b0c0e 100644 --- a/veilid-flutter/example/macos/Podfile.lock +++ b/veilid-flutter/example/macos/Podfile.lock @@ -19,7 +19,7 @@ EXTERNAL SOURCES: :path: Flutter/ephemeral/.symlinks/plugins/veilid/macos SPEC CHECKSUMS: - FlutterMacOS: 57701585bf7de1b3fc2bb61f6378d73bbdea8424 + FlutterMacOS: ae6af50a8ea7d6103d888583d46bd8328a7e9811 path_provider_macos: 3c0c3b4b0d4a76d2bf989a913c2de869c5641a19 veilid: 6bed3adec63fd8708a2ace498e0e17941c9fc32b