mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-01-15 09:17:24 -05:00
more refactor and clean up low level networking
This commit is contained in:
parent
03e872c128
commit
4906c5df78
@ -19,6 +19,7 @@ pub enum DetectedDialInfo {
|
||||
pub struct DetectionResult {
|
||||
pub ddi: DetectedDialInfo,
|
||||
pub external_address_types: AddressTypeSet,
|
||||
pub local_port: u16,
|
||||
}
|
||||
|
||||
// Result of checking external address
|
||||
@ -46,6 +47,7 @@ struct DiscoveryContextUnlockedInner {
|
||||
existing_external_address: Option<SocketAddress>,
|
||||
protocol_type: ProtocolType,
|
||||
address_type: AddressType,
|
||||
port: u16,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -62,6 +64,7 @@ impl DiscoveryContext {
|
||||
net: Network,
|
||||
protocol_type: ProtocolType,
|
||||
address_type: AddressType,
|
||||
port: u16,
|
||||
clear_network_callback: ClearNetworkCallback,
|
||||
) -> Self {
|
||||
let intf_addrs =
|
||||
@ -95,6 +98,7 @@ impl DiscoveryContext {
|
||||
existing_external_address,
|
||||
protocol_type,
|
||||
address_type,
|
||||
port,
|
||||
}),
|
||||
inner: Arc::new(Mutex::new(DiscoveryContextInner {
|
||||
external_1: None,
|
||||
@ -327,11 +331,7 @@ impl DiscoveryContext {
|
||||
let protocol_type = self.unlocked_inner.protocol_type;
|
||||
let low_level_protocol_type = protocol_type.low_level_protocol_type();
|
||||
let address_type = self.unlocked_inner.address_type;
|
||||
let local_port = self
|
||||
.unlocked_inner
|
||||
.net
|
||||
.get_local_port(protocol_type)
|
||||
.unwrap();
|
||||
let local_port = self.unlocked_inner.port;
|
||||
let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone();
|
||||
|
||||
let igd_manager = self.unlocked_inner.net.unlocked_inner.igd_manager.clone();
|
||||
@ -372,7 +372,7 @@ impl DiscoveryContext {
|
||||
return Some(external_mapped_dial_info);
|
||||
}
|
||||
|
||||
if validate_tries == PORT_MAP_VALIDATE_TRY_COUNT {
|
||||
if validate_tries != PORT_MAP_VALIDATE_TRY_COUNT {
|
||||
log_net!(debug "UPNP port mapping succeeded but port {}/{} is still unreachable.\nretrying\n",
|
||||
local_port, match low_level_protocol_type {
|
||||
LowLevelProtocolType::UDP => "udp",
|
||||
@ -431,6 +431,7 @@ impl DiscoveryContext {
|
||||
class: DialInfoClass::Direct,
|
||||
}),
|
||||
external_address_types: AddressTypeSet::only(external_1.address.address_type()),
|
||||
local_port: this.unlocked_inner.port,
|
||||
})
|
||||
} else {
|
||||
// Add public dial info with Blocked dialinfo class
|
||||
@ -440,6 +441,7 @@ impl DiscoveryContext {
|
||||
class: DialInfoClass::Blocked,
|
||||
}),
|
||||
external_address_types: AddressTypeSet::only(external_1.address.address_type()),
|
||||
local_port: this.unlocked_inner.port,
|
||||
})
|
||||
}
|
||||
});
|
||||
@ -463,6 +465,7 @@ impl DiscoveryContext {
|
||||
|
||||
// If we have two different external addresses, then this is a symmetric NAT
|
||||
if external_2.address.address() != external_1.address.address() {
|
||||
let this = self.clone();
|
||||
let do_symmetric_nat_fut: SendPinBoxFuture<Option<DetectionResult>> =
|
||||
Box::pin(async move {
|
||||
Some(DetectionResult {
|
||||
@ -472,6 +475,7 @@ impl DiscoveryContext {
|
||||
) | AddressTypeSet::only(
|
||||
external_2.address.address_type(),
|
||||
),
|
||||
local_port: this.unlocked_inner.port,
|
||||
})
|
||||
});
|
||||
unord.push(do_symmetric_nat_fut);
|
||||
@ -481,45 +485,41 @@ impl DiscoveryContext {
|
||||
// Manual Mapping Detection
|
||||
///////////
|
||||
let this = self.clone();
|
||||
if let Some(local_port) = self
|
||||
.unlocked_inner
|
||||
.net
|
||||
.get_local_port(self.unlocked_inner.protocol_type)
|
||||
{
|
||||
if external_1.dial_info.port() != local_port {
|
||||
let c_external_1 = external_1.clone();
|
||||
let do_manual_map_fut: SendPinBoxFuture<Option<DetectionResult>> =
|
||||
Box::pin(async move {
|
||||
// Do a validate_dial_info on the external address, but with the same port as the local port of local interface, from a redirected node
|
||||
// This test is to see if a node had manual port forwarding done with the same port number as the local listener
|
||||
let mut external_1_dial_info_with_local_port =
|
||||
c_external_1.dial_info.clone();
|
||||
external_1_dial_info_with_local_port.set_port(local_port);
|
||||
let local_port = self.unlocked_inner.port;
|
||||
if external_1.dial_info.port() != local_port {
|
||||
let c_external_1 = external_1.clone();
|
||||
let c_this = this.clone();
|
||||
let do_manual_map_fut: SendPinBoxFuture<Option<DetectionResult>> =
|
||||
Box::pin(async move {
|
||||
// Do a validate_dial_info on the external address, but with the same port as the local port of local interface, from a redirected node
|
||||
// This test is to see if a node had manual port forwarding done with the same port number as the local listener
|
||||
let mut external_1_dial_info_with_local_port = c_external_1.dial_info.clone();
|
||||
external_1_dial_info_with_local_port.set_port(local_port);
|
||||
|
||||
if this
|
||||
.validate_dial_info(
|
||||
c_external_1.node.clone(),
|
||||
external_1_dial_info_with_local_port.clone(),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
{
|
||||
// Add public dial info with Direct dialinfo class
|
||||
return Some(DetectionResult {
|
||||
ddi: DetectedDialInfo::Detected(DialInfoDetail {
|
||||
dial_info: external_1_dial_info_with_local_port,
|
||||
class: DialInfoClass::Direct,
|
||||
}),
|
||||
external_address_types: AddressTypeSet::only(
|
||||
c_external_1.address.address_type(),
|
||||
),
|
||||
});
|
||||
}
|
||||
if this
|
||||
.validate_dial_info(
|
||||
c_external_1.node.clone(),
|
||||
external_1_dial_info_with_local_port.clone(),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
{
|
||||
// Add public dial info with Direct dialinfo class
|
||||
return Some(DetectionResult {
|
||||
ddi: DetectedDialInfo::Detected(DialInfoDetail {
|
||||
dial_info: external_1_dial_info_with_local_port,
|
||||
class: DialInfoClass::Direct,
|
||||
}),
|
||||
external_address_types: AddressTypeSet::only(
|
||||
c_external_1.address.address_type(),
|
||||
),
|
||||
local_port: c_this.unlocked_inner.port,
|
||||
});
|
||||
}
|
||||
|
||||
None
|
||||
});
|
||||
unord.push(do_manual_map_fut);
|
||||
}
|
||||
None
|
||||
});
|
||||
unord.push(do_manual_map_fut);
|
||||
}
|
||||
|
||||
// NAT Detection
|
||||
@ -563,6 +563,7 @@ impl DiscoveryContext {
|
||||
external_address_types: AddressTypeSet::only(
|
||||
c_external_1.address.address_type(),
|
||||
),
|
||||
local_port: c_this.unlocked_inner.port,
|
||||
});
|
||||
}
|
||||
None
|
||||
@ -597,6 +598,7 @@ impl DiscoveryContext {
|
||||
external_address_types: AddressTypeSet::only(
|
||||
c_external_1.address.address_type(),
|
||||
),
|
||||
local_port: c_this.unlocked_inner.port,
|
||||
});
|
||||
}
|
||||
// Didn't get a reply from a non-default port, which means we are also port restricted
|
||||
@ -608,6 +610,7 @@ impl DiscoveryContext {
|
||||
external_address_types: AddressTypeSet::only(
|
||||
c_external_1.address.address_type(),
|
||||
),
|
||||
local_port: c_this.unlocked_inner.port,
|
||||
})
|
||||
});
|
||||
ord.push_back(do_restricted_cone_fut);
|
||||
@ -699,6 +702,7 @@ impl DiscoveryContext {
|
||||
external_address_types: AddressTypeSet::only(
|
||||
external_mapped_dial_info.address_type(),
|
||||
),
|
||||
local_port: this.unlocked_inner.port,
|
||||
});
|
||||
}
|
||||
None
|
||||
|
@ -85,14 +85,6 @@ struct NetworkInner {
|
||||
join_handles: Vec<MustJoinHandle<()>>,
|
||||
/// stop source for shutting down the low level network background tasks
|
||||
stop_source: Option<StopSource>,
|
||||
/// port we are binding raw udp listen to
|
||||
udp_port: u16,
|
||||
/// port we are binding raw tcp listen to
|
||||
tcp_port: u16,
|
||||
/// port we are binding websocket listen to
|
||||
ws_port: u16,
|
||||
/// port we are binding secure websocket listen to
|
||||
wss_port: u16,
|
||||
/// does our network have ipv4 on any network?
|
||||
enable_ipv4: bool,
|
||||
/// does our network have ipv6 on the global internet?
|
||||
@ -115,6 +107,8 @@ struct NetworkInner {
|
||||
tls_acceptor: Option<TlsAcceptor>,
|
||||
/// Multiplexer record for protocols on low level TCP sockets
|
||||
listener_states: BTreeMap<SocketAddr, Arc<RwLock<ListenerState>>>,
|
||||
/// Preferred local addresses for protocols/address combinations for outgoing connections
|
||||
preferred_local_addresses: BTreeMap<(ProtocolType, AddressType), SocketAddr>,
|
||||
}
|
||||
|
||||
struct NetworkUnlockedInner {
|
||||
@ -152,10 +146,6 @@ impl Network {
|
||||
static_public_dialinfo: ProtocolTypeSet::empty(),
|
||||
join_handles: Vec::new(),
|
||||
stop_source: None,
|
||||
udp_port: 0u16,
|
||||
tcp_port: 0u16,
|
||||
ws_port: 0u16,
|
||||
wss_port: 0u16,
|
||||
enable_ipv4: false,
|
||||
enable_ipv6_global: false,
|
||||
enable_ipv6_local: false,
|
||||
@ -164,6 +154,7 @@ impl Network {
|
||||
default_udpv6_protocol_handler: None,
|
||||
tls_acceptor: None,
|
||||
listener_states: BTreeMap::new(),
|
||||
preferred_local_addresses: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -327,18 +318,18 @@ impl Network {
|
||||
|
||||
pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option<SocketAddr> {
|
||||
let inner = self.inner.lock();
|
||||
let key = (dial_info.protocol_type(), dial_info.address_type());
|
||||
inner.preferred_local_addresses.get(&key).copied()
|
||||
}
|
||||
|
||||
let local_port = match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => inner.udp_port,
|
||||
ProtocolType::TCP => inner.tcp_port,
|
||||
ProtocolType::WS => inner.ws_port,
|
||||
ProtocolType::WSS => inner.wss_port,
|
||||
};
|
||||
|
||||
Some(match dial_info.address_type() {
|
||||
AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), local_port),
|
||||
AddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), local_port),
|
||||
})
|
||||
pub fn get_preferred_local_address_by_key(
|
||||
&self,
|
||||
pt: ProtocolType,
|
||||
at: AddressType,
|
||||
) -> Option<SocketAddr> {
|
||||
let inner = self.inner.lock();
|
||||
let key = (pt, at);
|
||||
inner.preferred_local_addresses.get(&key).copied()
|
||||
}
|
||||
|
||||
pub fn is_stable_interface_address(&self, addr: IpAddr) -> bool {
|
||||
|
@ -103,21 +103,35 @@ impl Network {
|
||||
_t: u64,
|
||||
) -> EyreResult<()> {
|
||||
// Figure out if we can optimize TCP/WS checking since they are often on the same port
|
||||
let (protocol_config, tcp_same_port) = {
|
||||
let (protocol_config, inbound_protocol_map) = {
|
||||
let mut inner = self.inner.lock();
|
||||
let protocol_config = inner.protocol_config.clone();
|
||||
let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP)
|
||||
&& protocol_config.inbound.contains(ProtocolType::WS)
|
||||
{
|
||||
inner.tcp_port == inner.ws_port
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
// Allow network to be cleared if external addresses change
|
||||
inner.network_already_cleared = false;
|
||||
|
||||
//
|
||||
(protocol_config, tcp_same_port)
|
||||
let mut inbound_protocol_map = HashMap::<(AddressType, u16), Vec<ProtocolType>>::new();
|
||||
for at in protocol_config.family_global {
|
||||
for pt in protocol_config.inbound {
|
||||
let key = (pt, at);
|
||||
|
||||
// Skip things with static public dialinfo
|
||||
// as they don't need to participate in discovery
|
||||
if inner.static_public_dialinfo.contains(pt) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(pla) = inner.preferred_local_addresses.get(&key) {
|
||||
let itmkey = (at, pla.port());
|
||||
inbound_protocol_map
|
||||
.entry(itmkey)
|
||||
.and_modify(|x| x.push(pt))
|
||||
.or_insert_with(|| vec![pt]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(protocol_config, inbound_protocol_map)
|
||||
};
|
||||
|
||||
// Save off existing public dial info for change detection later
|
||||
@ -166,100 +180,19 @@ impl Network {
|
||||
|
||||
// Process all protocol and address combinations
|
||||
let mut unord = FuturesUnordered::new();
|
||||
// Do UDPv4+v6 at the same time as everything else
|
||||
if protocol_config.inbound.contains(ProtocolType::UDP) {
|
||||
// UDPv4
|
||||
if protocol_config.family_global.contains(AddressType::IPV4) {
|
||||
let udpv4_context = DiscoveryContext::new(
|
||||
self.routing_table(),
|
||||
self.clone(),
|
||||
ProtocolType::UDP,
|
||||
AddressType::IPV4,
|
||||
clear_network_callback.clone(),
|
||||
);
|
||||
udpv4_context
|
||||
.discover(&mut unord)
|
||||
.instrument(trace_span!("udpv4_context.discover"))
|
||||
.await;
|
||||
}
|
||||
|
||||
// UDPv6
|
||||
if protocol_config.family_global.contains(AddressType::IPV6) {
|
||||
let udpv6_context = DiscoveryContext::new(
|
||||
self.routing_table(),
|
||||
self.clone(),
|
||||
ProtocolType::UDP,
|
||||
AddressType::IPV6,
|
||||
clear_network_callback.clone(),
|
||||
);
|
||||
udpv6_context
|
||||
.discover(&mut unord)
|
||||
.instrument(trace_span!("udpv6_context.discover"))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
for ((at, port), protocols) in &inbound_protocol_map {
|
||||
let first_pt = protocols.first().unwrap();
|
||||
|
||||
// Do TCPv4. Possibly do WSv4 if it is on a different port
|
||||
if protocol_config.family_global.contains(AddressType::IPV4) {
|
||||
if protocol_config.inbound.contains(ProtocolType::TCP) {
|
||||
let tcpv4_context = DiscoveryContext::new(
|
||||
self.routing_table(),
|
||||
self.clone(),
|
||||
ProtocolType::TCP,
|
||||
AddressType::IPV4,
|
||||
clear_network_callback.clone(),
|
||||
);
|
||||
tcpv4_context
|
||||
.discover(&mut unord)
|
||||
.instrument(trace_span!("tcpv4_context.discover"))
|
||||
.await;
|
||||
}
|
||||
|
||||
if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port {
|
||||
let wsv4_context = DiscoveryContext::new(
|
||||
self.routing_table(),
|
||||
self.clone(),
|
||||
ProtocolType::WS,
|
||||
AddressType::IPV4,
|
||||
clear_network_callback.clone(),
|
||||
);
|
||||
wsv4_context
|
||||
.discover(&mut unord)
|
||||
.instrument(trace_span!("wsv4_context.discover"))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Do TCPv6. Possibly do WSv6 if it is on a different port
|
||||
if protocol_config.family_global.contains(AddressType::IPV6) {
|
||||
if protocol_config.inbound.contains(ProtocolType::TCP) {
|
||||
let tcpv6_context = DiscoveryContext::new(
|
||||
self.routing_table(),
|
||||
self.clone(),
|
||||
ProtocolType::TCP,
|
||||
AddressType::IPV6,
|
||||
clear_network_callback.clone(),
|
||||
);
|
||||
tcpv6_context
|
||||
.discover(&mut unord)
|
||||
.instrument(trace_span!("tcpv6_context.discover"))
|
||||
.await;
|
||||
}
|
||||
|
||||
// WSv6
|
||||
if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port {
|
||||
let wsv6_context = DiscoveryContext::new(
|
||||
self.routing_table(),
|
||||
self.clone(),
|
||||
ProtocolType::WS,
|
||||
AddressType::IPV6,
|
||||
clear_network_callback.clone(),
|
||||
);
|
||||
wsv6_context
|
||||
.discover(&mut unord)
|
||||
.instrument(trace_span!("wsv6_context.discover"))
|
||||
.await;
|
||||
}
|
||||
let discovery_context = DiscoveryContext::new(
|
||||
self.routing_table(),
|
||||
self.clone(),
|
||||
*first_pt,
|
||||
*at,
|
||||
*port,
|
||||
clear_network_callback.clone(),
|
||||
);
|
||||
discovery_context.discover(&mut unord).await;
|
||||
}
|
||||
|
||||
// Wait for all discovery futures to complete and apply discoverycontexts
|
||||
@ -273,19 +206,20 @@ impl Network {
|
||||
// Add the external address kinds to the set we've seen
|
||||
all_address_types |= dr.external_address_types;
|
||||
|
||||
// Add WS dialinfo as well if it is on the same port as TCP
|
||||
// Add additional dialinfo for protocols on the same port
|
||||
if let DetectedDialInfo::Detected(did) = &dr.ddi {
|
||||
if did.dial_info.protocol_type() == ProtocolType::TCP && tcp_same_port {
|
||||
// Make WS dialinfo as well with same socket address as TCP
|
||||
let ws_ddi = DetectedDialInfo::Detected(DialInfoDetail {
|
||||
dial_info: self.make_dial_info(
|
||||
did.dial_info.socket_address(),
|
||||
ProtocolType::WS,
|
||||
),
|
||||
let ipmkey = (did.dial_info.address_type(), dr.local_port);
|
||||
for additional_pt in
|
||||
inbound_protocol_map.get(&ipmkey).unwrap().iter().skip(1)
|
||||
{
|
||||
// Make dialinfo for additional protocol type
|
||||
let additional_ddi = DetectedDialInfo::Detected(DialInfoDetail {
|
||||
dial_info: self
|
||||
.make_dial_info(did.dial_info.socket_address(), *additional_pt),
|
||||
class: did.class,
|
||||
});
|
||||
// Add additional WS dialinfo
|
||||
self.update_with_detected_dial_info(ws_ddi).await?;
|
||||
// Add additional dialinfo
|
||||
self.update_with_detected_dial_info(additional_ddi).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -368,7 +302,14 @@ impl Network {
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
ProtocolType::WSS => panic!("none of the discovery functions are used for wss"),
|
||||
ProtocolType::WSS => {
|
||||
let c = self.config.get();
|
||||
DialInfo::try_wss(
|
||||
addr,
|
||||
format!("wss://{}/{}", addr, c.network.protocol.wss.path),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -247,12 +247,23 @@ impl Network {
|
||||
)
|
||||
};
|
||||
|
||||
// Create a reusable socket with no linger time, and no delay
|
||||
let Some(socket) = new_bound_shared_tcp_socket(addr)
|
||||
.wrap_err("failed to create bound shared tcp socket")?
|
||||
// Create a socket and bind it
|
||||
let Some(socket) = new_bound_default_tcp_socket(addr)
|
||||
.wrap_err("failed to create default socket listener")?
|
||||
else {
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
// Drop the socket
|
||||
drop(socket);
|
||||
|
||||
// Create a shared socket and bind it once we have determined the port is free
|
||||
let Some(socket) = new_bound_shared_tcp_socket(addr)
|
||||
.wrap_err("failed to create shared socket listener")?
|
||||
else {
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
// Listen on the socket
|
||||
if socket.listen(128).is_err() {
|
||||
return Ok(false);
|
||||
|
@ -208,7 +208,6 @@ impl Network {
|
||||
}
|
||||
|
||||
// otherwise find the first outbound udp protocol handler that matches the ip protocol version of the peer addr
|
||||
let inner = self.inner.lock();
|
||||
match peer_socket_addr {
|
||||
SocketAddr::V4(_) => inner.udp_protocol_handlers.iter().find_map(|x| {
|
||||
if x.0.is_ipv4() {
|
||||
|
@ -77,7 +77,7 @@ pub fn new_bound_default_udp_socket(local_address: SocketAddr) -> io::Result<Opt
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", ret)]
|
||||
pub fn new_unbound_tcp_socket(domain: Domain) -> io::Result<Socket> {
|
||||
pub fn new_default_tcp_socket(domain: Domain) -> io::Result<Socket> {
|
||||
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
|
||||
if let Err(e) = socket.set_nodelay(true) {
|
||||
log_net!(error "Couldn't set TCP nodelay: {}", e);
|
||||
@ -89,7 +89,7 @@ pub fn new_unbound_tcp_socket(domain: Domain) -> io::Result<Socket> {
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", ret)]
|
||||
pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
|
||||
pub fn new_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
|
||||
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
|
||||
// if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
|
||||
// log_net!(error "Couldn't set TCP linger: {}", e);
|
||||
@ -109,11 +109,24 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
|
||||
|
||||
Ok(socket)
|
||||
}
|
||||
#[instrument(level = "trace", ret)]
|
||||
pub fn new_bound_default_tcp_socket(local_address: SocketAddr) -> io::Result<Option<Socket>> {
|
||||
let domain = Domain::for_address(local_address);
|
||||
let socket = new_default_tcp_socket(domain)?;
|
||||
let socket2_addr = SockAddr::from(local_address);
|
||||
if socket.bind(&socket2_addr).is_err() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
log_net!("created bound default tcp socket on {:?}", &local_address);
|
||||
|
||||
Ok(Some(socket))
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", ret)]
|
||||
pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Option<Socket>> {
|
||||
let domain = Domain::for_address(local_address);
|
||||
let socket = new_unbound_shared_tcp_socket(domain)?;
|
||||
let socket = new_shared_tcp_socket(domain)?;
|
||||
let socket2_addr = SockAddr::from(local_address);
|
||||
if socket.bind(&socket2_addr).is_err() {
|
||||
return Ok(None);
|
||||
|
@ -172,7 +172,7 @@ impl RawTcpProtocolHandler {
|
||||
Some(a) => {
|
||||
new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))?
|
||||
}
|
||||
None => new_unbound_tcp_socket(socket2::Domain::for_address(socket_addr))?,
|
||||
None => new_default_tcp_socket(socket2::Domain::for_address(socket_addr))?,
|
||||
};
|
||||
|
||||
// Non-blocking connect to remote address
|
||||
@ -187,16 +187,16 @@ impl RawTcpProtocolHandler {
|
||||
let ps = AsyncPeekStream::new(ts);
|
||||
|
||||
// Wrap the stream in a network connection and return it
|
||||
let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(
|
||||
Flow::new(
|
||||
PeerAddress::new(
|
||||
SocketAddress::from_socket_addr(socket_addr),
|
||||
ProtocolType::TCP,
|
||||
),
|
||||
SocketAddress::from_socket_addr(actual_local_address),
|
||||
let flow = Flow::new(
|
||||
PeerAddress::new(
|
||||
SocketAddress::from_socket_addr(socket_addr),
|
||||
ProtocolType::TCP,
|
||||
),
|
||||
ps,
|
||||
));
|
||||
SocketAddress::from_socket_addr(actual_local_address),
|
||||
);
|
||||
log_net!("rawtcp::connect: {:?}", flow);
|
||||
|
||||
let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(flow, ps));
|
||||
|
||||
Ok(NetworkResult::Value(conn))
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ impl RawUdpProtocolHandler {
|
||||
SocketAddress::from_socket_addr(local_socket_addr),
|
||||
);
|
||||
|
||||
eprintln!("udp::send_message: {:?}", flow);
|
||||
log_net!("udp::send_message: {:?}", flow);
|
||||
|
||||
#[cfg(feature = "verbose-tracing")]
|
||||
tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str());
|
||||
|
@ -322,7 +322,7 @@ impl WebsocketProtocolHandler {
|
||||
Some(a) => {
|
||||
new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))?
|
||||
}
|
||||
None => new_unbound_tcp_socket(socket2::Domain::for_address(remote_socket_addr))?,
|
||||
None => new_default_tcp_socket(socket2::Domain::for_address(remote_socket_addr))?,
|
||||
};
|
||||
|
||||
// Non-blocking connect to remote address
|
||||
@ -342,6 +342,7 @@ impl WebsocketProtocolHandler {
|
||||
dial_info.peer_address(),
|
||||
SocketAddress::from_socket_addr(actual_local_addr),
|
||||
);
|
||||
log_net!("{}::connect: {:?}", scheme, flow);
|
||||
|
||||
// Negotiate TLS if this is WSS
|
||||
if tls {
|
||||
|
@ -121,6 +121,19 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
// Add local dial info to preferred local address table
|
||||
fn add_preferred_local_address(inner: &mut NetworkInner, pa: PeerAddress) {
|
||||
let key = (pa.protocol_type(), pa.address_type());
|
||||
let sa = pa.socket_addr();
|
||||
let unspec_sa = match sa {
|
||||
SocketAddr::V4(a) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, a.port())),
|
||||
SocketAddr::V6(a) => {
|
||||
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, a.port(), 0, 0))
|
||||
}
|
||||
};
|
||||
inner.preferred_local_addresses.insert(key, unspec_sa);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////
|
||||
|
||||
pub(super) async fn bind_udp_protocol_handlers(
|
||||
@ -156,7 +169,10 @@ impl Network {
|
||||
bind_set.port, bind_set.addrs
|
||||
);
|
||||
}
|
||||
let local_dial_info_list = self.create_udp_protocol_handlers(bind_set).await?;
|
||||
|
||||
let mut local_dial_info_list = self.create_udp_protocol_handlers(bind_set).await?;
|
||||
local_dial_info_list.sort();
|
||||
|
||||
let mut static_public = false;
|
||||
|
||||
log_net!(
|
||||
@ -164,22 +180,6 @@ impl Network {
|
||||
local_dial_info_list
|
||||
);
|
||||
|
||||
// Register local dial info
|
||||
for di in &local_dial_info_list {
|
||||
// If the local interface address is global, then register global dial info
|
||||
// if no other public address is specified
|
||||
if !detect_address_changes
|
||||
&& public_address.is_none()
|
||||
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, di)
|
||||
{
|
||||
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
}
|
||||
|
||||
// Register interface dial info as well since the address is on the local interface
|
||||
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
|
||||
}
|
||||
|
||||
// Add static public dialinfo if it's configured
|
||||
if let Some(public_address) = public_address.as_ref() {
|
||||
// Resolve statically configured public dialinfo
|
||||
@ -192,11 +192,8 @@ impl Network {
|
||||
let pdi = DialInfo::udp_from_socketaddr(pdi_addr);
|
||||
|
||||
// Register the public address
|
||||
if !detect_address_changes {
|
||||
editor_public_internet
|
||||
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
}
|
||||
editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
|
||||
// See if this public address is also a local interface address we haven't registered yet
|
||||
let is_interface_address = (|| {
|
||||
@ -217,14 +214,31 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
if static_public {
|
||||
self.inner
|
||||
.lock()
|
||||
.static_public_dialinfo
|
||||
.insert(ProtocolType::UDP);
|
||||
// Register local dial info
|
||||
for di in &local_dial_info_list {
|
||||
// If the local interface address is global, then register global dial info
|
||||
// if no other public address is specified
|
||||
if !detect_address_changes
|
||||
&& public_address.is_none()
|
||||
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, di)
|
||||
{
|
||||
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
}
|
||||
|
||||
// Register interface dial info as well since the address is on the local interface
|
||||
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
|
||||
}
|
||||
|
||||
// xxx compile all dialinfo from editor into map of protocoltype+addresstype -> port for 'best port selection' code
|
||||
{
|
||||
let mut inner = self.inner.lock();
|
||||
if static_public {
|
||||
inner.static_public_dialinfo.insert(ProtocolType::UDP);
|
||||
}
|
||||
for ldi in local_dial_info_list {
|
||||
Self::add_preferred_local_address(&mut inner, ldi.peer_address());
|
||||
}
|
||||
}
|
||||
|
||||
// Now create tasks for udp listeners
|
||||
self.create_udp_listener_tasks().await
|
||||
@ -293,11 +307,8 @@ impl Network {
|
||||
let pdi = DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone())
|
||||
.wrap_err("try_ws failed")?;
|
||||
|
||||
if !detect_address_changes {
|
||||
editor_public_internet
|
||||
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
}
|
||||
editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
|
||||
// See if this public address is also a local interface address
|
||||
if !registered_addresses.contains(&gsa.ip())
|
||||
@ -310,14 +321,15 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
for socket_address in socket_addresses {
|
||||
for socket_address in &socket_addresses {
|
||||
// Skip addresses we already did
|
||||
if registered_addresses.contains(&socket_address.ip_addr()) {
|
||||
continue;
|
||||
}
|
||||
// Build dial info request url
|
||||
let local_url = format!("ws://{}/{}", socket_address, path);
|
||||
let local_di = DialInfo::try_ws(socket_address, local_url).wrap_err("try_ws failed")?;
|
||||
let local_di =
|
||||
DialInfo::try_ws(*socket_address, local_url).wrap_err("try_ws failed")?;
|
||||
|
||||
if !detect_address_changes
|
||||
&& url.is_none()
|
||||
@ -333,11 +345,12 @@ impl Network {
|
||||
editor_local_network.register_dial_info(local_di, DialInfoClass::Direct)?;
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
if static_public {
|
||||
self.inner
|
||||
.lock()
|
||||
.static_public_dialinfo
|
||||
.insert(ProtocolType::WS);
|
||||
inner.static_public_dialinfo.insert(ProtocolType::WS);
|
||||
}
|
||||
for sa in socket_addresses {
|
||||
Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::WS));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -350,7 +363,7 @@ impl Network {
|
||||
) -> EyreResult<()> {
|
||||
log_net!("WSS: binding protocol handlers");
|
||||
|
||||
let (listen_address, url, detect_address_changes) = {
|
||||
let (listen_address, url, _detect_address_changes) = {
|
||||
let c = self.config.get();
|
||||
(
|
||||
c.network.protocol.wss.listen_address.clone(),
|
||||
@ -411,11 +424,8 @@ impl Network {
|
||||
let pdi = DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone())
|
||||
.wrap_err("try_wss failed")?;
|
||||
|
||||
if !detect_address_changes {
|
||||
editor_public_internet
|
||||
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
}
|
||||
editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
|
||||
// See if this public address is also a local interface address
|
||||
if !registered_addresses.contains(&gsa.ip())
|
||||
@ -430,11 +440,12 @@ impl Network {
|
||||
bail!("WSS URL must be specified due to TLS requirements");
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
if static_public {
|
||||
self.inner
|
||||
.lock()
|
||||
.static_public_dialinfo
|
||||
.insert(ProtocolType::WSS);
|
||||
inner.static_public_dialinfo.insert(ProtocolType::WSS);
|
||||
}
|
||||
for sa in socket_addresses {
|
||||
Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::WSS));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -485,22 +496,6 @@ impl Network {
|
||||
let mut static_public = false;
|
||||
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
|
||||
|
||||
for socket_address in socket_addresses {
|
||||
let di = DialInfo::tcp(socket_address);
|
||||
|
||||
// Register global dial info if no public address is specified
|
||||
if !detect_address_changes
|
||||
&& public_address.is_none()
|
||||
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di)
|
||||
{
|
||||
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
}
|
||||
// Register interface dial info
|
||||
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
|
||||
registered_addresses.insert(socket_address.ip_addr());
|
||||
}
|
||||
|
||||
// Add static public dialinfo if it's configured
|
||||
if let Some(public_address) = public_address.as_ref() {
|
||||
// Resolve statically configured public dialinfo
|
||||
@ -516,11 +511,8 @@ impl Network {
|
||||
}
|
||||
let pdi = DialInfo::tcp_from_socketaddr(pdi_addr);
|
||||
|
||||
if !detect_address_changes {
|
||||
editor_public_internet
|
||||
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
}
|
||||
editor_public_internet.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
|
||||
// See if this public address is also a local interface address
|
||||
if self.is_stable_interface_address(pdi_addr.ip()) {
|
||||
@ -529,11 +521,29 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
for socket_address in &socket_addresses {
|
||||
let di = DialInfo::tcp(*socket_address);
|
||||
|
||||
// Register global dial info if no public address is specified
|
||||
if !detect_address_changes
|
||||
&& public_address.is_none()
|
||||
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di)
|
||||
{
|
||||
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
|
||||
static_public = true;
|
||||
}
|
||||
// Register interface dial info
|
||||
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
|
||||
registered_addresses.insert(socket_address.ip_addr());
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
if static_public {
|
||||
self.inner
|
||||
.lock()
|
||||
.static_public_dialinfo
|
||||
.insert(ProtocolType::TCP);
|
||||
inner.static_public_dialinfo.insert(ProtocolType::TCP);
|
||||
}
|
||||
for sa in socket_addresses {
|
||||
Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::TCP));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -54,6 +54,21 @@ impl NetworkManager {
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore flows that do not start from our listening port (unbound connections etc),
|
||||
// because a router is going to map these differently
|
||||
let Some(pla) =
|
||||
net.get_preferred_local_address_by_key(flow.protocol_type(), flow.address_type())
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let Some(local) = flow.local() else {
|
||||
return;
|
||||
};
|
||||
if local.port() != pla.port() {
|
||||
log_network_result!(debug "ignoring public internet address report because local port did not match listener: {} != {}", local.port(), pla.port());
|
||||
return;
|
||||
}
|
||||
|
||||
// If we are a webapp we should skip this completely
|
||||
// because we will never get inbound dialinfo directly on our public ip address
|
||||
// If we have an invalid network class, this is not necessary yet
|
||||
|
Loading…
Reference in New Issue
Block a user