From b325c82b9a2a2b4fb495362ac4055d0173e96feb Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 5 Sep 2023 19:27:16 -0400 Subject: [PATCH] more parallel --- .../native/discovery_context.rs | 230 ++++++++++-------- 1 file changed, 131 insertions(+), 99 deletions(-) diff --git a/veilid-core/src/network_manager/native/discovery_context.rs b/veilid-core/src/network_manager/native/discovery_context.rs index 7a4b1a01..b4175e28 100644 --- a/veilid-core/src/network_manager/native/discovery_context.rs +++ b/veilid-core/src/network_manager/native/discovery_context.rs @@ -378,26 +378,30 @@ impl DiscoveryContext { async fn protocol_process_no_nat( &self, unord: &mut FuturesUnordered>>, - ) -> DetectedDialInfo { + ) { let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone(); - // Do a validate_dial_info on the external address from a redirected node - if self - .validate_dial_info(external_1.node.clone(), external_1.dial_info.clone(), true) - .await - { - // Add public dial info with Direct dialinfo class - DetectedDialInfo::Detected(DialInfoDetail { - dial_info: external_1.dial_info.clone(), - class: DialInfoClass::Direct, - }) - } else { - // Add public dial info with Blocked dialinfo class - DetectedDialInfo::Detected(DialInfoDetail { - dial_info: external_1.dial_info.clone(), - class: DialInfoClass::Blocked, - }) - } + let this = self.clone(); + let do_no_nat_fut: SendPinBoxFuture> = Box::pin(async move { + // Do a validate_dial_info on the external address from a redirected node + if this + .validate_dial_info(external_1.node.clone(), external_1.dial_info.clone(), true) + .await + { + // Add public dial info with Direct dialinfo class + Some(DetectedDialInfo::Detected(DialInfoDetail { + dial_info: external_1.dial_info.clone(), + class: DialInfoClass::Direct, + })) + } else { + // Add public dial info with Blocked dialinfo class + Some(DetectedDialInfo::Detected(DialInfoDetail { + dial_info: external_1.dial_info.clone(), + class: DialInfoClass::Blocked, + })) + } + }); + unord.push(do_no_nat_fut); } // If we know we are behind NAT check what kind @@ -405,20 +409,24 @@ impl DiscoveryContext { async fn protocol_process_nat( &self, unord: &mut FuturesUnordered>>, - ) -> Option { + ) { // Get the external dial info for our use here - let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone(); - let external_2 = self.inner.lock().external_2.as_ref().unwrap().clone(); + let (external_1, external_2) = { + let inner = self.inner.lock(); + ( + inner.external_1.as_ref().unwrap().clone(), + inner.external_2.as_ref().unwrap().clone(), + ) + }; // If we have two different external addresses, then this is a symmetric NAT if external_2.address != external_1.address { - // No more retries - return Some(DetectedDialInfo::SymmetricNAT); + let do_symmetric_nat_fut: SendPinBoxFuture> = + Box::pin(async move { Some(DetectedDialInfo::SymmetricNAT) }); + unord.push(do_symmetric_nat_fut); + return; } - // Do these detections in parallel, but with ordering preference - let mut ord = FuturesOrdered::new(); - // Manual Mapping Detection /////////// let this = self.clone(); @@ -454,65 +462,111 @@ impl DiscoveryContext { None }); - ord.push_back(do_manual_map_fut); + unord.push(do_manual_map_fut); } } + // NAT Detection + /////////// + // Full Cone NAT Detection /////////// let this = self.clone(); - let c_external_1 = external_1.clone(); - let do_full_cone_fut: SendPinBoxFuture> = Box::pin(async move { - // Let's see what kind of NAT we have - // Does a redirected dial info validation from a different address and a random port find us? - if this - .validate_dial_info( - c_external_1.node.clone(), - c_external_1.dial_info.clone(), - true, - ) - .await - { - // Yes, another machine can use the dial info directly, so Full Cone - // Add public dial info with full cone NAT network class + let do_nat_detect_fut: SendPinBoxFuture> = Box::pin(async move { + let mut retry_count = { + let c = this.unlocked_inner.net.config.get(); + c.network.restricted_nat_retries + }; - return Some(DetectedDialInfo::Detected(DialInfoDetail { - dial_info: c_external_1.dial_info, - class: DialInfoClass::FullConeNAT, - })); + // Loop for restricted NAT retries + loop { + let mut ord = FuturesOrdered::new(); + + let c_this = this.clone(); + let c_external_1 = external_1.clone(); + let do_full_cone_fut: SendPinBoxFuture> = + Box::pin(async move { + // Let's see what kind of NAT we have + // Does a redirected dial info validation from a different address and a random port find us? + if c_this + .validate_dial_info( + c_external_1.node.clone(), + c_external_1.dial_info.clone(), + true, + ) + .await + { + // Yes, another machine can use the dial info directly, so Full Cone + // Add public dial info with full cone NAT network class + + return Some(DetectedDialInfo::Detected(DialInfoDetail { + dial_info: c_external_1.dial_info, + class: DialInfoClass::FullConeNAT, + })); + } + None + }); + ord.push_back(do_full_cone_fut); + + let c_this = this.clone(); + let c_external_1 = external_1.clone(); + let c_external_2 = external_2.clone(); + let do_restricted_cone_fut: SendPinBoxFuture> = + Box::pin(async move { + // We are restricted, determine what kind of restriction + + // If we're going to end up as a restricted NAT of some sort + // Address is the same, so it's address or port restricted + + // Do a validate_dial_info on the external address from a random port + if c_this + .validate_dial_info( + c_external_2.node.clone(), + c_external_1.dial_info.clone(), + false, + ) + .await + { + // Got a reply from a non-default port, which means we're only address restricted + return Some(DetectedDialInfo::Detected(DialInfoDetail { + dial_info: c_external_1.dial_info.clone(), + class: DialInfoClass::AddressRestrictedNAT, + })); + } + // Didn't get a reply from a non-default port, which means we are also port restricted + Some(DetectedDialInfo::Detected(DialInfoDetail { + dial_info: c_external_1.dial_info.clone(), + class: DialInfoClass::PortRestrictedNAT, + })) + }); + ord.push_back(do_restricted_cone_fut); + + // Return the first result we get + let mut some_ddi = None; + while let Some(res) = ord.next().await { + if let Some(ddi) = res { + some_ddi = Some(ddi); + break; + } + } + + if let Some(ddi) = some_ddi { + if let DetectedDialInfo::Detected(did) = &ddi { + // If we got something better than restricted NAT or we're done retrying + if did.class < DialInfoClass::AddressRestrictedNAT || retry_count == 0 { + return Some(ddi); + } + } + } + if retry_count == 0 { + break; + } + retry_count -= 1; } + None }); - ord.push_back(do_full_cone_fut); - - // Run detections in parallel and take the first one, ordered by preference, that returns a result - while let Some(res) = ord.next().await { - if let Some(ddi) = res { - return Some(ddi); - } - } - - // We are restricted, determine what kind of restriction - - // If we're going to end up as a restricted NAT of some sort - // Address is the same, so it's address or port restricted - - // Do a validate_dial_info on the external address from a random port - if self - .validate_dial_info(external_2.node.clone(), external_1.dial_info.clone(), false) - .await - { - // Got a reply from a non-default port, which means we're only address restricted - return Some(DetectedDialInfo::Detected(DialInfoDetail { - dial_info: external_1.dial_info.clone(), - class: DialInfoClass::AddressRestrictedNAT, - })); - } - // Didn't get a reply from a non-default port, which means we are also port restricted - Some(DetectedDialInfo::Detected(DialInfoDetail { - dial_info: external_1.dial_info.clone(), - class: DialInfoClass::PortRestrictedNAT, - })) + unord.push(do_nat_detect_fut); } /// Add discovery futures to an unordered set that may detect dialinfo when they complete @@ -520,9 +574,9 @@ impl DiscoveryContext { &self, unord: &mut FuturesUnordered>>, ) { - let (mut retry_count, enable_upnp) = { + let enable_upnp = { let c = self.unlocked_inner.net.config.get(); - (c.network.restricted_nat_retries, c.network.upnp) + c.network.upnp }; // Do this right away because it's fast and every detection is going to need it @@ -562,28 +616,6 @@ impl DiscoveryContext { self.protocol_process_no_nat(unord).await; } else { self.protocol_process_nat(unord).await; - - // // Loop for restricted NAT retries - // let this = self.clone(); - // let do_nat_fut: SendPinBoxFuture> = Box::pin(async move { - // loop { - // // There is -some NAT- - // if let Some(ddi) = this.protocol_process_nat().await { - // if let DetectedDialInfo::Detected(did) = &ddi { - // // If we got something better than restricted NAT or we're done retrying - // if did.class < DialInfoClass::AddressRestrictedNAT || retry_count == 0 { - // return Some(ddi); - // } - // } - // } - // if retry_count == 0 { - // break; - // } - // retry_count -= 1; - // } - // None - // }); - // unord.push(do_nat_fut); } } }