From 4906c5df788c4a3b327833f99a6ad53482968b35 Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 26 Apr 2024 15:23:38 -0400 Subject: [PATCH] more refactor and clean up low level networking --- .../native/discovery_context.rs | 90 +++++----- veilid-core/src/network_manager/native/mod.rs | 37 ++-- .../native/network_class_discovery.rs | 167 ++++++------------ .../src/network_manager/native/network_tcp.rs | 17 +- .../src/network_manager/native/network_udp.rs | 1 - .../native/protocol/sockets.rs | 19 +- .../network_manager/native/protocol/tcp.rs | 20 +-- .../network_manager/native/protocol/udp.rs | 2 +- .../src/network_manager/native/protocol/ws.rs | 3 +- .../network_manager/native/start_protocols.rs | 158 +++++++++-------- .../tasks/public_address_check.rs | 15 ++ 11 files changed, 257 insertions(+), 272 deletions(-) diff --git a/veilid-core/src/network_manager/native/discovery_context.rs b/veilid-core/src/network_manager/native/discovery_context.rs index c1b05c8b..9a5be337 100644 --- a/veilid-core/src/network_manager/native/discovery_context.rs +++ b/veilid-core/src/network_manager/native/discovery_context.rs @@ -19,6 +19,7 @@ pub enum DetectedDialInfo { pub struct DetectionResult { pub ddi: DetectedDialInfo, pub external_address_types: AddressTypeSet, + pub local_port: u16, } // Result of checking external address @@ -46,6 +47,7 @@ struct DiscoveryContextUnlockedInner { existing_external_address: Option, protocol_type: ProtocolType, address_type: AddressType, + port: u16, } #[derive(Clone)] @@ -62,6 +64,7 @@ impl DiscoveryContext { net: Network, protocol_type: ProtocolType, address_type: AddressType, + port: u16, clear_network_callback: ClearNetworkCallback, ) -> Self { let intf_addrs = @@ -95,6 +98,7 @@ impl DiscoveryContext { existing_external_address, protocol_type, address_type, + port, }), inner: Arc::new(Mutex::new(DiscoveryContextInner { external_1: None, @@ -327,11 +331,7 @@ impl DiscoveryContext { let protocol_type = self.unlocked_inner.protocol_type; let low_level_protocol_type = protocol_type.low_level_protocol_type(); let address_type = self.unlocked_inner.address_type; - let local_port = self - .unlocked_inner - .net - .get_local_port(protocol_type) - .unwrap(); + let local_port = self.unlocked_inner.port; let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone(); let igd_manager = self.unlocked_inner.net.unlocked_inner.igd_manager.clone(); @@ -372,7 +372,7 @@ impl DiscoveryContext { return Some(external_mapped_dial_info); } - if validate_tries == PORT_MAP_VALIDATE_TRY_COUNT { + if validate_tries != PORT_MAP_VALIDATE_TRY_COUNT { log_net!(debug "UPNP port mapping succeeded but port {}/{} is still unreachable.\nretrying\n", local_port, match low_level_protocol_type { LowLevelProtocolType::UDP => "udp", @@ -431,6 +431,7 @@ impl DiscoveryContext { class: DialInfoClass::Direct, }), external_address_types: AddressTypeSet::only(external_1.address.address_type()), + local_port: this.unlocked_inner.port, }) } else { // Add public dial info with Blocked dialinfo class @@ -440,6 +441,7 @@ impl DiscoveryContext { class: DialInfoClass::Blocked, }), external_address_types: AddressTypeSet::only(external_1.address.address_type()), + local_port: this.unlocked_inner.port, }) } }); @@ -463,6 +465,7 @@ impl DiscoveryContext { // If we have two different external addresses, then this is a symmetric NAT if external_2.address.address() != external_1.address.address() { + let this = self.clone(); let do_symmetric_nat_fut: SendPinBoxFuture> = Box::pin(async move { Some(DetectionResult { @@ -472,6 +475,7 @@ impl DiscoveryContext { ) | AddressTypeSet::only( external_2.address.address_type(), ), + local_port: this.unlocked_inner.port, }) }); unord.push(do_symmetric_nat_fut); @@ -481,45 +485,41 @@ impl DiscoveryContext { // Manual Mapping Detection /////////// let this = self.clone(); - if let Some(local_port) = self - .unlocked_inner - .net - .get_local_port(self.unlocked_inner.protocol_type) - { - if external_1.dial_info.port() != local_port { - let c_external_1 = external_1.clone(); - let do_manual_map_fut: SendPinBoxFuture> = - Box::pin(async move { - // Do a validate_dial_info on the external address, but with the same port as the local port of local interface, from a redirected node - // This test is to see if a node had manual port forwarding done with the same port number as the local listener - let mut external_1_dial_info_with_local_port = - c_external_1.dial_info.clone(); - external_1_dial_info_with_local_port.set_port(local_port); + let local_port = self.unlocked_inner.port; + if external_1.dial_info.port() != local_port { + let c_external_1 = external_1.clone(); + let c_this = this.clone(); + let do_manual_map_fut: SendPinBoxFuture> = + Box::pin(async move { + // Do a validate_dial_info on the external address, but with the same port as the local port of local interface, from a redirected node + // This test is to see if a node had manual port forwarding done with the same port number as the local listener + let mut external_1_dial_info_with_local_port = c_external_1.dial_info.clone(); + external_1_dial_info_with_local_port.set_port(local_port); - if this - .validate_dial_info( - c_external_1.node.clone(), - external_1_dial_info_with_local_port.clone(), - true, - ) - .await - { - // Add public dial info with Direct dialinfo class - return Some(DetectionResult { - ddi: DetectedDialInfo::Detected(DialInfoDetail { - dial_info: external_1_dial_info_with_local_port, - class: DialInfoClass::Direct, - }), - external_address_types: AddressTypeSet::only( - c_external_1.address.address_type(), - ), - }); - } + if this + .validate_dial_info( + c_external_1.node.clone(), + external_1_dial_info_with_local_port.clone(), + true, + ) + .await + { + // Add public dial info with Direct dialinfo class + return Some(DetectionResult { + ddi: DetectedDialInfo::Detected(DialInfoDetail { + dial_info: external_1_dial_info_with_local_port, + class: DialInfoClass::Direct, + }), + external_address_types: AddressTypeSet::only( + c_external_1.address.address_type(), + ), + local_port: c_this.unlocked_inner.port, + }); + } - None - }); - unord.push(do_manual_map_fut); - } + None + }); + unord.push(do_manual_map_fut); } // NAT Detection @@ -563,6 +563,7 @@ impl DiscoveryContext { external_address_types: AddressTypeSet::only( c_external_1.address.address_type(), ), + local_port: c_this.unlocked_inner.port, }); } None @@ -597,6 +598,7 @@ impl DiscoveryContext { external_address_types: AddressTypeSet::only( c_external_1.address.address_type(), ), + local_port: c_this.unlocked_inner.port, }); } // Didn't get a reply from a non-default port, which means we are also port restricted @@ -608,6 +610,7 @@ impl DiscoveryContext { external_address_types: AddressTypeSet::only( c_external_1.address.address_type(), ), + local_port: c_this.unlocked_inner.port, }) }); ord.push_back(do_restricted_cone_fut); @@ -699,6 +702,7 @@ impl DiscoveryContext { external_address_types: AddressTypeSet::only( external_mapped_dial_info.address_type(), ), + local_port: this.unlocked_inner.port, }); } None diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 9ff31171..2d33401f 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -85,14 +85,6 @@ struct NetworkInner { join_handles: Vec>, /// stop source for shutting down the low level network background tasks stop_source: Option, - /// port we are binding raw udp listen to - udp_port: u16, - /// port we are binding raw tcp listen to - tcp_port: u16, - /// port we are binding websocket listen to - ws_port: u16, - /// port we are binding secure websocket listen to - wss_port: u16, /// does our network have ipv4 on any network? enable_ipv4: bool, /// does our network have ipv6 on the global internet? @@ -115,6 +107,8 @@ struct NetworkInner { tls_acceptor: Option, /// Multiplexer record for protocols on low level TCP sockets listener_states: BTreeMap>>, + /// Preferred local addresses for protocols/address combinations for outgoing connections + preferred_local_addresses: BTreeMap<(ProtocolType, AddressType), SocketAddr>, } struct NetworkUnlockedInner { @@ -152,10 +146,6 @@ impl Network { static_public_dialinfo: ProtocolTypeSet::empty(), join_handles: Vec::new(), stop_source: None, - udp_port: 0u16, - tcp_port: 0u16, - ws_port: 0u16, - wss_port: 0u16, enable_ipv4: false, enable_ipv6_global: false, enable_ipv6_local: false, @@ -164,6 +154,7 @@ impl Network { default_udpv6_protocol_handler: None, tls_acceptor: None, listener_states: BTreeMap::new(), + preferred_local_addresses: BTreeMap::new(), } } @@ -327,18 +318,18 @@ impl Network { pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option { let inner = self.inner.lock(); + let key = (dial_info.protocol_type(), dial_info.address_type()); + inner.preferred_local_addresses.get(&key).copied() + } - let local_port = match dial_info.protocol_type() { - ProtocolType::UDP => inner.udp_port, - ProtocolType::TCP => inner.tcp_port, - ProtocolType::WS => inner.ws_port, - ProtocolType::WSS => inner.wss_port, - }; - - Some(match dial_info.address_type() { - AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), local_port), - AddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), local_port), - }) + pub fn get_preferred_local_address_by_key( + &self, + pt: ProtocolType, + at: AddressType, + ) -> Option { + let inner = self.inner.lock(); + let key = (pt, at); + inner.preferred_local_addresses.get(&key).copied() } pub fn is_stable_interface_address(&self, addr: IpAddr) -> bool { diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 3ed4ecff..6ed25ef4 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -103,21 +103,35 @@ impl Network { _t: u64, ) -> EyreResult<()> { // Figure out if we can optimize TCP/WS checking since they are often on the same port - let (protocol_config, tcp_same_port) = { + let (protocol_config, inbound_protocol_map) = { let mut inner = self.inner.lock(); let protocol_config = inner.protocol_config.clone(); - let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP) - && protocol_config.inbound.contains(ProtocolType::WS) - { - inner.tcp_port == inner.ws_port - } else { - false - }; + // Allow network to be cleared if external addresses change inner.network_already_cleared = false; - // - (protocol_config, tcp_same_port) + let mut inbound_protocol_map = HashMap::<(AddressType, u16), Vec>::new(); + for at in protocol_config.family_global { + for pt in protocol_config.inbound { + let key = (pt, at); + + // Skip things with static public dialinfo + // as they don't need to participate in discovery + if inner.static_public_dialinfo.contains(pt) { + continue; + } + + if let Some(pla) = inner.preferred_local_addresses.get(&key) { + let itmkey = (at, pla.port()); + inbound_protocol_map + .entry(itmkey) + .and_modify(|x| x.push(pt)) + .or_insert_with(|| vec![pt]); + } + } + } + + (protocol_config, inbound_protocol_map) }; // Save off existing public dial info for change detection later @@ -166,100 +180,19 @@ impl Network { // Process all protocol and address combinations let mut unord = FuturesUnordered::new(); - // Do UDPv4+v6 at the same time as everything else - if protocol_config.inbound.contains(ProtocolType::UDP) { - // UDPv4 - if protocol_config.family_global.contains(AddressType::IPV4) { - let udpv4_context = DiscoveryContext::new( - self.routing_table(), - self.clone(), - ProtocolType::UDP, - AddressType::IPV4, - clear_network_callback.clone(), - ); - udpv4_context - .discover(&mut unord) - .instrument(trace_span!("udpv4_context.discover")) - .await; - } - // UDPv6 - if protocol_config.family_global.contains(AddressType::IPV6) { - let udpv6_context = DiscoveryContext::new( - self.routing_table(), - self.clone(), - ProtocolType::UDP, - AddressType::IPV6, - clear_network_callback.clone(), - ); - udpv6_context - .discover(&mut unord) - .instrument(trace_span!("udpv6_context.discover")) - .await; - } - } + for ((at, port), protocols) in &inbound_protocol_map { + let first_pt = protocols.first().unwrap(); - // Do TCPv4. Possibly do WSv4 if it is on a different port - if protocol_config.family_global.contains(AddressType::IPV4) { - if protocol_config.inbound.contains(ProtocolType::TCP) { - let tcpv4_context = DiscoveryContext::new( - self.routing_table(), - self.clone(), - ProtocolType::TCP, - AddressType::IPV4, - clear_network_callback.clone(), - ); - tcpv4_context - .discover(&mut unord) - .instrument(trace_span!("tcpv4_context.discover")) - .await; - } - - if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port { - let wsv4_context = DiscoveryContext::new( - self.routing_table(), - self.clone(), - ProtocolType::WS, - AddressType::IPV4, - clear_network_callback.clone(), - ); - wsv4_context - .discover(&mut unord) - .instrument(trace_span!("wsv4_context.discover")) - .await; - } - } - - // Do TCPv6. Possibly do WSv6 if it is on a different port - if protocol_config.family_global.contains(AddressType::IPV6) { - if protocol_config.inbound.contains(ProtocolType::TCP) { - let tcpv6_context = DiscoveryContext::new( - self.routing_table(), - self.clone(), - ProtocolType::TCP, - AddressType::IPV6, - clear_network_callback.clone(), - ); - tcpv6_context - .discover(&mut unord) - .instrument(trace_span!("tcpv6_context.discover")) - .await; - } - - // WSv6 - if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port { - let wsv6_context = DiscoveryContext::new( - self.routing_table(), - self.clone(), - ProtocolType::WS, - AddressType::IPV6, - clear_network_callback.clone(), - ); - wsv6_context - .discover(&mut unord) - .instrument(trace_span!("wsv6_context.discover")) - .await; - } + let discovery_context = DiscoveryContext::new( + self.routing_table(), + self.clone(), + *first_pt, + *at, + *port, + clear_network_callback.clone(), + ); + discovery_context.discover(&mut unord).await; } // Wait for all discovery futures to complete and apply discoverycontexts @@ -273,19 +206,20 @@ impl Network { // Add the external address kinds to the set we've seen all_address_types |= dr.external_address_types; - // Add WS dialinfo as well if it is on the same port as TCP + // Add additional dialinfo for protocols on the same port if let DetectedDialInfo::Detected(did) = &dr.ddi { - if did.dial_info.protocol_type() == ProtocolType::TCP && tcp_same_port { - // Make WS dialinfo as well with same socket address as TCP - let ws_ddi = DetectedDialInfo::Detected(DialInfoDetail { - dial_info: self.make_dial_info( - did.dial_info.socket_address(), - ProtocolType::WS, - ), + let ipmkey = (did.dial_info.address_type(), dr.local_port); + for additional_pt in + inbound_protocol_map.get(&ipmkey).unwrap().iter().skip(1) + { + // Make dialinfo for additional protocol type + let additional_ddi = DetectedDialInfo::Detected(DialInfoDetail { + dial_info: self + .make_dial_info(did.dial_info.socket_address(), *additional_pt), class: did.class, }); - // Add additional WS dialinfo - self.update_with_detected_dial_info(ws_ddi).await?; + // Add additional dialinfo + self.update_with_detected_dial_info(additional_ddi).await?; } } } @@ -368,7 +302,14 @@ impl Network { ) .unwrap() } - ProtocolType::WSS => panic!("none of the discovery functions are used for wss"), + ProtocolType::WSS => { + let c = self.config.get(); + DialInfo::try_wss( + addr, + format!("wss://{}/{}", addr, c.network.protocol.wss.path), + ) + .unwrap() + } } } } diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index a94cac18..2eba6d3e 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -247,12 +247,23 @@ impl Network { ) }; - // Create a reusable socket with no linger time, and no delay - let Some(socket) = new_bound_shared_tcp_socket(addr) - .wrap_err("failed to create bound shared tcp socket")? + // Create a socket and bind it + let Some(socket) = new_bound_default_tcp_socket(addr) + .wrap_err("failed to create default socket listener")? else { return Ok(false); }; + + // Drop the socket + drop(socket); + + // Create a shared socket and bind it once we have determined the port is free + let Some(socket) = new_bound_shared_tcp_socket(addr) + .wrap_err("failed to create shared socket listener")? + else { + return Ok(false); + }; + // Listen on the socket if socket.listen(128).is_err() { return Ok(false); diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 416b501a..19dbbf3c 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -208,7 +208,6 @@ impl Network { } // otherwise find the first outbound udp protocol handler that matches the ip protocol version of the peer addr - let inner = self.inner.lock(); match peer_socket_addr { SocketAddr::V4(_) => inner.udp_protocol_handlers.iter().find_map(|x| { if x.0.is_ipv4() { diff --git a/veilid-core/src/network_manager/native/protocol/sockets.rs b/veilid-core/src/network_manager/native/protocol/sockets.rs index 5f758c40..5dcd3edb 100644 --- a/veilid-core/src/network_manager/native/protocol/sockets.rs +++ b/veilid-core/src/network_manager/native/protocol/sockets.rs @@ -77,7 +77,7 @@ pub fn new_bound_default_udp_socket(local_address: SocketAddr) -> io::Result io::Result { +pub fn new_default_tcp_socket(domain: Domain) -> io::Result { let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; if let Err(e) = socket.set_nodelay(true) { log_net!(error "Couldn't set TCP nodelay: {}", e); @@ -89,7 +89,7 @@ pub fn new_unbound_tcp_socket(domain: Domain) -> io::Result { } #[instrument(level = "trace", ret)] -pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result { +pub fn new_shared_tcp_socket(domain: Domain) -> io::Result { let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; // if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) { // log_net!(error "Couldn't set TCP linger: {}", e); @@ -109,11 +109,24 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result { Ok(socket) } +#[instrument(level = "trace", ret)] +pub fn new_bound_default_tcp_socket(local_address: SocketAddr) -> io::Result> { + let domain = Domain::for_address(local_address); + let socket = new_default_tcp_socket(domain)?; + let socket2_addr = SockAddr::from(local_address); + if socket.bind(&socket2_addr).is_err() { + return Ok(None); + } + + log_net!("created bound default tcp socket on {:?}", &local_address); + + Ok(Some(socket)) +} #[instrument(level = "trace", ret)] pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result> { let domain = Domain::for_address(local_address); - let socket = new_unbound_shared_tcp_socket(domain)?; + let socket = new_shared_tcp_socket(domain)?; let socket2_addr = SockAddr::from(local_address); if socket.bind(&socket2_addr).is_err() { return Ok(None); diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 7e52b87d..17bb53b6 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -172,7 +172,7 @@ impl RawTcpProtocolHandler { Some(a) => { new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))? } - None => new_unbound_tcp_socket(socket2::Domain::for_address(socket_addr))?, + None => new_default_tcp_socket(socket2::Domain::for_address(socket_addr))?, }; // Non-blocking connect to remote address @@ -187,16 +187,16 @@ impl RawTcpProtocolHandler { let ps = AsyncPeekStream::new(ts); // Wrap the stream in a network connection and return it - let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( - Flow::new( - PeerAddress::new( - SocketAddress::from_socket_addr(socket_addr), - ProtocolType::TCP, - ), - SocketAddress::from_socket_addr(actual_local_address), + let flow = Flow::new( + PeerAddress::new( + SocketAddress::from_socket_addr(socket_addr), + ProtocolType::TCP, ), - ps, - )); + SocketAddress::from_socket_addr(actual_local_address), + ); + log_net!("rawtcp::connect: {:?}", flow); + + let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(flow, ps)); Ok(NetworkResult::Value(conn)) } diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index 55b0dd1c..0faabc14 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -128,7 +128,7 @@ impl RawUdpProtocolHandler { SocketAddress::from_socket_addr(local_socket_addr), ); - eprintln!("udp::send_message: {:?}", flow); + log_net!("udp::send_message: {:?}", flow); #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str()); diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 82ad9b0a..c9aad682 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -322,7 +322,7 @@ impl WebsocketProtocolHandler { Some(a) => { new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))? } - None => new_unbound_tcp_socket(socket2::Domain::for_address(remote_socket_addr))?, + None => new_default_tcp_socket(socket2::Domain::for_address(remote_socket_addr))?, }; // Non-blocking connect to remote address @@ -342,6 +342,7 @@ impl WebsocketProtocolHandler { dial_info.peer_address(), SocketAddress::from_socket_addr(actual_local_addr), ); + log_net!("{}::connect: {:?}", scheme, flow); // Negotiate TLS if this is WSS if tls { diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index bd73cad2..50af8087 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -121,6 +121,19 @@ impl Network { } } + // Add local dial info to preferred local address table + fn add_preferred_local_address(inner: &mut NetworkInner, pa: PeerAddress) { + let key = (pa.protocol_type(), pa.address_type()); + let sa = pa.socket_addr(); + let unspec_sa = match sa { + SocketAddr::V4(a) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, a.port())), + SocketAddr::V6(a) => { + SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, a.port(), 0, 0)) + } + }; + inner.preferred_local_addresses.insert(key, unspec_sa); + } + ///////////////////////////////////////////////////// pub(super) async fn bind_udp_protocol_handlers( @@ -156,7 +169,10 @@ impl Network { bind_set.port, bind_set.addrs ); } - let local_dial_info_list = self.create_udp_protocol_handlers(bind_set).await?; + + let mut local_dial_info_list = self.create_udp_protocol_handlers(bind_set).await?; + local_dial_info_list.sort(); + let mut static_public = false; log_net!( @@ -164,22 +180,6 @@ impl Network { local_dial_info_list ); - // Register local dial info - for di in &local_dial_info_list { - // If the local interface address is global, then register global dial info - // if no other public address is specified - if !detect_address_changes - && public_address.is_none() - && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, di) - { - editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?; - static_public = true; - } - - // Register interface dial info as well since the address is on the local interface - editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?; - } - // Add static public dialinfo if it's configured if let Some(public_address) = public_address.as_ref() { // Resolve statically configured public dialinfo @@ -192,11 +192,8 @@ impl Network { let pdi = DialInfo::udp_from_socketaddr(pdi_addr); // Register the public address - if !detect_address_changes { - editor_public_internet - .register_dial_info(pdi.clone(), DialInfoClass::Direct)?; - static_public = true; - } + editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?; + static_public = true; // See if this public address is also a local interface address we haven't registered yet let is_interface_address = (|| { @@ -217,14 +214,31 @@ impl Network { } } - if static_public { - self.inner - .lock() - .static_public_dialinfo - .insert(ProtocolType::UDP); + // Register local dial info + for di in &local_dial_info_list { + // If the local interface address is global, then register global dial info + // if no other public address is specified + if !detect_address_changes + && public_address.is_none() + && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, di) + { + editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?; + static_public = true; + } + + // Register interface dial info as well since the address is on the local interface + editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?; } - // xxx compile all dialinfo from editor into map of protocoltype+addresstype -> port for 'best port selection' code + { + let mut inner = self.inner.lock(); + if static_public { + inner.static_public_dialinfo.insert(ProtocolType::UDP); + } + for ldi in local_dial_info_list { + Self::add_preferred_local_address(&mut inner, ldi.peer_address()); + } + } // Now create tasks for udp listeners self.create_udp_listener_tasks().await @@ -293,11 +307,8 @@ impl Network { let pdi = DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone()) .wrap_err("try_ws failed")?; - if !detect_address_changes { - editor_public_internet - .register_dial_info(pdi.clone(), DialInfoClass::Direct)?; - static_public = true; - } + editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?; + static_public = true; // See if this public address is also a local interface address if !registered_addresses.contains(&gsa.ip()) @@ -310,14 +321,15 @@ impl Network { } } - for socket_address in socket_addresses { + for socket_address in &socket_addresses { // Skip addresses we already did if registered_addresses.contains(&socket_address.ip_addr()) { continue; } // Build dial info request url let local_url = format!("ws://{}/{}", socket_address, path); - let local_di = DialInfo::try_ws(socket_address, local_url).wrap_err("try_ws failed")?; + let local_di = + DialInfo::try_ws(*socket_address, local_url).wrap_err("try_ws failed")?; if !detect_address_changes && url.is_none() @@ -333,11 +345,12 @@ impl Network { editor_local_network.register_dial_info(local_di, DialInfoClass::Direct)?; } + let mut inner = self.inner.lock(); if static_public { - self.inner - .lock() - .static_public_dialinfo - .insert(ProtocolType::WS); + inner.static_public_dialinfo.insert(ProtocolType::WS); + } + for sa in socket_addresses { + Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::WS)); } Ok(()) @@ -350,7 +363,7 @@ impl Network { ) -> EyreResult<()> { log_net!("WSS: binding protocol handlers"); - let (listen_address, url, detect_address_changes) = { + let (listen_address, url, _detect_address_changes) = { let c = self.config.get(); ( c.network.protocol.wss.listen_address.clone(), @@ -411,11 +424,8 @@ impl Network { let pdi = DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone()) .wrap_err("try_wss failed")?; - if !detect_address_changes { - editor_public_internet - .register_dial_info(pdi.clone(), DialInfoClass::Direct)?; - static_public = true; - } + editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?; + static_public = true; // See if this public address is also a local interface address if !registered_addresses.contains(&gsa.ip()) @@ -430,11 +440,12 @@ impl Network { bail!("WSS URL must be specified due to TLS requirements"); } + let mut inner = self.inner.lock(); if static_public { - self.inner - .lock() - .static_public_dialinfo - .insert(ProtocolType::WSS); + inner.static_public_dialinfo.insert(ProtocolType::WSS); + } + for sa in socket_addresses { + Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::WSS)); } Ok(()) @@ -485,22 +496,6 @@ impl Network { let mut static_public = false; let mut registered_addresses: HashSet = HashSet::new(); - for socket_address in socket_addresses { - let di = DialInfo::tcp(socket_address); - - // Register global dial info if no public address is specified - if !detect_address_changes - && public_address.is_none() - && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di) - { - editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?; - static_public = true; - } - // Register interface dial info - editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?; - registered_addresses.insert(socket_address.ip_addr()); - } - // Add static public dialinfo if it's configured if let Some(public_address) = public_address.as_ref() { // Resolve statically configured public dialinfo @@ -516,11 +511,8 @@ impl Network { } let pdi = DialInfo::tcp_from_socketaddr(pdi_addr); - if !detect_address_changes { - editor_public_internet - .register_dial_info(pdi.clone(), DialInfoClass::Direct)?; - static_public = true; - } + editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?; + static_public = true; // See if this public address is also a local interface address if self.is_stable_interface_address(pdi_addr.ip()) { @@ -529,11 +521,29 @@ impl Network { } } + for socket_address in &socket_addresses { + let di = DialInfo::tcp(*socket_address); + + // Register global dial info if no public address is specified + if !detect_address_changes + && public_address.is_none() + && routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di) + { + editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?; + static_public = true; + } + // Register interface dial info + editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?; + registered_addresses.insert(socket_address.ip_addr()); + } + + let mut inner = self.inner.lock(); + if static_public { - self.inner - .lock() - .static_public_dialinfo - .insert(ProtocolType::TCP); + inner.static_public_dialinfo.insert(ProtocolType::TCP); + } + for sa in socket_addresses { + Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::TCP)); } Ok(()) diff --git a/veilid-core/src/network_manager/tasks/public_address_check.rs b/veilid-core/src/network_manager/tasks/public_address_check.rs index 07fb7547..4ba03b38 100644 --- a/veilid-core/src/network_manager/tasks/public_address_check.rs +++ b/veilid-core/src/network_manager/tasks/public_address_check.rs @@ -54,6 +54,21 @@ impl NetworkManager { return; } + // Ignore flows that do not start from our listening port (unbound connections etc), + // because a router is going to map these differently + let Some(pla) = + net.get_preferred_local_address_by_key(flow.protocol_type(), flow.address_type()) + else { + return; + }; + let Some(local) = flow.local() else { + return; + }; + if local.port() != pla.port() { + log_network_result!(debug "ignoring public internet address report because local port did not match listener: {} != {}", local.port(), pla.port()); + return; + } + // If we are a webapp we should skip this completely // because we will never get inbound dialinfo directly on our public ip address // If we have an invalid network class, this is not necessary yet