From 03e872c128f52817aaa452a13899c87fdabcc156 Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 25 Apr 2024 21:32:35 -0400 Subject: [PATCH] refactor net bind, wip --- veilid-core/src/network_manager/native/mod.rs | 39 +- .../src/network_manager/native/network_tcp.rs | 116 +++--- .../src/network_manager/native/network_udp.rs | 171 ++++----- .../native/protocol/sockets.rs | 107 +----- .../network_manager/native/protocol/tcp.rs | 4 +- .../network_manager/native/protocol/udp.rs | 2 + .../src/network_manager/native/protocol/ws.rs | 4 +- .../network_manager/native/start_protocols.rs | 354 ++++++------------ veilid-flutter/example/pubspec.lock | 2 +- veilid-tools/src/tools.rs | 9 +- 10 files changed, 287 insertions(+), 521 deletions(-) diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 6573ed79..9ff31171 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -15,6 +15,7 @@ use protocol::tcp::RawTcpProtocolHandler; use protocol::udp::RawUdpProtocolHandler; use protocol::ws::WebsocketProtocolHandler; pub(in crate::network_manager) use protocol::*; +use start_protocols::*; use async_tls::TlsAcceptor; use futures_util::StreamExt; @@ -104,18 +105,12 @@ struct NetworkInner { network_already_cleared: bool, /// the punishment closure to enax public_dial_info_check_punishment: Option>, - /// udp socket record for bound-first sockets, which are used to guarantee a port is available before - /// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using - bound_first_udp: BTreeMap, Option)>, /// mapping of protocol handlers to accept messages from a set of bound socket addresses - inbound_udp_protocol_handlers: BTreeMap, + udp_protocol_handlers: BTreeMap, /// outbound udp protocol handler for udpv4 - outbound_udpv4_protocol_handler: Option, + default_udpv4_protocol_handler: Option, /// outbound udp protocol handler for udpv6 - outbound_udpv6_protocol_handler: Option, - /// tcp socket record for bound-first sockets, which are used to guarantee a port is available before - /// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using - bound_first_tcp: BTreeMap, Option)>, + default_udpv6_protocol_handler: Option, /// TLS handling socket controller tls_acceptor: Option, /// Multiplexer record for protocols on low level TCP sockets @@ -164,11 +159,9 @@ impl Network { enable_ipv4: false, enable_ipv6_global: false, enable_ipv6_local: false, - bound_first_udp: BTreeMap::new(), - inbound_udp_protocol_handlers: BTreeMap::new(), - outbound_udpv4_protocol_handler: None, - outbound_udpv6_protocol_handler: None, - bound_first_tcp: BTreeMap::new(), + udp_protocol_handlers: BTreeMap::new(), + default_udpv4_protocol_handler: None, + default_udpv6_protocol_handler: None, tls_acceptor: None, listener_states: BTreeMap::new(), } @@ -332,17 +325,6 @@ impl Network { } } - pub fn get_local_port(&self, protocol_type: ProtocolType) -> Option { - let inner = self.inner.lock(); - let local_port = match protocol_type { - ProtocolType::UDP => inner.udp_port, - ProtocolType::TCP => inner.tcp_port, - ProtocolType::WS => inner.ws_port, - ProtocolType::WSS => inner.wss_port, - }; - Some(local_port) - } - pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option { let inner = self.inner.lock(); @@ -846,7 +828,7 @@ impl Network { // start listeners if protocol_config.inbound.contains(ProtocolType::UDP) { - self.start_udp_listeners(&mut editor_public_internet, &mut editor_local_network) + self.bind_udp_protocol_handlers(&mut editor_public_internet, &mut editor_local_network) .await?; } if protocol_config.inbound.contains(ProtocolType::WS) { @@ -862,11 +844,6 @@ impl Network { .await?; } - // release caches of available listener ports - // this releases the 'first bound' ports we use to guarantee - // that we have ports available to us - self.free_bound_first_ports(); - editor_public_internet.setup_network( protocol_config.outbound, protocol_config.inbound, diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index ee2477ab..a94cac18 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -237,7 +237,7 @@ impl Network { } } - async fn spawn_socket_listener(&self, addr: SocketAddr) -> EyreResult<()> { + async fn spawn_socket_listener(&self, addr: SocketAddr) -> EyreResult { // Get config let (connection_initial_timeout_ms, tls_connection_initial_timeout_ms) = { let c = self.config.get(); @@ -248,12 +248,15 @@ impl Network { }; // Create a reusable socket with no linger time, and no delay - let socket = new_bound_shared_tcp_socket(addr) - .wrap_err("failed to create bound shared tcp socket")?; + let Some(socket) = new_bound_shared_tcp_socket(addr) + .wrap_err("failed to create bound shared tcp socket")? + else { + return Ok(false); + }; // Listen on the socket - socket - .listen(128) - .wrap_err("Couldn't listen on TCP socket")?; + if socket.listen(128).is_err() { + return Ok(false); + } // Make an async tcplistener from the socket2 socket let std_listener: std::net::TcpListener = socket.into(); @@ -324,7 +327,7 @@ impl Network { // Add to join handles self.add_to_join_handles(jh); - Ok(()) + Ok(true) } ///////////////////////////////////////////////////////////////// @@ -332,51 +335,76 @@ impl Network { // TCP listener that multiplexes ports so multiple protocols can exist on a single port pub(super) async fn start_tcp_listener( &self, - ip_addrs: Vec, - port: u16, + bind_set: NetworkBindSet, is_tls: bool, new_protocol_accept_handler: Box, ) -> EyreResult> { let mut out = Vec::::new(); - for ip_addr in ip_addrs { - let addr = SocketAddr::new(ip_addr, port); - let idi_addrs = self.translate_unspecified_address(&addr); + for ip_addr in bind_set.addrs { + let mut port = bind_set.port; + loop { + let addr = SocketAddr::new(ip_addr, port); - // see if we've already bound to this already - // if not, spawn a listener - if !self.inner.lock().listener_states.contains_key(&addr) { - self.clone().spawn_socket_listener(addr).await?; - } - - let ls = if let Some(ls) = self.inner.lock().listener_states.get_mut(&addr) { - ls.clone() - } else { - panic!("this shouldn't happen"); - }; - - if is_tls { - if ls.read().tls_acceptor.is_none() { - ls.write().tls_acceptor = Some(self.clone().get_or_create_tls_acceptor()?); + // see if we've already bound to this already + // if not, spawn a listener + let mut got_listener = false; + if !self.inner.lock().listener_states.contains_key(&addr) { + if self.clone().spawn_socket_listener(addr).await? { + got_listener = true; + } + } else { + got_listener = true; } - ls.write() - .tls_protocol_handlers - .push(new_protocol_accept_handler( - self.network_manager().config(), - true, - )); - } else { - ls.write() - .protocol_accept_handlers - .push(new_protocol_accept_handler( - self.network_manager().config(), - false, - )); - } - // Return interface dial infos we listen on - for idi_addr in idi_addrs { - out.push(SocketAddress::from_socket_addr(idi_addr)); + if got_listener { + let ls = if let Some(ls) = self.inner.lock().listener_states.get_mut(&addr) { + ls.clone() + } else { + panic!("this shouldn't happen"); + }; + + if is_tls { + if ls.read().tls_acceptor.is_none() { + ls.write().tls_acceptor = + Some(self.clone().get_or_create_tls_acceptor()?); + } + ls.write() + .tls_protocol_handlers + .push(new_protocol_accept_handler( + self.network_manager().config(), + true, + )); + } else { + ls.write() + .protocol_accept_handlers + .push(new_protocol_accept_handler( + self.network_manager().config(), + false, + )); + } + + // Return interface dial infos we listen on + let idi_addrs = self.translate_unspecified_address(&addr); + for idi_addr in idi_addrs { + out.push(SocketAddress::from_socket_addr(idi_addr)); + } + break; + } + + if !bind_set.search { + bail!("unable to bind to tcp {}", addr); + } + + if port == 65535u16 { + port = 1024; + } else { + port += 1; + } + + if port == bind_set.port { + bail!("unable to find a free port for tcp {}", ip_addr); + } } } diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 32e251b9..416b501a 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -27,19 +27,13 @@ impl Network { log_net!("UDP listener task spawned"); // Collect all our protocol handlers into a vector - let mut protocol_handlers: Vec = this + let protocol_handlers: Vec = this .inner .lock() - .inbound_udp_protocol_handlers + .udp_protocol_handlers .values() .cloned() .collect(); - if let Some(ph) = this.inner.lock().outbound_udpv4_protocol_handler.clone() { - protocol_handlers.push(ph); - } - if let Some(ph) = this.inner.lock().outbound_udpv6_protocol_handler.clone() { - protocol_handlers.push(ph); - } // Spawn a local async task for each socket let mut protocol_handlers_unordered = FuturesUnordered::new(); @@ -114,77 +108,13 @@ impl Network { Ok(()) } - pub(super) async fn create_udp_outbound_sockets(&self) -> EyreResult<()> { - let mut inner = self.inner.lock(); - let mut port = inner.udp_port; - // v4 - let socket_addr_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); - if let Ok(socket) = new_bound_shared_udp_socket(socket_addr_v4) { - // Pull the port if we randomly bound, so v6 can be on the same port - port = socket - .local_addr() - .wrap_err("failed to get local address")? - .as_socket_ipv4() - .ok_or_else(|| eyre!("expected ipv4 address type"))? - .port(); - - // Make an async UdpSocket from the socket2 socket - let std_udp_socket: std::net::UdpSocket = socket.into(); - cfg_if! { - if #[cfg(feature="rt-async-std")] { - let udp_socket = UdpSocket::from(std_udp_socket); - } else if #[cfg(feature="rt-tokio")] { - std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking"); - let udp_socket = UdpSocket::from_std(std_udp_socket).wrap_err("failed to make outbound v4 tokio udpsocket")?; - } else { - compile_error!("needs executor implementation") - } - } - let socket_arc = Arc::new(udp_socket); - - // Create protocol handler - let udpv4_handler = RawUdpProtocolHandler::new( - socket_arc, - Some(self.network_manager().address_filter()), - ); - - inner.outbound_udpv4_protocol_handler = Some(udpv4_handler); - } - //v6 - let socket_addr_v6 = - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), port); - if let Ok(socket) = new_bound_shared_udp_socket(socket_addr_v6) { - // Make an async UdpSocket from the socket2 socket - let std_udp_socket: std::net::UdpSocket = socket.into(); - cfg_if! { - if #[cfg(feature="rt-async-std")] { - let udp_socket = UdpSocket::from(std_udp_socket); - } else if #[cfg(feature="rt-tokio")] { - std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking"); - let udp_socket = UdpSocket::from_std(std_udp_socket).wrap_err("failed to make outbound v6 tokio udpsocket")?; - } else { - compile_error!("needs executor implementation") - } - } - let socket_arc = Arc::new(udp_socket); - - // Create protocol handler - let udpv6_handler = RawUdpProtocolHandler::new( - socket_arc, - Some(self.network_manager().address_filter()), - ); - - inner.outbound_udpv6_protocol_handler = Some(udpv6_handler); - } - - Ok(()) - } - - async fn create_udp_inbound_socket(&self, addr: SocketAddr) -> EyreResult<()> { - log_net!("create_udp_inbound_socket on {:?}", &addr); + async fn create_udp_protocol_handler(&self, addr: SocketAddr) -> EyreResult { + log_net!("create_udp_protocol_handler on {:?}", &addr); // Create a reusable socket - let socket = new_bound_shared_udp_socket(addr)?; + let Some(socket) = new_bound_default_udp_socket(addr)? else { + return Ok(false); + }; // Make an async UdpSocket from the socket2 socket let std_udp_socket: std::net::UdpSocket = socket.into(); @@ -204,40 +134,58 @@ impl Network { let protocol_handler = RawUdpProtocolHandler::new(socket_arc, Some(self.network_manager().address_filter())); - // Create message_handler records - self.inner - .lock() - .inbound_udp_protocol_handlers - .insert(addr, protocol_handler); + // Record protocol handler + let mut inner = self.inner.lock(); + inner + .udp_protocol_handlers + .insert(addr, protocol_handler.clone()); + if addr.is_ipv4() && inner.default_udpv4_protocol_handler.is_none() { + inner.default_udpv4_protocol_handler = Some(protocol_handler); + } else if addr.is_ipv6() && inner.default_udpv6_protocol_handler.is_none() { + inner.default_udpv6_protocol_handler = Some(protocol_handler); + } - Ok(()) + Ok(true) } - pub(super) async fn create_udp_inbound_sockets( + pub(super) async fn create_udp_protocol_handlers( &self, - ip_addrs: Vec, - port: u16, + bind_set: NetworkBindSet, ) -> EyreResult> { let mut out = Vec::::new(); - for ip_addr in ip_addrs { - let addr = SocketAddr::new(ip_addr, port); + for ip_addr in bind_set.addrs { + let mut port = bind_set.port; + loop { + let addr = SocketAddr::new(ip_addr, port); - // see if we've already bound to this already - // if not, spawn a listener - if !self - .inner - .lock() - .inbound_udp_protocol_handlers - .contains_key(&addr) - { - let idi_addrs = self.translate_unspecified_address(&addr); + // see if we've already bound to this already + // if not, spawn a listener + if !self.inner.lock().udp_protocol_handlers.contains_key(&addr) { + let bound = self.clone().create_udp_protocol_handler(addr).await?; - self.clone().create_udp_inbound_socket(addr).await?; + // Return interface dial infos we listen on + if bound { + let idi_addrs = self.translate_unspecified_address(&addr); + for idi_addr in idi_addrs { + out.push(DialInfo::udp_from_socketaddr(idi_addr)); + } + break; + } + } - // Return interface dial infos we listen on - for idi_addr in idi_addrs { - out.push(DialInfo::udp_from_socketaddr(idi_addr)); + if !bind_set.search { + bail!("unable to bind to udp {}", addr); + } + + if port == 65535u16 { + port = 1024; + } else { + port += 1; + } + + if port == bind_set.port { + bail!("unable to find a free port for udp {}", ip_addr); } } } @@ -251,18 +199,31 @@ impl Network { peer_socket_addr: &SocketAddr, local_socket_addr: &Option, ) -> Option { + let inner = self.inner.lock(); // if our last communication with this peer came from a particular inbound udp protocol handler, use it if let Some(sa) = local_socket_addr { - if let Some(ph) = self.inner.lock().inbound_udp_protocol_handlers.get(sa) { + if let Some(ph) = inner.udp_protocol_handlers.get(sa) { return Some(ph.clone()); } } - // otherwise find the outbound udp protocol handler that matches the ip protocol version of the peer addr + // otherwise find the first outbound udp protocol handler that matches the ip protocol version of the peer addr let inner = self.inner.lock(); match peer_socket_addr { - SocketAddr::V4(_) => inner.outbound_udpv4_protocol_handler.clone(), - SocketAddr::V6(_) => inner.outbound_udpv6_protocol_handler.clone(), + SocketAddr::V4(_) => inner.udp_protocol_handlers.iter().find_map(|x| { + if x.0.is_ipv4() { + Some(x.1.clone()) + } else { + None + } + }), + SocketAddr::V6(_) => inner.udp_protocol_handlers.iter().find_map(|x| { + if x.0.is_ipv6() { + Some(x.1.clone()) + } else { + None + } + }), } } } diff --git a/veilid-core/src/network_manager/native/protocol/sockets.rs b/veilid-core/src/network_manager/native/protocol/sockets.rs index e345a8ce..5f758c40 100644 --- a/veilid-core/src/network_manager/native/protocol/sockets.rs +++ b/veilid-core/src/network_manager/native/protocol/sockets.rs @@ -36,7 +36,7 @@ cfg_if! { } #[instrument(level = "trace", ret)] -pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result { +pub fn new_shared_udp_socket(domain: Domain) -> io::Result { let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; if domain == Domain::IPV6 { socket.set_only_v6(true)?; @@ -52,56 +52,28 @@ pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result { } #[instrument(level = "trace", ret)] -pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result { - let domain = Domain::for_address(local_address); - let socket = new_unbound_shared_udp_socket(domain)?; - let socket2_addr = SockAddr::from(local_address); - socket.bind(&socket2_addr)?; - - log_net!("created bound shared udp socket on {:?}", &local_address); +pub fn new_default_udp_socket(domain: Domain) -> io::Result { + let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; + if domain == Domain::IPV6 { + socket.set_only_v6(true)?; + } Ok(socket) } #[instrument(level = "trace", ret)] -pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> io::Result { +pub fn new_bound_default_udp_socket(local_address: SocketAddr) -> io::Result> { let domain = Domain::for_address(local_address); - let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; - if domain == Domain::IPV6 { - socket.set_only_v6(true)?; - } - // Bind the socket -first- before turning on 'reuse address' this way it will - // fail if the port is already taken + let socket = new_default_udp_socket(domain)?; let socket2_addr = SockAddr::from(local_address); - // On windows, do SO_EXCLUSIVEADDRUSE before the bind to ensure the port is fully available - cfg_if! { - if #[cfg(windows)] { - set_exclusiveaddruse(&socket)?; - } + if socket.bind(&socket2_addr).is_err() { + return Ok(None); } - // Bind the socket -first- without turning on SO_REUSEPORT this way it will - // fail if the port is already taken - cfg_if! { - if #[cfg(unix)] { - socket - .set_reuse_address(true)?; - } - } + log_net!("created bound default udp socket on {:?}", &local_address); - socket.bind(&socket2_addr)?; - - // Set 'reuse address' so future binds to this port will succeed - // This does not work on Windows, where reuse options can not be set after the bind - cfg_if! { - if #[cfg(unix)] { - socket.set_reuse_port(true)?; - } - } - log_net!("created bound first udp socket on {:?}", &local_address); - - Ok(socket) + Ok(Some(socket)) } #[instrument(level = "trace", ret)] @@ -139,62 +111,17 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result { } #[instrument(level = "trace", ret)] -pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result { +pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result> { let domain = Domain::for_address(local_address); let socket = new_unbound_shared_tcp_socket(domain)?; let socket2_addr = SockAddr::from(local_address); - socket.bind(&socket2_addr)?; + if socket.bind(&socket2_addr).is_err() { + return Ok(None); + } log_net!("created bound shared tcp socket on {:?}", &local_address); - Ok(socket) -} - -#[instrument(level = "trace", ret)] -pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result { - let domain = Domain::for_address(local_address); - - let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; - // if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) { - // log_net!(error "Couldn't set TCP linger: {}", e); - // } - if let Err(e) = socket.set_nodelay(true) { - log_net!(error "Couldn't set TCP nodelay: {}", e); - } - if domain == Domain::IPV6 { - socket.set_only_v6(true)?; - } - - // On windows, do SO_EXCLUSIVEADDRUSE before the bind to ensure the port is fully available - cfg_if! { - if #[cfg(windows)] { - set_exclusiveaddruse(&socket)?; - } - } - - // Bind the socket -first- without turning on SO_REUSEPORT this way it will - // fail if the port is already taken - let socket2_addr = SockAddr::from(local_address); - - cfg_if! { - if #[cfg(unix)] { - socket - .set_reuse_address(true)?; - } - } - - socket.bind(&socket2_addr)?; - - // Set 'reuse address' so future binds to this port will succeed - // This does not work on Windows, where reuse options can not be set after the bind - cfg_if! { - if #[cfg(unix)] { - socket.set_reuse_port(true)?; - } - } - log_net!("created bound first tcp socket on {:?}", &local_address); - - Ok(socket) + Ok(Some(socket)) } // Non-blocking connect is tricky when you want to start with a prepared socket diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index de7f28f0..7e52b87d 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -169,7 +169,9 @@ impl RawTcpProtocolHandler { ) -> io::Result> { // Make a shared socket let socket = match local_address { - Some(a) => new_bound_shared_tcp_socket(a)?, + Some(a) => { + new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))? + } None => new_unbound_tcp_socket(socket2::Domain::for_address(socket_addr))?, }; diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index ad54aa49..55b0dd1c 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -128,6 +128,8 @@ impl RawUdpProtocolHandler { SocketAddress::from_socket_addr(local_socket_addr), ); + eprintln!("udp::send_message: {:?}", flow); + #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str()); Ok(NetworkResult::value(flow)) diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 4a35b35b..82ad9b0a 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -319,7 +319,9 @@ impl WebsocketProtocolHandler { // Make a shared socket let socket = match local_address { - Some(a) => new_bound_shared_tcp_socket(a)?, + Some(a) => { + new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))? + } None => new_unbound_tcp_socket(socket2::Domain::for_address(remote_socket_addr))?, }; diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index d73d4928..bd73cad2 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -1,4 +1,3 @@ -use super::sockets::*; use super::*; use lazy_static::*; @@ -74,152 +73,29 @@ lazy_static! { ]); } +pub(super) struct NetworkBindSet { + pub port: u16, + pub addrs: Vec, + pub search: bool, +} + impl Network { ///////////////////////////////////////////////////// - // Support for binding first on ports to ensure nobody binds ahead of us - // or two copies of the app don't accidentally collide. This is tricky - // because we use 'reuseaddr/port' and we can accidentally bind in front of ourselves :P - fn bind_first_udp_port(&self, udp_port: u16) -> bool { - let mut inner = self.inner.lock(); - if inner.bound_first_udp.contains_key(&udp_port) { - return true; - } - - // Check for ipv6 - let has_v6 = is_ipv6_supported(); - - // If the address is specified, only use the specified port and fail otherwise - let mut bound_first_socket_v4 = None; - let mut bound_first_socket_v6 = None; - if let Ok(bfs4) = - new_bound_first_udp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port)) - { - if has_v6 { - if let Ok(bfs6) = new_bound_first_udp_socket(SocketAddr::new( - IpAddr::V6(Ipv6Addr::UNSPECIFIED), - udp_port, - )) { - bound_first_socket_v4 = Some(bfs4); - bound_first_socket_v6 = Some(bfs6); - } - } else { - bound_first_socket_v4 = Some(bfs4); - } - } - - if bound_first_socket_v4.is_none() && (has_v6 && bound_first_socket_v6.is_none()) { - return false; - } - - cfg_if! { - if #[cfg(windows)] { - // On windows, drop the socket. This is a race condition, but there's - // no way around it. This isn't for security anyway, it's to prevent multiple copies of the - // app from binding on the same port. - inner.bound_first_udp.insert(udp_port, (None, None)); - } else { - inner.bound_first_udp.insert(udp_port, (bound_first_socket_v4, bound_first_socket_v6)); - } - } - true - } - - fn bind_first_tcp_port(&self, tcp_port: u16) -> bool { - let mut inner = self.inner.lock(); - if inner.bound_first_tcp.contains_key(&tcp_port) { - return true; - } - - // Check for ipv6 - let has_v6 = is_ipv6_supported(); - - // If the address is specified, only use the specified port and fail otherwise - let mut bound_first_socket_v4 = None; - let mut bound_first_socket_v6 = None; - if let Ok(bfs4) = - new_bound_first_tcp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port)) - { - if has_v6 { - if let Ok(bfs6) = new_bound_first_tcp_socket(SocketAddr::new( - IpAddr::V6(Ipv6Addr::UNSPECIFIED), - tcp_port, - )) { - bound_first_socket_v4 = Some(bfs4); - bound_first_socket_v6 = Some(bfs6); - } - } else { - bound_first_socket_v4 = Some(bfs4); - } - } - - if bound_first_socket_v4.is_none() && (has_v6 && bound_first_socket_v6.is_none()) { - return false; - } - - cfg_if! { - if #[cfg(windows)] { - // On windows, drop the socket. This is a race condition, but there's - // no way around it. This isn't for security anyway, it's to prevent multiple copies of the - // app from binding on the same port. - inner.bound_first_tcp.insert(tcp_port, (None, None)); - } else { - inner.bound_first_tcp.insert(tcp_port, (bound_first_socket_v4, bound_first_socket_v6)); - } - } - true - } - - pub(super) fn free_bound_first_ports(&self) { - let mut inner = self.inner.lock(); - inner.bound_first_udp.clear(); - inner.bound_first_tcp.clear(); - } - - ///////////////////////////////////////////////////// - - fn find_available_udp_port(&self, start_port: u16) -> EyreResult { - // If the address is empty, iterate ports until we find one we can use. - let mut udp_port = start_port; - loop { - if BAD_PORTS.contains(&udp_port) { - continue; - } - if self.bind_first_udp_port(udp_port) { - break; - } - if udp_port == 65535 { - bail!("Could not find free udp port to listen on"); - } - udp_port += 1; - } - Ok(udp_port) - } - - fn find_available_tcp_port(&self, start_port: u16) -> EyreResult { - // If the address is empty, iterate ports until we find one we can use. - let mut tcp_port = start_port; - loop { - if BAD_PORTS.contains(&tcp_port) { - continue; - } - if self.bind_first_tcp_port(tcp_port) { - break; - } - if tcp_port == 65535 { - bail!("Could not find free tcp port to listen on"); - } - tcp_port += 1; - } - Ok(tcp_port) - } - - async fn allocate_udp_port(&self, listen_address: String) -> EyreResult<(u16, Vec)> { + // Returns a port, a set of ip addresses to bind to, and a + // bool specifying if multiple ports should be tried + async fn convert_listen_address_to_bind_set( + &self, + listen_address: String, + ) -> EyreResult { if listen_address.is_empty() { - // If listen address is empty, find us a port iteratively - let port = self.find_available_udp_port(5150)?; + // If listen address is empty, start with port 5150 and iterate let ip_addrs = available_unspecified_addresses(); - Ok((port, ip_addrs)) + Ok(NetworkBindSet { + port: 5150, + addrs: ip_addrs, + search: true, + }) } else { // If no address is specified, but the port is, use ipv4 and ipv6 unspecified // If the address is specified, only use the specified port and fail otherwise @@ -229,58 +105,30 @@ impl Network { bail!("No valid listen address: {}", listen_address); } let port = sockaddrs[0].port(); - - Ok((port, sockaddrs.iter().map(|s| s.ip()).collect())) - } - } - - async fn allocate_tcp_port(&self, listen_address: String) -> EyreResult<(u16, Vec)> { - if listen_address.is_empty() { - // If listen address is empty, find us a port iteratively - let port = self.find_available_tcp_port(5150)?; - let ip_addrs = available_unspecified_addresses(); - Ok((port, ip_addrs)) - } else { - // If no address is specified, but the port is, use ipv4 and ipv6 unspecified - // If the address is specified, only use the specified port and fail otherwise - let sockaddrs = - listen_address_to_socket_addrs(&listen_address).map_err(|e| eyre!("{}", e))?; - if sockaddrs.is_empty() { - bail!("No valid listen address: {}", listen_address); + if port == 0 { + Ok(NetworkBindSet { + port: 5150, + addrs: sockaddrs.iter().map(|s| s.ip()).collect(), + search: true, + }) + } else { + Ok(NetworkBindSet { + port, + addrs: sockaddrs.iter().map(|s| s.ip()).collect(), + search: false, + }) } - let port = sockaddrs[0].port(); - - let mut attempts = 10; - let mut success = false; - while attempts >= 0 { - if self.bind_first_tcp_port(port) { - success = true; - break; - } - attempts -= 1; - - // Wait 5 seconds before trying again - log_net!(debug - "Binding TCP port at {} failed, waiting. Attempts remaining = {}", - port, attempts - ); - sleep(5000).await - } - if !success { - bail!("Could not find free tcp port to listen on"); - } - Ok((port, sockaddrs.iter().map(|s| s.ip()).collect())) } } ///////////////////////////////////////////////////// - pub(super) async fn start_udp_listeners( + pub(super) async fn bind_udp_protocol_handlers( &self, editor_public_internet: &mut RoutingDomainEditor, editor_local_network: &mut RoutingDomainEditor, ) -> EyreResult<()> { - log_net!("starting udp listeners"); + log_net!("UDP: binding protocol handlers"); let routing_table = self.routing_table(); let (listen_address, public_address, detect_address_changes) = { let c = self.config.get(); @@ -291,28 +139,30 @@ impl Network { ) }; - // Pick out UDP port we're going to use everywhere - // Keep sockets around until the end of this function - // to keep anyone else from binding in front of us - let (udp_port, ip_addrs) = self.allocate_udp_port(listen_address.clone()).await?; - - // Save the bound udp port for use later on - self.inner.lock().udp_port = udp_port; - - // First, create outbound sockets - // (unlike tcp where we create sockets for every connection) - // and we'll add protocol handlers for them too - self.create_udp_outbound_sockets().await?; + // Get the binding parameters from the user-specified listen address + let bind_set = self + .convert_listen_address_to_bind_set(listen_address.clone()) + .await?; // Now create udp inbound sockets for whatever interfaces we're listening on - info!( - "UDP: starting listeners on port {} at {:?}", - udp_port, ip_addrs - ); - let local_dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?; + if bind_set.search { + info!( + "UDP: searching for free port starting with {} on {:?}", + bind_set.port, bind_set.addrs + ); + } else { + info!( + "UDP: binding protocol handlers at port {} on {:?}", + bind_set.port, bind_set.addrs + ); + } + let local_dial_info_list = self.create_udp_protocol_handlers(bind_set).await?; let mut static_public = false; - log_net!("UDP: listener started on {:#?}", local_dial_info_list); + log_net!( + "UDP: protocol handlers bound to {:#?}", + local_dial_info_list + ); // Register local dial info for di in &local_dial_info_list { @@ -374,6 +224,8 @@ impl Network { .insert(ProtocolType::UDP); } + // xxx compile all dialinfo from editor into map of protocoltype+addresstype -> port for 'best port selection' code + // Now create tasks for udp listeners self.create_udp_listener_tasks().await } @@ -383,7 +235,7 @@ impl Network { editor_public_internet: &mut RoutingDomainEditor, editor_local_network: &mut RoutingDomainEditor, ) -> EyreResult<()> { - log_net!("starting ws listeners"); + log_net!("WS: binding protocol handlers"); let routing_table = self.routing_table(); let (listen_address, url, path, detect_address_changes) = { let c = self.config.get(); @@ -395,27 +247,30 @@ impl Network { ) }; - // Pick out TCP port we're going to use everywhere - // Keep sockets around until the end of this function - // to keep anyone else from binding in front of us - let (ws_port, ip_addrs) = self.allocate_tcp_port(listen_address.clone()).await?; + // Get the binding parameters from the user-specified listen address + let bind_set = self + .convert_listen_address_to_bind_set(listen_address.clone()) + .await?; - // Save the bound ws port for use later on - self.inner.lock().ws_port = ws_port; - - info!( - "WS: starting listener on port {} at {:?}", - ws_port, ip_addrs - ); + if bind_set.search { + info!( + "WS: searching for free port starting with {} on {:?}", + bind_set.port, bind_set.addrs + ); + } else { + info!( + "WS: binding protocol handlers at port {} on {:?}", + bind_set.port, bind_set.addrs + ); + } let socket_addresses = self .start_tcp_listener( - ip_addrs, - ws_port, + bind_set, false, Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))), ) .await?; - log_net!("WS: listener started on {:#?}", socket_addresses); + log_net!("WS: protocol handlers started on {:#?}", socket_addresses); let mut static_public = false; let mut registered_addresses: HashSet = HashSet::new(); @@ -493,7 +348,7 @@ impl Network { editor_public_internet: &mut RoutingDomainEditor, editor_local_network: &mut RoutingDomainEditor, ) -> EyreResult<()> { - log_net!("starting wss listeners"); + log_net!("WSS: binding protocol handlers"); let (listen_address, url, detect_address_changes) = { let c = self.config.get(); @@ -504,27 +359,31 @@ impl Network { ) }; - // Pick out TCP port we're going to use everywhere - // Keep sockets around until the end of this function - // to keep anyone else from binding in front of us - let (wss_port, ip_addrs) = self.allocate_tcp_port(listen_address.clone()).await?; + // Get the binding parameters from the user-specified listen address + let bind_set = self + .convert_listen_address_to_bind_set(listen_address.clone()) + .await?; - // Save the bound wss port for use later on - self.inner.lock().wss_port = wss_port; + if bind_set.search { + info!( + "WSS: searching for free port starting with {} on {:?}", + bind_set.port, bind_set.addrs + ); + } else { + info!( + "WSS: binding protocol handlers at port {} on {:?}", + bind_set.port, bind_set.addrs + ); + } - info!( - "WSS: starting listener on port {} at {:?}", - wss_port, ip_addrs - ); let socket_addresses = self .start_tcp_listener( - ip_addrs, - wss_port, + bind_set, true, Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))), ) .await?; - log_net!("WSS: listener started on {:#?}", socket_addresses); + log_net!("WSS: protocol handlers started on {:#?}", socket_addresses); // NOTE: No interface dial info for WSS, as there is no way to connect to a local dialinfo via TLS // If the hostname is specified, it is the public dialinfo via the URL. If no hostname @@ -586,7 +445,7 @@ impl Network { editor_public_internet: &mut RoutingDomainEditor, editor_local_network: &mut RoutingDomainEditor, ) -> EyreResult<()> { - log_net!("starting tcp listeners"); + log_net!("TCP: binding protocol handlers"); let routing_table = self.routing_table(); let (listen_address, public_address, detect_address_changes) = { @@ -598,27 +457,30 @@ impl Network { ) }; - // Pick out TCP port we're going to use everywhere - // Keep sockets around until the end of this function - // to keep anyone else from binding in front of us - let (tcp_port, ip_addrs) = self.allocate_tcp_port(listen_address.clone()).await?; + // Get the binding parameters from the user-specified listen address + let bind_set = self + .convert_listen_address_to_bind_set(listen_address.clone()) + .await?; - // Save the bound tcp port for use later on - self.inner.lock().tcp_port = tcp_port; - - info!( - "TCP: starting listener on port {} at {:?}", - tcp_port, ip_addrs - ); + if bind_set.search { + info!( + "TCP: searching for free port starting with {} on {:?}", + bind_set.port, bind_set.addrs + ); + } else { + info!( + "TCP: binding protocol handlers at port {} on {:?}", + bind_set.port, bind_set.addrs + ); + } let socket_addresses = self .start_tcp_listener( - ip_addrs, - tcp_port, + bind_set, false, Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))), ) .await?; - log_net!("TCP: listener started on {:#?}", socket_addresses); + log_net!("TCP: protocol handlers started on {:#?}", socket_addresses); let mut static_public = false; let mut registered_addresses: HashSet = HashSet::new(); diff --git a/veilid-flutter/example/pubspec.lock b/veilid-flutter/example/pubspec.lock index edbddda9..097383ef 100644 --- a/veilid-flutter/example/pubspec.lock +++ b/veilid-flutter/example/pubspec.lock @@ -458,7 +458,7 @@ packages: path: ".." relative: true source: path - version: "0.3.0" + version: "0.3.1" vm_service: dependency: transitive description: diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 02c050e1..22a3165b 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -274,12 +274,17 @@ pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result() { ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect() } else { + let listen_address_with_port = if listen_address.contains(':') { + listen_address.to_string() + } else { + format!("{}:0", listen_address) + }; cfg_if! { if #[cfg(target_arch = "wasm32")] { use core::str::FromStr; - vec![SocketAddr::from_str(listen_address).map_err(|e| format!("Unable to parse address: {}",e))?] + vec![SocketAddr::from_str(listen_address_with_port).map_err(|e| format!("Unable to parse address: {}",e))?] } else { - listen_address + listen_address_with_port .to_socket_addrs() .map_err(|e| format!("Unable to resolve address: {}", e))? .collect()