fix bugs and lints

This commit is contained in:
John Smith 2022-04-25 11:29:02 -04:00
parent 2d7cffee3d
commit 911d0c563f
9 changed files with 307 additions and 279 deletions

View File

@ -487,11 +487,11 @@ impl Network {
////////////////////////////////////////// //////////////////////////////////////////
pub fn get_network_class(&self) -> Option<NetworkClass> { pub fn get_network_class(&self) -> Option<NetworkClass> {
let inner = self.inner.lock(); let inner = self.inner.lock();
return inner.network_class; inner.network_class
} }
pub fn reset_network_class(&self) { pub fn reset_network_class(&self) {
let inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.network_class = None; inner.network_class = None;
} }

View File

@ -16,7 +16,7 @@ struct DiscoveryContextInner {
node_b: Option<NodeRef>, node_b: Option<NodeRef>,
} }
struct DiscoveryContext { pub struct DiscoveryContext {
routing_table: RoutingTable, routing_table: RoutingTable,
net: Network, net: Network,
inner: Arc<Mutex<DiscoveryContextInner>>, inner: Arc<Mutex<DiscoveryContextInner>>,
@ -43,10 +43,10 @@ impl DiscoveryContext {
/////// ///////
// Utilities // Utilities
xxxx continue converting to async safe inner
// Pick the best network class we have seen so far // Pick the best network class we have seen so far
pub fn upgrade_network_class(&self, network_class: NetworkClass) { pub fn upgrade_network_class(&self, network_class: NetworkClass) {
let inner = self.inner.lock(); let mut inner = self.inner.lock();
if let Some(old_nc) = inner.network_class { if let Some(old_nc) = inner.network_class {
if network_class < old_nc { if network_class < old_nc {
@ -137,7 +137,7 @@ xxxx continue converting to async safe inner
.unwrap_or(false) .unwrap_or(false)
} }
async fn try_port_mapping(&mut self) -> Option<DialInfo> { async fn try_port_mapping(&self) -> Option<DialInfo> {
//xxx //xxx
None None
} }
@ -161,25 +161,30 @@ xxxx continue converting to async safe inner
/////// ///////
// Per-protocol discovery routines // Per-protocol discovery routines
pub fn protocol_begin(&mut self, protocol_type: ProtocolType, address_type: AddressType) { pub fn protocol_begin(&self, protocol_type: ProtocolType, address_type: AddressType) {
// Get our interface addresses // Get our interface addresses
self.intf_addrs = Some(self.get_local_addresses(protocol_type, address_type)); let intf_addrs = self.get_local_addresses(protocol_type, address_type);
self.protocol_type = Some(protocol_type);
self.address_type = Some(address_type); let mut inner = self.inner.lock();
self.low_level_protocol_type = Some(match protocol_type { inner.intf_addrs = Some(intf_addrs);
inner.protocol_type = Some(protocol_type);
inner.address_type = Some(address_type);
inner.low_level_protocol_type = Some(match protocol_type {
ProtocolType::UDP => ProtocolType::UDP, ProtocolType::UDP => ProtocolType::UDP,
ProtocolType::TCP => ProtocolType::TCP, ProtocolType::TCP => ProtocolType::TCP,
ProtocolType::WS => ProtocolType::TCP, ProtocolType::WS => ProtocolType::TCP,
ProtocolType::WSS => ProtocolType::TCP, ProtocolType::WSS => ProtocolType::TCP,
}); });
self.external1_dial_info = None; inner.external1_dial_info = None;
self.external1 = None; inner.external1 = None;
self.node_b = None; inner.node_b = None;
} }
pub async fn protocol_get_external_address_1(&mut self) -> bool { pub async fn protocol_get_external_address_1(&self) -> bool {
let protocol_type = self.protocol_type.unwrap(); let (protocol_type, address_type) = {
let address_type = self.address_type.unwrap(); let inner = self.inner.lock();
(inner.protocol_type.unwrap(), inner.address_type.unwrap())
};
// Get our external address from some fast node, call it node B // Get our external address from some fast node, call it node B
let (external1, node_b) = match self let (external1, node_b) = match self
@ -194,20 +199,22 @@ xxxx continue converting to async safe inner
}; };
let external1_dial_info = self.make_dial_info(external1, protocol_type); let external1_dial_info = self.make_dial_info(external1, protocol_type);
self.external1_dial_info = Some(external1_dial_info); let mut inner = self.inner.lock();
self.external1 = Some(external1); inner.external1_dial_info = Some(external1_dial_info);
self.node_b = Some(node_b); inner.external1 = Some(external1);
inner.node_b = Some(node_b);
true true
} }
pub async fn protocol_process_no_nat(&mut self) { pub async fn protocol_process_no_nat(&self) {
let node_b = self.node_b.as_ref().unwrap().clone(); let (node_b, external1_dial_info) = {
let external1_dial_info = self.external1_dial_info.as_ref().unwrap().clone(); let inner = self.inner.lock();
let external1 = self.external1.unwrap(); (
let protocol_type = self.protocol_type.unwrap(); inner.node_b.as_ref().unwrap().clone(),
let address_type = self.address_type.unwrap(); inner.external1_dial_info.as_ref().unwrap().clone(),
let intf_addrs = self.intf_addrs.as_ref().unwrap(); )
};
// Do a validate_dial_info on the external address from a redirected node // Do a validate_dial_info on the external address from a redirected node
if self if self
@ -240,13 +247,17 @@ xxxx continue converting to async safe inner
self.upgrade_network_class(NetworkClass::InboundCapable); self.upgrade_network_class(NetworkClass::InboundCapable);
} }
pub async fn protocol_process_nat(&mut self) -> bool { pub async fn protocol_process_nat(&self) -> bool {
let node_b = self.node_b.as_ref().unwrap().clone(); let (node_b, external1_dial_info, external1, protocol_type, address_type) = {
let external1_dial_info = self.external1_dial_info.as_ref().unwrap().clone(); let inner = self.inner.lock();
let external1 = self.external1.unwrap(); (
let protocol_type = self.protocol_type.unwrap(); inner.node_b.as_ref().unwrap().clone(),
let address_type = self.address_type.unwrap(); inner.external1_dial_info.as_ref().unwrap().clone(),
let intf_addrs = self.intf_addrs.as_ref().unwrap(); inner.external1.unwrap(),
inner.protocol_type.unwrap(),
inner.address_type.unwrap(),
)
};
// Attempt a UDP port mapping via all available and enabled mechanisms // Attempt a UDP port mapping via all available and enabled mechanisms
if let Some(external_mapped_dial_info) = self.try_port_mapping().await { if let Some(external_mapped_dial_info) = self.try_port_mapping().await {
@ -337,7 +348,7 @@ xxxx continue converting to async safe inner
impl Network { impl Network {
pub async fn update_ipv4_protocol_dialinfo( pub async fn update_ipv4_protocol_dialinfo(
&self, &self,
context: &mut DiscoveryContext, context: &DiscoveryContext,
protocol_type: ProtocolType, protocol_type: ProtocolType,
) -> Result<(), String> { ) -> Result<(), String> {
let mut retry_count = { let mut retry_count = {
@ -357,17 +368,22 @@ impl Network {
} }
// If our local interface list contains external1 then there is no NAT in place // If our local interface list contains external1 then there is no NAT in place
if context
.intf_addrs
.as_ref()
.unwrap()
.contains(&context.external1.as_ref().unwrap())
{ {
// No NAT let res = {
context.protocol_process_no_nat().await; let inner = context.inner.lock();
inner
.intf_addrs
.as_ref()
.unwrap()
.contains(inner.external1.as_ref().unwrap())
};
if res {
// No NAT
context.protocol_process_no_nat().await;
// No more retries // No more retries
break; break;
}
} }
// There is -some NAT- // There is -some NAT-
@ -388,7 +404,7 @@ impl Network {
pub async fn update_ipv6_protocol_dialinfo( pub async fn update_ipv6_protocol_dialinfo(
&self, &self,
context: &mut DiscoveryContext, context: &DiscoveryContext,
protocol_type: ProtocolType, protocol_type: ProtocolType,
) -> Result<(), String> { ) -> Result<(), String> {
// Start doing ipv6 protocol // Start doing ipv6 protocol
@ -401,18 +417,21 @@ impl Network {
} }
// If our local interface list doesn't contain external1 then there is an Ipv6 NAT in place // If our local interface list doesn't contain external1 then there is an Ipv6 NAT in place
if !context
.intf_addrs
.as_ref()
.unwrap()
.contains(&context.external1.as_ref().unwrap())
{ {
// IPv6 NAT is not supported today let inner = context.inner.lock();
log_net!(warn if !inner
"IPv6 NAT is not supported for external address: {}", .intf_addrs
context.external1.unwrap() .as_ref()
); .unwrap()
return Ok(()); .contains(inner.external1.as_ref().unwrap())
{
// IPv6 NAT is not supported today
log_net!(warn
"IPv6 NAT is not supported for external address: {}",
inner.external1.unwrap()
);
return Ok(());
}
} }
// No NAT // No NAT
@ -424,37 +443,32 @@ impl Network {
pub async fn update_network_class_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { pub async fn update_network_class_task_routine(self, _l: u64, _t: u64) -> Result<(), String> {
log_net!("updating network class"); log_net!("updating network class");
let protocol_config = self let protocol_config = self.inner.lock().protocol_config.unwrap_or_default();
.inner
.lock()
.protocol_config
.clone()
.unwrap_or_default();
let context = DiscoveryContext::new(self.routing_table(), self.clone()); let context = DiscoveryContext::new(self.routing_table(), self.clone());
if protocol_config.inbound.contains(ProtocolType::UDP) { if protocol_config.inbound.contains(ProtocolType::UDP) {
self.update_ipv4_protocol_dialinfo(&mut context, ProtocolType::UDP) self.update_ipv4_protocol_dialinfo(&context, ProtocolType::UDP)
.await?; .await?;
self.update_ipv6_protocol_dialinfo(&mut context, ProtocolType::UDP) self.update_ipv6_protocol_dialinfo(&context, ProtocolType::UDP)
.await?; .await?;
} }
if protocol_config.inbound.contains(ProtocolType::TCP) { if protocol_config.inbound.contains(ProtocolType::TCP) {
self.update_ipv4_protocol_dialinfo(&mut context, ProtocolType::TCP) self.update_ipv4_protocol_dialinfo(&context, ProtocolType::TCP)
.await?; .await?;
self.update_ipv6_protocol_dialinfo(&mut context, ProtocolType::TCP) self.update_ipv6_protocol_dialinfo(&context, ProtocolType::TCP)
.await?; .await?;
} }
if protocol_config.inbound.contains(ProtocolType::WS) { if protocol_config.inbound.contains(ProtocolType::WS) {
self.update_ipv4_protocol_dialinfo(&mut context, ProtocolType::WS) self.update_ipv4_protocol_dialinfo(&context, ProtocolType::WS)
.await?; .await?;
self.update_ipv6_protocol_dialinfo(&mut context, ProtocolType::WS) self.update_ipv6_protocol_dialinfo(&context, ProtocolType::WS)
.await?; .await?;
} }
self.inner.lock().network_class = context.network_class; self.inner.lock().network_class = context.inner.lock().network_class;
Ok(()) Ok(())
} }

View File

@ -331,17 +331,16 @@ impl Network {
DialInfoClass::Direct, DialInfoClass::Direct,
); );
// See if this public address is also a local interface address // See if this public address is also a local interface address we haven't registered yet
if !local_dial_info_list.contains(&pdi) let is_interface_address = self.with_interface_addresses(|ip_addrs| {
&& self.with_interface_addresses(|ip_addrs| { for ip_addr in ip_addrs {
for ip_addr in ip_addrs { if pdi_addr.ip() == *ip_addr {
if pdi_addr.ip() == *ip_addr { return true;
return true;
}
} }
false }
}) false
{ });
if !local_dial_info_list.contains(&pdi) && is_interface_address {
routing_table.register_dial_info( routing_table.register_dial_info(
RoutingDomain::LocalNetwork, RoutingDomain::LocalNetwork,
DialInfo::udp_from_socketaddr(pdi_addr), DialInfo::udp_from_socketaddr(pdi_addr),
@ -432,16 +431,15 @@ impl Network {
static_public = true; static_public = true;
// See if this public address is also a local interface address // See if this public address is also a local interface address
if !registered_addresses.contains(&gsa.ip()) let is_interface_address = self.with_interface_addresses(|ip_addrs| {
&& self.with_interface_addresses(|ip_addrs| { for ip_addr in ip_addrs {
for ip_addr in ip_addrs { if gsa.ip() == *ip_addr {
if gsa.ip() == *ip_addr { return true;
return true;
}
} }
false }
}) false
{ });
if !registered_addresses.contains(&gsa.ip()) && is_interface_address {
routing_table.register_dial_info( routing_table.register_dial_info(
RoutingDomain::LocalNetwork, RoutingDomain::LocalNetwork,
pdi, pdi,
@ -496,12 +494,11 @@ impl Network {
trace!("starting wss listeners"); trace!("starting wss listeners");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let (listen_address, url, enable_local_peer_scope) = { let (listen_address, url) = {
let c = self.config.get(); let c = self.config.get();
( (
c.network.protocol.wss.listen_address.clone(), c.network.protocol.wss.listen_address.clone(),
c.network.protocol.wss.url.clone(), c.network.protocol.wss.url.clone(),
c.network.enable_local_peer_scope,
) )
}; };
@ -566,16 +563,15 @@ impl Network {
static_public = true; static_public = true;
// See if this public address is also a local interface address // See if this public address is also a local interface address
if !registered_addresses.contains(&gsa.ip()) let is_interface_address = self.with_interface_addresses(|ip_addrs| {
&& self.with_interface_addresses(|ip_addrs| { for ip_addr in ip_addrs {
for ip_addr in ip_addrs { if gsa.ip() == *ip_addr {
if gsa.ip() == *ip_addr { return true;
return true;
}
} }
false }
}) false
{ });
if !registered_addresses.contains(&gsa.ip()) && is_interface_address {
routing_table.register_dial_info( routing_table.register_dial_info(
RoutingDomain::LocalNetwork, RoutingDomain::LocalNetwork,
pdi, pdi,
@ -683,22 +679,21 @@ impl Network {
static_public = true; static_public = true;
// See if this public address is also a local interface address // See if this public address is also a local interface address
if self.with_interface_addresses(|ip_addrs| { let is_interface_address = self.with_interface_addresses(|ip_addrs| {
for ip_addr in ip_addrs { for ip_addr in ip_addrs {
if pdi_addr.ip() == *ip_addr { if pdi_addr.ip() == *ip_addr {
return true; return true;
} }
} }
false false
}) { });
if is_interface_address {
routing_table.register_dial_info( routing_table.register_dial_info(
RoutingDomain::LocalNetwork, RoutingDomain::LocalNetwork,
pdi, pdi,
DialInfoClass::Direct, DialInfoClass::Direct,
); );
} }
static_public = true;
} }
} }

View File

@ -599,7 +599,7 @@ impl NetworkManager {
if envelope_node_id != via_node_id { if envelope_node_id != via_node_id {
return Ok(SendDataKind::GlobalIndirect); return Ok(SendDataKind::GlobalIndirect);
} }
return Ok(send_data_kind); Ok(send_data_kind)
} }
// Called by the RPC handler when we want to issue an direct receipt // Called by the RPC handler when we want to issue an direct receipt
@ -626,7 +626,7 @@ impl NetworkManager {
} }
// Figure out how to reach a node // Figure out how to reach a node
fn get_contact_method(&self, target_node_ref: NodeRef) -> Result<ContactMethod, String> { fn get_contact_method(&self, mut target_node_ref: NodeRef) -> Result<ContactMethod, String> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
// Get our network class and protocol config // Get our network class and protocol config
@ -639,105 +639,94 @@ impl NetworkManager {
} }
// Get the best matching local direct dial info if we have it // Get the best matching local direct dial info if we have it
let opt_local_did = let opt_target_local_did =
target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::LocalNetwork)); target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::LocalNetwork));
if let Some(local_did) = opt_local_did { if let Some(target_local_did) = opt_target_local_did {
return Ok(ContactMethod::Direct(local_did.dial_info)); return Ok(ContactMethod::Direct(target_local_did.dial_info));
} }
// Get the best match internet dial info if we have it // Get the best match internet dial info if we have it
let opt_public_did = let opt_target_public_did =
target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)); target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet));
if let Some(target_public_did) = opt_target_public_did {
// Can the target node do inbound?
let target_network_class = target_node_ref.network_class();
//if matches!(target_network_class, NetworkClass::InboundCapable) {
if let Some(public_did) = opt_public_did {
// Do we need to signal before going inbound? // Do we need to signal before going inbound?
if public_did.class.requires_signal() { if !target_public_did.class.requires_signal() {
// Get the target's inbound relay, it must have one or it is not reachable // Go direct without signaling
if let Some(inbound_relay_nr) = target_node_ref.relay() { return Ok(ContactMethod::Direct(target_public_did.dial_info));
// Can we reach the inbound relay?
if inbound_relay_nr
.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet))
.is_some()
{
// Can we receive anything inbound ever?
if matches!(our_network_class, NetworkClass::InboundCapable) {
// Get the best match dial info for an reverse inbound connection
let reverse_dif = DialInfoFilter::global()
.with_protocol_set(target_node_ref.outbound_protocols());
if let Some(reverse_did) = routing_table
.first_filtered_dial_info_detail(
RoutingDomain::PublicInternet,
&reverse_dif,
)
{
// Can we receive a direct reverse connection?
if !reverse_did.class.requires_signal() {
return Ok(ContactMethod::SignalReverse(
inbound_relay_nr,
target_node_ref,
));
}
}
// Does we and the target have outbound protocols to hole-punch?
if our_protocol_config.outbound.contains(ProtocolType::UDP)
&& target_node_ref
.outbound_protocols()
.contains(ProtocolType::UDP)
{
// Do the target and self nodes have a direct udp dialinfo
let udp_dif =
DialInfoFilter::global().with_protocol_type(ProtocolType::UDP);
let udp_target_nr = target_node_ref.clone();
udp_target_nr
.filter_protocols(ProtocolSet::only(ProtocolType::UDP));
let target_has_udp_dialinfo = target_node_ref
.first_filtered_dial_info_detail(Some(
RoutingDomain::PublicInternet,
))
.is_some();
let self_has_udp_dialinfo = routing_table
.first_filtered_dial_info_detail(
RoutingDomain::PublicInternet,
&udp_dif,
)
.is_some();
if target_has_udp_dialinfo && self_has_udp_dialinfo {
return Ok(ContactMethod::SignalHolePunch(
inbound_relay_nr,
udp_target_nr,
));
}
}
// Otherwise we have to inbound relay
}
return Ok(ContactMethod::InboundRelay(inbound_relay_nr));
}
}
} }
// Go direct without signaling
else { // Get the target's inbound relay, it must have one or it is not reachable
// If we have direct dial info we can use, do it if let Some(inbound_relay_nr) = target_node_ref.relay() {
if let Some(did) = opt_public_did { // Can we reach the inbound relay?
return Ok(ContactMethod::Direct(did.dial_info)); if inbound_relay_nr
}
}
} else {
// If the other node is not inbound capable at all, it is using a full relay
if let Some(target_inbound_relay_nr) = target_node_ref.relay() {
// Can we reach the full relay?
if target_inbound_relay_nr
.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet))
.is_some() .is_some()
{ {
return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr)); // Can we receive anything inbound ever?
if matches!(our_network_class, NetworkClass::InboundCapable) {
// Get the best match dial info for an reverse inbound connection
let reverse_dif = DialInfoFilter::global()
.with_protocol_set(target_node_ref.outbound_protocols());
if let Some(reverse_did) = routing_table.first_filtered_dial_info_detail(
Some(RoutingDomain::PublicInternet),
&reverse_dif,
) {
// Can we receive a direct reverse connection?
if !reverse_did.class.requires_signal() {
return Ok(ContactMethod::SignalReverse(
inbound_relay_nr,
target_node_ref,
));
}
}
// Does we and the target have outbound protocols to hole-punch?
if our_protocol_config.outbound.contains(ProtocolType::UDP)
&& target_node_ref
.outbound_protocols()
.contains(ProtocolType::UDP)
{
// Do the target and self nodes have a direct udp dialinfo
let udp_dif =
DialInfoFilter::global().with_protocol_type(ProtocolType::UDP);
let mut udp_target_nr = target_node_ref.clone();
udp_target_nr.filter_protocols(ProtocolSet::only(ProtocolType::UDP));
let target_has_udp_dialinfo = target_node_ref
.first_filtered_dial_info_detail(Some(
RoutingDomain::PublicInternet,
))
.is_some();
let self_has_udp_dialinfo = routing_table
.first_filtered_dial_info_detail(
Some(RoutingDomain::PublicInternet),
&udp_dif,
)
.is_some();
if target_has_udp_dialinfo && self_has_udp_dialinfo {
return Ok(ContactMethod::SignalHolePunch(
inbound_relay_nr,
udp_target_nr,
));
}
}
// Otherwise we have to inbound relay
}
return Ok(ContactMethod::InboundRelay(inbound_relay_nr));
} }
} }
} }
// If the other node is not inbound capable at all, it is using a full relay
else if let Some(target_inbound_relay_nr) = target_node_ref.relay() {
// Can we reach the full relay?
if target_inbound_relay_nr
.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet))
.is_some()
{
return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr));
}
}
// If we can't reach the node by other means, try our outbound relay if we have one // If we can't reach the node by other means, try our outbound relay if we have one
if let Some(relay_node) = self.relay_node() { if let Some(relay_node) = self.relay_node() {
return Ok(ContactMethod::OutboundRelay(relay_node)); return Ok(ContactMethod::OutboundRelay(relay_node));
@ -868,23 +857,17 @@ impl NetworkManager {
let inbound_nr = match eventual_value.await { let inbound_nr = match eventual_value.await {
ReceiptEvent::Returned(inbound_nr) => inbound_nr, ReceiptEvent::Returned(inbound_nr) => inbound_nr,
ReceiptEvent::Expired => { ReceiptEvent::Expired => {
return Err(format!( return Err(format!("hole punch receipt expired from {:?}", target_nr));
"reverse connect receipt expired from {:?}",
target_nr
));
} }
ReceiptEvent::Cancelled => { ReceiptEvent::Cancelled => {
return Err(format!( return Err(format!("hole punch receipt cancelled from {:?}", target_nr));
"reverse connect receipt cancelled from {:?}",
target_nr
));
} }
}; };
// We expect the inbound noderef to be the same as the target noderef // We expect the inbound noderef to be the same as the target noderef
// if they aren't the same, we should error on this and figure out what then hell is up // if they aren't the same, we should error on this and figure out what then hell is up
if target_nr != inbound_nr { if target_nr != inbound_nr {
error!("unexpected noderef mismatch on reverse connect"); error!("unexpected noderef mismatch on hole punch");
} }
// And now use the existing connection to send over // And now use the existing connection to send over
@ -896,10 +879,10 @@ impl NetworkManager {
.map_err(logthru_net!())? .map_err(logthru_net!())?
{ {
None => Ok(()), None => Ok(()),
Some(_) => Err("unable to send over reverse connection".to_owned()), Some(_) => Err("unable to send over hole punch".to_owned()),
} }
} else { } else {
Err("no reverse connection available".to_owned()) Err("no hole punch available".to_owned())
} }
} }
@ -947,17 +930,17 @@ impl NetworkManager {
.await .await
.map(|_| SendDataKind::GlobalIndirect) .map(|_| SendDataKind::GlobalIndirect)
} }
ContactMethod::Direct(dial_info) => this ContactMethod::Direct(dial_info) => {
.net() let send_data_kind = if dial_info.is_local() {
.send_data_to_dial_info(dial_info, data) SendDataKind::LocalDirect
.await } else {
.map(|_| { SendDataKind::GlobalDirect
if dial_info.is_local() { };
SendDataKind::LocalDirect this.net()
} else { .send_data_to_dial_info(dial_info, data)
SendDataKind::GlobalDirect .await
} .map(|_| send_data_kind)
}), }
ContactMethod::SignalReverse(relay_nr, target_node_ref) => this ContactMethod::SignalReverse(relay_nr, target_node_ref) => this
.do_reverse_connect(relay_nr, target_node_ref, data) .do_reverse_connect(relay_nr, target_node_ref, data)
.await .await

View File

@ -50,7 +50,6 @@ impl RoutingTable {
// Get our own node's peer info (public node info) so we can share it with other nodes // Get our own node's peer info (public node info) so we can share it with other nodes
pub fn get_own_peer_info(&self) -> PeerInfo { pub fn get_own_peer_info(&self) -> PeerInfo {
let netman = self.network_manager(); let netman = self.network_manager();
let enable_local_peer_scope = netman.config().get().network.enable_local_peer_scope;
let relay_node = netman.relay_node(); let relay_node = netman.relay_node();
PeerInfo { PeerInfo {
node_id: NodeId::new(self.node_id()), node_id: NodeId::new(self.node_id()),

View File

@ -187,17 +187,36 @@ impl RoutingTable {
pub fn first_filtered_dial_info_detail( pub fn first_filtered_dial_info_detail(
&self, &self,
domain: RoutingDomain, domain: Option<RoutingDomain>,
filter: &DialInfoFilter, filter: &DialInfoFilter,
) -> Option<DialInfoDetail> { ) -> Option<DialInfoDetail> {
let inner = self.inner.lock(); let inner = self.inner.lock();
Self::with_routing_domain(&*inner, domain, |rd| { // Prefer local network first if it isn't filtered out
for did in rd.dial_info_details { if domain == None || domain == Some(RoutingDomain::LocalNetwork) {
if did.matches_filter(filter) { Self::with_routing_domain(&*inner, RoutingDomain::LocalNetwork, |rd| {
return Some(did.clone()); for did in &rd.dial_info_details {
if did.matches_filter(filter) {
return Some(did.clone());
}
} }
} None
})
} else {
None None
}
.or_else(|| {
if domain == None || domain == Some(RoutingDomain::PublicInternet) {
Self::with_routing_domain(&*inner, RoutingDomain::PublicInternet, |rd| {
for did in &rd.dial_info_details {
if did.matches_filter(filter) {
return Some(did.clone());
}
}
None
})
} else {
None
}
}) })
} }
@ -211,7 +230,7 @@ impl RoutingTable {
if domain == None || domain == Some(RoutingDomain::LocalNetwork) { if domain == None || domain == Some(RoutingDomain::LocalNetwork) {
Self::with_routing_domain(&*inner, RoutingDomain::LocalNetwork, |rd| { Self::with_routing_domain(&*inner, RoutingDomain::LocalNetwork, |rd| {
for did in rd.dial_info_details { for did in &rd.dial_info_details {
if did.matches_filter(filter) { if did.matches_filter(filter) {
ret.push(did.clone()); ret.push(did.clone());
} }
@ -220,7 +239,7 @@ impl RoutingTable {
} }
if domain == None || domain == Some(RoutingDomain::PublicInternet) { if domain == None || domain == Some(RoutingDomain::PublicInternet) {
Self::with_routing_domain(&*inner, RoutingDomain::PublicInternet, |rd| { Self::with_routing_domain(&*inner, RoutingDomain::PublicInternet, |rd| {
for did in rd.dial_info_details { for did in &rd.dial_info_details {
if did.matches_filter(filter) { if did.matches_filter(filter) {
ret.push(did.clone()); ret.push(did.clone());
} }

View File

@ -46,11 +46,12 @@ impl NodeRef {
// Returns true if some protocols can still pass the filter and false if no protocols remain // Returns true if some protocols can still pass the filter and false if no protocols remain
pub fn filter_protocols(&mut self, protocol_set: ProtocolSet) -> bool { pub fn filter_protocols(&mut self, protocol_set: ProtocolSet) -> bool {
if protocol_set != ProtocolSet::all() { if protocol_set != ProtocolSet::all() {
let mut dif = self.filter.unwrap_or_default(); let mut dif = self.filter.clone().unwrap_or_default();
dif.protocol_set &= protocol_set; dif.protocol_set &= protocol_set;
self.filter = Some(dif); self.filter = Some(dif);
} }
self.filter self.filter
.as_ref()
.map(|f| !f.protocol_set.is_empty()) .map(|f| !f.protocol_set.is_empty())
.unwrap_or(true) .unwrap_or(true)
} }
@ -79,13 +80,12 @@ impl NodeRef {
self.operate(|e| e.node_info().outbound_protocols) self.operate(|e| e.node_info().outbound_protocols)
} }
pub fn relay(&self) -> Option<NodeRef> { pub fn relay(&self) -> Option<NodeRef> {
let target_rpi = self.operate(|e| e.node_info().relay_peer_info)?; let target_rpi = self.operate(|e| e.node_info().relay_peer_info.clone())?;
self.routing_table self.routing_table
.register_node_with_node_info(target_rpi.node_id.key, target_rpi.node_info) .register_node_with_node_info(target_rpi.node_id.key, target_rpi.node_info)
.map_err(logthru_rtab!(error)) .map_err(logthru_rtab!(error))
.ok() .ok()
.map(|nr| { .map(|mut nr| {
nr.set_filter(self.filter_ref().cloned()); nr.set_filter(self.filter_ref().cloned());
nr nr
}) })
@ -95,16 +95,20 @@ impl NodeRef {
routing_domain: Option<RoutingDomain>, routing_domain: Option<RoutingDomain>,
) -> Option<DialInfoDetail> { ) -> Option<DialInfoDetail> {
self.operate(|e| { self.operate(|e| {
// Prefer local dial info first unless it is filtered out
if (routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork)) if (routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork))
&& matches!( && matches!(
self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), self.filter
.as_ref()
.map(|f| f.peer_scope)
.unwrap_or(PeerScope::All),
PeerScope::All | PeerScope::Local PeerScope::All | PeerScope::Local
) )
{ {
e.local_node_info() e.local_node_info()
.first_filtered_dial_info(|di| { .first_filtered_dial_info(|di| {
if let Some(filter) = self.filter { if let Some(filter) = self.filter.as_ref() {
di.matches_filter(&filter) di.matches_filter(filter)
} else { } else {
true true
} }
@ -119,13 +123,16 @@ impl NodeRef {
.or_else(|| { .or_else(|| {
if (routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet)) if (routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet))
&& matches!( && matches!(
self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), self.filter
.as_ref()
.map(|f| f.peer_scope)
.unwrap_or(PeerScope::All),
PeerScope::All | PeerScope::Global PeerScope::All | PeerScope::Global
) )
{ {
e.node_info().first_filtered_dial_info_detail(|did| { e.node_info().first_filtered_dial_info_detail(|did| {
if let Some(filter) = self.filter { if let Some(filter) = self.filter.as_ref() {
did.matches_filter(&filter) did.matches_filter(filter)
} else { } else {
true true
} }
@ -143,15 +150,19 @@ impl NodeRef {
) -> Vec<DialInfoDetail> { ) -> Vec<DialInfoDetail> {
let mut out = Vec::new(); let mut out = Vec::new();
self.operate(|e| { self.operate(|e| {
// Prefer local dial info first unless it is filtered out
if (routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork)) if (routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork))
&& matches!( && matches!(
self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), self.filter
.as_ref()
.map(|f| f.peer_scope)
.unwrap_or(PeerScope::All),
PeerScope::All | PeerScope::Local PeerScope::All | PeerScope::Local
) )
{ {
for di in e.local_node_info().all_filtered_dial_info(|di| { for di in e.local_node_info().all_filtered_dial_info(|di| {
if let Some(filter) = self.filter { if let Some(filter) = self.filter.as_ref() {
di.matches_filter(&filter) di.matches_filter(filter)
} else { } else {
true true
} }
@ -164,13 +175,16 @@ impl NodeRef {
} }
if (routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet)) if (routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet))
&& matches!( && matches!(
self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), self.filter
.as_ref()
.map(|f| f.peer_scope)
.unwrap_or(PeerScope::All),
PeerScope::All | PeerScope::Global PeerScope::All | PeerScope::Global
) )
{ {
out.append(&mut e.node_info().all_filtered_dial_info_details(|did| { out.append(&mut e.node_info().all_filtered_dial_info_details(|did| {
if let Some(filter) = self.filter { if let Some(filter) = self.filter.as_ref() {
did.matches_filter(&filter) did.matches_filter(filter)
} else { } else {
true true
} }

View File

@ -1378,49 +1378,52 @@ impl RPCProcessor {
// Wait for reply // Wait for reply
let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?; let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?;
let response_operation = rpcreader let (sender_info, node_status) = {
.reader let response_operation = rpcreader
.get_root::<veilid_capnp::operation::Reader>() .reader
.map_err(map_error_capnp_error!()) .get_root::<veilid_capnp::operation::Reader>()
.map_err(logthru_rpc!())?; .map_err(map_error_capnp_error!())
let info_a = match response_operation .map_err(logthru_rpc!())?;
.get_detail() let info_a = match response_operation
.which() .get_detail()
.map_err(map_error_capnp_notinschema!()) .which()
.map_err(logthru_rpc!())? .map_err(map_error_capnp_notinschema!())
{ .map_err(logthru_rpc!())?
veilid_capnp::operation::detail::InfoA(a) => { {
a.map_err(map_error_internal!("Invalid InfoA"))? veilid_capnp::operation::detail::InfoA(a) => {
a.map_err(map_error_internal!("Invalid InfoA"))?
}
_ => return Err(rpc_error_internal("Incorrect RPC answer for question")),
};
// Decode node info
if !info_a.has_node_status() {
return Err(rpc_error_internal("Missing node status"));
} }
_ => return Err(rpc_error_internal("Incorrect RPC answer for question")), let nsr = info_a
.get_node_status()
.map_err(map_error_internal!("Broken node status"))?;
let node_status = decode_node_status(&nsr)?;
// Decode sender info
let sender_info = if info_a.has_sender_info() {
let sir = info_a
.get_sender_info()
.map_err(map_error_internal!("Broken sender info"))?;
decode_sender_info(&sir)?
} else {
SenderInfo::default()
};
// Update latest node status in routing table
peer.operate(|e| {
e.update_node_status(node_status.clone());
});
(sender_info, node_status)
}; };
// Decode node info
if !info_a.has_node_status() {
return Err(rpc_error_internal("Missing node status"));
}
let nsr = info_a
.get_node_status()
.map_err(map_error_internal!("Broken node status"))?;
let node_status = decode_node_status(&nsr)?;
// Decode sender info
let sender_info = if info_a.has_sender_info() {
let sir = info_a
.get_sender_info()
.map_err(map_error_internal!("Broken sender info"))?;
decode_sender_info(&sir)?
} else {
SenderInfo::default()
};
// Update latest node status in routing table
peer.operate(|e| {
e.update_node_status(node_status.clone());
});
// Report sender_info IP addresses to network manager // Report sender_info IP addresses to network manager
if let Some(socket_address) = sender_info.socket_address { if let Some(socket_address) = sender_info.socket_address {
match send_data_kind { match send_data_kind {
SendDataKind::LocalDirect => { SendDataKind::LocalDirect => {

View File

@ -326,7 +326,7 @@ impl NodeInfo {
F: Fn(&DialInfoDetail) -> bool, F: Fn(&DialInfoDetail) -> bool,
{ {
for did in &self.dial_info_detail_list { for did in &self.dial_info_detail_list {
if filter(&did) { if filter(did) {
return Some(did.clone()); return Some(did.clone());
} }
} }
@ -340,7 +340,7 @@ impl NodeInfo {
let mut dial_info_detail_list = Vec::new(); let mut dial_info_detail_list = Vec::new();
for did in &self.dial_info_detail_list { for did in &self.dial_info_detail_list {
if filter(&did) { if filter(did) {
dial_info_detail_list.push(did.clone()); dial_info_detail_list.push(did.clone());
} }
} }
@ -446,6 +446,7 @@ impl LocalNodeInfo {
} }
} }
#[allow(clippy::derive_hash_xor_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] #[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)]
// Keep member order appropriate for sorting < preference // Keep member order appropriate for sorting < preference
// Must match DialInfo order // Must match DialInfo order