Merge branch 'refactor-net-bind' into 'main'

Refactor low level network

See merge request veilid/veilid!274
This commit is contained in:
Christien Rioux 2024-04-26 21:09:08 +00:00
commit 4bd2ee51b7
13 changed files with 560 additions and 809 deletions

View File

@ -19,6 +19,7 @@ pub enum DetectedDialInfo {
pub struct DetectionResult { pub struct DetectionResult {
pub ddi: DetectedDialInfo, pub ddi: DetectedDialInfo,
pub external_address_types: AddressTypeSet, pub external_address_types: AddressTypeSet,
pub local_port: u16,
} }
// Result of checking external address // Result of checking external address
@ -46,6 +47,7 @@ struct DiscoveryContextUnlockedInner {
existing_external_address: Option<SocketAddress>, existing_external_address: Option<SocketAddress>,
protocol_type: ProtocolType, protocol_type: ProtocolType,
address_type: AddressType, address_type: AddressType,
port: u16,
} }
#[derive(Clone)] #[derive(Clone)]
@ -62,6 +64,7 @@ impl DiscoveryContext {
net: Network, net: Network,
protocol_type: ProtocolType, protocol_type: ProtocolType,
address_type: AddressType, address_type: AddressType,
port: u16,
clear_network_callback: ClearNetworkCallback, clear_network_callback: ClearNetworkCallback,
) -> Self { ) -> Self {
let intf_addrs = let intf_addrs =
@ -95,6 +98,7 @@ impl DiscoveryContext {
existing_external_address, existing_external_address,
protocol_type, protocol_type,
address_type, address_type,
port,
}), }),
inner: Arc::new(Mutex::new(DiscoveryContextInner { inner: Arc::new(Mutex::new(DiscoveryContextInner {
external_1: None, external_1: None,
@ -327,11 +331,7 @@ impl DiscoveryContext {
let protocol_type = self.unlocked_inner.protocol_type; let protocol_type = self.unlocked_inner.protocol_type;
let low_level_protocol_type = protocol_type.low_level_protocol_type(); let low_level_protocol_type = protocol_type.low_level_protocol_type();
let address_type = self.unlocked_inner.address_type; let address_type = self.unlocked_inner.address_type;
let local_port = self let local_port = self.unlocked_inner.port;
.unlocked_inner
.net
.get_local_port(protocol_type)
.unwrap();
let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone(); let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone();
let igd_manager = self.unlocked_inner.net.unlocked_inner.igd_manager.clone(); let igd_manager = self.unlocked_inner.net.unlocked_inner.igd_manager.clone();
@ -372,7 +372,7 @@ impl DiscoveryContext {
return Some(external_mapped_dial_info); return Some(external_mapped_dial_info);
} }
if validate_tries == PORT_MAP_VALIDATE_TRY_COUNT { if validate_tries != PORT_MAP_VALIDATE_TRY_COUNT {
log_net!(debug "UPNP port mapping succeeded but port {}/{} is still unreachable.\nretrying\n", log_net!(debug "UPNP port mapping succeeded but port {}/{} is still unreachable.\nretrying\n",
local_port, match low_level_protocol_type { local_port, match low_level_protocol_type {
LowLevelProtocolType::UDP => "udp", LowLevelProtocolType::UDP => "udp",
@ -431,6 +431,7 @@ impl DiscoveryContext {
class: DialInfoClass::Direct, class: DialInfoClass::Direct,
}), }),
external_address_types: AddressTypeSet::only(external_1.address.address_type()), external_address_types: AddressTypeSet::only(external_1.address.address_type()),
local_port: this.unlocked_inner.port,
}) })
} else { } else {
// Add public dial info with Blocked dialinfo class // Add public dial info with Blocked dialinfo class
@ -440,6 +441,7 @@ impl DiscoveryContext {
class: DialInfoClass::Blocked, class: DialInfoClass::Blocked,
}), }),
external_address_types: AddressTypeSet::only(external_1.address.address_type()), external_address_types: AddressTypeSet::only(external_1.address.address_type()),
local_port: this.unlocked_inner.port,
}) })
} }
}); });
@ -463,6 +465,7 @@ impl DiscoveryContext {
// If we have two different external addresses, then this is a symmetric NAT // If we have two different external addresses, then this is a symmetric NAT
if external_2.address.address() != external_1.address.address() { if external_2.address.address() != external_1.address.address() {
let this = self.clone();
let do_symmetric_nat_fut: SendPinBoxFuture<Option<DetectionResult>> = let do_symmetric_nat_fut: SendPinBoxFuture<Option<DetectionResult>> =
Box::pin(async move { Box::pin(async move {
Some(DetectionResult { Some(DetectionResult {
@ -472,6 +475,7 @@ impl DiscoveryContext {
) | AddressTypeSet::only( ) | AddressTypeSet::only(
external_2.address.address_type(), external_2.address.address_type(),
), ),
local_port: this.unlocked_inner.port,
}) })
}); });
unord.push(do_symmetric_nat_fut); unord.push(do_symmetric_nat_fut);
@ -481,45 +485,41 @@ impl DiscoveryContext {
// Manual Mapping Detection // Manual Mapping Detection
/////////// ///////////
let this = self.clone(); let this = self.clone();
if let Some(local_port) = self let local_port = self.unlocked_inner.port;
.unlocked_inner if external_1.dial_info.port() != local_port {
.net let c_external_1 = external_1.clone();
.get_local_port(self.unlocked_inner.protocol_type) let c_this = this.clone();
{ let do_manual_map_fut: SendPinBoxFuture<Option<DetectionResult>> =
if external_1.dial_info.port() != local_port { Box::pin(async move {
let c_external_1 = external_1.clone(); // Do a validate_dial_info on the external address, but with the same port as the local port of local interface, from a redirected node
let do_manual_map_fut: SendPinBoxFuture<Option<DetectionResult>> = // This test is to see if a node had manual port forwarding done with the same port number as the local listener
Box::pin(async move { let mut external_1_dial_info_with_local_port = c_external_1.dial_info.clone();
// Do a validate_dial_info on the external address, but with the same port as the local port of local interface, from a redirected node external_1_dial_info_with_local_port.set_port(local_port);
// This test is to see if a node had manual port forwarding done with the same port number as the local listener
let mut external_1_dial_info_with_local_port =
c_external_1.dial_info.clone();
external_1_dial_info_with_local_port.set_port(local_port);
if this if this
.validate_dial_info( .validate_dial_info(
c_external_1.node.clone(), c_external_1.node.clone(),
external_1_dial_info_with_local_port.clone(), external_1_dial_info_with_local_port.clone(),
true, true,
) )
.await .await
{ {
// Add public dial info with Direct dialinfo class // Add public dial info with Direct dialinfo class
return Some(DetectionResult { return Some(DetectionResult {
ddi: DetectedDialInfo::Detected(DialInfoDetail { ddi: DetectedDialInfo::Detected(DialInfoDetail {
dial_info: external_1_dial_info_with_local_port, dial_info: external_1_dial_info_with_local_port,
class: DialInfoClass::Direct, class: DialInfoClass::Direct,
}), }),
external_address_types: AddressTypeSet::only( external_address_types: AddressTypeSet::only(
c_external_1.address.address_type(), c_external_1.address.address_type(),
), ),
}); local_port: c_this.unlocked_inner.port,
} });
}
None None
}); });
unord.push(do_manual_map_fut); unord.push(do_manual_map_fut);
}
} }
// NAT Detection // NAT Detection
@ -563,6 +563,7 @@ impl DiscoveryContext {
external_address_types: AddressTypeSet::only( external_address_types: AddressTypeSet::only(
c_external_1.address.address_type(), c_external_1.address.address_type(),
), ),
local_port: c_this.unlocked_inner.port,
}); });
} }
None None
@ -597,6 +598,7 @@ impl DiscoveryContext {
external_address_types: AddressTypeSet::only( external_address_types: AddressTypeSet::only(
c_external_1.address.address_type(), c_external_1.address.address_type(),
), ),
local_port: c_this.unlocked_inner.port,
}); });
} }
// Didn't get a reply from a non-default port, which means we are also port restricted // Didn't get a reply from a non-default port, which means we are also port restricted
@ -608,6 +610,7 @@ impl DiscoveryContext {
external_address_types: AddressTypeSet::only( external_address_types: AddressTypeSet::only(
c_external_1.address.address_type(), c_external_1.address.address_type(),
), ),
local_port: c_this.unlocked_inner.port,
}) })
}); });
ord.push_back(do_restricted_cone_fut); ord.push_back(do_restricted_cone_fut);
@ -699,6 +702,7 @@ impl DiscoveryContext {
external_address_types: AddressTypeSet::only( external_address_types: AddressTypeSet::only(
external_mapped_dial_info.address_type(), external_mapped_dial_info.address_type(),
), ),
local_port: this.unlocked_inner.port,
}); });
} }
None None

View File

@ -15,6 +15,7 @@ use protocol::tcp::RawTcpProtocolHandler;
use protocol::udp::RawUdpProtocolHandler; use protocol::udp::RawUdpProtocolHandler;
use protocol::ws::WebsocketProtocolHandler; use protocol::ws::WebsocketProtocolHandler;
pub(in crate::network_manager) use protocol::*; pub(in crate::network_manager) use protocol::*;
use start_protocols::*;
use async_tls::TlsAcceptor; use async_tls::TlsAcceptor;
use futures_util::StreamExt; use futures_util::StreamExt;
@ -84,14 +85,6 @@ struct NetworkInner {
join_handles: Vec<MustJoinHandle<()>>, join_handles: Vec<MustJoinHandle<()>>,
/// stop source for shutting down the low level network background tasks /// stop source for shutting down the low level network background tasks
stop_source: Option<StopSource>, stop_source: Option<StopSource>,
/// port we are binding raw udp listen to
udp_port: u16,
/// port we are binding raw tcp listen to
tcp_port: u16,
/// port we are binding websocket listen to
ws_port: u16,
/// port we are binding secure websocket listen to
wss_port: u16,
/// does our network have ipv4 on any network? /// does our network have ipv4 on any network?
enable_ipv4: bool, enable_ipv4: bool,
/// does our network have ipv6 on the global internet? /// does our network have ipv6 on the global internet?
@ -104,22 +97,18 @@ struct NetworkInner {
network_already_cleared: bool, network_already_cleared: bool,
/// the punishment closure to enax /// the punishment closure to enax
public_dial_info_check_punishment: Option<Box<dyn FnOnce() + Send + 'static>>, public_dial_info_check_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
/// udp socket record for bound-first sockets, which are used to guarantee a port is available before
/// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using
bound_first_udp: BTreeMap<u16, (Option<socket2::Socket>, Option<socket2::Socket>)>,
/// mapping of protocol handlers to accept messages from a set of bound socket addresses /// mapping of protocol handlers to accept messages from a set of bound socket addresses
inbound_udp_protocol_handlers: BTreeMap<SocketAddr, RawUdpProtocolHandler>, udp_protocol_handlers: BTreeMap<SocketAddr, RawUdpProtocolHandler>,
/// outbound udp protocol handler for udpv4 /// outbound udp protocol handler for udpv4
outbound_udpv4_protocol_handler: Option<RawUdpProtocolHandler>, default_udpv4_protocol_handler: Option<RawUdpProtocolHandler>,
/// outbound udp protocol handler for udpv6 /// outbound udp protocol handler for udpv6
outbound_udpv6_protocol_handler: Option<RawUdpProtocolHandler>, default_udpv6_protocol_handler: Option<RawUdpProtocolHandler>,
/// tcp socket record for bound-first sockets, which are used to guarantee a port is available before
/// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using
bound_first_tcp: BTreeMap<u16, (Option<socket2::Socket>, Option<socket2::Socket>)>,
/// TLS handling socket controller /// TLS handling socket controller
tls_acceptor: Option<TlsAcceptor>, tls_acceptor: Option<TlsAcceptor>,
/// Multiplexer record for protocols on low level TCP sockets /// Multiplexer record for protocols on low level TCP sockets
listener_states: BTreeMap<SocketAddr, Arc<RwLock<ListenerState>>>, listener_states: BTreeMap<SocketAddr, Arc<RwLock<ListenerState>>>,
/// Preferred local addresses for protocols/address combinations for outgoing connections
preferred_local_addresses: BTreeMap<(ProtocolType, AddressType), SocketAddr>,
} }
struct NetworkUnlockedInner { struct NetworkUnlockedInner {
@ -157,20 +146,15 @@ impl Network {
static_public_dialinfo: ProtocolTypeSet::empty(), static_public_dialinfo: ProtocolTypeSet::empty(),
join_handles: Vec::new(), join_handles: Vec::new(),
stop_source: None, stop_source: None,
udp_port: 0u16,
tcp_port: 0u16,
ws_port: 0u16,
wss_port: 0u16,
enable_ipv4: false, enable_ipv4: false,
enable_ipv6_global: false, enable_ipv6_global: false,
enable_ipv6_local: false, enable_ipv6_local: false,
bound_first_udp: BTreeMap::new(), udp_protocol_handlers: BTreeMap::new(),
inbound_udp_protocol_handlers: BTreeMap::new(), default_udpv4_protocol_handler: None,
outbound_udpv4_protocol_handler: None, default_udpv6_protocol_handler: None,
outbound_udpv6_protocol_handler: None,
bound_first_tcp: BTreeMap::new(),
tls_acceptor: None, tls_acceptor: None,
listener_states: BTreeMap::new(), listener_states: BTreeMap::new(),
preferred_local_addresses: BTreeMap::new(),
} }
} }
@ -332,31 +316,20 @@ impl Network {
} }
} }
pub fn get_local_port(&self, protocol_type: ProtocolType) -> Option<u16> {
let inner = self.inner.lock();
let local_port = match protocol_type {
ProtocolType::UDP => inner.udp_port,
ProtocolType::TCP => inner.tcp_port,
ProtocolType::WS => inner.ws_port,
ProtocolType::WSS => inner.wss_port,
};
Some(local_port)
}
pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option<SocketAddr> { pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option<SocketAddr> {
let inner = self.inner.lock(); let inner = self.inner.lock();
let key = (dial_info.protocol_type(), dial_info.address_type());
inner.preferred_local_addresses.get(&key).copied()
}
let local_port = match dial_info.protocol_type() { pub fn get_preferred_local_address_by_key(
ProtocolType::UDP => inner.udp_port, &self,
ProtocolType::TCP => inner.tcp_port, pt: ProtocolType,
ProtocolType::WS => inner.ws_port, at: AddressType,
ProtocolType::WSS => inner.wss_port, ) -> Option<SocketAddr> {
}; let inner = self.inner.lock();
let key = (pt, at);
Some(match dial_info.address_type() { inner.preferred_local_addresses.get(&key).copied()
AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), local_port),
AddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), local_port),
})
} }
pub fn is_stable_interface_address(&self, addr: IpAddr) -> bool { pub fn is_stable_interface_address(&self, addr: IpAddr) -> bool {
@ -846,7 +819,7 @@ impl Network {
// start listeners // start listeners
if protocol_config.inbound.contains(ProtocolType::UDP) { if protocol_config.inbound.contains(ProtocolType::UDP) {
self.start_udp_listeners(&mut editor_public_internet, &mut editor_local_network) self.bind_udp_protocol_handlers(&mut editor_public_internet, &mut editor_local_network)
.await?; .await?;
} }
if protocol_config.inbound.contains(ProtocolType::WS) { if protocol_config.inbound.contains(ProtocolType::WS) {
@ -862,11 +835,6 @@ impl Network {
.await?; .await?;
} }
// release caches of available listener ports
// this releases the 'first bound' ports we use to guarantee
// that we have ports available to us
self.free_bound_first_ports();
editor_public_internet.setup_network( editor_public_internet.setup_network(
protocol_config.outbound, protocol_config.outbound,
protocol_config.inbound, protocol_config.inbound,

View File

@ -103,21 +103,35 @@ impl Network {
_t: u64, _t: u64,
) -> EyreResult<()> { ) -> EyreResult<()> {
// Figure out if we can optimize TCP/WS checking since they are often on the same port // Figure out if we can optimize TCP/WS checking since they are often on the same port
let (protocol_config, tcp_same_port) = { let (protocol_config, inbound_protocol_map) = {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let protocol_config = inner.protocol_config.clone(); let protocol_config = inner.protocol_config.clone();
let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP)
&& protocol_config.inbound.contains(ProtocolType::WS)
{
inner.tcp_port == inner.ws_port
} else {
false
};
// Allow network to be cleared if external addresses change // Allow network to be cleared if external addresses change
inner.network_already_cleared = false; inner.network_already_cleared = false;
// let mut inbound_protocol_map = HashMap::<(AddressType, u16), Vec<ProtocolType>>::new();
(protocol_config, tcp_same_port) for at in protocol_config.family_global {
for pt in protocol_config.inbound {
let key = (pt, at);
// Skip things with static public dialinfo
// as they don't need to participate in discovery
if inner.static_public_dialinfo.contains(pt) {
continue;
}
if let Some(pla) = inner.preferred_local_addresses.get(&key) {
let itmkey = (at, pla.port());
inbound_protocol_map
.entry(itmkey)
.and_modify(|x| x.push(pt))
.or_insert_with(|| vec![pt]);
}
}
}
(protocol_config, inbound_protocol_map)
}; };
// Save off existing public dial info for change detection later // Save off existing public dial info for change detection later
@ -166,100 +180,19 @@ impl Network {
// Process all protocol and address combinations // Process all protocol and address combinations
let mut unord = FuturesUnordered::new(); let mut unord = FuturesUnordered::new();
// Do UDPv4+v6 at the same time as everything else
if protocol_config.inbound.contains(ProtocolType::UDP) {
// UDPv4
if protocol_config.family_global.contains(AddressType::IPV4) {
let udpv4_context = DiscoveryContext::new(
self.routing_table(),
self.clone(),
ProtocolType::UDP,
AddressType::IPV4,
clear_network_callback.clone(),
);
udpv4_context
.discover(&mut unord)
.instrument(trace_span!("udpv4_context.discover"))
.await;
}
// UDPv6 for ((at, port), protocols) in &inbound_protocol_map {
if protocol_config.family_global.contains(AddressType::IPV6) { let first_pt = protocols.first().unwrap();
let udpv6_context = DiscoveryContext::new(
self.routing_table(),
self.clone(),
ProtocolType::UDP,
AddressType::IPV6,
clear_network_callback.clone(),
);
udpv6_context
.discover(&mut unord)
.instrument(trace_span!("udpv6_context.discover"))
.await;
}
}
// Do TCPv4. Possibly do WSv4 if it is on a different port let discovery_context = DiscoveryContext::new(
if protocol_config.family_global.contains(AddressType::IPV4) { self.routing_table(),
if protocol_config.inbound.contains(ProtocolType::TCP) { self.clone(),
let tcpv4_context = DiscoveryContext::new( *first_pt,
self.routing_table(), *at,
self.clone(), *port,
ProtocolType::TCP, clear_network_callback.clone(),
AddressType::IPV4, );
clear_network_callback.clone(), discovery_context.discover(&mut unord).await;
);
tcpv4_context
.discover(&mut unord)
.instrument(trace_span!("tcpv4_context.discover"))
.await;
}
if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port {
let wsv4_context = DiscoveryContext::new(
self.routing_table(),
self.clone(),
ProtocolType::WS,
AddressType::IPV4,
clear_network_callback.clone(),
);
wsv4_context
.discover(&mut unord)
.instrument(trace_span!("wsv4_context.discover"))
.await;
}
}
// Do TCPv6. Possibly do WSv6 if it is on a different port
if protocol_config.family_global.contains(AddressType::IPV6) {
if protocol_config.inbound.contains(ProtocolType::TCP) {
let tcpv6_context = DiscoveryContext::new(
self.routing_table(),
self.clone(),
ProtocolType::TCP,
AddressType::IPV6,
clear_network_callback.clone(),
);
tcpv6_context
.discover(&mut unord)
.instrument(trace_span!("tcpv6_context.discover"))
.await;
}
// WSv6
if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port {
let wsv6_context = DiscoveryContext::new(
self.routing_table(),
self.clone(),
ProtocolType::WS,
AddressType::IPV6,
clear_network_callback.clone(),
);
wsv6_context
.discover(&mut unord)
.instrument(trace_span!("wsv6_context.discover"))
.await;
}
} }
// Wait for all discovery futures to complete and apply discoverycontexts // Wait for all discovery futures to complete and apply discoverycontexts
@ -273,19 +206,20 @@ impl Network {
// Add the external address kinds to the set we've seen // Add the external address kinds to the set we've seen
all_address_types |= dr.external_address_types; all_address_types |= dr.external_address_types;
// Add WS dialinfo as well if it is on the same port as TCP // Add additional dialinfo for protocols on the same port
if let DetectedDialInfo::Detected(did) = &dr.ddi { if let DetectedDialInfo::Detected(did) = &dr.ddi {
if did.dial_info.protocol_type() == ProtocolType::TCP && tcp_same_port { let ipmkey = (did.dial_info.address_type(), dr.local_port);
// Make WS dialinfo as well with same socket address as TCP for additional_pt in
let ws_ddi = DetectedDialInfo::Detected(DialInfoDetail { inbound_protocol_map.get(&ipmkey).unwrap().iter().skip(1)
dial_info: self.make_dial_info( {
did.dial_info.socket_address(), // Make dialinfo for additional protocol type
ProtocolType::WS, let additional_ddi = DetectedDialInfo::Detected(DialInfoDetail {
), dial_info: self
.make_dial_info(did.dial_info.socket_address(), *additional_pt),
class: did.class, class: did.class,
}); });
// Add additional WS dialinfo // Add additional dialinfo
self.update_with_detected_dial_info(ws_ddi).await?; self.update_with_detected_dial_info(additional_ddi).await?;
} }
} }
} }
@ -368,7 +302,14 @@ impl Network {
) )
.unwrap() .unwrap()
} }
ProtocolType::WSS => panic!("none of the discovery functions are used for wss"), ProtocolType::WSS => {
let c = self.config.get();
DialInfo::try_wss(
addr,
format!("wss://{}/{}", addr, c.network.protocol.wss.path),
)
.unwrap()
}
} }
} }
} }

View File

@ -237,7 +237,7 @@ impl Network {
} }
} }
async fn spawn_socket_listener(&self, addr: SocketAddr) -> EyreResult<()> { async fn spawn_socket_listener(&self, addr: SocketAddr) -> EyreResult<bool> {
// Get config // Get config
let (connection_initial_timeout_ms, tls_connection_initial_timeout_ms) = { let (connection_initial_timeout_ms, tls_connection_initial_timeout_ms) = {
let c = self.config.get(); let c = self.config.get();
@ -247,13 +247,27 @@ impl Network {
) )
}; };
// Create a reusable socket with no linger time, and no delay // Create a socket and bind it
let socket = new_bound_shared_tcp_socket(addr) let Some(socket) = new_bound_default_tcp_socket(addr)
.wrap_err("failed to create bound shared tcp socket")?; .wrap_err("failed to create default socket listener")?
else {
return Ok(false);
};
// Drop the socket
drop(socket);
// Create a shared socket and bind it once we have determined the port is free
let Some(socket) = new_bound_shared_tcp_socket(addr)
.wrap_err("failed to create shared socket listener")?
else {
return Ok(false);
};
// Listen on the socket // Listen on the socket
socket if socket.listen(128).is_err() {
.listen(128) return Ok(false);
.wrap_err("Couldn't listen on TCP socket")?; }
// Make an async tcplistener from the socket2 socket // Make an async tcplistener from the socket2 socket
let std_listener: std::net::TcpListener = socket.into(); let std_listener: std::net::TcpListener = socket.into();
@ -324,7 +338,7 @@ impl Network {
// Add to join handles // Add to join handles
self.add_to_join_handles(jh); self.add_to_join_handles(jh);
Ok(()) Ok(true)
} }
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
@ -332,51 +346,76 @@ impl Network {
// TCP listener that multiplexes ports so multiple protocols can exist on a single port // TCP listener that multiplexes ports so multiple protocols can exist on a single port
pub(super) async fn start_tcp_listener( pub(super) async fn start_tcp_listener(
&self, &self,
ip_addrs: Vec<IpAddr>, bind_set: NetworkBindSet,
port: u16,
is_tls: bool, is_tls: bool,
new_protocol_accept_handler: Box<NewProtocolAcceptHandler>, new_protocol_accept_handler: Box<NewProtocolAcceptHandler>,
) -> EyreResult<Vec<SocketAddress>> { ) -> EyreResult<Vec<SocketAddress>> {
let mut out = Vec::<SocketAddress>::new(); let mut out = Vec::<SocketAddress>::new();
for ip_addr in ip_addrs { for ip_addr in bind_set.addrs {
let addr = SocketAddr::new(ip_addr, port); let mut port = bind_set.port;
let idi_addrs = self.translate_unspecified_address(&addr); loop {
let addr = SocketAddr::new(ip_addr, port);
// see if we've already bound to this already // see if we've already bound to this already
// if not, spawn a listener // if not, spawn a listener
if !self.inner.lock().listener_states.contains_key(&addr) { let mut got_listener = false;
self.clone().spawn_socket_listener(addr).await?; if !self.inner.lock().listener_states.contains_key(&addr) {
} if self.clone().spawn_socket_listener(addr).await? {
got_listener = true;
let ls = if let Some(ls) = self.inner.lock().listener_states.get_mut(&addr) { }
ls.clone() } else {
} else { got_listener = true;
panic!("this shouldn't happen");
};
if is_tls {
if ls.read().tls_acceptor.is_none() {
ls.write().tls_acceptor = Some(self.clone().get_or_create_tls_acceptor()?);
} }
ls.write()
.tls_protocol_handlers
.push(new_protocol_accept_handler(
self.network_manager().config(),
true,
));
} else {
ls.write()
.protocol_accept_handlers
.push(new_protocol_accept_handler(
self.network_manager().config(),
false,
));
}
// Return interface dial infos we listen on if got_listener {
for idi_addr in idi_addrs { let ls = if let Some(ls) = self.inner.lock().listener_states.get_mut(&addr) {
out.push(SocketAddress::from_socket_addr(idi_addr)); ls.clone()
} else {
panic!("this shouldn't happen");
};
if is_tls {
if ls.read().tls_acceptor.is_none() {
ls.write().tls_acceptor =
Some(self.clone().get_or_create_tls_acceptor()?);
}
ls.write()
.tls_protocol_handlers
.push(new_protocol_accept_handler(
self.network_manager().config(),
true,
));
} else {
ls.write()
.protocol_accept_handlers
.push(new_protocol_accept_handler(
self.network_manager().config(),
false,
));
}
// Return interface dial infos we listen on
let idi_addrs = self.translate_unspecified_address(&addr);
for idi_addr in idi_addrs {
out.push(SocketAddress::from_socket_addr(idi_addr));
}
break;
}
if !bind_set.search {
bail!("unable to bind to tcp {}", addr);
}
if port == 65535u16 {
port = 1024;
} else {
port += 1;
}
if port == bind_set.port {
bail!("unable to find a free port for tcp {}", ip_addr);
}
} }
} }

View File

@ -27,19 +27,13 @@ impl Network {
log_net!("UDP listener task spawned"); log_net!("UDP listener task spawned");
// Collect all our protocol handlers into a vector // Collect all our protocol handlers into a vector
let mut protocol_handlers: Vec<RawUdpProtocolHandler> = this let protocol_handlers: Vec<RawUdpProtocolHandler> = this
.inner .inner
.lock() .lock()
.inbound_udp_protocol_handlers .udp_protocol_handlers
.values() .values()
.cloned() .cloned()
.collect(); .collect();
if let Some(ph) = this.inner.lock().outbound_udpv4_protocol_handler.clone() {
protocol_handlers.push(ph);
}
if let Some(ph) = this.inner.lock().outbound_udpv6_protocol_handler.clone() {
protocol_handlers.push(ph);
}
// Spawn a local async task for each socket // Spawn a local async task for each socket
let mut protocol_handlers_unordered = FuturesUnordered::new(); let mut protocol_handlers_unordered = FuturesUnordered::new();
@ -114,77 +108,13 @@ impl Network {
Ok(()) Ok(())
} }
pub(super) async fn create_udp_outbound_sockets(&self) -> EyreResult<()> { async fn create_udp_protocol_handler(&self, addr: SocketAddr) -> EyreResult<bool> {
let mut inner = self.inner.lock(); log_net!("create_udp_protocol_handler on {:?}", &addr);
let mut port = inner.udp_port;
// v4
let socket_addr_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
if let Ok(socket) = new_bound_shared_udp_socket(socket_addr_v4) {
// Pull the port if we randomly bound, so v6 can be on the same port
port = socket
.local_addr()
.wrap_err("failed to get local address")?
.as_socket_ipv4()
.ok_or_else(|| eyre!("expected ipv4 address type"))?
.port();
// Make an async UdpSocket from the socket2 socket
let std_udp_socket: std::net::UdpSocket = socket.into();
cfg_if! {
if #[cfg(feature="rt-async-std")] {
let udp_socket = UdpSocket::from(std_udp_socket);
} else if #[cfg(feature="rt-tokio")] {
std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking");
let udp_socket = UdpSocket::from_std(std_udp_socket).wrap_err("failed to make outbound v4 tokio udpsocket")?;
} else {
compile_error!("needs executor implementation")
}
}
let socket_arc = Arc::new(udp_socket);
// Create protocol handler
let udpv4_handler = RawUdpProtocolHandler::new(
socket_arc,
Some(self.network_manager().address_filter()),
);
inner.outbound_udpv4_protocol_handler = Some(udpv4_handler);
}
//v6
let socket_addr_v6 =
SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), port);
if let Ok(socket) = new_bound_shared_udp_socket(socket_addr_v6) {
// Make an async UdpSocket from the socket2 socket
let std_udp_socket: std::net::UdpSocket = socket.into();
cfg_if! {
if #[cfg(feature="rt-async-std")] {
let udp_socket = UdpSocket::from(std_udp_socket);
} else if #[cfg(feature="rt-tokio")] {
std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking");
let udp_socket = UdpSocket::from_std(std_udp_socket).wrap_err("failed to make outbound v6 tokio udpsocket")?;
} else {
compile_error!("needs executor implementation")
}
}
let socket_arc = Arc::new(udp_socket);
// Create protocol handler
let udpv6_handler = RawUdpProtocolHandler::new(
socket_arc,
Some(self.network_manager().address_filter()),
);
inner.outbound_udpv6_protocol_handler = Some(udpv6_handler);
}
Ok(())
}
async fn create_udp_inbound_socket(&self, addr: SocketAddr) -> EyreResult<()> {
log_net!("create_udp_inbound_socket on {:?}", &addr);
// Create a reusable socket // Create a reusable socket
let socket = new_bound_shared_udp_socket(addr)?; let Some(socket) = new_bound_default_udp_socket(addr)? else {
return Ok(false);
};
// Make an async UdpSocket from the socket2 socket // Make an async UdpSocket from the socket2 socket
let std_udp_socket: std::net::UdpSocket = socket.into(); let std_udp_socket: std::net::UdpSocket = socket.into();
@ -204,40 +134,58 @@ impl Network {
let protocol_handler = let protocol_handler =
RawUdpProtocolHandler::new(socket_arc, Some(self.network_manager().address_filter())); RawUdpProtocolHandler::new(socket_arc, Some(self.network_manager().address_filter()));
// Create message_handler records // Record protocol handler
self.inner let mut inner = self.inner.lock();
.lock() inner
.inbound_udp_protocol_handlers .udp_protocol_handlers
.insert(addr, protocol_handler); .insert(addr, protocol_handler.clone());
if addr.is_ipv4() && inner.default_udpv4_protocol_handler.is_none() {
inner.default_udpv4_protocol_handler = Some(protocol_handler);
} else if addr.is_ipv6() && inner.default_udpv6_protocol_handler.is_none() {
inner.default_udpv6_protocol_handler = Some(protocol_handler);
}
Ok(()) Ok(true)
} }
pub(super) async fn create_udp_inbound_sockets( pub(super) async fn create_udp_protocol_handlers(
&self, &self,
ip_addrs: Vec<IpAddr>, bind_set: NetworkBindSet,
port: u16,
) -> EyreResult<Vec<DialInfo>> { ) -> EyreResult<Vec<DialInfo>> {
let mut out = Vec::<DialInfo>::new(); let mut out = Vec::<DialInfo>::new();
for ip_addr in ip_addrs { for ip_addr in bind_set.addrs {
let addr = SocketAddr::new(ip_addr, port); let mut port = bind_set.port;
loop {
let addr = SocketAddr::new(ip_addr, port);
// see if we've already bound to this already // see if we've already bound to this already
// if not, spawn a listener // if not, spawn a listener
if !self if !self.inner.lock().udp_protocol_handlers.contains_key(&addr) {
.inner let bound = self.clone().create_udp_protocol_handler(addr).await?;
.lock()
.inbound_udp_protocol_handlers
.contains_key(&addr)
{
let idi_addrs = self.translate_unspecified_address(&addr);
self.clone().create_udp_inbound_socket(addr).await?; // Return interface dial infos we listen on
if bound {
let idi_addrs = self.translate_unspecified_address(&addr);
for idi_addr in idi_addrs {
out.push(DialInfo::udp_from_socketaddr(idi_addr));
}
break;
}
}
// Return interface dial infos we listen on if !bind_set.search {
for idi_addr in idi_addrs { bail!("unable to bind to udp {}", addr);
out.push(DialInfo::udp_from_socketaddr(idi_addr)); }
if port == 65535u16 {
port = 1024;
} else {
port += 1;
}
if port == bind_set.port {
bail!("unable to find a free port for udp {}", ip_addr);
} }
} }
} }
@ -251,18 +199,30 @@ impl Network {
peer_socket_addr: &SocketAddr, peer_socket_addr: &SocketAddr,
local_socket_addr: &Option<SocketAddr>, local_socket_addr: &Option<SocketAddr>,
) -> Option<RawUdpProtocolHandler> { ) -> Option<RawUdpProtocolHandler> {
let inner = self.inner.lock();
// if our last communication with this peer came from a particular inbound udp protocol handler, use it // if our last communication with this peer came from a particular inbound udp protocol handler, use it
if let Some(sa) = local_socket_addr { if let Some(sa) = local_socket_addr {
if let Some(ph) = self.inner.lock().inbound_udp_protocol_handlers.get(sa) { if let Some(ph) = inner.udp_protocol_handlers.get(sa) {
return Some(ph.clone()); return Some(ph.clone());
} }
} }
// otherwise find the outbound udp protocol handler that matches the ip protocol version of the peer addr // otherwise find the first outbound udp protocol handler that matches the ip protocol version of the peer addr
let inner = self.inner.lock();
match peer_socket_addr { match peer_socket_addr {
SocketAddr::V4(_) => inner.outbound_udpv4_protocol_handler.clone(), SocketAddr::V4(_) => inner.udp_protocol_handlers.iter().find_map(|x| {
SocketAddr::V6(_) => inner.outbound_udpv6_protocol_handler.clone(), if x.0.is_ipv4() {
Some(x.1.clone())
} else {
None
}
}),
SocketAddr::V6(_) => inner.udp_protocol_handlers.iter().find_map(|x| {
if x.0.is_ipv6() {
Some(x.1.clone())
} else {
None
}
}),
} }
} }
} }

View File

@ -15,28 +15,28 @@ cfg_if! {
use socket2::{Domain, Protocol, SockAddr, Socket, Type}; use socket2::{Domain, Protocol, SockAddr, Socket, Type};
cfg_if! { // cfg_if! {
if #[cfg(windows)] { // if #[cfg(windows)] {
use winapi::shared::ws2def::{ SOL_SOCKET, SO_EXCLUSIVEADDRUSE}; // use winapi::shared::ws2def::{ SOL_SOCKET, SO_EXCLUSIVEADDRUSE};
use winapi::um::winsock2::{SOCKET_ERROR, setsockopt}; // use winapi::um::winsock2::{SOCKET_ERROR, setsockopt};
use winapi::ctypes::c_int; // use winapi::ctypes::c_int;
use std::os::windows::io::AsRawSocket; // use std::os::windows::io::AsRawSocket;
fn set_exclusiveaddruse(socket: &Socket) -> io::Result<()> { // fn set_exclusiveaddruse(socket: &Socket) -> io::Result<()> {
unsafe { // unsafe {
let optval:c_int = 1; // let optval:c_int = 1;
if setsockopt(socket.as_raw_socket().try_into().unwrap(), SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (&optval as *const c_int).cast(), // if setsockopt(socket.as_raw_socket().try_into().unwrap(), SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (&optval as *const c_int).cast(),
std::mem::size_of::<c_int>() as c_int) == SOCKET_ERROR { // std::mem::size_of::<c_int>() as c_int) == SOCKET_ERROR {
return Err(io::Error::last_os_error()); // return Err(io::Error::last_os_error());
} // }
Ok(()) // Ok(())
} // }
} // }
} // }
} // }
#[instrument(level = "trace", ret)] #[instrument(level = "trace", ret)]
pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result<Socket> { pub fn new_shared_udp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
if domain == Domain::IPV6 { if domain == Domain::IPV6 {
socket.set_only_v6(true)?; socket.set_only_v6(true)?;
@ -52,60 +52,32 @@ pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result<Socket> {
} }
#[instrument(level = "trace", ret)] #[instrument(level = "trace", ret)]
pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result<Socket> { pub fn new_default_udp_socket(domain: Domain) -> io::Result<Socket> {
let domain = Domain::for_address(local_address);
let socket = new_unbound_shared_udp_socket(domain)?;
let socket2_addr = SockAddr::from(local_address);
socket.bind(&socket2_addr)?;
log_net!("created bound shared udp socket on {:?}", &local_address);
Ok(socket)
}
#[instrument(level = "trace", ret)]
pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
if domain == Domain::IPV6 { if domain == Domain::IPV6 {
socket.set_only_v6(true)?; socket.set_only_v6(true)?;
} }
// Bind the socket -first- before turning on 'reuse address' this way it will
// fail if the port is already taken
let socket2_addr = SockAddr::from(local_address);
// On windows, do SO_EXCLUSIVEADDRUSE before the bind to ensure the port is fully available
cfg_if! {
if #[cfg(windows)] {
set_exclusiveaddruse(&socket)?;
}
}
// Bind the socket -first- without turning on SO_REUSEPORT this way it will
// fail if the port is already taken
cfg_if! {
if #[cfg(unix)] {
socket
.set_reuse_address(true)?;
}
}
socket.bind(&socket2_addr)?;
// Set 'reuse address' so future binds to this port will succeed
// This does not work on Windows, where reuse options can not be set after the bind
cfg_if! {
if #[cfg(unix)] {
socket.set_reuse_port(true)?;
}
}
log_net!("created bound first udp socket on {:?}", &local_address);
Ok(socket) Ok(socket)
} }
#[instrument(level = "trace", ret)] #[instrument(level = "trace", ret)]
pub fn new_unbound_tcp_socket(domain: Domain) -> io::Result<Socket> { pub fn new_bound_default_udp_socket(local_address: SocketAddr) -> io::Result<Option<Socket>> {
let domain = Domain::for_address(local_address);
let socket = new_default_udp_socket(domain)?;
let socket2_addr = SockAddr::from(local_address);
if socket.bind(&socket2_addr).is_err() {
return Ok(None);
}
log_net!("created bound default udp socket on {:?}", &local_address);
Ok(Some(socket))
}
#[instrument(level = "trace", ret)]
pub fn new_default_tcp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
if let Err(e) = socket.set_nodelay(true) { if let Err(e) = socket.set_nodelay(true) {
log_net!(error "Couldn't set TCP nodelay: {}", e); log_net!(error "Couldn't set TCP nodelay: {}", e);
@ -117,7 +89,7 @@ pub fn new_unbound_tcp_socket(domain: Domain) -> io::Result<Socket> {
} }
#[instrument(level = "trace", ret)] #[instrument(level = "trace", ret)]
pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> { pub fn new_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
// if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) { // if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
// log_net!(error "Couldn't set TCP linger: {}", e); // log_net!(error "Couldn't set TCP linger: {}", e);
@ -137,64 +109,32 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
Ok(socket) Ok(socket)
} }
#[instrument(level = "trace", ret)]
pub fn new_bound_default_tcp_socket(local_address: SocketAddr) -> io::Result<Option<Socket>> {
let domain = Domain::for_address(local_address);
let socket = new_default_tcp_socket(domain)?;
let socket2_addr = SockAddr::from(local_address);
if socket.bind(&socket2_addr).is_err() {
return Ok(None);
}
log_net!("created bound default tcp socket on {:?}", &local_address);
Ok(Some(socket))
}
#[instrument(level = "trace", ret)] #[instrument(level = "trace", ret)]
pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> { pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Option<Socket>> {
let domain = Domain::for_address(local_address); let domain = Domain::for_address(local_address);
let socket = new_unbound_shared_tcp_socket(domain)?; let socket = new_shared_tcp_socket(domain)?;
let socket2_addr = SockAddr::from(local_address); let socket2_addr = SockAddr::from(local_address);
socket.bind(&socket2_addr)?; if socket.bind(&socket2_addr).is_err() {
return Ok(None);
}
log_net!("created bound shared tcp socket on {:?}", &local_address); log_net!("created bound shared tcp socket on {:?}", &local_address);
Ok(socket) Ok(Some(socket))
}
#[instrument(level = "trace", ret)]
pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
// if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
// log_net!(error "Couldn't set TCP linger: {}", e);
// }
if let Err(e) = socket.set_nodelay(true) {
log_net!(error "Couldn't set TCP nodelay: {}", e);
}
if domain == Domain::IPV6 {
socket.set_only_v6(true)?;
}
// On windows, do SO_EXCLUSIVEADDRUSE before the bind to ensure the port is fully available
cfg_if! {
if #[cfg(windows)] {
set_exclusiveaddruse(&socket)?;
}
}
// Bind the socket -first- without turning on SO_REUSEPORT this way it will
// fail if the port is already taken
let socket2_addr = SockAddr::from(local_address);
cfg_if! {
if #[cfg(unix)] {
socket
.set_reuse_address(true)?;
}
}
socket.bind(&socket2_addr)?;
// Set 'reuse address' so future binds to this port will succeed
// This does not work on Windows, where reuse options can not be set after the bind
cfg_if! {
if #[cfg(unix)] {
socket.set_reuse_port(true)?;
}
}
log_net!("created bound first tcp socket on {:?}", &local_address);
Ok(socket)
} }
// Non-blocking connect is tricky when you want to start with a prepared socket // Non-blocking connect is tricky when you want to start with a prepared socket

View File

@ -169,8 +169,10 @@ impl RawTcpProtocolHandler {
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> { ) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
// Make a shared socket // Make a shared socket
let socket = match local_address { let socket = match local_address {
Some(a) => new_bound_shared_tcp_socket(a)?, Some(a) => {
None => new_unbound_tcp_socket(socket2::Domain::for_address(socket_addr))?, new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))?
}
None => new_default_tcp_socket(socket2::Domain::for_address(socket_addr))?,
}; };
// Non-blocking connect to remote address // Non-blocking connect to remote address
@ -185,16 +187,16 @@ impl RawTcpProtocolHandler {
let ps = AsyncPeekStream::new(ts); let ps = AsyncPeekStream::new(ts);
// Wrap the stream in a network connection and return it // Wrap the stream in a network connection and return it
let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( let flow = Flow::new(
Flow::new( PeerAddress::new(
PeerAddress::new( SocketAddress::from_socket_addr(socket_addr),
SocketAddress::from_socket_addr(socket_addr), ProtocolType::TCP,
ProtocolType::TCP,
),
SocketAddress::from_socket_addr(actual_local_address),
), ),
ps, SocketAddress::from_socket_addr(actual_local_address),
)); );
log_net!("rawtcp::connect: {:?}", flow);
let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(flow, ps));
Ok(NetworkResult::Value(conn)) Ok(NetworkResult::Value(conn))
} }

View File

@ -128,6 +128,8 @@ impl RawUdpProtocolHandler {
SocketAddress::from_socket_addr(local_socket_addr), SocketAddress::from_socket_addr(local_socket_addr),
); );
log_net!("udp::send_message: {:?}", flow);
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str()); tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str());
Ok(NetworkResult::value(flow)) Ok(NetworkResult::value(flow))

View File

@ -319,8 +319,10 @@ impl WebsocketProtocolHandler {
// Make a shared socket // Make a shared socket
let socket = match local_address { let socket = match local_address {
Some(a) => new_bound_shared_tcp_socket(a)?, Some(a) => {
None => new_unbound_tcp_socket(socket2::Domain::for_address(remote_socket_addr))?, new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))?
}
None => new_default_tcp_socket(socket2::Domain::for_address(remote_socket_addr))?,
}; };
// Non-blocking connect to remote address // Non-blocking connect to remote address
@ -340,6 +342,7 @@ impl WebsocketProtocolHandler {
dial_info.peer_address(), dial_info.peer_address(),
SocketAddress::from_socket_addr(actual_local_addr), SocketAddress::from_socket_addr(actual_local_addr),
); );
log_net!("{}::connect: {:?}", scheme, flow);
// Negotiate TLS if this is WSS // Negotiate TLS if this is WSS
if tls { if tls {

View File

@ -1,4 +1,3 @@
use super::sockets::*;
use super::*; use super::*;
use lazy_static::*; use lazy_static::*;
@ -74,152 +73,29 @@ lazy_static! {
]); ]);
} }
pub(super) struct NetworkBindSet {
pub port: u16,
pub addrs: Vec<IpAddr>,
pub search: bool,
}
impl Network { impl Network {
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
// Support for binding first on ports to ensure nobody binds ahead of us
// or two copies of the app don't accidentally collide. This is tricky
// because we use 'reuseaddr/port' and we can accidentally bind in front of ourselves :P
fn bind_first_udp_port(&self, udp_port: u16) -> bool { // Returns a port, a set of ip addresses to bind to, and a
let mut inner = self.inner.lock(); // bool specifying if multiple ports should be tried
if inner.bound_first_udp.contains_key(&udp_port) { async fn convert_listen_address_to_bind_set(
return true; &self,
} listen_address: String,
) -> EyreResult<NetworkBindSet> {
// Check for ipv6
let has_v6 = is_ipv6_supported();
// If the address is specified, only use the specified port and fail otherwise
let mut bound_first_socket_v4 = None;
let mut bound_first_socket_v6 = None;
if let Ok(bfs4) =
new_bound_first_udp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port))
{
if has_v6 {
if let Ok(bfs6) = new_bound_first_udp_socket(SocketAddr::new(
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
udp_port,
)) {
bound_first_socket_v4 = Some(bfs4);
bound_first_socket_v6 = Some(bfs6);
}
} else {
bound_first_socket_v4 = Some(bfs4);
}
}
if bound_first_socket_v4.is_none() && (has_v6 && bound_first_socket_v6.is_none()) {
return false;
}
cfg_if! {
if #[cfg(windows)] {
// On windows, drop the socket. This is a race condition, but there's
// no way around it. This isn't for security anyway, it's to prevent multiple copies of the
// app from binding on the same port.
inner.bound_first_udp.insert(udp_port, (None, None));
} else {
inner.bound_first_udp.insert(udp_port, (bound_first_socket_v4, bound_first_socket_v6));
}
}
true
}
fn bind_first_tcp_port(&self, tcp_port: u16) -> bool {
let mut inner = self.inner.lock();
if inner.bound_first_tcp.contains_key(&tcp_port) {
return true;
}
// Check for ipv6
let has_v6 = is_ipv6_supported();
// If the address is specified, only use the specified port and fail otherwise
let mut bound_first_socket_v4 = None;
let mut bound_first_socket_v6 = None;
if let Ok(bfs4) =
new_bound_first_tcp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port))
{
if has_v6 {
if let Ok(bfs6) = new_bound_first_tcp_socket(SocketAddr::new(
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
tcp_port,
)) {
bound_first_socket_v4 = Some(bfs4);
bound_first_socket_v6 = Some(bfs6);
}
} else {
bound_first_socket_v4 = Some(bfs4);
}
}
if bound_first_socket_v4.is_none() && (has_v6 && bound_first_socket_v6.is_none()) {
return false;
}
cfg_if! {
if #[cfg(windows)] {
// On windows, drop the socket. This is a race condition, but there's
// no way around it. This isn't for security anyway, it's to prevent multiple copies of the
// app from binding on the same port.
inner.bound_first_tcp.insert(tcp_port, (None, None));
} else {
inner.bound_first_tcp.insert(tcp_port, (bound_first_socket_v4, bound_first_socket_v6));
}
}
true
}
pub(super) fn free_bound_first_ports(&self) {
let mut inner = self.inner.lock();
inner.bound_first_udp.clear();
inner.bound_first_tcp.clear();
}
/////////////////////////////////////////////////////
fn find_available_udp_port(&self, start_port: u16) -> EyreResult<u16> {
// If the address is empty, iterate ports until we find one we can use.
let mut udp_port = start_port;
loop {
if BAD_PORTS.contains(&udp_port) {
continue;
}
if self.bind_first_udp_port(udp_port) {
break;
}
if udp_port == 65535 {
bail!("Could not find free udp port to listen on");
}
udp_port += 1;
}
Ok(udp_port)
}
fn find_available_tcp_port(&self, start_port: u16) -> EyreResult<u16> {
// If the address is empty, iterate ports until we find one we can use.
let mut tcp_port = start_port;
loop {
if BAD_PORTS.contains(&tcp_port) {
continue;
}
if self.bind_first_tcp_port(tcp_port) {
break;
}
if tcp_port == 65535 {
bail!("Could not find free tcp port to listen on");
}
tcp_port += 1;
}
Ok(tcp_port)
}
async fn allocate_udp_port(&self, listen_address: String) -> EyreResult<(u16, Vec<IpAddr>)> {
if listen_address.is_empty() { if listen_address.is_empty() {
// If listen address is empty, find us a port iteratively // If listen address is empty, start with port 5150 and iterate
let port = self.find_available_udp_port(5150)?;
let ip_addrs = available_unspecified_addresses(); let ip_addrs = available_unspecified_addresses();
Ok((port, ip_addrs)) Ok(NetworkBindSet {
port: 5150,
addrs: ip_addrs,
search: true,
})
} else { } else {
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified // If no address is specified, but the port is, use ipv4 and ipv6 unspecified
// If the address is specified, only use the specified port and fail otherwise // If the address is specified, only use the specified port and fail otherwise
@ -229,58 +105,43 @@ impl Network {
bail!("No valid listen address: {}", listen_address); bail!("No valid listen address: {}", listen_address);
} }
let port = sockaddrs[0].port(); let port = sockaddrs[0].port();
if port == 0 {
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect())) Ok(NetworkBindSet {
port: 5150,
addrs: sockaddrs.iter().map(|s| s.ip()).collect(),
search: true,
})
} else {
Ok(NetworkBindSet {
port,
addrs: sockaddrs.iter().map(|s| s.ip()).collect(),
search: false,
})
}
} }
} }
async fn allocate_tcp_port(&self, listen_address: String) -> EyreResult<(u16, Vec<IpAddr>)> { // Add local dial info to preferred local address table
if listen_address.is_empty() { fn add_preferred_local_address(inner: &mut NetworkInner, pa: PeerAddress) {
// If listen address is empty, find us a port iteratively let key = (pa.protocol_type(), pa.address_type());
let port = self.find_available_tcp_port(5150)?; let sa = pa.socket_addr();
let ip_addrs = available_unspecified_addresses(); let unspec_sa = match sa {
Ok((port, ip_addrs)) SocketAddr::V4(a) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, a.port())),
} else { SocketAddr::V6(a) => {
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, a.port(), 0, 0))
// If the address is specified, only use the specified port and fail otherwise
let sockaddrs =
listen_address_to_socket_addrs(&listen_address).map_err(|e| eyre!("{}", e))?;
if sockaddrs.is_empty() {
bail!("No valid listen address: {}", listen_address);
} }
let port = sockaddrs[0].port(); };
inner.preferred_local_addresses.insert(key, unspec_sa);
let mut attempts = 10;
let mut success = false;
while attempts >= 0 {
if self.bind_first_tcp_port(port) {
success = true;
break;
}
attempts -= 1;
// Wait 5 seconds before trying again
log_net!(debug
"Binding TCP port at {} failed, waiting. Attempts remaining = {}",
port, attempts
);
sleep(5000).await
}
if !success {
bail!("Could not find free tcp port to listen on");
}
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect()))
}
} }
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
pub(super) async fn start_udp_listeners( pub(super) async fn bind_udp_protocol_handlers(
&self, &self,
editor_public_internet: &mut RoutingDomainEditor, editor_public_internet: &mut RoutingDomainEditor,
editor_local_network: &mut RoutingDomainEditor, editor_local_network: &mut RoutingDomainEditor,
) -> EyreResult<()> { ) -> EyreResult<()> {
log_net!("starting udp listeners"); log_net!("UDP: binding protocol handlers");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let (listen_address, public_address, detect_address_changes) = { let (listen_address, public_address, detect_address_changes) = {
let c = self.config.get(); let c = self.config.get();
@ -291,44 +152,33 @@ impl Network {
) )
}; };
// Pick out UDP port we're going to use everywhere // Get the binding parameters from the user-specified listen address
// Keep sockets around until the end of this function let bind_set = self
// to keep anyone else from binding in front of us .convert_listen_address_to_bind_set(listen_address.clone())
let (udp_port, ip_addrs) = self.allocate_udp_port(listen_address.clone()).await?; .await?;
// Save the bound udp port for use later on
self.inner.lock().udp_port = udp_port;
// First, create outbound sockets
// (unlike tcp where we create sockets for every connection)
// and we'll add protocol handlers for them too
self.create_udp_outbound_sockets().await?;
// Now create udp inbound sockets for whatever interfaces we're listening on // Now create udp inbound sockets for whatever interfaces we're listening on
info!( if bind_set.search {
"UDP: starting listeners on port {} at {:?}", info!(
udp_port, ip_addrs "UDP: searching for free port starting with {} on {:?}",
); bind_set.port, bind_set.addrs
let local_dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?; );
} else {
info!(
"UDP: binding protocol handlers at port {} on {:?}",
bind_set.port, bind_set.addrs
);
}
let mut local_dial_info_list = self.create_udp_protocol_handlers(bind_set).await?;
local_dial_info_list.sort();
let mut static_public = false; let mut static_public = false;
log_net!("UDP: listener started on {:#?}", local_dial_info_list); log_net!(
"UDP: protocol handlers bound to {:#?}",
// Register local dial info local_dial_info_list
for di in &local_dial_info_list { );
// If the local interface address is global, then register global dial info
// if no other public address is specified
if !detect_address_changes
&& public_address.is_none()
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, di)
{
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// Register interface dial info as well since the address is on the local interface
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
}
// Add static public dialinfo if it's configured // Add static public dialinfo if it's configured
if let Some(public_address) = public_address.as_ref() { if let Some(public_address) = public_address.as_ref() {
@ -342,11 +192,8 @@ impl Network {
let pdi = DialInfo::udp_from_socketaddr(pdi_addr); let pdi = DialInfo::udp_from_socketaddr(pdi_addr);
// Register the public address // Register the public address
if !detect_address_changes { editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
editor_public_internet static_public = true;
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// See if this public address is also a local interface address we haven't registered yet // See if this public address is also a local interface address we haven't registered yet
let is_interface_address = (|| { let is_interface_address = (|| {
@ -367,11 +214,30 @@ impl Network {
} }
} }
if static_public { // Register local dial info
self.inner for di in &local_dial_info_list {
.lock() // If the local interface address is global, then register global dial info
.static_public_dialinfo // if no other public address is specified
.insert(ProtocolType::UDP); if !detect_address_changes
&& public_address.is_none()
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, di)
{
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// Register interface dial info as well since the address is on the local interface
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
}
{
let mut inner = self.inner.lock();
if static_public {
inner.static_public_dialinfo.insert(ProtocolType::UDP);
}
for ldi in local_dial_info_list {
Self::add_preferred_local_address(&mut inner, ldi.peer_address());
}
} }
// Now create tasks for udp listeners // Now create tasks for udp listeners
@ -383,7 +249,7 @@ impl Network {
editor_public_internet: &mut RoutingDomainEditor, editor_public_internet: &mut RoutingDomainEditor,
editor_local_network: &mut RoutingDomainEditor, editor_local_network: &mut RoutingDomainEditor,
) -> EyreResult<()> { ) -> EyreResult<()> {
log_net!("starting ws listeners"); log_net!("WS: binding protocol handlers");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let (listen_address, url, path, detect_address_changes) = { let (listen_address, url, path, detect_address_changes) = {
let c = self.config.get(); let c = self.config.get();
@ -395,27 +261,30 @@ impl Network {
) )
}; };
// Pick out TCP port we're going to use everywhere // Get the binding parameters from the user-specified listen address
// Keep sockets around until the end of this function let bind_set = self
// to keep anyone else from binding in front of us .convert_listen_address_to_bind_set(listen_address.clone())
let (ws_port, ip_addrs) = self.allocate_tcp_port(listen_address.clone()).await?; .await?;
// Save the bound ws port for use later on if bind_set.search {
self.inner.lock().ws_port = ws_port; info!(
"WS: searching for free port starting with {} on {:?}",
info!( bind_set.port, bind_set.addrs
"WS: starting listener on port {} at {:?}", );
ws_port, ip_addrs } else {
); info!(
"WS: binding protocol handlers at port {} on {:?}",
bind_set.port, bind_set.addrs
);
}
let socket_addresses = self let socket_addresses = self
.start_tcp_listener( .start_tcp_listener(
ip_addrs, bind_set,
ws_port,
false, false,
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))), Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
) )
.await?; .await?;
log_net!("WS: listener started on {:#?}", socket_addresses); log_net!("WS: protocol handlers started on {:#?}", socket_addresses);
let mut static_public = false; let mut static_public = false;
let mut registered_addresses: HashSet<IpAddr> = HashSet::new(); let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
@ -438,11 +307,8 @@ impl Network {
let pdi = DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone()) let pdi = DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone())
.wrap_err("try_ws failed")?; .wrap_err("try_ws failed")?;
if !detect_address_changes { editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
editor_public_internet static_public = true;
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// See if this public address is also a local interface address // See if this public address is also a local interface address
if !registered_addresses.contains(&gsa.ip()) if !registered_addresses.contains(&gsa.ip())
@ -455,14 +321,15 @@ impl Network {
} }
} }
for socket_address in socket_addresses { for socket_address in &socket_addresses {
// Skip addresses we already did // Skip addresses we already did
if registered_addresses.contains(&socket_address.ip_addr()) { if registered_addresses.contains(&socket_address.ip_addr()) {
continue; continue;
} }
// Build dial info request url // Build dial info request url
let local_url = format!("ws://{}/{}", socket_address, path); let local_url = format!("ws://{}/{}", socket_address, path);
let local_di = DialInfo::try_ws(socket_address, local_url).wrap_err("try_ws failed")?; let local_di =
DialInfo::try_ws(*socket_address, local_url).wrap_err("try_ws failed")?;
if !detect_address_changes if !detect_address_changes
&& url.is_none() && url.is_none()
@ -478,11 +345,12 @@ impl Network {
editor_local_network.register_dial_info(local_di, DialInfoClass::Direct)?; editor_local_network.register_dial_info(local_di, DialInfoClass::Direct)?;
} }
let mut inner = self.inner.lock();
if static_public { if static_public {
self.inner inner.static_public_dialinfo.insert(ProtocolType::WS);
.lock() }
.static_public_dialinfo for sa in socket_addresses {
.insert(ProtocolType::WS); Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::WS));
} }
Ok(()) Ok(())
@ -493,9 +361,9 @@ impl Network {
editor_public_internet: &mut RoutingDomainEditor, editor_public_internet: &mut RoutingDomainEditor,
editor_local_network: &mut RoutingDomainEditor, editor_local_network: &mut RoutingDomainEditor,
) -> EyreResult<()> { ) -> EyreResult<()> {
log_net!("starting wss listeners"); log_net!("WSS: binding protocol handlers");
let (listen_address, url, detect_address_changes) = { let (listen_address, url, _detect_address_changes) = {
let c = self.config.get(); let c = self.config.get();
( (
c.network.protocol.wss.listen_address.clone(), c.network.protocol.wss.listen_address.clone(),
@ -504,27 +372,31 @@ impl Network {
) )
}; };
// Pick out TCP port we're going to use everywhere // Get the binding parameters from the user-specified listen address
// Keep sockets around until the end of this function let bind_set = self
// to keep anyone else from binding in front of us .convert_listen_address_to_bind_set(listen_address.clone())
let (wss_port, ip_addrs) = self.allocate_tcp_port(listen_address.clone()).await?; .await?;
// Save the bound wss port for use later on if bind_set.search {
self.inner.lock().wss_port = wss_port; info!(
"WSS: searching for free port starting with {} on {:?}",
bind_set.port, bind_set.addrs
);
} else {
info!(
"WSS: binding protocol handlers at port {} on {:?}",
bind_set.port, bind_set.addrs
);
}
info!(
"WSS: starting listener on port {} at {:?}",
wss_port, ip_addrs
);
let socket_addresses = self let socket_addresses = self
.start_tcp_listener( .start_tcp_listener(
ip_addrs, bind_set,
wss_port,
true, true,
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))), Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
) )
.await?; .await?;
log_net!("WSS: listener started on {:#?}", socket_addresses); log_net!("WSS: protocol handlers started on {:#?}", socket_addresses);
// NOTE: No interface dial info for WSS, as there is no way to connect to a local dialinfo via TLS // NOTE: No interface dial info for WSS, as there is no way to connect to a local dialinfo via TLS
// If the hostname is specified, it is the public dialinfo via the URL. If no hostname // If the hostname is specified, it is the public dialinfo via the URL. If no hostname
@ -552,11 +424,8 @@ impl Network {
let pdi = DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone()) let pdi = DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone())
.wrap_err("try_wss failed")?; .wrap_err("try_wss failed")?;
if !detect_address_changes { editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
editor_public_internet static_public = true;
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// See if this public address is also a local interface address // See if this public address is also a local interface address
if !registered_addresses.contains(&gsa.ip()) if !registered_addresses.contains(&gsa.ip())
@ -571,11 +440,12 @@ impl Network {
bail!("WSS URL must be specified due to TLS requirements"); bail!("WSS URL must be specified due to TLS requirements");
} }
let mut inner = self.inner.lock();
if static_public { if static_public {
self.inner inner.static_public_dialinfo.insert(ProtocolType::WSS);
.lock() }
.static_public_dialinfo for sa in socket_addresses {
.insert(ProtocolType::WSS); Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::WSS));
} }
Ok(()) Ok(())
@ -586,7 +456,7 @@ impl Network {
editor_public_internet: &mut RoutingDomainEditor, editor_public_internet: &mut RoutingDomainEditor,
editor_local_network: &mut RoutingDomainEditor, editor_local_network: &mut RoutingDomainEditor,
) -> EyreResult<()> { ) -> EyreResult<()> {
log_net!("starting tcp listeners"); log_net!("TCP: binding protocol handlers");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let (listen_address, public_address, detect_address_changes) = { let (listen_address, public_address, detect_address_changes) = {
@ -598,47 +468,34 @@ impl Network {
) )
}; };
// Pick out TCP port we're going to use everywhere // Get the binding parameters from the user-specified listen address
// Keep sockets around until the end of this function let bind_set = self
// to keep anyone else from binding in front of us .convert_listen_address_to_bind_set(listen_address.clone())
let (tcp_port, ip_addrs) = self.allocate_tcp_port(listen_address.clone()).await?; .await?;
// Save the bound tcp port for use later on if bind_set.search {
self.inner.lock().tcp_port = tcp_port; info!(
"TCP: searching for free port starting with {} on {:?}",
info!( bind_set.port, bind_set.addrs
"TCP: starting listener on port {} at {:?}", );
tcp_port, ip_addrs } else {
); info!(
"TCP: binding protocol handlers at port {} on {:?}",
bind_set.port, bind_set.addrs
);
}
let socket_addresses = self let socket_addresses = self
.start_tcp_listener( .start_tcp_listener(
ip_addrs, bind_set,
tcp_port,
false, false,
Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))), Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))),
) )
.await?; .await?;
log_net!("TCP: listener started on {:#?}", socket_addresses); log_net!("TCP: protocol handlers started on {:#?}", socket_addresses);
let mut static_public = false; let mut static_public = false;
let mut registered_addresses: HashSet<IpAddr> = HashSet::new(); let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
for socket_address in socket_addresses {
let di = DialInfo::tcp(socket_address);
// Register global dial info if no public address is specified
if !detect_address_changes
&& public_address.is_none()
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di)
{
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// Register interface dial info
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
registered_addresses.insert(socket_address.ip_addr());
}
// Add static public dialinfo if it's configured // Add static public dialinfo if it's configured
if let Some(public_address) = public_address.as_ref() { if let Some(public_address) = public_address.as_ref() {
// Resolve statically configured public dialinfo // Resolve statically configured public dialinfo
@ -654,11 +511,8 @@ impl Network {
} }
let pdi = DialInfo::tcp_from_socketaddr(pdi_addr); let pdi = DialInfo::tcp_from_socketaddr(pdi_addr);
if !detect_address_changes { editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
editor_public_internet static_public = true;
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// See if this public address is also a local interface address // See if this public address is also a local interface address
if self.is_stable_interface_address(pdi_addr.ip()) { if self.is_stable_interface_address(pdi_addr.ip()) {
@ -667,11 +521,29 @@ impl Network {
} }
} }
for socket_address in &socket_addresses {
let di = DialInfo::tcp(*socket_address);
// Register global dial info if no public address is specified
if !detect_address_changes
&& public_address.is_none()
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di)
{
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// Register interface dial info
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
registered_addresses.insert(socket_address.ip_addr());
}
let mut inner = self.inner.lock();
if static_public { if static_public {
self.inner inner.static_public_dialinfo.insert(ProtocolType::TCP);
.lock() }
.static_public_dialinfo for sa in socket_addresses {
.insert(ProtocolType::TCP); Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::TCP));
} }
Ok(()) Ok(())

View File

@ -54,6 +54,21 @@ impl NetworkManager {
return; return;
} }
// Ignore flows that do not start from our listening port (unbound connections etc),
// because a router is going to map these differently
let Some(pla) =
net.get_preferred_local_address_by_key(flow.protocol_type(), flow.address_type())
else {
return;
};
let Some(local) = flow.local() else {
return;
};
if local.port() != pla.port() {
log_network_result!(debug "ignoring public internet address report because local port did not match listener: {} != {}", local.port(), pla.port());
return;
}
// If we are a webapp we should skip this completely // If we are a webapp we should skip this completely
// because we will never get inbound dialinfo directly on our public ip address // because we will never get inbound dialinfo directly on our public ip address
// If we have an invalid network class, this is not necessary yet // If we have an invalid network class, this is not necessary yet

View File

@ -458,7 +458,7 @@ packages:
path: ".." path: ".."
relative: true relative: true
source: path source: path
version: "0.3.0" version: "0.3.1"
vm_service: vm_service:
dependency: transitive dependency: transitive
description: description:

View File

@ -274,12 +274,17 @@ pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result<Vec<Socket
} else if let Ok(port) = listen_address.parse::<u16>() { } else if let Ok(port) = listen_address.parse::<u16>() {
ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect() ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
} else { } else {
let listen_address_with_port = if listen_address.contains(':') {
listen_address.to_string()
} else {
format!("{}:0", listen_address)
};
cfg_if! { cfg_if! {
if #[cfg(target_arch = "wasm32")] { if #[cfg(target_arch = "wasm32")] {
use core::str::FromStr; use core::str::FromStr;
vec![SocketAddr::from_str(listen_address).map_err(|e| format!("Unable to parse address: {}",e))?] vec![SocketAddr::from_str(listen_address_with_port).map_err(|e| format!("Unable to parse address: {}",e))?]
} else { } else {
listen_address listen_address_with_port
.to_socket_addrs() .to_socket_addrs()
.map_err(|e| format!("Unable to resolve address: {}", e))? .map_err(|e| format!("Unable to resolve address: {}", e))?
.collect() .collect()