discovery work

This commit is contained in:
John Smith 2022-08-06 20:55:24 -04:00
parent 28b3b841a3
commit 4aa9f6d2b9
4 changed files with 117 additions and 21 deletions

View File

@ -1710,6 +1710,8 @@ impl NetworkManager {
if needs_public_address_detection { if needs_public_address_detection {
// Reset the address check cache now so we can start detecting fresh // Reset the address check cache now so we can start detecting fresh
info!("Public address has changed, detecting public dial info");
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.public_address_check_cache.clear(); inner.public_address_check_cache.clear();

View File

@ -653,12 +653,13 @@ impl Network {
self.free_bound_first_ports(); self.free_bound_first_ports();
// If we have static public dialinfo, upgrade our network class // If we have static public dialinfo, upgrade our network class
{ // xxx: force public address detection
let mut inner = self.inner.lock(); // {
if !inner.static_public_dialinfo.is_empty() { // let mut inner = self.inner.lock();
inner.network_class = Some(NetworkClass::InboundCapable); // if !inner.static_public_dialinfo.is_empty() {
} // inner.network_class = Some(NetworkClass::InboundCapable);
} // }
// }
info!("network started"); info!("network started");
self.inner.lock().network_started = true; self.inner.lock().network_started = true;

View File

@ -1,4 +1,5 @@
use super::*; use super::*;
//use futures_util::stream::FuturesOrdered;
use futures_util::stream::FuturesUnordered; use futures_util::stream::FuturesUnordered;
use futures_util::FutureExt; use futures_util::FutureExt;
use stop_token::future::FutureExt as StopTokenFutureExt; use stop_token::future::FutureExt as StopTokenFutureExt;
@ -367,6 +368,7 @@ impl DiscoveryContext {
{ {
None => { None => {
// If we can't get an external address, allow retry // If we can't get an external address, allow retry
log_net!(debug "failed to discover external address 2 for {:?}:{:?}, skipping node {:?}", protocol_type, address_type, node_1.node_id());
return Ok(false); return Ok(false);
} }
Some(v) => v, Some(v) => v,
@ -425,6 +427,21 @@ impl Network {
c.network.restricted_nat_retries c.network.restricted_nat_retries
}; };
// See if we already have a public dialinfo of this protocol/address type
let routing_table = self.routing_table();
let dif = DialInfoFilter::global()
.with_protocol_type(protocol_type)
.with_address_type(AddressType::IPV4);
let dids =
routing_table.all_filtered_dial_info_details(Some(RoutingDomain::PublicInternet), &dif);
if !dids.is_empty() {
log_net!(debug
"Skipping detection for public dialinfo for {:?}:IPV4",
protocol_type
);
return Ok(());
}
// Start doing ipv4 protocol // Start doing ipv4 protocol
context.protocol_begin(protocol_type, AddressType::IPV4); context.protocol_begin(protocol_type, AddressType::IPV4);
@ -482,6 +499,20 @@ impl Network {
context: &DiscoveryContext, context: &DiscoveryContext,
protocol_type: ProtocolType, protocol_type: ProtocolType,
) -> EyreResult<()> { ) -> EyreResult<()> {
// See if we already have a public dialinfo of this protocol/address type
let routing_table = self.routing_table();
let dif = DialInfoFilter::global()
.with_protocol_type(protocol_type)
.with_address_type(AddressType::IPV6);
let dids =
routing_table.all_filtered_dial_info_details(Some(RoutingDomain::PublicInternet), &dif);
if !dids.is_empty() {
log_net!(debug
"Skipping detection for public dialinfo for {:?}:IPV6",
protocol_type
);
return Ok(());
}
// Start doing ipv6 protocol // Start doing ipv6 protocol
context.protocol_begin(protocol_type, AddressType::IPV6); context.protocol_begin(protocol_type, AddressType::IPV6);
@ -538,13 +569,14 @@ impl Network {
false false
}; };
let mut unord = FuturesUnordered::new(); let mut futures = FuturesUnordered::new();
//let mut futures = FuturesOrdered::new();
// Do UDPv4+v6 at the same time as everything else // Do UDPv4+v6 at the same time as everything else
if protocol_config.inbound.contains(ProtocolType::UDP) { if protocol_config.inbound.contains(ProtocolType::UDP) {
// UDPv4 // UDPv4
if protocol_config.family_global.contains(AddressType::IPV4) { if protocol_config.family_global.contains(AddressType::IPV4) {
unord.push( futures.push(
async { async {
let udpv4_context = let udpv4_context =
DiscoveryContext::new(self.routing_table(), self.clone()); DiscoveryContext::new(self.routing_table(), self.clone());
@ -563,7 +595,7 @@ impl Network {
// UDPv6 // UDPv6
if protocol_config.family_global.contains(AddressType::IPV6) { if protocol_config.family_global.contains(AddressType::IPV6) {
unord.push( futures.push(
async { async {
let udpv6_context = let udpv6_context =
DiscoveryContext::new(self.routing_table(), self.clone()); DiscoveryContext::new(self.routing_table(), self.clone());
@ -584,7 +616,7 @@ impl Network {
// Do TCPv4 + WSv4 in series because they may use the same connection 5-tuple // Do TCPv4 + WSv4 in series because they may use the same connection 5-tuple
if protocol_config.family_global.contains(AddressType::IPV4) { if protocol_config.family_global.contains(AddressType::IPV4) {
if protocol_config.inbound.contains(ProtocolType::TCP) { if protocol_config.inbound.contains(ProtocolType::TCP) {
unord.push( futures.push(
async { async {
// TCPv4 // TCPv4
let tcpv4_context = let tcpv4_context =
@ -603,7 +635,7 @@ impl Network {
} }
if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port { if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port {
unord.push( futures.push(
async { async {
// WSv4 // WSv4
let wsv4_context = let wsv4_context =
@ -625,7 +657,7 @@ impl Network {
// Do TCPv6 + WSv6 in series because they may use the same connection 5-tuple // Do TCPv6 + WSv6 in series because they may use the same connection 5-tuple
if protocol_config.family_global.contains(AddressType::IPV6) { if protocol_config.family_global.contains(AddressType::IPV6) {
if protocol_config.inbound.contains(ProtocolType::TCP) { if protocol_config.inbound.contains(ProtocolType::TCP) {
unord.push( futures.push(
async { async {
// TCPv6 // TCPv6
let tcpv6_context = let tcpv6_context =
@ -645,7 +677,7 @@ impl Network {
// WSv6 // WSv6
if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port { if protocol_config.inbound.contains(ProtocolType::WS) && !tcp_same_port {
unord.push( futures.push(
async { async {
let wsv6_context = let wsv6_context =
DiscoveryContext::new(self.routing_table(), self.clone()); DiscoveryContext::new(self.routing_table(), self.clone());
@ -667,7 +699,7 @@ impl Network {
let mut contexts = Vec::<DiscoveryContext>::new(); let mut contexts = Vec::<DiscoveryContext>::new();
let mut network_class = Option::<NetworkClass>::None; let mut network_class = Option::<NetworkClass>::None;
loop { loop {
match unord.next().timeout_at(stop_token.clone()).await { match futures.next().timeout_at(stop_token.clone()).await {
Ok(Some(ctxvec)) => { Ok(Some(ctxvec)) => {
if let Some(ctxvec) = ctxvec { if let Some(ctxvec) = ctxvec {
for ctx in ctxvec { for ctx in ctxvec {

View File

@ -21,6 +21,31 @@ fn get_number(text: &str) -> Option<usize> {
fn get_dht_key(text: &str) -> Option<DHTKey> { fn get_dht_key(text: &str) -> Option<DHTKey> {
DHTKey::try_decode(text).ok() DHTKey::try_decode(text).ok()
} }
fn get_protocol_type(text: &str) -> Option<ProtocolType> {
let lctext = text.to_ascii_lowercase();
if lctext == "udp" {
Some(ProtocolType::UDP)
} else if lctext == "tcp" {
Some(ProtocolType::TCP)
} else if lctext == "ws" {
Some(ProtocolType::WS)
} else if lctext == "wss" {
Some(ProtocolType::WSS)
} else {
None
}
}
fn get_address_type(text: &str) -> Option<AddressType> {
let lctext = text.to_ascii_lowercase();
if lctext == "ipv4" {
Some(AddressType::IPV4)
} else if lctext == "ipv6" {
Some(AddressType::IPV6)
} else {
None
}
}
fn get_debug_argument<T, G: FnOnce(&str) -> Option<T>>( fn get_debug_argument<T, G: FnOnce(&str) -> Option<T>>(
value: &str, value: &str,
context: &str, context: &str,
@ -264,11 +289,32 @@ impl VeilidAPI {
let network_manager = self.network_manager()?; let network_manager = self.network_manager()?;
let routing_table = network_manager.routing_table(); let routing_table = network_manager.routing_table();
let nr = match routing_table.lookup_node_ref(node_id) { let mut nr = match routing_table.lookup_node_ref(node_id) {
Some(nr) => nr, Some(nr) => nr,
None => return Ok("Node id not found in routing table".to_owned()), None => return Ok("Node id not found in routing table".to_owned()),
}; };
if args.len() >= 2 {
let pt = get_debug_argument_at(
&args,
1,
"debug_contact",
"protocol_type",
get_protocol_type,
)?;
nr.merge_filter(DialInfoFilter::all().with_protocol_type(pt));
if args.len() >= 3 {
let at = get_debug_argument_at(
&args,
2,
"debug_contact",
"address_type",
get_address_type,
)?;
nr.merge_filter(DialInfoFilter::all().with_address_type(at));
}
}
let cm = network_manager.get_contact_method(nr); let cm = network_manager.get_contact_method(nr);
Ok(format!("{:#?}", cm)) Ok(format!("{:#?}", cm))
@ -280,12 +326,27 @@ impl VeilidAPI {
let node_id = get_debug_argument_at(&args, 0, "debug_ping", "node_id", get_dht_key)?; let node_id = get_debug_argument_at(&args, 0, "debug_ping", "node_id", get_dht_key)?;
let routing_table = self.network_manager()?.routing_table(); let routing_table = self.network_manager()?.routing_table();
let mut nr = match routing_table.lookup_node_ref(node_id) {
let nr = match routing_table.lookup_node_ref(node_id) {
Some(nr) => nr, Some(nr) => nr,
None => return Ok("Node id not found in routing table".to_owned()), None => return Ok("Node id not found in routing table".to_owned()),
}; };
if args.len() >= 2 {
let pt =
get_debug_argument_at(&args, 1, "debug_ping", "protocol_type", get_protocol_type)?;
nr.merge_filter(DialInfoFilter::all().with_protocol_type(pt));
if args.len() >= 3 {
let at = get_debug_argument_at(
&args,
2,
"debug_ping",
"address_type",
get_address_type,
)?;
nr.merge_filter(DialInfoFilter::all().with_address_type(at));
}
}
let rpc = self.network_manager()?.rpc_processor(); let rpc = self.network_manager()?.rpc_processor();
// Dump routing table entry // Dump routing table entry
@ -315,15 +376,15 @@ impl VeilidAPI {
buckets [dead|reliable] buckets [dead|reliable]
dialinfo dialinfo
entries [dead|reliable] [limit] entries [dead|reliable] [limit]
entry [node_id] entry <node_id>
nodeinfo nodeinfo
config [key [new value]] config [key [new value]]
purge [buckets|connections] purge <buckets|connections>
attach attach
detach detach
restart network restart network
ping [node_id] ping <node_id> [protocol_type [address_type]]
contact [node_id] contact <node_id> [protocol_type [address_type]]
"# "#
.to_owned()) .to_owned())
} }