This commit is contained in:
Christien Rioux 2023-09-04 13:33:27 -04:00
parent 1b5934dad4
commit 80b2e7b9da
4 changed files with 223 additions and 308 deletions

View File

@ -929,14 +929,14 @@ impl Network {
routing_table routing_table
.edit_routing_domain(RoutingDomain::PublicInternet) .edit_routing_domain(RoutingDomain::PublicInternet)
.clear_dial_info_details() .clear_dial_info_details(None, None)
.set_network_class(None) .set_network_class(None)
.clear_relay_node() .clear_relay_node()
.commit(); .commit();
routing_table routing_table
.edit_routing_domain(RoutingDomain::LocalNetwork) .edit_routing_domain(RoutingDomain::LocalNetwork)
.clear_dial_info_details() .clear_dial_info_details(None, None)
.set_network_class(None) .set_network_class(None)
.clear_relay_node() .clear_relay_node()
.commit(); .commit();

View File

@ -1,95 +1,96 @@
/// Detect NetworkClass and DialInfo for the DialInfo for the PublicInternet RoutingDomain /// Detect NetworkClass and DialInfo for the DialInfo for the PublicInternet RoutingDomain
use super::*; use super::*;
use futures_util::stream::FuturesUnordered; use futures_util::stream::FuturesUnordered;
use futures_util::FutureExt;
use stop_token::future::FutureExt as StopTokenFutureExt; use stop_token::future::FutureExt as StopTokenFutureExt;
impl Network { impl Network {
#[instrument(level = "trace", skip(self, context), err)]
pub async fn update_protocol_dialinfo(
&self,
context: &DiscoveryContext,
protocol_type: ProtocolType,
address_type: AddressType,
) -> EyreResult<()> {
let mut retry_count = {
let c = self.config.get();
c.network.restricted_nat_retries
};
// Start doing protocol
context.protocol_begin(protocol_type, address_type);
// UPNP Automatic Mapping
///////////
let this = self.clone();
let do_mapped_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = Box::pin(async move {
// Attempt a port mapping via all available and enabled mechanisms
// Try this before the direct mapping in the event that we are restarting
// and may not have recorded a mapping created the last time
if let Some(external_mapped_dial_info) = this.try_port_mapping().await {
// Got a port mapping, let's use it
return Some(DetectedDialInfo {
dial_info: external_mapped_dial_info.clone(),
dial_info_class: DialInfoClass::Mapped,
network_class: NetworkClass::InboundCapable,
});
}
None
});
ord.push_back(do_mapped_fut);
// Loop for restricted NAT retries
loop {
log_net!(debug
"=== update_protocol_dialinfo {:?} {:?} tries_left={} ===",
address_type,
protocol_type,
retry_count
);
// Get our external address from some fast node, call it node 1
if !context.protocol_get_external_address_1().await {
// If we couldn't get an external address, then we should just try the whole network class detection again later
return Ok(());
}
// If our local interface list contains external_1 then there is no NAT in place
let res = {
let inner = context.inner.lock();
inner
.intf_addrs
.as_ref()
.unwrap()
.contains(inner.external_1_address.as_ref().unwrap())
};
if res {
// No NAT
context.protocol_process_no_nat().await?;
// No more retries
break;
}
// There is -some NAT-
if context.protocol_process_nat().await? {
// We either got dial info or a network class without one
break;
}
// If we tried everything, break anyway after N attempts
if retry_count == 0 {
break;
}
retry_count -= 1;
}
Ok(())
}
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub async fn update_with_discovery_context(&self, ctx: DiscoveryContext) -> EyreResult<()> { pub async fn update_with_detected_dial_info(&self, ddi: DetectedDialInfo) -> EyreResult<()> {
// let existing_network_class = self
.routing_table()
.get_network_class(RoutingDomain::PublicInternet)
.unwrap_or_default();
// get existing dial info into table by protocol/address type
let mut existing_dial_info = BTreeMap::<(ProtocolType, AddressType), DialInfoDetail>::new();
for did in self.routing_table().all_filtered_dial_info_details(
RoutingDomain::PublicInternet.into(),
&DialInfoFilter::all(),
) {
// Only need to keep one per pt/at pair, since they will all have the same dialinfoclass
existing_dial_info.insert(
(did.dial_info.protocol_type(), did.dial_info.address_type()),
did,
);
}
match ddi {
DetectedDialInfo::SymmetricNAT => {
// If we get any symmetric nat dialinfo, this whole network class is outbound only,
// and all dial info should be treated as invalid
if !matches!(existing_network_class, NetworkClass::OutboundOnly) {
let mut editor = self
.routing_table()
.edit_routing_domain(RoutingDomain::PublicInternet);
editor.clear_dial_info_details(None, None);
editor.set_network_class(Some(NetworkClass::OutboundOnly));
editor.commit();
}
}
DetectedDialInfo::Detected(did) => {
// We got a dial info, upgrade everything unless we are fixed to outbound only due to a symmetric nat
if !matches!(existing_network_class, NetworkClass::OutboundOnly) {
// Get existing dial info for protocol/address type combination
let pt = did.dial_info.protocol_type();
let at = did.dial_info.address_type();
// See what operations to perform with this dialinfo
let mut clear = false;
let mut add = false;
if let Some(edi) = existing_dial_info.get(&(pt, at)) {
if did.class < edi.class {
// Better dial info class was found, clear existing dialinfo for this pt/at pair
clear = true;
add = true;
} else if did.class == edi.class {
// Same dial info class, just add dial info
add = true;
}
// Otherwise, don't upgrade, don't add, this is worse than what we have already
} else {
// No existing dial info of this type accept it, no need to upgrade, but add it
add = true;
}
if clear || add {
let mut editor = self
.routing_table()
.edit_routing_domain(RoutingDomain::PublicInternet);
if clear {
editor.clear_dial_info_details(
Some(did.dial_info.address_type()),
Some(did.dial_info.protocol_type()),
);
}
if add {
if let Err(e) =
editor.register_dial_info(did.dial_info.clone(), did.class)
{
log_net!(debug "Failed to register detected dialinfo {:?}: {}", did, e);
}
}
editor.set_network_class(Some(NetworkClass::InboundCapable));
editor.commit();
}
}
}
}
Ok(())
} }
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
@ -99,14 +100,10 @@ impl Network {
_l: u64, _l: u64,
_t: u64, _t: u64,
) -> EyreResult<()> { ) -> EyreResult<()> {
let routing_table = self.routing_table();
// Figure out if we can optimize TCP/WS checking since they are often on the same port // Figure out if we can optimize TCP/WS checking since they are often on the same port
let (protocol_config, existing_network_class, tcp_same_port) = { let (protocol_config, tcp_same_port) = {
let inner = self.inner.lock(); let inner = self.inner.lock();
let protocol_config = inner.protocol_config; let protocol_config = inner.protocol_config;
let existing_network_class =
routing_table.get_network_class(RoutingDomain::PublicInternet);
let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP) let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP)
&& protocol_config.inbound.contains(ProtocolType::WS) && protocol_config.inbound.contains(ProtocolType::WS)
{ {
@ -114,187 +111,133 @@ impl Network {
} else { } else {
false false
}; };
(protocol_config, existing_network_class, tcp_same_port) (protocol_config, tcp_same_port)
}; };
// Save off existing public dial info for change detection later
let existing_public_dial_info: HashSet<DialInfoDetail> = self
.routing_table()
.all_filtered_dial_info_details(
RoutingDomain::PublicInternet.into(),
&DialInfoFilter::all(),
)
.into_iter()
.collect();
// Clear public dialinfo and network class in prep for discovery
let mut editor = self
.routing_table()
.edit_routing_domain(RoutingDomain::PublicInternet);
editor.clear_dial_info_details(None, None);
editor.set_network_class(None);
editor.clear_relay_node();
editor.commit();
log_net!(debug "PublicInternet network class cleared");
// Process all protocol and address combinations // Process all protocol and address combinations
let mut unord = FuturesUnordered::new(); let mut unord = FuturesUnordered::new();
// Do UDPv4+v6 at the same time as everything else // Do UDPv4+v6 at the same time as everything else
if protocol_config.inbound.contains(ProtocolType::UDP) { if protocol_config.inbound.contains(ProtocolType::UDP) {
// UDPv4 // UDPv4
if protocol_config.family_global.contains(AddressType::IPV4) { if protocol_config.family_global.contains(AddressType::IPV4) {
unord.push( let udpv4_context = DiscoveryContext::new(
async { self.routing_table(),
let udpv4_context = self.clone(),
DiscoveryContext::new(self.routing_table(), self.clone());
if let Err(e) = self
.update_protocol_dialinfo(
&udpv4_context,
ProtocolType::UDP, ProtocolType::UDP,
AddressType::IPV4, AddressType::IPV4,
)
.await
{
log_net!(debug "Failed UDPv4 dialinfo discovery: {}", e);
return None;
}
Some(udpv4_context)
}
.instrument(trace_span!("do_public_dial_info_check UDPv4"))
.boxed(),
); );
udpv4_context
.discover(&mut unord)
.instrument(trace_span!("udpv4_context.discover"))
.await;
} }
// UDPv6 // UDPv6
if protocol_config.family_global.contains(AddressType::IPV6) { if protocol_config.family_global.contains(AddressType::IPV6) {
unord.push( let udpv6_context = DiscoveryContext::new(
async { self.routing_table(),
let udpv6_context = self.clone(),
DiscoveryContext::new(self.routing_table(), self.clone());
if let Err(e) = self
.update_protocol_dialinfo(
&udpv6_context,
ProtocolType::UDP, ProtocolType::UDP,
AddressType::IPV6, AddressType::IPV6,
)
.await
{
log_net!(debug "Failed UDPv6 dialinfo discovery: {}", e);
return None;
}
Some(udpv6_context)
}
.instrument(trace_span!("do_public_dial_info_check UDPv6"))
.boxed(),
); );
udpv6_context
.discover(&mut unord)
.instrument(trace_span!("udpv6_context.discover"))
.await;
} }
} }
// Do TCPv4. Possibly do WSv4 if it is on a different port // Do TCPv4. Possibly do WSv4 if it is on a different port
if protocol_config.family_global.contains(AddressType::IPV4) { if protocol_config.family_global.contains(AddressType::IPV4) {
if protocol_config.inbound.contains(ProtocolType::TCP) { if protocol_config.inbound.contains(ProtocolType::TCP) {
unord.push( let tcpv4_context = DiscoveryContext::new(
async { self.routing_table(),
// TCPv4 self.clone(),
let tcpv4_context =
DiscoveryContext::new(self.routing_table(), self.clone());
if let Err(e) = self
.update_protocol_dialinfo(
&tcpv4_context,
ProtocolType::TCP, ProtocolType::TCP,
AddressType::IPV4, AddressType::IPV4,
)
.await
{
log_net!(debug "Failed TCPv4 dialinfo discovery: {}", e);
return None;
}
Some(tcpv4_context)
}
.instrument(trace_span!("do_public_dial_info_check TCPv4"))
.boxed(),
); );
tcpv4_context
.discover(&mut unord)
.instrument(trace_span!("tcpv4_context.discover"))
.await;
} }
if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port { if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port {
unord.push( let wsv4_context = DiscoveryContext::new(
async { self.routing_table(),
// WSv4 self.clone(),
let wsv4_context =
DiscoveryContext::new(self.routing_table(), self.clone());
if let Err(e) = self
.update_protocol_dialinfo(
&wsv4_context,
ProtocolType::WS, ProtocolType::WS,
AddressType::IPV4, AddressType::IPV4,
)
.await
{
log_net!(debug "Failed WSv4 dialinfo discovery: {}", e);
return None;
}
Some(wsv4_context)
}
.instrument(trace_span!("do_public_dial_info_check WSv4"))
.boxed(),
); );
wsv4_context
.discover(&mut unord)
.instrument(trace_span!("wsv4_context.discover"))
.await;
} }
} }
// Do TCPv6. Possibly do WSv6 if it is on a different port // Do TCPv6. Possibly do WSv6 if it is on a different port
if protocol_config.family_global.contains(AddressType::IPV6) { if protocol_config.family_global.contains(AddressType::IPV6) {
if protocol_config.inbound.contains(ProtocolType::TCP) { if protocol_config.inbound.contains(ProtocolType::TCP) {
unord.push( let tcpv6_context = DiscoveryContext::new(
async { self.routing_table(),
// TCPv6 self.clone(),
let tcpv6_context =
DiscoveryContext::new(self.routing_table(), self.clone());
if let Err(e) = self
.update_protocol_dialinfo(
&tcpv6_context,
ProtocolType::TCP, ProtocolType::TCP,
AddressType::IPV6, AddressType::IPV6,
)
.await
{
log_net!(debug "Failed TCPv6 dialinfo discovery: {}", e);
return None;
}
Some(tcpv6_context)
}
.instrument(trace_span!("do_public_dial_info_check TCPv6"))
.boxed(),
); );
tcpv6_context
.discover(&mut unord)
.instrument(trace_span!("tcpv6_context.discover"))
.await;
} }
// WSv6 // WSv6
if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port { if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port {
unord.push( let wsv6_context = DiscoveryContext::new(
async { self.routing_table(),
let wsv6_context = self.clone(),
DiscoveryContext::new(self.routing_table(), self.clone());
if let Err(e) = self
.update_protocol_dialinfo(
&wsv6_context,
ProtocolType::WS, ProtocolType::WS,
AddressType::IPV6, AddressType::IPV6,
)
.await
{
log_net!(debug "Failed WSv6 dialinfo discovery: {}", e);
return None;
}
Some(wsv6_context)
}
.instrument(trace_span!("do_public_dial_info_check WSv6"))
.boxed(),
); );
wsv6_context
.discover(&mut unord)
.instrument(trace_span!("wsv6_context.discover"))
.await;
} }
} }
// Wait for all discovery futures to complete and collect contexts // Wait for all discovery futures to complete and apply discoverycontexts
let mut contexts = Vec::<DiscoveryContext>::new();
let mut new_network_class = Option::<NetworkClass>::None;
loop { loop {
match unord.next().timeout_at(stop_token.clone()).await { match unord.next().timeout_at(stop_token.clone()).await {
Ok(Some(ctx)) => { Ok(Some(Some(ddi))) => {
if let Some(ctx) = ctx { // Found some new dial info for this protocol/address combination
if let Some(nc) = ctx.inner.lock().detected_network_class { self.update_with_detected_dial_info(ddi).await?
if let Some(last_nc) = new_network_class {
if nc < last_nc {
new_network_class = Some(nc);
}
} else {
new_network_class = Some(nc);
}
}
contexts.push(ctx);
} }
Ok(Some(None)) => {
// Found no new dial info for this protocol/address combination
} }
Ok(None) => { Ok(None) => {
// Normal completion // All done, normally
break; break;
} }
Err(_) => { Err(_) => {
@ -304,13 +247,9 @@ impl Network {
} }
} }
// If a network class could be determined // All done, see if things changed
// see about updating our public dial info let new_public_dial_info: HashSet<DialInfoDetail> = self
let mut changed = false; .routing_table()
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
if new_network_class.is_some() {
// Get existing public dial info
let existing_public_dial_info: HashSet<DialInfoDetail> = routing_table
.all_filtered_dial_info_details( .all_filtered_dial_info_details(
RoutingDomain::PublicInternet.into(), RoutingDomain::PublicInternet.into(),
&DialInfoFilter::all(), &DialInfoFilter::all(),
@ -318,71 +257,11 @@ impl Network {
.into_iter() .into_iter()
.collect(); .collect();
// Get new public dial info and ensure it is valid
let mut new_public_dial_info: HashSet<DialInfoDetail> = HashSet::new();
for ctx in contexts {
let inner = ctx.inner.lock();
if let Some(pdi) = &inner.detected_public_dial_info {
if routing_table
.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &pdi.dial_info)
{
new_public_dial_info.insert(DialInfoDetail {
class: pdi.class,
dial_info: pdi.dial_info.clone(),
});
}
// duplicate for same port
if tcp_same_port && pdi.dial_info.protocol_type() == ProtocolType::TCP {
let ws_dial_info =
ctx.make_dial_info(pdi.dial_info.socket_address(), ProtocolType::WS);
if routing_table
.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &ws_dial_info)
{
new_public_dial_info.insert(DialInfoDetail {
class: pdi.class,
dial_info: ws_dial_info,
});
}
}
}
}
// Is the public dial info different?
if existing_public_dial_info != new_public_dial_info {
// If so, clear existing public dial info and re-register the new public dial info
editor.clear_dial_info_details();
for did in new_public_dial_info {
if let Err(e) = editor.register_dial_info(did.dial_info, did.class) {
log_net!(error "Failed to register detected public dial info: {}", e);
}
}
changed = true;
}
// Is the network class different?
if existing_network_class != new_network_class {
editor.set_network_class(new_network_class);
changed = true;
log_net!(debug "PublicInternet network class changed to {:?}", new_network_class);
}
} else if existing_network_class.is_some() {
// Network class could not be determined
editor.clear_dial_info_details();
editor.set_network_class(None);
editor.clear_relay_node();
changed = true;
log_net!(debug "PublicInternet network class cleared");
}
// Punish nodes that told us our public address had changed when it didn't // Punish nodes that told us our public address had changed when it didn't
if !changed { if new_public_dial_info == existing_public_dial_info {
if let Some(punish) = self.inner.lock().public_dial_info_check_punishment.take() { if let Some(punish) = self.inner.lock().public_dial_info_check_punishment.take() {
punish(); punish();
} }
} else {
// Commit updates
editor.commit();
} }
Ok(()) Ok(())
@ -398,7 +277,11 @@ impl Network {
let out = self.do_public_dial_info_check(stop_token, l, t).await; let out = self.do_public_dial_info_check(stop_token, l, t).await;
// Done with public dial info check // Done with public dial info check
self.inner.lock().needs_public_dial_info_check = false; {
let mut inner = self.inner.lock();
inner.needs_public_dial_info_check = false;
inner.public_dial_info_check_punishment = None;
}
out out
} }

View File

@ -1,7 +1,10 @@
use super::*; use super::*;
enum RoutingDomainChange { enum RoutingDomainChange {
ClearDialInfoDetails, ClearDialInfoDetails {
address_type: Option<AddressType>,
protocol_type: Option<ProtocolType>,
},
ClearRelayNode, ClearRelayNode,
SetRelayNode { SetRelayNode {
relay_node: NodeRef, relay_node: NodeRef,
@ -39,8 +42,16 @@ impl RoutingDomainEditor {
} }
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
pub fn clear_dial_info_details(&mut self) -> &mut Self { pub fn clear_dial_info_details(
self.changes.push(RoutingDomainChange::ClearDialInfoDetails); &mut self,
address_type: Option<AddressType>,
protocol_type: Option<ProtocolType>,
) -> &mut Self {
self.changes
.push(RoutingDomainChange::ClearDialInfoDetails {
address_type,
protocol_type,
});
self self
} }
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
@ -125,9 +136,17 @@ impl RoutingDomainEditor {
inner.with_routing_domain_mut(self.routing_domain, |detail| { inner.with_routing_domain_mut(self.routing_domain, |detail| {
for change in self.changes.drain(..) { for change in self.changes.drain(..) {
match change { match change {
RoutingDomainChange::ClearDialInfoDetails => { RoutingDomainChange::ClearDialInfoDetails {
debug!("[{:?}] cleared dial info details", self.routing_domain); address_type,
detail.common_mut().clear_dial_info_details(); protocol_type,
} => {
debug!(
"[{:?}] cleared dial info details: at={:?} pt={:?}",
self.routing_domain, address_type, protocol_type
);
detail
.common_mut()
.clear_dial_info_details(address_type, protocol_type);
changed = true; changed = true;
} }
RoutingDomainChange::ClearRelayNode => { RoutingDomainChange::ClearRelayNode => {

View File

@ -103,8 +103,21 @@ impl RoutingDomainDetailCommon {
pub fn dial_info_details(&self) -> &Vec<DialInfoDetail> { pub fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
&self.dial_info_details &self.dial_info_details
} }
pub(super) fn clear_dial_info_details(&mut self) { pub(super) fn clear_dial_info_details(&mut self, address_type: Option<AddressType>, protocol_type: Option<ProtocolType>) {
self.dial_info_details.clear(); self.dial_info_details.retain_mut(|e| {
let mut remove = true;
if let Some(pt) = protocol_type {
if pt != e.dial_info.protocol_type() {
remove = false;
}
}
if let Some(at) = address_type {
if at != e.dial_info.address_type() {
remove = false;
}
}
!remove
});
self.clear_cache(); self.clear_cache();
} }
pub(super) fn add_dial_info_detail(&mut self, did: DialInfoDetail) { pub(super) fn add_dial_info_detail(&mut self, did: DialInfoDetail) {