diff --git a/veilid-core/src/network_manager/connection_handle.rs b/veilid-core/src/network_manager/connection_handle.rs index bbc7dff5..119e1535 100644 --- a/veilid-core/src/network_manager/connection_handle.rs +++ b/veilid-core/src/network_manager/connection_handle.rs @@ -2,8 +2,8 @@ use super::*; #[derive(Clone, Debug)] pub struct ConnectionHandle { - _id: NetworkConnectionId, - descriptor: ConnectionDescriptor, + connection_id: NetworkConnectionId, + flow: Flow, channel: flume::Sender<(Option, Vec)>, } @@ -15,23 +15,32 @@ pub enum ConnectionHandleSendResult { impl ConnectionHandle { pub(super) fn new( - id: NetworkConnectionId, - descriptor: ConnectionDescriptor, + connection_id: NetworkConnectionId, + flow: Flow, channel: flume::Sender<(Option, Vec)>, ) -> Self { Self { - _id: id, - descriptor, + connection_id, + flow, channel, } } - // pub fn connection_id(&self) -> NetworkConnectionId { - // self.id - // } + #[allow(dead_code)] + pub fn connection_id(&self) -> NetworkConnectionId { + self.connection_id + } - pub fn connection_descriptor(&self) -> ConnectionDescriptor { - self.descriptor + #[allow(dead_code)] + pub fn flow(&self) -> Flow { + self.flow + } + + pub fn unique_flow(&self) -> UniqueFlow { + UniqueFlow { + flow: self.flow, + connection_id: Some(self.connection_id), + } } // #[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(self, message), fields(message.len = message.len())))] @@ -57,7 +66,7 @@ impl ConnectionHandle { impl PartialEq for ConnectionHandle { fn eq(&self, other: &Self) -> bool { - self.descriptor == other.descriptor + self.connection_id == other.connection_id && self.flow == other.flow } } diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index c2c53eb6..a8bec2f7 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -16,23 +16,25 @@ enum ConnectionManagerEvent { #[derive(Debug)] pub(crate) struct ConnectionRefScope { connection_manager: ConnectionManager, - descriptor: ConnectionDescriptor, + id: NetworkConnectionId, } impl ConnectionRefScope { - pub fn new(connection_manager: ConnectionManager, descriptor: ConnectionDescriptor) -> Self { - connection_manager.connection_ref(descriptor, ConnectionRefKind::AddRef); - Self { - connection_manager, - descriptor, + pub fn try_new(connection_manager: ConnectionManager, id: NetworkConnectionId) -> Option { + if !connection_manager.connection_ref(id, ConnectionRefKind::AddRef) { + return None; } + Some(Self { + connection_manager, + id, + }) } } impl Drop for ConnectionRefScope { fn drop(&mut self) { self.connection_manager - .connection_ref(self.descriptor, ConnectionRefKind::RemoveRef); + .connection_ref(self.id, ConnectionRefKind::RemoveRef); } } @@ -163,7 +165,7 @@ impl ConnectionManager { fn should_protect_connection(&self, conn: &NetworkConnection) -> Option { let netman = self.network_manager(); let routing_table = netman.routing_table(); - let remote_address = conn.connection_descriptor().remote_address().address(); + let remote_address = conn.flow().remote_address().address(); let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else { return None; }; @@ -173,8 +175,8 @@ impl ConnectionManager { let relay_nr = rn.filtered_clone( NodeRefFilter::new() .with_routing_domain(routing_domain) - .with_address_type(conn.connection_descriptor().address_type()) - .with_protocol_type(conn.connection_descriptor().protocol_type()), + .with_address_type(conn.flow().address_type()) + .with_protocol_type(conn.flow().protocol_type()), ); let dids = relay_nr.all_filtered_dial_info_details(); for did in dids { @@ -231,7 +233,7 @@ impl ConnectionManager { } Err(ConnectionTableAddError::AddressFilter(conn, e)) => { // Connection filtered - let desc = conn.connection_descriptor(); + let desc = conn.flow(); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); return Ok(NetworkResult::no_connection_other(format!( "connection filtered: {:?} ({})", @@ -240,7 +242,7 @@ impl ConnectionManager { } Err(ConnectionTableAddError::AlreadyExists(conn)) => { // Connection already exists - let desc = conn.connection_descriptor(); + let desc = conn.flow(); log_net!(debug "== Connection already exists: {:?}", conn.debug_print(get_aligned_timestamp())); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); return Ok(NetworkResult::no_connection_other(format!( @@ -250,7 +252,7 @@ impl ConnectionManager { } Err(ConnectionTableAddError::TableFull(conn)) => { // Connection table is full - let desc = conn.connection_descriptor(); + let desc = conn.flow(); log_net!(debug "== Connection table full: {:?}", conn.debug_print(get_aligned_timestamp())); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); return Ok(NetworkResult::no_connection_other(format!( @@ -263,10 +265,8 @@ impl ConnectionManager { } // Returns a network connection if one already is established - pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option { - self.arc - .connection_table - .peek_connection_by_descriptor(descriptor) + pub fn get_connection(&self, flow: Flow) -> Option { + self.arc.connection_table.peek_connection_by_flow(flow) } // Returns a network connection if one already is established @@ -275,13 +275,11 @@ impl ConnectionManager { } // Protects a network connection if one already is established - fn connection_ref(&self, descriptor: ConnectionDescriptor, kind: ConnectionRefKind) { - self.arc - .connection_table - .ref_connection_by_descriptor(descriptor, kind); + fn connection_ref(&self, id: NetworkConnectionId, kind: ConnectionRefKind) -> bool { + self.arc.connection_table.ref_connection_by_id(id, kind) } - pub fn connection_ref_scope(&self, descriptor: ConnectionDescriptor) -> ConnectionRefScope { - ConnectionRefScope::new(self.clone(), descriptor) + pub fn try_connection_ref_scope(&self, id: NetworkConnectionId) -> Option { + ConnectionRefScope::try_new(self.clone(), id) } /// Called when we want to create a new connection or get the current one that already exists @@ -385,14 +383,22 @@ impl ConnectionManager { // Process async commands while let Ok(Ok(event)) = receiver.recv_async().timeout_at(stop_token.clone()).await { match event { - ConnectionManagerEvent::Accepted(conn) => { + ConnectionManagerEvent::Accepted(prot_conn) => { + // Async lock on the remote address for atomicity per remote + let _lock_guard = self + .arc + .address_lock_table + .lock_tag(prot_conn.flow().remote_address().socket_addr()) + .await; + let mut inner = self.arc.inner.lock(); match &mut *inner { Some(inner) => { // Register the connection // We don't care if this fails, since nobody here asked for the inbound connection. // If it does, we just drop the connection - let _ = self.on_new_protocol_network_connection(inner, conn); + + let _ = self.on_new_protocol_network_connection(inner, prot_conn); } None => { // If this somehow happens, we're shutting down @@ -400,6 +406,12 @@ impl ConnectionManager { }; } ConnectionManagerEvent::Dead(mut conn) => { + let _lock_guard = self + .arc + .address_lock_table + .lock_tag(conn.flow().remote_address().socket_addr()) + .await; + conn.close(); conn.await; } diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index 88aa8bc8..f934f4ce 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -38,7 +38,7 @@ struct ConnectionTableInner { max_connections: Vec, conn_by_id: Vec>, protocol_index_by_id: BTreeMap, - id_by_descriptor: BTreeMap, + id_by_flow: BTreeMap, ids_by_remote: BTreeMap>, address_filter: AddressFilter, } @@ -67,7 +67,7 @@ impl ConnectionTable { LruCache::new_unbounded(), ], protocol_index_by_id: BTreeMap::new(), - id_by_descriptor: BTreeMap::new(), + id_by_flow: BTreeMap::new(), ids_by_remote: BTreeMap::new(), address_filter, })), @@ -105,7 +105,7 @@ impl ConnectionTable { } } inner.protocol_index_by_id.clear(); - inner.id_by_descriptor.clear(); + inner.id_by_flow.clear(); inner.ids_by_remote.clear(); unord }; @@ -151,14 +151,14 @@ impl ConnectionTable { ) -> Result, ConnectionTableAddError> { // Get indices for network connection table let id = network_connection.connection_id(); - let descriptor = network_connection.connection_descriptor(); - let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); - let remote = descriptor.remote(); + let flow = network_connection.flow(); + let protocol_index = Self::protocol_to_index(flow.protocol_type()); + let remote = flow.remote(); let mut inner = self.inner.lock(); - // Two connections to the same descriptor should be rejected (soft rejection) - if inner.id_by_descriptor.contains_key(&descriptor) { + // Two connections to the same flow should be rejected (soft rejection) + if inner.id_by_flow.contains_key(&flow) { return Err(ConnectionTableAddError::already_exists(network_connection)); } @@ -169,14 +169,14 @@ impl ConnectionTable { if inner.protocol_index_by_id.get(&id).is_some() { panic!("duplicate id to protocol index: {:#?}", network_connection); } - if let Some(ids) = inner.ids_by_remote.get(&descriptor.remote()) { + if let Some(ids) = inner.ids_by_remote.get(&flow.remote()) { if ids.contains(&id) { panic!("duplicate id by remote: {:#?}", network_connection); } } // Filter by ip for connection limits - let ip_addr = descriptor.remote_address().ip_addr(); + let ip_addr = flow.remote_address().ip_addr(); match inner.address_filter.add_connection(ip_addr) { Ok(()) => {} Err(e) => { @@ -217,12 +217,26 @@ impl ConnectionTable { // add connection records inner.protocol_index_by_id.insert(id, protocol_index); - inner.id_by_descriptor.insert(descriptor, id); + inner.id_by_flow.insert(flow, id); inner.ids_by_remote.entry(remote).or_default().push(id); Ok(out_conn) } + //#[instrument(level = "trace", skip(self), ret)] + pub fn peek_connection_by_flow(&self, flow: Flow) -> Option { + if flow.protocol_type() == ProtocolType::UDP { + return None; + } + + let inner = self.inner.lock(); + + let id = *inner.id_by_flow.get(&flow)?; + let protocol_index = Self::protocol_to_index(flow.protocol_type()); + let out = inner.conn_by_id[protocol_index].peek(&id).unwrap(); + Some(out.get_handle()) + } + //#[instrument(level = "trace", skip(self), ret)] pub fn touch_connection_by_id(&self, id: NetworkConnectionId) { let mut inner = self.inner.lock(); @@ -233,45 +247,21 @@ impl ConnectionTable { } //#[instrument(level = "trace", skip(self), ret)] - pub fn peek_connection_by_descriptor( + pub fn ref_connection_by_id( &self, - descriptor: ConnectionDescriptor, - ) -> Option { - if descriptor.protocol_type() == ProtocolType::UDP { - return None; - } - - let inner = self.inner.lock(); - - let id = *inner.id_by_descriptor.get(&descriptor)?; - let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); - let out = inner.conn_by_id[protocol_index].peek(&id).unwrap(); - Some(out.get_handle()) - } - - //#[instrument(level = "trace", skip(self), ret)] - pub fn ref_connection_by_descriptor( - &self, - descriptor: ConnectionDescriptor, + id: NetworkConnectionId, ref_type: ConnectionRefKind, ) -> bool { - if descriptor.protocol_type() == ProtocolType::UDP { - return false; - } - let mut inner = self.inner.lock(); - - let Some(id) = inner.id_by_descriptor.get(&descriptor).copied() else { - log_net!(error "failed to ref descriptor: {:?} ({:?})", descriptor, ref_type); + let Some(protocol_index) = inner.protocol_index_by_id.get(&id).copied() else { + // Sometimes network connections die before we can ref/unref them return false; }; - let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); let out = inner.conn_by_id[protocol_index].get_mut(&id).unwrap(); match ref_type { ConnectionRefKind::AddRef => out.add_ref(), ConnectionRefKind::RemoveRef => out.remove_ref(), } - true } @@ -299,7 +289,7 @@ impl ConnectionTable { if let Some(best_port) = best_port { for id in all_ids_by_remote { let nc = inner.conn_by_id[protocol_index].peek(id).unwrap(); - if let Some(local_addr) = nc.connection_descriptor().local() { + if let Some(local_addr) = nc.flow().local() { if local_addr.port() == best_port { let nc = inner.conn_by_id[protocol_index].get(id).unwrap(); return Some(nc.get_handle()); @@ -326,13 +316,13 @@ impl ConnectionTable { // pub fn drain_filter(&self, mut filter: F) -> Vec // where - // F: FnMut(ConnectionDescriptor) -> bool, + // F: FnMut(Flow) -> bool, // { // let mut inner = self.inner.lock(); // let mut filtered_ids = Vec::new(); // for cbi in &mut inner.conn_by_id { // for (id, conn) in cbi { - // if filter(conn.connection_descriptor()) { + // if filter(conn.flow()) { // filtered_ids.push(*id); // } // } @@ -359,11 +349,11 @@ impl ConnectionTable { let protocol_index = inner.protocol_index_by_id.remove(&id).unwrap(); // conn_by_id let conn = inner.conn_by_id[protocol_index].remove(&id).unwrap(); - // id_by_descriptor - let descriptor = conn.connection_descriptor(); - inner.id_by_descriptor.remove(&descriptor).unwrap(); + // id_by_flow + let flow = conn.flow(); + inner.id_by_flow.remove(&flow).unwrap(); // ids_by_remote - let remote = descriptor.remote(); + let remote = flow.remote(); let ids = inner.ids_by_remote.get_mut(&remote).unwrap(); for (n, elem) in ids.iter().enumerate() { if *elem == id { diff --git a/veilid-core/src/network_manager/direct_boot.rs b/veilid-core/src/network_manager/direct_boot.rs index d739c54d..71f87e89 100644 --- a/veilid-core/src/network_manager/direct_boot.rs +++ b/veilid-core/src/network_manager/direct_boot.rs @@ -3,10 +3,7 @@ use super::*; impl NetworkManager { // Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism) #[instrument(level = "trace", skip(self), ret, err)] - pub(crate) async fn handle_boot_request( - &self, - descriptor: ConnectionDescriptor, - ) -> EyreResult> { + pub(crate) async fn handle_boot_request(&self, flow: Flow) -> EyreResult> { let routing_table = self.routing_table(); // Get a bunch of nodes with the various @@ -22,14 +19,14 @@ impl NetworkManager { // Reply with a chunk of signed routing table match self .net() - .send_data_to_existing_connection(descriptor, json_bytes) + .send_data_to_existing_flow(flow, json_bytes) .await? { - None => { + SendDataToExistingFlowResult::Sent(_) => { // Bootstrap reply was sent Ok(NetworkResult::value(())) } - Some(_) => Ok(NetworkResult::no_connection_other( + SendDataToExistingFlowResult::NotSent(_) => Ok(NetworkResult::no_connection_other( "bootstrap reply could not be sent", )), } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index d0a5497a..8a277e31 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -95,8 +95,8 @@ pub(crate) struct SendDataMethod { pub contact_method: NodeContactMethod, /// Pre-relayed contact method pub opt_relayed_contact_method: Option, - /// The connection used to send the data - pub connection_descriptor: ConnectionDescriptor, + /// The specific flow used to send the data + pub unique_flow: UniqueFlow, } /// Mechanism required to contact another node @@ -128,6 +128,11 @@ struct NodeContactMethodCacheKey { #[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] struct PublicAddressCheckCacheKey(ProtocolType, AddressType); +enum SendDataToExistingFlowResult { + Sent(UniqueFlow), + NotSent(Vec), +} + // The mutable state of the network manager struct NetworkManagerInner { stats: NetworkManagerStats, @@ -661,7 +666,7 @@ impl NetworkManager { #[instrument(level = "trace", skip(self), err)] pub async fn handle_signal( &self, - signal_connection_descriptor: ConnectionDescriptor, + signal_flow: Flow, signal_info: SignalInfo, ) -> EyreResult> { match signal_info { @@ -685,7 +690,7 @@ impl NetworkManager { }; // Restrict reverse connection to same sequencing requirement as inbound signal - if signal_connection_descriptor.protocol_type().is_ordered() { + if signal_flow.protocol_type().is_ordered() { peer_nr.set_sequencing(Sequencing::EnsureOrdered); } @@ -730,7 +735,7 @@ impl NetworkManager { // Do our half of the hole punch by sending an empty packet // Both sides will do this and then the receipt will get sent over the punched hole - let connection_descriptor = network_result_try!( + let unique_flow = network_result_try!( self.net() .send_data_to_dial_info( hole_punch_dial_info_detail.dial_info.clone(), @@ -742,7 +747,7 @@ impl NetworkManager { // XXX: do we need a delay here? or another hole punch packet? // Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch - peer_nr.set_last_connection(connection_descriptor, get_aligned_timestamp()); + peer_nr.set_last_flow(unique_flow.flow, get_aligned_timestamp()); // Return the receipt using the same dial info send the receipt to it rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt) @@ -867,28 +872,20 @@ impl NetworkManager { // network protocol handler. Processes the envelope, authenticates and decrypts the RPC message // and passes it to the RPC handler #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len())))] - async fn on_recv_envelope( - &self, - data: &mut [u8], - connection_descriptor: ConnectionDescriptor, - ) -> EyreResult { + async fn on_recv_envelope(&self, data: &mut [u8], flow: Flow) -> EyreResult { #[cfg(feature = "verbose-tracing")] let root = span!( parent: None, Level::TRACE, "on_recv_envelope", "data.len" = data.len(), - "descriptor" = ?connection_descriptor + "flow" = ?flow ); #[cfg(feature = "verbose-tracing")] let _root_enter = root.enter(); - log_net!( - "envelope of {} bytes received from {:?}", - data.len(), - connection_descriptor - ); - let remote_addr = connection_descriptor.remote_address().ip_addr(); + log_net!("envelope of {} bytes received from {:?}", data.len(), flow); + let remote_addr = flow.remote_address().ip_addr(); // Network accounting self.stats_packet_rcvd(remote_addr, ByteCount::new(data.len() as u64)); @@ -910,18 +907,18 @@ impl NetworkManager { // Get the routing domain for this data let routing_domain = match self .routing_table() - .routing_domain_for_address(connection_descriptor.remote_address().address()) + .routing_domain_for_address(flow.remote_address().address()) { Some(rd) => rd, None => { - log_net!(debug "no routing domain for envelope received from {:?}", connection_descriptor); + log_net!(debug "no routing domain for envelope received from {:?}", flow); return Ok(false); } }; // Is this a direct bootstrap request instead of an envelope? if data[0..4] == *BOOT_MAGIC { - network_result_value_or_log!(self.handle_boot_request(connection_descriptor).await? => [ format!(": connection_descriptor={:?}", connection_descriptor) ] {}); + network_result_value_or_log!(self.handle_boot_request(flow).await? => [ format!(": flow={:?}", flow) ] {}); return Ok(true); } @@ -968,7 +965,7 @@ impl NetworkManager { log_net!(debug "Timestamp behind: {}ms ({})", timestamp_to_secs(ts.saturating_sub(ets).as_u64()) * 1000f64, - connection_descriptor.remote() + flow.remote() ); return Ok(false); } @@ -978,7 +975,7 @@ impl NetworkManager { log_net!(debug "Timestamp ahead: {}ms ({})", timestamp_to_secs(ets.saturating_sub(ts).as_u64()) * 1000f64, - connection_descriptor.remote() + flow.remote() ); return Ok(false); } @@ -1033,7 +1030,7 @@ impl NetworkManager { if let Some(mut relay_nr) = some_relay_nr { // Ensure the protocol used to forward is of the same sequencing requirement // Address type is allowed to change if connectivity is better - if connection_descriptor.protocol_type().is_ordered() { + if flow.protocol_type().is_ordered() { relay_nr.set_sequencing(Sequencing::EnsureOrdered); }; @@ -1080,7 +1077,7 @@ impl NetworkManager { // Cache the envelope information in the routing table let source_noderef = match routing_table.register_node_with_existing_connection( envelope.get_sender_typed_id(), - connection_descriptor, + flow, ts, ) { Ok(v) => v, @@ -1093,13 +1090,7 @@ impl NetworkManager { source_noderef.add_envelope_version(envelope.get_version()); // Pass message to RPC system - rpc.enqueue_direct_message( - envelope, - source_noderef, - connection_descriptor, - routing_domain, - body, - )?; + rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)?; // Inform caller that we dealt with the envelope locally Ok(true) diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 5b2a07bb..ecedbddb 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -578,75 +578,79 @@ impl Network { } #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] - pub async fn send_data_to_existing_connection( + pub async fn send_data_to_existing_flow( &self, - descriptor: ConnectionDescriptor, + flow: Flow, data: Vec, - ) -> EyreResult>> { + ) -> EyreResult { let data_len = data.len(); // Handle connectionless protocol - if descriptor.protocol_type() == ProtocolType::UDP { + if flow.protocol_type() == ProtocolType::UDP { // send over the best udp socket we have bound since UDP is not connection oriented - let peer_socket_addr = descriptor.remote().socket_addr(); + let peer_socket_addr = flow.remote().socket_addr(); if let Some(ph) = self.find_best_udp_protocol_handler( &peer_socket_addr, - &descriptor.local().map(|sa| sa.socket_addr()), + &flow.local().map(|sa| sa.socket_addr()), ) { network_result_value_or_log!(ph.clone() .send_message(data.clone(), peer_socket_addr) .await - .wrap_err("sending data to existing connection")? => [ format!(": data.len={}, descriptor={:?}", data.len(), descriptor) ] - { return Ok(Some(data)); } ); + .wrap_err("sending data to existing connection")? => [ format!(": data.len={}, flow={:?}", data.len(), flow) ] + { return Ok(SendDataToExistingFlowResult::NotSent(data)); } ); // Network accounting self.network_manager() .stats_packet_sent(peer_socket_addr.ip(), ByteCount::new(data_len as u64)); // Data was consumed - return Ok(None); + let unique_flow = UniqueFlow { + flow, + connection_id: None, + }; + return Ok(SendDataToExistingFlowResult::Sent(unique_flow)); } } // Handle connection-oriented protocols // Try to send to the exact existing connection if one exists - if let Some(conn) = self.connection_manager().get_connection(descriptor) { + if let Some(conn) = self.connection_manager().get_connection(flow) { // connection exists, send over it match conn.send_async(data).await { ConnectionHandleSendResult::Sent => { // Network accounting self.network_manager().stats_packet_sent( - descriptor.remote().socket_addr().ip(), + flow.remote().socket_addr().ip(), ByteCount::new(data_len as u64), ); // Data was consumed - return Ok(None); + return Ok(SendDataToExistingFlowResult::Sent(conn.unique_flow())); } ConnectionHandleSendResult::NotSent(data) => { // Couldn't send // Pass the data back out so we don't own it any more - return Ok(Some(data)); + return Ok(SendDataToExistingFlowResult::NotSent(data)); } } } // Connection didn't exist // Pass the data back out so we don't own it any more - Ok(Some(data)) + Ok(SendDataToExistingFlowResult::NotSent(data)) } // Send data directly to a dial info, possibly without knowing which node it is going to - // Returns a descriptor for the connection used to send the data + // Returns a flow for the connection used to send the data #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] pub async fn send_data_to_dial_info( &self, dial_info: DialInfo, data: Vec, - ) -> EyreResult> { + ) -> EyreResult> { self.record_dial_info_failure(dial_info.clone(), async move { let data_len = data.len(); - let connection_descriptor; + let unique_flow; if dial_info.protocol_type() == ProtocolType::UDP { // Handle connectionless protocol let peer_socket_addr = dial_info.to_socket_addr(); @@ -658,10 +662,14 @@ impl Network { )); } }; - connection_descriptor = network_result_try!(ph + let flow = network_result_try!(ph .send_message(data, peer_socket_addr) .await .wrap_err("failed to send data to dial info")?); + unique_flow = UniqueFlow { + flow, + connection_id: None, + }; } else { // Handle connection-oriented protocols let conn = network_result_try!( @@ -676,14 +684,14 @@ impl Network { "failed to send", ))); } - connection_descriptor = conn.connection_descriptor(); + unique_flow = conn.unique_flow(); } // Network accounting self.network_manager() .stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64)); - Ok(NetworkResult::value(connection_descriptor)) + Ok(NetworkResult::value(unique_flow)) }) .await } diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 2c06a378..9f015ed3 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -65,16 +65,16 @@ impl Network { .timeout_at(stop_token.clone()) .await { - Ok(Ok((size, descriptor))) => { + Ok(Ok((size, flow))) => { // Network accounting network_manager.stats_packet_rcvd( - descriptor.remote_address().ip_addr(), + flow.remote_address().ip_addr(), ByteCount::new(size as u64), ); // Pass it up for processing if let Err(e) = network_manager - .on_recv_envelope(&mut data[..size], descriptor) + .on_recv_envelope(&mut data[..size], flow) .await { log_net!(debug "failed to process received udp envelope: {}", e); diff --git a/veilid-core/src/network_manager/native/protocol/mod.rs b/veilid-core/src/network_manager/native/protocol/mod.rs index c59d87de..493d1987 100644 --- a/veilid-core/src/network_manager/native/protocol/mod.rs +++ b/veilid-core/src/network_manager/native/protocol/mod.rs @@ -45,13 +45,13 @@ impl ProtocolNetworkConnection { } } - pub fn descriptor(&self) -> ConnectionDescriptor { + pub fn flow(&self) -> Flow { match self { - // Self::Dummy(d) => d.descriptor(), - Self::RawTcp(t) => t.descriptor(), - Self::WsAccepted(w) => w.descriptor(), - Self::Ws(w) => w.descriptor(), - Self::Wss(w) => w.descriptor(), + // Self::Dummy(d) => d.flow(), + Self::RawTcp(t) => t.flow(), + Self::WsAccepted(w) => w.flow(), + Self::Ws(w) => w.flow(), + Self::Wss(w) => w.flow(), } } diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 52e1e27f..de7f28f0 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -3,7 +3,7 @@ use futures_util::{AsyncReadExt, AsyncWriteExt}; use sockets::*; pub struct RawTcpNetworkConnection { - descriptor: ConnectionDescriptor, + flow: Flow, stream: AsyncPeekStream, } @@ -14,12 +14,12 @@ impl fmt::Debug for RawTcpNetworkConnection { } impl RawTcpNetworkConnection { - pub fn new(descriptor: ConnectionDescriptor, stream: AsyncPeekStream) -> Self { - Self { descriptor, stream } + pub fn new(flow: Flow, stream: AsyncPeekStream) -> Self { + Self { flow, stream } } - pub fn descriptor(&self) -> ConnectionDescriptor { - self.descriptor + pub fn flow(&self) -> Flow { + self.flow } #[cfg_attr( @@ -152,7 +152,7 @@ impl RawTcpProtocolHandler { ProtocolType::TCP, ); let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( - ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_addr)), + Flow::new(peer_addr, SocketAddress::from_socket_addr(local_addr)), ps, )); @@ -186,7 +186,7 @@ impl RawTcpProtocolHandler { // Wrap the stream in a network connection and return it let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( - ConnectionDescriptor::new( + Flow::new( PeerAddress::new( SocketAddress::from_socket_addr(socket_addr), ProtocolType::TCP, diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index ea40379d..0a955046 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -17,9 +17,9 @@ impl RawUdpProtocolHandler { } } - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor)))] - pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, ConnectionDescriptor)> { - let (message_len, descriptor) = loop { + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.flow)))] + pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, Flow)> { + let (message_len, flow) = loop { // Get a packet let (size, remote_addr) = network_result_value_or_log!(self.socket.recv_from(data).await.into_network_result()? => continue); @@ -64,33 +64,33 @@ impl RawUdpProtocolHandler { // Copy assemble message out if we got one data[0..message.len()].copy_from_slice(&message); - // Return a connection descriptor and the amount of data in the message + // Return a flow and the amount of data in the message let peer_addr = PeerAddress::new( SocketAddress::from_socket_addr(remote_addr), ProtocolType::UDP, ); let local_socket_addr = self.socket.local_addr()?; - let descriptor = ConnectionDescriptor::new( + let flow = Flow::new( peer_addr, SocketAddress::from_socket_addr(local_socket_addr), ); - break (message.len(), descriptor); + break (message.len(), flow); }; #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("ret.len", message_len); #[cfg(feature = "verbose-tracing")] - tracing::Span::current().record("ret.descriptor", format!("{:?}", descriptor).as_str()); - Ok((message_len, descriptor)) + tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str()); + Ok((message_len, flow)) } - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.descriptor)))] + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.flow)))] pub async fn send_message( &self, data: Vec, remote_addr: SocketAddr, - ) -> io::Result> { + ) -> io::Result> { if data.len() > MAX_MESSAGE_SIZE { bail_io_error_other!("sending too large UDP message"); } @@ -121,21 +121,21 @@ impl RawUdpProtocolHandler { .await? ); - // Return a connection descriptor for the sent message + // Return a flow for the sent message let peer_addr = PeerAddress::new( SocketAddress::from_socket_addr(remote_addr), ProtocolType::UDP, ); let local_socket_addr = self.socket.local_addr()?; - let descriptor = ConnectionDescriptor::new( + let flow = Flow::new( peer_addr, SocketAddress::from_socket_addr(local_socket_addr), ); #[cfg(feature = "verbose-tracing")] - tracing::Span::current().record("ret.descriptor", format!("{:?}", descriptor).as_str()); - Ok(NetworkResult::value(descriptor)) + tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str()); + Ok(NetworkResult::value(flow)) } #[instrument(level = "trace", err)] diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 57d8a79d..4a35b35b 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -54,7 +54,7 @@ pub struct WebsocketNetworkConnection where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - descriptor: ConnectionDescriptor, + flow: Flow, stream: CloneStream>, } @@ -71,15 +71,15 @@ impl WebsocketNetworkConnection where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - pub fn new(descriptor: ConnectionDescriptor, stream: WebSocketStream) -> Self { + pub fn new(flow: Flow, stream: WebSocketStream) -> Self { Self { - descriptor, + flow, stream: CloneStream::new(stream), } } - pub fn descriptor(&self) -> ConnectionDescriptor { - self.descriptor + pub fn flow(&self) -> Flow { + self.flow } #[cfg_attr( @@ -286,7 +286,7 @@ impl WebsocketProtocolHandler { PeerAddress::new(SocketAddress::from_socket_addr(socket_addr), protocol_type); let conn = ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new( - ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_addr)), + Flow::new(peer_addr, SocketAddress::from_socket_addr(local_addr)), ws_stream, )); @@ -335,8 +335,8 @@ impl WebsocketProtocolHandler { #[cfg(feature = "rt-tokio")] let tcp_stream = tcp_stream.compat(); - // Make our connection descriptor - let descriptor = ConnectionDescriptor::new( + // Make our flow + let flow = Flow::new( dial_info.peer_address(), SocketAddress::from_socket_addr(actual_local_addr), ); @@ -350,14 +350,14 @@ impl WebsocketProtocolHandler { .map_err(to_io_error_other)?; Ok(NetworkResult::Value(ProtocolNetworkConnection::Wss( - WebsocketNetworkConnection::new(descriptor, ws_stream), + WebsocketNetworkConnection::new(flow, ws_stream), ))) } else { let (ws_stream, _response) = client_async(request, tcp_stream) .await .map_err(to_io_error_other)?; Ok(NetworkResult::Value(ProtocolNetworkConnection::Ws( - WebsocketNetworkConnection::new(descriptor, ws_stream), + WebsocketNetworkConnection::new(flow, ws_stream), ))) } } diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index fe99179a..df4f03e4 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -47,12 +47,12 @@ cfg_if::cfg_if! { // #[derive(Debug)] // pub struct DummyNetworkConnection { -// descriptor: ConnectionDescriptor, +// flow: Flow, // } // impl DummyNetworkConnection { -// pub fn descriptor(&self) -> ConnectionDescriptor { -// self.descriptor +// pub fn flow(&self) -> Flow { +// self.flow // } // pub fn close(&self) -> io::Result> { // Ok(NetworkResult::Value(())) @@ -83,12 +83,10 @@ pub struct NetworkConnectionStats { } -pub type NetworkConnectionId = AlignedU64; - #[derive(Debug)] pub(in crate::network_manager) struct NetworkConnection { connection_id: NetworkConnectionId, - descriptor: ConnectionDescriptor, + flow: Flow, processor: Option>, established_time: Timestamp, stats: Arc>, @@ -108,13 +106,13 @@ impl Drop for NetworkConnection { impl NetworkConnection { - pub(super) fn dummy(id: NetworkConnectionId, descriptor: ConnectionDescriptor) -> Self { + pub(super) fn dummy(id: NetworkConnectionId, flow: Flow) -> Self { // Create handle for sending (dummy is immediately disconnected) let (sender, _receiver) = flume::bounded(get_concurrency() as usize); Self { connection_id: id, - descriptor, + flow, processor: None, established_time: get_aligned_timestamp(), stats: Arc::new(Mutex::new(NetworkConnectionStats { @@ -134,8 +132,8 @@ impl NetworkConnection { protocol_connection: ProtocolNetworkConnection, connection_id: NetworkConnectionId, ) -> Self { - // Get descriptor - let descriptor = protocol_connection.descriptor(); + // Get flow + let flow = protocol_connection.flow(); // Create handle for sending let (sender, receiver) = flume::bounded(get_concurrency() as usize); @@ -155,7 +153,7 @@ impl NetworkConnection { local_stop_token, manager_stop_token, connection_id, - descriptor, + flow, receiver, protocol_connection, stats.clone(), @@ -164,7 +162,7 @@ impl NetworkConnection { // Return the connection Self { connection_id, - descriptor, + flow, processor: Some(processor), established_time: get_aligned_timestamp(), stats, @@ -179,12 +177,20 @@ impl NetworkConnection { self.connection_id } - pub fn connection_descriptor(&self) -> ConnectionDescriptor { - self.descriptor + pub fn flow(&self) -> Flow { + self.flow + } + + #[allow(dead_code)] + pub fn unique_flow(&self) -> UniqueFlow { + UniqueFlow { + flow: self.flow, + connection_id: Some(self.connection_id), + } } pub fn get_handle(&self) -> ConnectionHandle { - ConnectionHandle::new(self.connection_id, self.descriptor, self.sender.clone()) + ConnectionHandle::new(self.connection_id, self.flow, self.sender.clone()) } pub fn is_in_use(&self) -> bool { @@ -264,7 +270,7 @@ impl NetworkConnection { local_stop_token: StopToken, manager_stop_token: StopToken, connection_id: NetworkConnectionId, - descriptor: ConnectionDescriptor, + flow: Flow, receiver: flume::Receiver<(Option, Vec)>, protocol_connection: ProtocolNetworkConnection, stats: Arc>, @@ -272,7 +278,7 @@ impl NetworkConnection { Box::pin(async move { log_net!( "== Starting process_connection loop for id={}, {:?}", connection_id, - descriptor + flow ); let network_manager = connection_manager.network_manager(); @@ -286,7 +292,7 @@ impl NetworkConnection { let new_timer = || { sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async { // timeout - log_net!("== Connection timeout on {:?}", descriptor); + log_net!("== Connection timeout on {:?}", flow); RecvLoopAction::Timeout }) }; @@ -341,7 +347,7 @@ impl NetworkConnection { .then(|res| async { match res { Ok(v) => { - let peer_address = protocol_connection.descriptor().remote(); + let peer_address = protocol_connection.flow().remote(); // Check to see if it is punished if address_filter.is_ip_addr_punished(peer_address.socket_addr().ip()) { @@ -367,7 +373,7 @@ impl NetworkConnection { // Pass received messages up to the network manager for processing if let Err(e) = network_manager - .on_recv_envelope(message.as_mut_slice(), descriptor) + .on_recv_envelope(message.as_mut_slice(), flow) .await { log_net!(debug "failed to process received envelope: {}", e); @@ -424,8 +430,8 @@ impl NetworkConnection { } log_net!( - "== Connection loop finished descriptor={:?}", - descriptor + "== Connection loop finished flow={:?}", + flow ); // Let the connection manager know the receive loop exited @@ -443,8 +449,8 @@ impl NetworkConnection { pub fn debug_print(&self, cur_ts: Timestamp) -> String { format!("{} <- {} | {} | est {} sent {} rcvd {} refcount {}{}", - self.descriptor.remote_address(), - self.descriptor.local().map(|x| x.to_string()).unwrap_or("---".to_owned()), + self.flow.remote_address(), + self.flow.local().map(|x| x.to_string()).unwrap_or("---".to_owned()), self.connection_id.as_u64(), debug_duration(cur_ts.as_u64().saturating_sub(self.established_time.as_u64())), self.stats().last_message_sent_time.map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64())) ).unwrap_or("---".to_owned()), diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index a00c06ec..cc3d492f 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -3,10 +3,10 @@ use super::*; impl NetworkManager { /// Send raw data to a node /// - /// We may not have dial info for a node, but have an existing connection for it - /// because an inbound connection happened first, and no FindNodeQ has happened to that - /// node yet to discover its dial info. The existing connection should be tried first - /// in this case, if it matches the node ref's filters and no more permissive connection + /// We may not have dial info for a node, but have an existing flow for it + /// because an inbound flow happened first, and no FindNodeQ has happened to that + /// node yet to discover its dial info. The existing flow should be tried first + /// in this case, if it matches the node ref's filters and no more permissive flow /// could be established. /// /// Sending to a node requires determining a NetworkClass compatible contact method @@ -19,27 +19,26 @@ impl NetworkManager { let this = self.clone(); Box::pin( async move { - - // First try to send data to the last socket we've seen this peer on - let data = if let Some(connection_descriptor) = destination_node_ref.last_connection() { + // First try to send data to the last flow we've seen this peer on + let data = if let Some(flow) = destination_node_ref.last_flow() { match this .net() - .send_data_to_existing_connection(connection_descriptor, data) + .send_data_to_existing_flow(flow, data) .await? { - None => { - // Update timestamp for this last connection since we just sent to it + SendDataToExistingFlowResult::Sent(unique_flow) => { + // Update timestamp for this last flow since we just sent to it destination_node_ref - .set_last_connection(connection_descriptor, get_aligned_timestamp()); + .set_last_flow(unique_flow.flow, get_aligned_timestamp()); return Ok(NetworkResult::value(SendDataMethod { opt_relayed_contact_method: None, contact_method: NodeContactMethod::Existing, - connection_descriptor, + unique_flow, })); } - Some(data) => { - // Couldn't send data to existing connection + SendDataToExistingFlowResult::NotSent(data) => { + // Couldn't send data to existing flow // so pass the data back out data } @@ -135,30 +134,32 @@ impl NetworkManager { data: Vec, ) -> EyreResult> { // First try to send data to the last connection we've seen this peer on - let Some(connection_descriptor) = target_node_ref.last_connection() else { + let Some(flow) = target_node_ref.last_flow() else { return Ok(NetworkResult::no_connection_other( format!("should have found an existing connection: {}", target_node_ref) )); }; - if self + let unique_flow = match self .net() - .send_data_to_existing_connection(connection_descriptor, data) + .send_data_to_existing_flow(flow, data) .await? - .is_some() { - return Ok(NetworkResult::no_connection_other( - "failed to send to existing connection", - )); - } + SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, + SendDataToExistingFlowResult::NotSent(_) => { + return Ok(NetworkResult::no_connection_other( + "failed to send to existing flow", + )); + } + }; // Update timestamp for this last connection since we just sent to it - target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); + target_node_ref.set_last_flow(flow, get_aligned_timestamp()); Ok(NetworkResult::value(SendDataMethod{ contact_method: NodeContactMethod::Existing, opt_relayed_contact_method: None, - connection_descriptor + unique_flow })) } @@ -169,30 +170,32 @@ impl NetworkManager { data: Vec, ) -> EyreResult> { // Try to send data to the last socket we've seen this peer on - let Some(connection_descriptor) = target_node_ref.last_connection() else { + let Some(flow) = target_node_ref.last_flow() else { return Ok(NetworkResult::no_connection_other( format!("Node is not reachable and has no existing connection: {}", target_node_ref) )); }; - if self + let unique_flow = match self .net() - .send_data_to_existing_connection(connection_descriptor, data) - .await? - .is_some() + .send_data_to_existing_flow(flow, data) + .await? { - return Ok(NetworkResult::no_connection_other( - format!("failed to send to unreachable node over existing connection: {:?}", connection_descriptor) - )); - } + SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, + SendDataToExistingFlowResult::NotSent(_) => { + return Ok(NetworkResult::no_connection_other( + format!("failed to send to unreachable node over existing connection: {:?}", flow) + )); + } + }; // Update timestamp for this last connection since we just sent to it - target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); + target_node_ref.set_last_flow(flow, get_aligned_timestamp()); Ok(NetworkResult::value(SendDataMethod { - connection_descriptor, contact_method: NodeContactMethod::Existing, opt_relayed_contact_method: None, + unique_flow, })) } @@ -204,24 +207,24 @@ impl NetworkManager { data: Vec, ) -> EyreResult> { // First try to send data to the last socket we've seen this peer on - let data = if let Some(connection_descriptor) = target_node_ref.last_connection() { + let data = if let Some(flow) = target_node_ref.last_flow() { match self .net() - .send_data_to_existing_connection(connection_descriptor, data) + .send_data_to_existing_flow(flow, data) .await? { - None => { + SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it target_node_ref - .set_last_connection(connection_descriptor, get_aligned_timestamp()); + .set_last_flow(flow, get_aligned_timestamp()); return Ok(NetworkResult::value(SendDataMethod{ contact_method: NodeContactMethod::Existing, opt_relayed_contact_method: None, - connection_descriptor + unique_flow })); } - Some(data) => { + SendDataToExistingFlowResult::NotSent(data) => { // Couldn't send data to existing connection // so pass the data back out data @@ -232,14 +235,14 @@ impl NetworkManager { data }; - let connection_descriptor = network_result_try!( + let unique_flow = network_result_try!( self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data) .await? ); Ok(NetworkResult::value(SendDataMethod { - connection_descriptor, contact_method: NodeContactMethod::SignalReverse(relay_nr, target_node_ref), opt_relayed_contact_method: None, + unique_flow, })) } @@ -251,24 +254,24 @@ impl NetworkManager { data: Vec, ) -> EyreResult> { // First try to send data to the last socket we've seen this peer on - let data = if let Some(connection_descriptor) = target_node_ref.last_connection() { + let data = if let Some(flow) = target_node_ref.last_flow() { match self .net() - .send_data_to_existing_connection(connection_descriptor, data) + .send_data_to_existing_flow(flow, data) .await? { - None => { + SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it target_node_ref - .set_last_connection(connection_descriptor, get_aligned_timestamp()); + .set_last_flow(flow, get_aligned_timestamp()); return Ok(NetworkResult::value(SendDataMethod{ contact_method: NodeContactMethod::Existing, opt_relayed_contact_method: None, - connection_descriptor + unique_flow })); } - Some(data) => { + SendDataToExistingFlowResult::NotSent(data) => { // Couldn't send data to existing connection // so pass the data back out data @@ -279,12 +282,12 @@ impl NetworkManager { data }; - let connection_descriptor = + let unique_flow = network_result_try!(self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data).await?); Ok(NetworkResult::value(SendDataMethod { - connection_descriptor, contact_method: NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref), opt_relayed_contact_method: None, + unique_flow, })) } @@ -299,31 +302,31 @@ impl NetworkManager { let node_ref = node_ref.filtered_clone(NodeRefFilter::from(dial_info.make_filter())); // First try to send data to the last socket we've seen this peer on - let data = if let Some(connection_descriptor) = node_ref.last_connection() { + let data = if let Some(flow) = node_ref.last_flow() { #[cfg(feature = "verbose-tracing")] debug!( "ExistingConnection: {:?} for {:?}", - connection_descriptor, node_ref + flow, node_ref ); match self .net() - .send_data_to_existing_connection(connection_descriptor, data) + .send_data_to_existing_flow(flow, data) .await? { - None => { + SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it - node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); + node_ref.set_last_flow(flow, get_aligned_timestamp()); return Ok(NetworkResult::value(SendDataMethod{ contact_method: NodeContactMethod::Existing, opt_relayed_contact_method: None, - connection_descriptor + unique_flow })); } - Some(d) => { + SendDataToExistingFlowResult::NotSent(d) => { // Connection couldn't send, kill it - node_ref.clear_last_connection(connection_descriptor); + node_ref.clear_last_connection(flow); d } } @@ -332,16 +335,16 @@ impl NetworkManager { }; // New direct connection was necessary for this dial info - let connection_descriptor = + let unique_flow = network_result_try!(self.net().send_data_to_dial_info(dial_info.clone(), data).await?); // If we connected to this node directly, save off the last connection so we can use it again - node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); + node_ref.set_last_flow(unique_flow.flow, get_aligned_timestamp()); Ok(NetworkResult::value(SendDataMethod { - connection_descriptor, contact_method: NodeContactMethod::Direct(dial_info), opt_relayed_contact_method: None, + unique_flow, })) } @@ -528,7 +531,7 @@ impl NetworkManager { relay_nr: NodeRef, target_nr: NodeRef, data: Vec, - ) -> EyreResult> { + ) -> EyreResult> { // Build a return receipt for the signal let receipt_timeout = ms_to_us( self.unlocked_inner @@ -592,14 +595,14 @@ impl NetworkManager { } // And now use the existing connection to send over - if let Some(descriptor) = inbound_nr.last_connection() { + if let Some(flow) = inbound_nr.last_flow() { match self .net() - .send_data_to_existing_connection(descriptor, data) + .send_data_to_existing_flow(flow, data) .await? { - None => Ok(NetworkResult::value(descriptor)), - Some(_) => Ok(NetworkResult::no_connection_other( + SendDataToExistingFlowResult::Sent(unique_flow) => Ok(NetworkResult::value(unique_flow)), + SendDataToExistingFlowResult::NotSent(_) => Ok(NetworkResult::no_connection_other( "unable to send over reverse connection", )), } @@ -620,7 +623,7 @@ impl NetworkManager { relay_nr: NodeRef, target_nr: NodeRef, data: Vec, - ) -> EyreResult> { + ) -> EyreResult> { // Ensure we are filtered down to UDP (the only hole punch protocol supported today) assert!(target_nr .filter_ref() @@ -660,7 +663,7 @@ impl NetworkManager { // Do our half of the hole punch by sending an empty packet // Both sides will do this and then the receipt will get sent over the punched hole - // Don't bother storing the returned connection descriptor as the 'last connection' because the other side of the hole + // Don't bother storing the returned flow as the 'last flow' because the other side of the hole // punch should come through and create a real 'last connection' for us if this succeeds network_result_try!( self.net() @@ -710,14 +713,14 @@ impl NetworkManager { } // And now use the existing connection to send over - if let Some(descriptor) = inbound_nr.last_connection() { + if let Some(flow) = inbound_nr.last_flow() { match self .net() - .send_data_to_existing_connection(descriptor, data) + .send_data_to_existing_flow(flow, data) .await? { - None => Ok(NetworkResult::value(descriptor)), - Some(_) => Ok(NetworkResult::no_connection_other( + SendDataToExistingFlowResult::Sent(unique_flow) => Ok(NetworkResult::value(unique_flow)), + SendDataToExistingFlowResult::NotSent(_) => Ok(NetworkResult::no_connection_other( "unable to send over hole punch", )), } diff --git a/veilid-core/src/network_manager/tasks/public_address_check.rs b/veilid-core/src/network_manager/tasks/public_address_check.rs index e214e27a..faa6b69a 100644 --- a/veilid-core/src/network_manager/tasks/public_address_check.rs +++ b/veilid-core/src/network_manager/tasks/public_address_check.rs @@ -31,7 +31,7 @@ impl NetworkManager { pub fn report_local_network_socket_address( &self, _socket_address: SocketAddress, - _connection_descriptor: ConnectionDescriptor, + _flow: Flow, _reporting_peer: NodeRef, ) { // XXX: Nothing here yet. @@ -43,11 +43,11 @@ impl NetworkManager { pub fn report_public_internet_socket_address( &self, socket_address: SocketAddress, // the socket address as seen by the remote peer - connection_descriptor: ConnectionDescriptor, // the connection descriptor used + flow: Flow, // the flow used reporting_peer: NodeRef, // the peer's noderef reporting the socket address ) { #[cfg(feature = "network-result-extra")] - debug!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); + debug!("report_global_socket_address\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer); // Ignore these reports if we are currently detecting public dial info let net = self.net(); @@ -77,10 +77,7 @@ impl NetworkManager { }); // Get the ip(block) this report is coming from - let reporting_ipblock = ip_to_ipblock( - ip6_prefix_size, - connection_descriptor.remote_address().ip_addr(), - ); + let reporting_ipblock = ip_to_ipblock(ip6_prefix_size, flow.remote_address().ip_addr()); // Reject public address reports from nodes that we know are behind symmetric nat or // nodes that must be using a relay for everything @@ -105,10 +102,8 @@ impl NetworkManager { let mut inner = self.inner.lock(); let inner = &mut *inner; - let addr_proto_type_key = PublicAddressCheckCacheKey( - connection_descriptor.protocol_type(), - connection_descriptor.address_type(), - ); + let addr_proto_type_key = + PublicAddressCheckCacheKey(flow.protocol_type(), flow.address_type()); if inner .public_address_inconsistencies_table .get(&addr_proto_type_key) @@ -136,7 +131,7 @@ impl NetworkManager { NetworkClass::InboundCapable ) { // Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed - let dial_info_filter = connection_descriptor.make_dial_info_filter(); + let dial_info_filter = flow.make_dial_info_filter(); // Get current external ip/port from registered global dialinfo let current_addresses: BTreeSet = routing_table @@ -267,7 +262,7 @@ impl NetworkManager { net.set_needs_public_dial_info_check(bad_public_address_detection_punishment); } else { warn!("Public address may have changed. Restarting the server may be required."); - warn!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); + warn!("report_global_socket_address\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer); warn!( "public_address_check_cache: {:#?}", inner.public_address_check_cache diff --git a/veilid-core/src/network_manager/tests/test_connection_table.rs b/veilid-core/src/network_manager/tests/test_connection_table.rs index 05d679d1..1f20df65 100644 --- a/veilid-core/src/network_manager/tests/test_connection_table.rs +++ b/veilid-core/src/network_manager/tests/test_connection_table.rs @@ -9,12 +9,12 @@ pub async fn test_add_get_remove() { let address_filter = AddressFilter::new(config.clone(), mock_routing_table()); let table = ConnectionTable::new(config, address_filter); - let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new( + let a1 = Flow::new_no_local(PeerAddress::new( SocketAddress::new(Address::IPV4(Ipv4Addr::new(192, 168, 0, 1)), 8080), ProtocolType::TCP, )); let a2 = a1; - let a3 = ConnectionDescriptor::new( + let a3 = Flow::new( PeerAddress::new( SocketAddress::new(Address::IPV6(Ipv6Addr::new(191, 0, 0, 0, 0, 0, 0, 1)), 8090), ProtocolType::TCP, @@ -26,7 +26,7 @@ pub async fn test_add_get_remove() { 0, ))), ); - let a4 = ConnectionDescriptor::new( + let a4 = Flow::new( PeerAddress::new( SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090), ProtocolType::TCP, @@ -38,7 +38,7 @@ pub async fn test_add_get_remove() { 0, ))), ); - let a5 = ConnectionDescriptor::new( + let a5 = Flow::new( PeerAddress::new( SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090), ProtocolType::WSS, @@ -59,12 +59,12 @@ pub async fn test_add_get_remove() { let c4 = NetworkConnection::dummy(4.into(), a4); let c5 = NetworkConnection::dummy(5.into(), a5); - assert_eq!(a1, c2.connection_descriptor()); - assert_ne!(a3, c4.connection_descriptor()); - assert_ne!(a4, c5.connection_descriptor()); + assert_eq!(a1, c2.flow()); + assert_ne!(a3, c4.flow()); + assert_ne!(a4, c5.flow()); assert_eq!(table.connection_count(), 0); - assert_eq!(table.peek_connection_by_descriptor(a1), None); + assert_eq!(table.peek_connection_by_flow(a1), None); table.add_connection(c1).unwrap(); assert!(table.add_connection(c1b).is_err()); @@ -72,26 +72,26 @@ pub async fn test_add_get_remove() { assert!(table.remove_connection_by_id(4.into()).is_none()); assert!(table.remove_connection_by_id(5.into()).is_none()); assert_eq!(table.connection_count(), 1); - assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); - assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); + assert_eq!(table.peek_connection_by_flow(a1), Some(c1h.clone())); + assert_eq!(table.peek_connection_by_flow(a1), Some(c1h.clone())); assert_eq!(table.connection_count(), 1); assert_err!(table.add_connection(c2)); assert_eq!(table.connection_count(), 1); - assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); - assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); + assert_eq!(table.peek_connection_by_flow(a1), Some(c1h.clone())); + assert_eq!(table.peek_connection_by_flow(a1), Some(c1h.clone())); assert_eq!(table.connection_count(), 1); assert_eq!( table .remove_connection_by_id(1.into()) - .map(|c| c.connection_descriptor()) + .map(|c| c.flow()) .unwrap(), a1 ); assert_eq!(table.connection_count(), 0); assert!(table.remove_connection_by_id(2.into()).is_none()); assert_eq!(table.connection_count(), 0); - assert_eq!(table.peek_connection_by_descriptor(a2), None); - assert_eq!(table.peek_connection_by_descriptor(a1), None); + assert_eq!(table.peek_connection_by_flow(a2), None); + assert_eq!(table.peek_connection_by_flow(a1), None); assert_eq!(table.connection_count(), 0); let c1 = NetworkConnection::dummy(6.into(), a1); table.add_connection(c1).unwrap(); @@ -103,21 +103,21 @@ pub async fn test_add_get_remove() { assert_eq!( table .remove_connection_by_id(6.into()) - .map(|c| c.connection_descriptor()) + .map(|c| c.flow()) .unwrap(), a2 ); assert_eq!( table .remove_connection_by_id(3.into()) - .map(|c| c.connection_descriptor()) + .map(|c| c.flow()) .unwrap(), a3 ); assert_eq!( table .remove_connection_by_id(4.into()) - .map(|c| c.connection_descriptor()) + .map(|c| c.flow()) .unwrap(), a4 ); diff --git a/veilid-core/src/network_manager/types/dial_info_filter.rs b/veilid-core/src/network_manager/types/dial_info_filter.rs index 14c6d7e5..ad64fbed 100644 --- a/veilid-core/src/network_manager/types/dial_info_filter.rs +++ b/veilid-core/src/network_manager/types/dial_info_filter.rs @@ -97,8 +97,8 @@ impl From for DialInfoFilter { } } -impl From for DialInfoFilter { - fn from(other: ConnectionDescriptor) -> Self { +impl From for DialInfoFilter { + fn from(other: Flow) -> Self { Self { protocol_type_set: ProtocolTypeSet::from(other.protocol_type()), address_type_set: AddressTypeSet::from(other.address_type()), diff --git a/veilid-core/src/network_manager/types/connection_descriptor.rs b/veilid-core/src/network_manager/types/flow.rs similarity index 71% rename from veilid-core/src/network_manager/types/connection_descriptor.rs rename to veilid-core/src/network_manager/types/flow.rs index b57fcc91..5722493c 100644 --- a/veilid-core/src/network_manager/types/connection_descriptor.rs +++ b/veilid-core/src/network_manager/types/flow.rs @@ -3,17 +3,21 @@ use super::*; /// Represents the 5-tuple of an established connection /// Not used to specify connections to create, that is reserved for DialInfo /// -/// ConnectionDescriptors should never be from unspecified local addresses for connection oriented protocols +/// Abstracts both connections to 'connection oriented' protocols (TCP/WS/WSS), but also datagram protocols (UDP) +/// +/// Flows should never be from UNSPECIFIED local addresses for connection oriented protocols /// If the medium does not allow local addresses, None should have been used or 'new_no_local' /// If we are specifying only a port, then the socket's 'local_address()' should have been used, since an /// established connection is always from a real address to another real address. +/// + #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] -pub struct ConnectionDescriptor { +pub struct Flow { remote: PeerAddress, local: Option, } -impl ConnectionDescriptor { +impl Flow { pub fn new(remote: PeerAddress, local: SocketAddress) -> Self { assert!(!remote.protocol_type().is_ordered() || !local.address().is_unspecified()); @@ -50,7 +54,7 @@ impl ConnectionDescriptor { } } -impl MatchesDialInfoFilter for ConnectionDescriptor { +impl MatchesDialInfoFilter for Flow { fn matches_filter(&self, filter: &DialInfoFilter) -> bool { if !filter.protocol_type_set.contains(self.protocol_type()) { return false; @@ -61,3 +65,14 @@ impl MatchesDialInfoFilter for ConnectionDescriptor { true } } + +/// UniqueFlow is a record a specific flow that may or may not currently exist +/// The NetworkConnectionId associated with each flow may represent a low level network connection +/// and will be unique with high probability per low-level connection +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct UniqueFlow { + pub flow: Flow, + pub connection_id: Option, +} + +pub type NetworkConnectionId = AlignedU64; diff --git a/veilid-core/src/network_manager/types/mod.rs b/veilid-core/src/network_manager/types/mod.rs index 3c1c1e9d..2c054254 100644 --- a/veilid-core/src/network_manager/types/mod.rs +++ b/veilid-core/src/network_manager/types/mod.rs @@ -1,9 +1,9 @@ mod address; mod address_type; -mod connection_descriptor; mod dial_info; mod dial_info_class; mod dial_info_filter; +mod flow; mod low_level_protocol_type; mod network_class; mod peer_address; @@ -15,10 +15,10 @@ use super::*; pub use address::*; pub use address_type::*; -pub use connection_descriptor::*; pub use dial_info::*; pub use dial_info_class::*; pub use dial_info_filter::*; +pub use flow::*; pub use low_level_protocol_type::*; pub use network_class::*; pub use peer_address::*; diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index c6ce03da..a27b3ed9 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -248,11 +248,11 @@ impl Network { #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] pub async fn send_data_to_existing_connection( &self, - descriptor: ConnectionDescriptor, + flow: Flow, data: Vec, ) -> EyreResult>> { let data_len = data.len(); - match descriptor.protocol_type() { + match flow.protocol_type() { ProtocolType::UDP => { bail!("no support for UDP protocol") } @@ -265,13 +265,13 @@ impl Network { // Handle connection-oriented protocols // Try to send to the exact existing connection if one exists - if let Some(conn) = self.connection_manager().get_connection(descriptor) { + if let Some(conn) = self.connection_manager().get_connection(flow) { // connection exists, send over it match conn.send_async(data).await { ConnectionHandleSendResult::Sent => { // Network accounting self.network_manager().stats_packet_sent( - descriptor.remote().socket_addr().ip(), + flow.remote().socket_addr().ip(), ByteCount::new(data_len as u64), ); @@ -295,7 +295,7 @@ impl Network { &self, dial_info: DialInfo, data: Vec, - ) -> EyreResult> { + ) -> EyreResult> { self.record_dial_info_failure(dial_info.clone(), async move { let data_len = data.len(); if dial_info.protocol_type() == ProtocolType::UDP { @@ -318,13 +318,13 @@ impl Network { "failed to send", ))); } - let connection_descriptor = conn.connection_descriptor(); + let flow = conn.flow(); // Network accounting self.network_manager() .stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64)); - Ok(NetworkResult::value(connection_descriptor)) + Ok(NetworkResult::value(flow)) }) .await } diff --git a/veilid-core/src/network_manager/wasm/protocol/mod.rs b/veilid-core/src/network_manager/wasm/protocol/mod.rs index 44f77d20..6b1ae16b 100644 --- a/veilid-core/src/network_manager/wasm/protocol/mod.rs +++ b/veilid-core/src/network_manager/wasm/protocol/mod.rs @@ -35,10 +35,10 @@ impl ProtocolNetworkConnection { } } - pub fn descriptor(&self) -> ConnectionDescriptor { + pub fn flow(&self) -> Flow { match self { - // Self::Dummy(d) => d.descriptor(), - Self::Ws(w) => w.descriptor(), + // Self::Dummy(d) => d.flow(), + Self::Ws(w) => w.flow(), } } pub async fn close(&self) -> io::Result> { diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index 6d7e1b5d..c9306fb1 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -34,7 +34,7 @@ fn to_io(err: WsErr) -> io::Error { #[derive(Clone)] pub struct WebsocketNetworkConnection { - descriptor: ConnectionDescriptor, + flow: Flow, inner: Arc, } @@ -45,9 +45,9 @@ impl fmt::Debug for WebsocketNetworkConnection { } impl WebsocketNetworkConnection { - pub fn new(descriptor: ConnectionDescriptor, ws_meta: WsMeta, ws_stream: WsStream) -> Self { + pub fn new(flow: Flow, ws_meta: WsMeta, ws_stream: WsStream) -> Self { Self { - descriptor, + flow, inner: Arc::new(WebsocketNetworkConnectionInner { ws_meta, ws_stream: CloneStream::new(ws_stream), @@ -55,8 +55,8 @@ impl WebsocketNetworkConnection { } } - pub fn descriptor(&self) -> ConnectionDescriptor { - self.descriptor + pub fn flow(&self) -> Flow { + self.flow } #[cfg_attr( @@ -147,9 +147,9 @@ impl WebsocketProtocolHandler { .into_network_result()) .into_network_result()?); - // Make our connection descriptor + // Make our flow let wnc = WebsocketNetworkConnection::new( - ConnectionDescriptor::new_no_local(dial_info.peer_address()), + Flow::new_no_local(dial_info.peer_address()), wsmeta, wsio, ); diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index c6fbcb47..28a666c6 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -36,7 +36,7 @@ pub(crate) enum BucketEntryState { } #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] -pub(crate) struct LastConnectionKey(ProtocolType, AddressType); +pub(crate) struct LastFlowKey(ProtocolType, AddressType); /// Bucket entry information specific to the LocalNetwork RoutingDomain #[derive(Debug, Serialize, Deserialize)] @@ -75,9 +75,9 @@ pub(crate) struct BucketEntryInner { /// has the same timestamp, because if we change our own IP address or network class it may be possible for nodes that were /// unreachable may now be reachable with the same SignedNodeInfo/DialInfo updated_since_last_network_change: bool, - /// The last connection descriptors used to contact this node, per protocol type + /// The last flows used to contact this node, per protocol type #[serde(skip)] - last_connections: BTreeMap, + last_flows: BTreeMap, /// The node info for this entry on the publicinternet routing domain public_internet: BucketEntryPublicInternet, /// The node info for this entry on the localnetwork routing domain @@ -304,7 +304,7 @@ impl BucketEntryInner { // The latest connection would have been the one we got the new node info // over so that connection is still valid. if node_info_changed { - self.clear_last_connections_except_latest(); + self.clear_last_flows_except_latest(); } } @@ -333,7 +333,7 @@ impl BucketEntryInner { } // Check connections - let last_connections = self.last_connections( + let last_connections = self.last_flows( rti, true, NodeRefFilter::from(routing_domain), @@ -387,7 +387,7 @@ impl BucketEntryInner { } // Check connections let mut best_routing_domain: Option = None; - let last_connections = self.last_connections( + let last_connections = self.last_flows( rti, true, NodeRefFilter::from(routing_domain_set), @@ -408,77 +408,77 @@ impl BucketEntryInner { best_routing_domain } - fn descriptor_to_key(&self, last_connection: ConnectionDescriptor) -> LastConnectionKey { - LastConnectionKey( - last_connection.protocol_type(), - last_connection.address_type(), + fn flow_to_key(&self, last_flow: Flow) -> LastFlowKey { + LastFlowKey( + last_flow.protocol_type(), + last_flow.address_type(), ) } - // Stores a connection descriptor in this entry's table of last connections - pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: Timestamp) { + // Stores a flow in this entry's table of last flows + pub fn set_last_flow(&mut self, last_flow: Flow, timestamp: Timestamp) { if self.is_punished { // Don't record connection if this entry is currently punished return; } - let key = self.descriptor_to_key(last_connection); - self.last_connections - .insert(key, (last_connection, timestamp)); + let key = self.flow_to_key(last_flow); + self.last_flows + .insert(key, (last_flow, timestamp)); } - // Removes a connection descriptor in this entry's table of last connections - pub fn clear_last_connection(&mut self, last_connection: ConnectionDescriptor) { - let key = self.descriptor_to_key(last_connection); - self.last_connections + // Removes a flow in this entry's table of last flows + pub fn remove_last_flow(&mut self, last_flow: Flow) { + let key = self.flow_to_key(last_flow); + self.last_flows .remove(&key); } - // Clears the table of last connections to ensure we create new ones and drop any existing ones - pub fn clear_last_connections(&mut self) { - self.last_connections.clear(); + // Clears the table of last flows to ensure we create new ones and drop any existing ones + pub fn clear_last_flows(&mut self) { + self.last_flows.clear(); } - // Clears the table of last connections except the most recent one - pub fn clear_last_connections_except_latest(&mut self) { - if self.last_connections.is_empty() { + // Clears the table of last flows except the most recent one + pub fn clear_last_flows_except_latest(&mut self) { + if self.last_flows.is_empty() { // No last_connections return; } - let mut dead_keys = Vec::with_capacity(self.last_connections.len()-1); - let mut most_recent_connection = None; - let mut most_recent_connection_time = 0u64; - for (k, v) in &self.last_connections { + let mut dead_keys = Vec::with_capacity(self.last_flows.len()-1); + let mut most_recent_flow = None; + let mut most_recent_flow_time = 0u64; + for (k, v) in &self.last_flows { let lct = v.1.as_u64(); - if lct > most_recent_connection_time { - most_recent_connection = Some(k); - most_recent_connection_time = lct; + if lct > most_recent_flow_time { + most_recent_flow = Some(k); + most_recent_flow_time = lct; } } - let Some(most_recent_connection) = most_recent_connection else { + let Some(most_recent_flow) = most_recent_flow else { return; }; - for k in self.last_connections.keys() { - if k != most_recent_connection { + for k in self.last_flows.keys() { + if k != most_recent_flow { dead_keys.push(k.clone()); } } for dk in dead_keys { - self.last_connections.remove(&dk); + self.last_flows.remove(&dk); } } - // Gets all the 'last connections' that match a particular filter, and their accompanying timestamps of last use - pub(super) fn last_connections( + // Gets all the 'last flows' that match a particular filter, and their accompanying timestamps of last use + pub(super) fn last_flows( &self, rti: &RoutingTableInner, only_live: bool, filter: NodeRefFilter, - ) -> Vec<(ConnectionDescriptor, Timestamp)> { + ) -> Vec<(Flow, Timestamp)> { let connection_manager = rti.unlocked_inner.network_manager.connection_manager(); - let mut out: Vec<(ConnectionDescriptor, Timestamp)> = self - .last_connections + let mut out: Vec<(Flow, Timestamp)> = self + .last_flows .iter() .filter_map(|(k, v)| { let include = { @@ -564,7 +564,7 @@ impl BucketEntryInner { pub fn set_punished(&mut self, punished: bool) { self.is_punished = punished; if punished { - self.clear_last_connections(); + self.clear_last_flows(); } } @@ -845,7 +845,7 @@ impl BucketEntry { unsupported_node_ids: TypedKeyGroup::new(), envelope_support: Vec::new(), updated_since_last_network_change: false, - last_connections: BTreeMap::new(), + last_flows: BTreeMap::new(), local_network: BucketEntryLocalNetwork { last_seen_our_node_info_ts: Timestamp::new(0u64), signed_node_info: None, diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 9770d51b..8d0a9f00 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -90,7 +90,7 @@ pub type BucketIndex = (CryptoKind, usize); #[derive(Debug, Clone, Copy)] pub(crate) struct RecentPeersEntry { - pub last_connection: ConnectionDescriptor, + pub last_connection: Flow, } pub(crate) struct RoutingTableUnlockedInner { @@ -662,13 +662,13 @@ impl RoutingTable { pub fn register_node_with_existing_connection( &self, node_id: TypedKey, - descriptor: ConnectionDescriptor, + flow: Flow, timestamp: Timestamp, ) -> EyreResult { self.inner.write().register_node_with_existing_connection( self.clone(), node_id, - descriptor, + flow, timestamp, ) } @@ -698,7 +698,7 @@ impl RoutingTable { for e in &recent_peers { let mut dead = true; if let Ok(Some(nr)) = self.lookup_node_ref(*e) { - if let Some(last_connection) = nr.last_connection() { + if let Some(last_connection) = nr.last_flow() { out.push((*e, RecentPeersEntry { last_connection })); dead = false; } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 1acaef96..49f56e95 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -273,13 +273,13 @@ pub(crate) trait NodeRefBase: Sized { /// Get the most recent 'last connection' to this node /// Filtered first and then sorted by ordering preference and then by most recent - fn last_connection(&self) -> Option { + fn last_flow(&self) -> Option { self.operate(|rti, e| { // apply sequencing to filter and get sort let sequencing = self.common().sequencing; let filter = self.common().filter.unwrap_or_default(); let (ordered, filter) = filter.with_sequencing(sequencing); - let mut last_connections = e.last_connections(rti, true, filter); + let mut last_connections = e.last_flows(rti, true, filter); if ordered { last_connections.sort_by(|a, b| { @@ -292,19 +292,19 @@ pub(crate) trait NodeRefBase: Sized { } fn clear_last_connections(&self) { - self.operate_mut(|_rti, e| e.clear_last_connections()) + self.operate_mut(|_rti, e| e.clear_last_flows()) } - fn set_last_connection(&self, connection_descriptor: ConnectionDescriptor, ts: Timestamp) { + fn set_last_flow(&self, flow: Flow, ts: Timestamp) { self.operate_mut(|rti, e| { - e.set_last_connection(connection_descriptor, ts); - rti.touch_recent_peer(e.best_node_id(), connection_descriptor); + e.set_last_flow(flow, ts); + rti.touch_recent_peer(e.best_node_id(), flow); }) } - fn clear_last_connection(&self, connection_descriptor: ConnectionDescriptor) { + fn clear_last_connection(&self, flow: Flow) { self.operate_mut(|_rti, e| { - e.clear_last_connection(connection_descriptor); + e.remove_last_flow(flow); }) } @@ -325,6 +325,10 @@ pub(crate) trait NodeRefBase: Sized { self.stats_failed_to_send(get_aligned_timestamp(), false); } + fn report_failed_route_test(&self) { + self.stats_failed_to_send(get_aligned_timestamp(), false); + } + fn stats_question_sent(&self, ts: Timestamp, bytes: Timestamp, expects_answer: bool) { self.operate_mut(|rti, e| { rti.transfer_stats_accounting().add_up(bytes); diff --git a/veilid-core/src/routing_table/node_ref_filter.rs b/veilid-core/src/routing_table/node_ref_filter.rs index dfa48208..68509832 100644 --- a/veilid-core/src/routing_table/node_ref_filter.rs +++ b/veilid-core/src/routing_table/node_ref_filter.rs @@ -112,8 +112,8 @@ impl From for NodeRefFilter { } } -impl From for NodeRefFilter { - fn from(other: ConnectionDescriptor) -> Self { +impl From for NodeRefFilter { + fn from(other: Flow) -> Self { Self { routing_domain_set: RoutingDomainSet::all(), dial_info_filter: DialInfoFilter::from(other), diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs index 23f87b30..0ddc59c3 100644 --- a/veilid-core/src/routing_table/route_spec_store/mod.rs +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -661,9 +661,9 @@ impl RouteSpecStore { )] async fn test_allocated_route(&self, private_route_id: RouteId) -> VeilidAPIResult { // Make loopback route to test with - let dest = { - // Get the best private route for this id - let (key, hop_count) = { + let (dest, hops) = { + // Get the best allocated route for this id + let (key, hops) = { let inner = &mut *self.inner.lock(); let Some(rssd) = inner.content.get_detail(&private_route_id) else { apibail_invalid_argument!( @@ -675,9 +675,10 @@ impl RouteSpecStore { let Some(key) = rssd.get_best_route_set_key() else { apibail_internal!("no best key to test allocated route"); }; - // Match the private route's hop length for safety route length - let hop_count = rssd.hop_count(); - (key, hop_count) + // Get the hops so we can match the route's hop length for safety + // route length as well as marking nodes as unreliable if this fails + let hops = rssd.hops_node_refs(); + (key, hops) }; // Get the private route to send to @@ -686,6 +687,8 @@ impl RouteSpecStore { let stability = Stability::Reliable; // Routes should test with the most likely to succeed sequencing they are capable of let sequencing = Sequencing::PreferOrdered; + // Hop count for safety spec should match the private route spec + let hop_count = hops.len(); let safety_spec = SafetySpec { preferred_route: Some(private_route_id), @@ -695,10 +698,13 @@ impl RouteSpecStore { }; let safety_selection = SafetySelection::Safe(safety_spec); - Destination::PrivateRoute { - private_route, - safety_selection, - } + ( + Destination::PrivateRoute { + private_route, + safety_selection, + }, + hops, + ) }; // Test with double-round trip ping to self @@ -706,7 +712,12 @@ impl RouteSpecStore { let _res = match rpc_processor.rpc_call_status(dest).await? { NetworkResult::Value(v) => v, _ => { - // Did not error, but did not come back, just return false + // Did not error, but did not come back, mark the nodes as failed to send, and then return false + // This will prevent those node from immediately being included in the next allocated route, + // avoiding the same route being constructed to replace this one when it is removed. + for hop in hops { + hop.report_failed_route_test(); + } return Ok(false); } }; diff --git a/veilid-core/src/routing_table/route_spec_store/route_set_spec_detail.rs b/veilid-core/src/routing_table/route_spec_store/route_set_spec_detail.rs index 11887352..bedf4efc 100644 --- a/veilid-core/src/routing_table/route_spec_store/route_set_spec_detail.rs +++ b/veilid-core/src/routing_table/route_spec_store/route_set_spec_detail.rs @@ -86,6 +86,9 @@ impl RouteSetSpecDetail { pub fn hop_count(&self) -> usize { self.hop_node_refs.len() } + pub fn hops_node_refs(&self) -> Vec { + self.hop_node_refs.clone() + } pub fn hop_node_ref(&self, idx: usize) -> Option { self.hop_node_refs.get(idx).cloned() } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index e8663703..855e1321 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -378,7 +378,7 @@ impl RoutingTableInner { for bucket in &self.buckets[&ck] { for entry in bucket.entries() { entry.1.with_mut_inner(|e| { - e.clear_last_connections(); + e.clear_last_flows(); }); } } @@ -853,7 +853,7 @@ impl RoutingTableInner { &mut self, outer_self: RoutingTable, node_id: TypedKey, - descriptor: ConnectionDescriptor, + flow: Flow, timestamp: Timestamp, ) -> EyreResult { let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| { @@ -861,8 +861,7 @@ impl RoutingTableInner { e.touch_last_seen(timestamp); })?; // set the most recent node address for connection finding and udp replies - nr.locked_mut(self) - .set_last_connection(descriptor, timestamp); + nr.locked_mut(self).set_last_flow(flow, timestamp); Ok(nr) } @@ -912,7 +911,7 @@ impl RoutingTableInner { } } - pub fn touch_recent_peer(&mut self, node_id: TypedKey, last_connection: ConnectionDescriptor) { + pub fn touch_recent_peer(&mut self, node_id: TypedKey, last_connection: Flow) { self.recent_peers .insert(node_id, RecentPeersEntry { last_connection }); } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index e9c4cc19..009559ed 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -54,8 +54,8 @@ struct RPCMessageHeaderDetailDirect { envelope: Envelope, /// The noderef of the peer that sent the message (not the original sender). Ensures node doesn't get evicted from routing table until we're done with it peer_noderef: NodeRef, - /// The connection from the peer sent the message (not the original sender) - connection_descriptor: ConnectionDescriptor, + /// The flow from the peer sent the message (not the original sender) + flow: Flow, /// The routing domain the message was sent through routing_domain: RoutingDomain, } @@ -189,7 +189,7 @@ struct WaitableReply { safety_route: Option, remote_private_route: Option, reply_private_route: Option, - _connection_ref_scope: ConnectionRefScope, + _opt_connection_ref_scope: Option, } ///////////////////////////////////////////////////////////////////// @@ -1158,7 +1158,7 @@ impl RPCProcessor { // Log rpc send #[cfg(feature = "verbose-tracing")] - debug!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest, protect); + debug!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { @@ -1224,10 +1224,10 @@ impl RPCProcessor { // Ref the connection so it doesn't go away until we're done with the waitable reply - let connection_ref_scope = self + let opt_connection_ref_scope = send_data_method.unique_flow.connection_id.and_then(|id| self .network_manager() .connection_manager() - .connection_ref_scope(send_data_method.connection_descriptor); + .try_connection_ref_scope(id)); // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { @@ -1239,7 +1239,7 @@ impl RPCProcessor { safety_route, remote_private_route, reply_private_route, - _connection_ref_scope: connection_ref_scope, + _opt_connection_ref_scope: opt_connection_ref_scope, })) } @@ -1660,7 +1660,7 @@ impl RPCProcessor { &self, envelope: Envelope, peer_noderef: NodeRef, - connection_descriptor: ConnectionDescriptor, + flow: Flow, routing_domain: RoutingDomain, body: Vec, ) -> EyreResult<()> { @@ -1668,7 +1668,7 @@ impl RPCProcessor { detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect { envelope, peer_noderef, - connection_descriptor, + flow, routing_domain, }), timestamp: get_aligned_timestamp(), diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 056770ee..aa36d37f 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -49,8 +49,8 @@ impl RPCProcessor { // Can't allow anything other than direct packets here, as handling reverse connections // or anything like via signals over private routes would deanonymize the route - let connection_descriptor = match &msg.header.detail { - RPCMessageHeaderDetail::Direct(d) => d.connection_descriptor, + let flow = match &msg.header.detail { + RPCMessageHeaderDetail::Direct(d) => d.flow, RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { return Ok(NetworkResult::invalid_message("signal must be direct")); } @@ -70,7 +70,7 @@ impl RPCProcessor { let network_manager = self.network_manager(); let signal_info = signal.destructure(); network_manager - .handle_signal(connection_descriptor, signal_info) + .handle_signal(flow, signal_info) .await .map_err(RPCError::network) } diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 4126d7a9..ae03778d 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -162,13 +162,13 @@ impl RPCProcessor { .network_manager() .report_public_internet_socket_address( sender_info.socket_address, - send_data_method.connection_descriptor, + send_data_method.unique_flow.flow, target, ), RoutingDomain::LocalNetwork => { self.network_manager().report_local_network_socket_address( sender_info.socket_address, - send_data_method.connection_descriptor, + send_data_method.unique_flow.flow, target, ) } @@ -208,7 +208,7 @@ impl RPCProcessor { let (node_status, sender_info) = match &msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => { - let connection_descriptor = detail.connection_descriptor; + let flow = detail.flow; let routing_domain = detail.routing_domain; // Ensure the node status from the question is the kind for the routing domain we received the request in @@ -222,7 +222,7 @@ impl RPCProcessor { // Get the peer address in the returned sender info let sender_info = SenderInfo { - socket_address: *connection_descriptor.remote_address(), + socket_address: *flow.remote_address(), }; // Make status answer diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index 56d2eba4..a5bd52f6 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -34,6 +34,7 @@ rt-tokio = [ ] tracking = ["veilid-core/tracking"] network-result-extra = ["veilid-core/network-result-extra"] +verbose-tracing = ["veilid-core/verbose-tracing"] [dependencies] veilid-core = { path = "../veilid-core", default-features = false }