refactor done for native

This commit is contained in:
John Smith 2021-12-24 18:02:53 -05:00
parent 922470365a
commit 23abaa3c99
14 changed files with 266 additions and 350 deletions

View File

@ -18,13 +18,11 @@ use utils::network_interfaces::*;
use async_std::io;
use async_std::net::*;
use async_tls::TlsAcceptor;
use cfg_if::*;
use futures_util::StreamExt;
// xxx: rustls ^0.20
//use rustls::{server::NoClientAuth, Certificate, PrivateKey, ServerConfig};
use rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig};
use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
use socket2::{Domain, Protocol, Socket, Type};
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
@ -44,6 +42,7 @@ struct NetworkInner {
protocol_config: Option<ProtocolConfig>,
udp_static_public_dialinfo: bool,
tcp_static_public_dialinfo: bool,
ws_static_public_dialinfo: bool,
network_class: Option<NetworkClass>,
join_handles: Vec<JoinHandle<()>>,
listener_states: BTreeMap<SocketAddr, Arc<RwLock<ListenerState>>>,
@ -55,9 +54,6 @@ struct NetworkInner {
wss_port: u16,
outbound_udpv4_protocol_handler: Option<RawUdpProtocolHandler>,
outbound_udpv6_protocol_handler: Option<RawUdpProtocolHandler>,
outbound_tcp_protocol_handler: Option<RawTcpProtocolHandler>,
outbound_ws_protocol_handler: Option<WebsocketProtocolHandler>,
outbound_wss_protocol_handler: Option<WebsocketProtocolHandler>,
interfaces: NetworkInterfaces,
}
@ -84,6 +80,7 @@ impl Network {
protocol_config: None,
udp_static_public_dialinfo: false,
tcp_static_public_dialinfo: false,
ws_static_public_dialinfo: false,
network_class: None,
join_handles: Vec::new(),
listener_states: BTreeMap::new(),
@ -95,9 +92,6 @@ impl Network {
wss_port: 0u16,
outbound_udpv4_protocol_handler: None,
outbound_udpv6_protocol_handler: None,
outbound_tcp_protocol_handler: None,
outbound_ws_protocol_handler: None,
outbound_wss_protocol_handler: None,
interfaces: NetworkInterfaces::new(),
}
}
@ -464,7 +458,7 @@ impl Network {
.local_addr()
.map_err(map_to_string)?
.as_socket_ipv4()
.ok_or("expected ipv4 address type".to_owned())?
.ok_or_else(|| "expected ipv4 address type".to_owned())?
.port();
// Make an async UdpSocket from the socket2 socket
@ -639,7 +633,6 @@ impl Network {
local_port: u16,
peer_socket_addr: &SocketAddr,
) -> SocketAddr {
let inner = self.inner.lock();
match peer_socket_addr {
SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), local_port),
SocketAddr::V6(_) => SocketAddr::new(
@ -807,15 +800,27 @@ impl Network {
};
info!("UDP: starting listener at {:?}", listen_address);
let dial_infos = self.start_udp_handler(listen_address.clone()).await?;
for x in &dial_infos {
let mut static_public = false;
for di in &dial_infos {
// Pick out UDP port for outbound connections (they will all be the same)
self.inner.lock().udp_port = x.port();
// Register local dial info
routing_table.register_local_dial_info(x.clone(), DialInfoOrigin::Static);
self.inner.lock().udp_port = di.port();
// Register local dial info only here if we specify a public address
if public_address.is_none() && di.is_global() {
// Register global dial info if no public address is specified
routing_table.register_dial_info(
di.clone(),
DialInfoOrigin::Static,
Some(NetworkClass::Server),
);
static_public = true;
} else if di.is_local() {
// Register local dial info
routing_table.register_dial_info(di.clone(), DialInfoOrigin::Static, None);
}
}
// Add static public dialinfo if it's configured
let mut static_public = false;
if let Some(public_address) = public_address.as_ref() {
// Resolve statically configured public dialinfo
let mut public_sockaddrs = public_address
@ -825,26 +830,15 @@ impl Network {
// Add all resolved addresses as public dialinfo
for pdi_addr in &mut public_sockaddrs {
routing_table.register_global_dial_info(
routing_table.register_dial_info(
DialInfo::udp_from_socketaddr(pdi_addr),
Some(NetworkClass::Server),
DialInfoOrigin::Static,
Some(NetworkClass::Server),
);
static_public = true;
}
} else {
// Register local dial info as public if it is publicly routable
for x in &dial_infos {
if x.is_global() {
routing_table.register_global_dial_info(
x.clone(),
Some(NetworkClass::Server),
DialInfoOrigin::Static,
);
static_public = true;
}
}
}
self.inner.lock().udp_static_public_dialinfo = static_public;
Ok(())
}
@ -869,18 +863,35 @@ impl Network {
.await?;
trace!("WS: listener started");
let mut dial_infos: Vec<DialInfo> = Vec::new();
let mut static_public = false;
for socket_address in socket_addresses {
// Pick out WS port for outbound connections (they will all be the same)
self.inner.lock().ws_port = socket_address.port();
// Build local dial info request url
let local_url = format!("ws://{}/{}", socket_address, path);
// Create local dial info
let di = DialInfo::try_ws(socket_address, local_url)
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
dial_infos.push(di.clone());
routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
if url.is_none() && socket_address.address().is_global() {
// Build global dial info request url
let global_url = format!("ws://{}/{}", socket_address, path);
// Create global dial info
let di = DialInfo::try_ws(socket_address, global_url)
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
routing_table.register_dial_info(
di,
DialInfoOrigin::Static,
Some(NetworkClass::Server),
);
static_public = true;
} else if socket_address.address().is_local() {
// Build local dial info request url
let local_url = format!("ws://{}/{}", socket_address, path);
// Create local dial info
let di = DialInfo::try_ws(socket_address, local_url)
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
routing_table.register_dial_info(di, DialInfoOrigin::Static, None);
}
}
// Add static public dialinfo if it's configured
@ -900,15 +911,17 @@ impl Network {
.map_err(logthru_net!(error))?;
for gsa in global_socket_addrs {
routing_table.register_global_dial_info(
routing_table.register_dial_info(
DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone())
.map_err(map_to_string)
.map_err(logthru_net!(error))?,
Some(NetworkClass::Server),
DialInfoOrigin::Static,
Some(NetworkClass::Server),
);
}
static_public = true;
}
self.inner.lock().ws_static_public_dialinfo = static_public;
Ok(())
}
@ -937,13 +950,9 @@ impl Network {
// is specified, then TLS won't validate, so no local dialinfo is possible.
// This is not the case with unencrypted websockets, which can be specified solely by an IP address
//
// let mut dial_infos: Vec<DialInfo> = Vec::new();
for socket_address in socket_addresses {
if let Some(socket_address) = socket_addresses.first() {
// Pick out WSS port for outbound connections (they will all be the same)
self.inner.lock().wss_port = socket_address.port();
// Don't register local dial info because TLS won't allow that anyway without a local CA
// and we aren't doing that yet at all today.
}
// Add static public dialinfo if it's configured
@ -964,12 +973,12 @@ impl Network {
.map_err(logthru_net!(error))?;
for gsa in global_socket_addrs {
routing_table.register_global_dial_info(
routing_table.register_dial_info(
DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone())
.map_err(map_to_string)
.map_err(logthru_net!(error))?,
Some(NetworkClass::Server),
DialInfoOrigin::Static,
Some(NetworkClass::Server),
);
}
} else {
@ -998,18 +1007,29 @@ impl Network {
.await?;
trace!("TCP: listener started");
let mut dial_infos: Vec<DialInfo> = Vec::new();
let mut static_public = false;
for socket_address in socket_addresses {
// Pick out TCP port for outbound connections (they will all be the same)
self.inner.lock().tcp_port = socket_address.port();
let di = DialInfo::tcp(socket_address);
dial_infos.push(di.clone());
routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
// Register local dial info only here if we specify a public address
if public_address.is_none() && di.is_global() {
// Register global dial info if no public address is specified
routing_table.register_dial_info(
di.clone(),
DialInfoOrigin::Static,
Some(NetworkClass::Server),
);
static_public = true;
} else if di.is_local() {
// Register local dial info
routing_table.register_dial_info(di.clone(), DialInfoOrigin::Static, None);
}
}
// Add static public dialinfo if it's configured
let mut static_public = false;
if let Some(public_address) = public_address.as_ref() {
// Resolve statically configured public dialinfo
let mut public_sockaddrs = public_address
@ -1019,25 +1039,13 @@ impl Network {
// Add all resolved addresses as public dialinfo
for pdi_addr in &mut public_sockaddrs {
routing_table.register_global_dial_info(
routing_table.register_dial_info(
DialInfo::tcp_from_socketaddr(pdi_addr),
None,
DialInfoOrigin::Static,
None,
);
static_public = true;
}
} else {
// Register local dial info as public if it is publicly routable
for x in &dial_infos {
if x.is_global() {
routing_table.register_global_dial_info(
x.clone(),
Some(NetworkClass::Server),
DialInfoOrigin::Static,
);
static_public = true;
}
}
}
self.inner.lock().tcp_static_public_dialinfo = static_public;
@ -1046,12 +1054,11 @@ impl Network {
}
pub fn get_protocol_config(&self) -> Option<ProtocolConfig> {
self.inner.lock().protocol_config.clone()
self.inner.lock().protocol_config
}
pub async fn startup(&self) -> Result<(), String> {
info!("starting network");
let network_manager = self.inner.lock().network_manager.clone();
// initialize interfaces
self.inner.lock().interfaces.refresh()?;
@ -1103,8 +1110,7 @@ impl Network {
let routing_table = network_manager.routing_table();
// Drop all dial info
routing_table.clear_local_dial_info();
routing_table.clear_global_dial_info();
routing_table.clear_dial_info_details();
// Cancels all async background tasks by dropping join handles
*self.inner.lock() = Self::new_inner(network_manager);
@ -1151,10 +1157,7 @@ impl Network {
let inner = self.inner.lock();
(
inner.network_manager.routing_table(),
inner
.protocol_config
.clone()
.unwrap_or_else(|| ProtocolConfig::default()),
inner.protocol_config.unwrap_or_default(),
inner.udp_static_public_dialinfo,
inner.tcp_static_public_dialinfo,
inner.network_class.unwrap_or(NetworkClass::Invalid),
@ -1170,12 +1173,11 @@ impl Network {
&& !udp_static_public_dialinfo
&& (network_class.inbound_capable() || network_class == NetworkClass::Invalid)
{
let filter = DialInfoFilter::with_protocol_type_and_address_type(
ProtocolType::UDP,
AddressType::IPV4,
);
let filter = DialInfoFilter::global()
.with_protocol_type(ProtocolType::UDP)
.with_address_type(AddressType::IPV4);
let need_udpv4_dialinfo = routing_table
.first_filtered_global_dial_info_details(|d| d.dial_info.matches_filter(&filter))
.first_filtered_dial_info_detail(&filter)
.is_none();
if need_udpv4_dialinfo {
// If we have no public UDPv4 dialinfo, then we need to run a NAT check
@ -1192,12 +1194,11 @@ impl Network {
&& !tcp_static_public_dialinfo
&& (network_class.inbound_capable() || network_class == NetworkClass::Invalid)
{
let filter = DialInfoFilter::with_protocol_type_and_address_type(
ProtocolType::TCP,
AddressType::IPV4,
);
let filter = DialInfoFilter::global()
.with_protocol_type(ProtocolType::TCP)
.with_address_type(AddressType::IPV4);
let need_tcpv4_dialinfo = routing_table
.first_filtered_global_dial_info_details(|d| d.dial_info.matches_filter(&filter))
.first_filtered_dial_info_detail(&filter)
.is_none();
if need_tcpv4_dialinfo {
// If we have no public TCPv4 dialinfo, then we need to run a NAT check

View File

@ -4,17 +4,14 @@ pub mod wrtc;
pub mod ws;
use super::listener_state::*;
use crate::veilid_api::ProtocolType;
use crate::xx::*;
use crate::*;
use socket2::{Domain, Protocol, Socket, Type};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DummyNetworkConnection {}
impl DummyNetworkConnection {
pub fn protocol_type(&self) -> ProtocolType {
ProtocolType::UDP
}
pub fn send(&self, _message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
Box::pin(async { Ok(()) })
}
@ -34,15 +31,6 @@ pub enum NetworkConnection {
}
impl NetworkConnection {
pub fn protocol_type(&self) -> ProtocolType {
match self {
Self::Dummy(d) => d.protocol_type(),
Self::RawTcp(t) => t.protocol_type(),
Self::WsAccepted(w) => w.protocol_type(),
Self::Ws(w) => w.protocol_type(),
Self::Wss(w) => w.protocol_type(),
}
}
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
match self {
Self::Dummy(d) => d.send(message),

View File

@ -44,10 +44,6 @@ impl RawTcpNetworkConnection {
}
impl RawTcpNetworkConnection {
pub fn protocol_type(&self) -> ProtocolType {
ProtocolType::TCP
}
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
let inner = self.inner.clone();

View File

@ -80,14 +80,6 @@ where
}
}
pub fn protocol_type(&self) -> ProtocolType {
if self.tls {
ProtocolType::WSS
} else {
ProtocolType::WS
}
}
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
let inner = self.inner.clone();
@ -248,9 +240,9 @@ impl WebsocketProtocolHandler {
dial_info: &DialInfo,
) -> Result<NetworkConnection, String> {
// Split dial info up
let (tls, protocol_type, scheme) = match &dial_info {
DialInfo::WS(_) => (false, ProtocolType::WS, "ws"),
DialInfo::WSS(_) => (true, ProtocolType::WSS, "wss"),
let (tls, scheme) = match &dial_info {
DialInfo::WS(_) => (false, "ws"),
DialInfo::WSS(_) => (true, "wss"),
_ => panic!("invalid dialinfo for WS/WSS protocol"),
};
let request = dial_info.request().unwrap();

View File

@ -5,8 +5,6 @@ use crate::network_manager::*;
use crate::routing_table::*;
use crate::*;
use async_std::net::*;
impl Network {
// Ask for a public address check from a particular noderef
async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddress> {
@ -22,17 +20,20 @@ impl Network {
.unwrap_or(None)
}
xxx convert to filter
// find fast peers with a particular address type, and ask them to tell us what our external address is
async fn discover_external_address(
&self,
protocol_address_type: ProtocolAddressType,
protocol_type: ProtocolType,
address_type: AddressType,
ignore_node: Option<DHTKey>,
) -> Option<(SocketAddr, NodeRef)> {
) -> Option<(SocketAddress, NodeRef)> {
let routing_table = self.routing_table();
let peers = routing_table.get_fast_nodes_of_type(protocol_address_type);
let filter = DialInfoFilter::global()
.with_protocol_type(protocol_type)
.with_address_type(address_type);
let peers = routing_table.find_fast_nodes_filtered(&filter);
if peers.is_empty() {
log_net!("no peers of type '{:?}'", protocol_address_type);
log_net!("no peers of type '{:?}'", filter);
return None;
}
for peer in peers {
@ -49,24 +50,20 @@ impl Network {
None
}
fn get_interface_addresses(
fn get_local_addresses(
&self,
protocol_address_type: ProtocolAddressType,
) -> Vec<SocketAddr> {
protocol_type: ProtocolType,
address_type: AddressType,
) -> Vec<SocketAddress> {
let routing_table = self.routing_table();
let filter = DialInfoFilter::local()
.with_protocol_type(protocol_type)
.with_address_type(address_type);
routing_table
.get_own_peer_info(PeerScope::Local)
.dial_infos
.all_filtered_dial_info_details(&filter)
.iter()
.filter_map(|di| {
if di.protocol_address_type() == protocol_address_type {
if let Ok(addr) = di.to_socket_addr() {
return Some(addr);
}
}
None
})
.map(|did| did.dial_info.socket_address())
.collect()
}
@ -88,11 +85,12 @@ impl Network {
.unwrap_or(false)
}
async fn try_port_mapping<I: AsRef<[SocketAddr]>>(
async fn try_port_mapping<I: AsRef<[SocketAddress]>>(
&self,
_intf_addrs: I,
_protocol_address_type: ProtocolAddressType,
) -> Option<SocketAddr> {
_protocol_type: ProtocolType,
_address_type: AddressType,
) -> Option<SocketAddress> {
//xxx
None
}
@ -107,13 +105,13 @@ impl Network {
};
// Get our interface addresses
let intf_addrs = self.get_interface_addresses(ProtocolAddressType::UDPv4);
let intf_addrs = self.get_local_addresses(ProtocolType::UDP, AddressType::IPV4);
// Loop for restricted NAT retries
loop {
// Get our external address from some fast node, call it node B
let (external1, node_b) = match self
.discover_external_address(ProtocolAddressType::UDPv4, None)
.discover_external_address(ProtocolType::UDP, AddressType::IPV4, None)
.await
{
None => {
@ -122,7 +120,7 @@ impl Network {
}
Some(v) => v,
};
let external1_dial_info = DialInfo::udp_from_socketaddr(external1);
let external1_dial_info = DialInfo::udp(external1);
// If our local interface list contains external1 then there is no NAT in place
if intf_addrs.contains(&external1) {
@ -133,10 +131,10 @@ impl Network {
.await
{
// Add public dial info with Server network class
routing_table.register_global_dial_info(
routing_table.register_dial_info(
external1_dial_info,
Some(NetworkClass::Server),
DialInfoOrigin::Discovered,
Some(NetworkClass::Server),
);
// No more retries
@ -149,15 +147,15 @@ impl Network {
// There is -some NAT-
// Attempt a UDP port mapping via all available and enabled mechanisms
if let Some(external_mapped) = self
.try_port_mapping(&intf_addrs, ProtocolAddressType::UDPv4)
.try_port_mapping(&intf_addrs, ProtocolType::UDP, AddressType::IPV4)
.await
{
// Got a port mapping, let's use it
let external_mapped_dial_info = DialInfo::udp_from_socketaddr(external_mapped);
routing_table.register_global_dial_info(
let external_mapped_dial_info = DialInfo::udp(external_mapped);
routing_table.register_dial_info(
external_mapped_dial_info,
Some(NetworkClass::Mapped),
DialInfoOrigin::Mapped,
Some(NetworkClass::Mapped),
);
// No more retries
@ -177,10 +175,10 @@ impl Network {
{
// Yes, another machine can use the dial info directly, so Full Cone
// Add public dial info with full cone NAT network class
routing_table.register_global_dial_info(
routing_table.register_dial_info(
external1_dial_info,
Some(NetworkClass::FullNAT),
DialInfoOrigin::Discovered,
Some(NetworkClass::FullNAT),
);
// No more retries
@ -191,7 +189,8 @@ impl Network {
// Get our external address from some fast node, that is not node B, call it node D
let (external2, node_d) = match self
.discover_external_address(
ProtocolAddressType::UDPv4,
ProtocolType::UDP,
AddressType::IPV4,
Some(node_b.node_id()),
)
.await
@ -214,7 +213,7 @@ impl Network {
// we should go through our retries before we assign a dial info
if retry_count == 0 {
// Address is the same, so it's address or port restricted
let external2_dial_info = DialInfo::udp_from_socketaddr(external2);
let external2_dial_info = DialInfo::udp(external2);
// Do a validate_dial_info on the external address from a routed node
if self
.validate_dial_info(
@ -226,17 +225,17 @@ impl Network {
.await
{
// Got a reply from a non-default port, which means we're only address restricted
routing_table.register_global_dial_info(
routing_table.register_dial_info(
external1_dial_info,
Some(NetworkClass::AddressRestrictedNAT),
DialInfoOrigin::Discovered,
Some(NetworkClass::AddressRestrictedNAT),
);
} else {
// Didn't get a reply from a non-default port, which means we are also port restricted
routing_table.register_global_dial_info(
routing_table.register_dial_info(
external1_dial_info,
Some(NetworkClass::PortRestrictedNAT),
DialInfoOrigin::Discovered,
Some(NetworkClass::PortRestrictedNAT),
);
}
}

View File

@ -185,8 +185,7 @@ impl Network {
let routing_table = network_manager.routing_table();
// Drop all dial info
routing_table.clear_local_dial_info();
routing_table.clear_global_dial_info();
routing_table.clear_dial_info_details();
// Cancels all async background tasks by dropping join handles
*self.inner.lock() = Self::new_inner(network_manager);

View File

@ -27,12 +27,6 @@ pub enum NetworkConnection {
}
impl NetworkConnection {
pub fn protocol_type(&self) -> ProtocolType {
match self {
Self::Dummy(d) => d.protocol_type(),
Self::WS(w) => w.protocol_type(),
}
}
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
match self {
Self::Dummy(d) => d.send(message),

View File

@ -45,13 +45,6 @@ impl WebsocketNetworkConnection {
}
impl WebsocketNetworkConnection {
pub fn protocol_type(&self) -> ProtocolType {
if self.tls {
ProtocolType::WSS
} else {
ProtocolType::WS
}
}
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
let inner = self.inner.clone();
Box::pin(async move {

View File

@ -76,7 +76,7 @@ impl BucketEntry {
where
F: Fn(&DialInfo) -> bool,
{
let ret = Vec::new();
let mut ret = Vec::new();
for di in &self.dial_infos {
if filter(di) {
ret.push(di.clone());
@ -86,7 +86,7 @@ impl BucketEntry {
}
pub fn dial_infos(&self) -> &[DialInfo] {
&self.dial_infos.clone()
&self.dial_infos
}
pub fn get_peer_info(&self, key: DHTKey, scope: PeerScope) -> PeerInfo {

View File

@ -8,10 +8,11 @@ use crate::*;
pub type FilterType = Box<dyn Fn(&(&DHTKey, Option<&mut BucketEntry>)) -> bool>;
impl RoutingTable {
// Retrieve the fastest nodes in the routing table with a particular kind of protocol address type
// Retrieve the fastest nodes in the routing table with a particular kind of protocol and address type
// Returns noderefs are are scoped to that address type only
pub fn get_fast_nodes_filtered(&self, dial_info_filter: &DialInfoFilter) -> Vec<NodeRef> {
let dial_info_filter = dial_info_filter.clone();
pub fn find_fast_nodes_filtered(&self, dial_info_filter: &DialInfoFilter) -> Vec<NodeRef> {
let dial_info_filter1 = dial_info_filter.clone();
let dial_info_filter2 = dial_info_filter.clone();
self.find_fastest_nodes(
// filter
Some(Box::new(
@ -20,7 +21,7 @@ impl RoutingTable {
.1
.as_ref()
.unwrap()
.first_filtered_dial_info(|di| di.matches_filter(&dial_info_filter))
.first_filtered_dial_info(|di| di.matches_filter(&dial_info_filter1))
.is_some()
},
)),
@ -30,27 +31,22 @@ impl RoutingTable {
self.clone(),
*e.0,
e.1.as_mut().unwrap(),
dial_info_filter.clone(),
dial_info_filter2.clone(),
)
},
)
}
pub fn get_own_peer_info(&self, scope: PeerScope) -> PeerInfo {
let dial_infos = match scope {
PeerScope::All => {
let mut divec = self.global_dial_info_details();
divec.append(&mut self.local_dial_info_details());
divec.dedup();
divec
}
PeerScope::Global => self.global_dial_info_details(),
PeerScope::Local => self.local_dial_info_details(),
};
let filter = DialInfoFilter::scoped(scope);
PeerInfo {
node_id: NodeId::new(self.node_id()),
dial_infos: dial_infos.iter().map(|x| x.dial_info.clone()).collect(),
dial_infos: self
.all_filtered_dial_info_details(&filter)
.iter()
.map(|did| did.dial_info.clone())
.collect(),
}
}

View File

@ -37,13 +37,18 @@ pub struct DialInfoDetail {
pub timestamp: u64,
}
impl MatchesDialInfoFilter for DialInfoDetail {
fn matches_filter(&self, filter: &DialInfoFilter) -> bool {
self.dial_info.matches_filter(filter)
}
}
struct RoutingTableInner {
network_manager: NetworkManager,
node_id: DHTKey,
node_id_secret: DHTKeySecret,
buckets: Vec<Bucket>,
local_dial_info: Vec<DialInfoDetail>,
global_dial_info: Vec<DialInfoDetail>,
dial_info_details: Vec<DialInfoDetail>,
bucket_entry_count: usize,
// Waiters
eventual_changed_dial_info: Eventual,
@ -75,8 +80,7 @@ impl RoutingTable {
node_id: DHTKey::default(),
node_id_secret: DHTKeySecret::default(),
buckets: Vec::new(),
local_dial_info: Vec::new(),
global_dial_info: Vec::new(),
dial_info_details: Vec::new(),
bucket_entry_count: 0,
eventual_changed_dial_info: Eventual::new(),
stats_accounting: StatsAccounting::new(),
@ -150,125 +154,71 @@ impl RoutingTable {
}
pub fn has_local_dial_info(&self) -> bool {
let inner = self.inner.lock();
!inner.local_dial_info.is_empty()
self.first_filtered_dial_info_detail(&DialInfoFilter::local())
.is_some()
}
pub fn has_global_dial_info(&self) -> bool {
self.first_filtered_dial_info_detail(&DialInfoFilter::global())
.is_some()
}
pub fn global_dial_info_details(&self) -> Vec<DialInfoDetail> {
self.all_filtered_dial_info_details(&DialInfoFilter::global())
}
pub fn local_dial_info_details(&self) -> Vec<DialInfoDetail> {
let inner = self.inner.lock();
inner.local_dial_info.clone()
self.all_filtered_dial_info_details(&DialInfoFilter::local())
}
pub fn first_filtered_local_dial_info_details<F>(&self, filter: F) -> Option<DialInfoDetail>
where
F: Fn(&DialInfoDetail) -> bool,
{
pub fn first_filtered_dial_info_detail(
&self,
filter: &DialInfoFilter,
) -> Option<DialInfoDetail> {
let inner = self.inner.lock();
for did in &inner.local_dial_info {
if filter(did) {
for did in &inner.dial_info_details {
if did.matches_filter(filter) {
return Some(did.clone());
}
}
None
}
pub fn all_filtered_local_dial_info_details<F>(&self, filter: F) -> Vec<DialInfoDetail>
where
F: Fn(&DialInfoDetail) -> bool,
{
pub fn all_filtered_dial_info_details(&self, filter: &DialInfoFilter) -> Vec<DialInfoDetail> {
let inner = self.inner.lock();
let ret = Vec::new();
for did in &inner.local_dial_info {
if filter(did) {
let mut ret = Vec::new();
for did in &inner.dial_info_details {
if did.matches_filter(filter) {
ret.push(did.clone());
}
}
ret
}
pub fn register_local_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) {
pub fn register_dial_info(
&self,
dial_info: DialInfo,
origin: DialInfoOrigin,
network_class: Option<NetworkClass>,
) {
let timestamp = get_timestamp();
let mut inner = self.inner.lock();
inner.local_dial_info.push(DialInfoDetail {
inner.dial_info_details.push(DialInfoDetail {
dial_info: dial_info.clone(),
origin,
network_class: None,
network_class,
timestamp,
});
info!(
"Local Dial Info: {}",
NodeDialInfoSingle {
node_id: NodeId::new(inner.node_id),
dial_info
}
.to_string(),
);
debug!(" Origin: {:?}", origin);
Self::trigger_changed_dial_info(&mut *inner);
}
pub fn clear_local_dial_info(&self) {
let mut inner = self.inner.lock();
inner.local_dial_info.clear();
Self::trigger_changed_dial_info(&mut *inner);
}
pub fn has_global_dial_info(&self) -> bool {
let inner = self.inner.lock();
!inner.global_dial_info.is_empty()
}
pub fn global_dial_info_details(&self) -> Vec<DialInfoDetail> {
let inner = self.inner.lock();
inner.global_dial_info.clone()
}
pub fn first_filtered_global_dial_info_details<F>(&self, filter: F) -> Option<DialInfoDetail>
where
F: Fn(&DialInfoDetail) -> bool,
{
let inner = self.inner.lock();
for did in &inner.global_dial_info {
if filter(did) {
return Some(did.clone());
}
}
None
}
pub fn all_filtered_global_dial_info_details<F>(&self, filter: F) -> Vec<DialInfoDetail>
where
F: Fn(&DialInfoDetail) -> bool,
{
let inner = self.inner.lock();
let ret = Vec::new();
for did in &inner.global_dial_info {
if filter(did) {
ret.push(did.clone());
}
}
ret
}
pub fn register_global_dial_info(
&self,
dial_info: DialInfo,
network_class: Option<NetworkClass>,
origin: DialInfoOrigin,
) {
let ts = get_timestamp();
let mut inner = self.inner.lock();
inner.global_dial_info.push(DialInfoDetail {
dial_info: dial_info.clone(),
origin,
network_class,
timestamp: ts,
});
info!(
"Global Dial Info: {}",
"{}Dial Info: {}",
if dial_info.is_local() {
"Local "
} else if dial_info.is_global() {
"Global "
} else {
"Other "
},
NodeDialInfoSingle {
node_id: NodeId::new(inner.node_id),
dial_info
@ -277,12 +227,13 @@ impl RoutingTable {
);
debug!(" Origin: {:?}", origin);
debug!(" Network Class: {:?}", network_class);
Self::trigger_changed_dial_info(&mut *inner);
}
pub fn clear_global_dial_info(&self) {
pub fn clear_dial_info_details(&self) {
let mut inner = self.inner.lock();
inner.global_dial_info.clear();
inner.dial_info_details.clear();
Self::trigger_changed_dial_info(&mut *inner);
}

View File

@ -49,12 +49,7 @@ impl NodeRef {
// Returns the best dial info to attempt a connection to this node
pub fn best_dial_info(&self) -> Option<DialInfo> {
let nm = self.routing_table.network_manager();
let protocol_config = nm.get_protocol_config();
if protocol_config.is_none() {
return None;
}
let protocol_config = protocol_config.unwrap();
let protocol_config = nm.get_protocol_config()?;
self.operate(|e| {
e.first_filtered_dial_info(|di| {
// Does it match the dial info filter
@ -99,7 +94,7 @@ impl Clone for NodeRef {
impl fmt::Debug for NodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let out = format!("{}", self.node_id.encode());
let mut out = self.node_id.encode();
if !self.dial_info_filter.is_empty() {
out += &format!("{:?}", self.dial_info_filter);
}

View File

@ -867,23 +867,23 @@ impl RPCProcessor {
if redirect {
let routing_table = self.routing_table();
let filter = dial_info.make_filter(true);
let peers = routing_table.get_fast_nodes_filtered(&filter);
let peers = routing_table.find_fast_nodes_filtered(&filter);
if peers.is_empty() {
return Err(rpc_error_internal(format!(
"no peers matching filter '{:?}'",
filter
)));
}
for peer in peers {
// See if this peer will validate dial info
if !peer.operate(|e| {
let will_validate_dial_info = peer.operate(|e: &mut BucketEntry| {
if let Some(ni) = &e.peer_stats().node_info {
ni.will_validate_dial_info
} else {
true
}
}) {
});
if !will_validate_dial_info {
continue;
}
// Make a copy of the request, without the redirect flag

View File

@ -1,3 +1,5 @@
#![allow(dead_code)]
mod debug;
pub use debug::*;
@ -209,8 +211,8 @@ impl Address {
}
pub fn address_type(&self) -> AddressType {
match self {
Address::IPV4(v4) => AddressType::IPV4,
Address::IPV6(v6) => AddressType::IPV6,
Address::IPV4(_) => AddressType::IPV4,
Address::IPV6(_) => AddressType::IPV6,
}
}
pub fn address_string(&self) -> String {
@ -227,14 +229,14 @@ impl Address {
}
pub fn is_global(&self) -> bool {
match self {
Address::IPV4(v4) => ipv4addr_is_global(&v4),
Address::IPV6(v6) => ipv6addr_is_global(&v6),
Address::IPV4(v4) => ipv4addr_is_global(v4),
Address::IPV6(v6) => ipv6addr_is_global(v6),
}
}
pub fn is_local(&self) -> bool {
match self {
Address::IPV4(v4) => ipv4addr_is_private(&v4),
Address::IPV6(v6) => ipv6addr_is_unicast_site_local(&v6),
Address::IPV4(v4) => ipv4addr_is_private(v4),
Address::IPV6(v6) => ipv6addr_is_unicast_site_local(v6),
}
}
pub fn to_ip_addr(&self) -> IpAddr {
@ -334,30 +336,42 @@ pub struct DialInfoFilter {
}
impl DialInfoFilter {
pub fn new_empty() -> Self {
pub fn all() -> Self {
Self {
peer_scope: PeerScope::All,
protocol_type: None,
address_type: None,
}
}
pub fn with_protocol_type(protocol_type: ProtocolType) -> Self {
pub fn global() -> Self {
Self {
peer_scope: PeerScope::All,
protocol_type: Some(protocol_type),
peer_scope: PeerScope::Global,
protocol_type: None,
address_type: None,
}
}
pub fn with_protocol_type_and_address_type(
protocol_type: ProtocolType,
address_type: AddressType,
) -> Self {
pub fn local() -> Self {
Self {
peer_scope: PeerScope::All,
protocol_type: Some(protocol_type),
address_type: Some(address_type),
peer_scope: PeerScope::Local,
protocol_type: None,
address_type: None,
}
}
pub fn scoped(peer_scope: PeerScope) -> Self {
Self {
peer_scope,
protocol_type: None,
address_type: None,
}
}
pub fn with_protocol_type(mut self, protocol_type: ProtocolType) -> Self {
self.protocol_type = Some(protocol_type);
self
}
pub fn with_address_type(mut self, address_type: AddressType) -> Self {
self.address_type = Some(address_type);
self
}
pub fn is_empty(&self) -> bool {
self.peer_scope == PeerScope::All
&& self.protocol_type.is_none()
@ -379,6 +393,10 @@ impl fmt::Debug for DialInfoFilter {
}
}
pub trait MatchesDialInfoFilter {
fn matches_filter(&self, filter: &DialInfoFilter) -> bool;
}
#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq)]
pub struct DialInfoUDP {
pub socket_address: SocketAddress,
@ -430,9 +448,9 @@ impl fmt::Display for DialInfo {
impl FromStr for DialInfo {
type Err = VeilidAPIError;
fn from_str(s: &str) -> Result<DialInfo, VeilidAPIError> {
let (proto, rest) = s.split_once('|').ok_or_else(|| {
parse_error!("SocketAddress::from_str missing protocol '|' separator", s)
})?;
let (proto, rest) = s
.split_once('|')
.ok_or_else(|| parse_error!("DialInfo::from_str missing protocol '|' separator", s))?;
match proto {
"udp" => {
let socket_address = SocketAddress::from_str(rest)?;
@ -444,24 +462,19 @@ impl FromStr for DialInfo {
}
"ws" => {
let (sa, rest) = s.split_once('|').ok_or_else(|| {
parse_error!(
"SocketAddress::from_str missing socket address '|' separator",
s
)
parse_error!("DialInfo::from_str missing socket address '|' separator", s)
})?;
let socket_address = SocketAddress::from_str(sa)?;
DialInfo::try_ws(socket_address, rest.to_string())
}
"wss" => {
let (sa, rest) = s.split_once('|').ok_or_else(|| {
parse_error!(
"SocketAddress::from_str missing socket address '|' separator",
s
)
parse_error!("DialInfo::from_str missing socket address '|' separator", s)
})?;
let socket_address = SocketAddress::from_str(sa)?;
DialInfo::try_wss(socket_address, rest.to_string())
}
_ => Err(parse_error!("DialInfo::from_str has invalid scheme", s)),
}
}
}
@ -518,7 +531,7 @@ impl DialInfo {
url
));
}
if !Address::from_str(&split_url.host).is_err() {
if Address::from_str(&split_url.host).is_ok() {
return Err(parse_error!(
"WSS url can not use address format, only hostname format",
url
@ -599,22 +612,6 @@ impl DialInfo {
PeerScope::Local => self.is_local(),
}
}
pub fn matches_filter(&self, filter: &DialInfoFilter) -> bool {
if !self.matches_peer_scope(filter.peer_scope) {
return false;
}
if let Some(pt) = filter.protocol_type {
if self.protocol_type() != pt {
return false;
}
}
if let Some(at) = filter.address_type {
if self.address_type() != at {
return false;
}
}
true
}
pub fn make_filter(&self, scoped: bool) -> DialInfoFilter {
DialInfoFilter {
peer_scope: if scoped {
@ -634,6 +631,25 @@ impl DialInfo {
}
}
impl MatchesDialInfoFilter for DialInfo {
fn matches_filter(&self, filter: &DialInfoFilter) -> bool {
if !self.matches_peer_scope(filter.peer_scope) {
return false;
}
if let Some(pt) = filter.protocol_type {
if self.protocol_type() != pt {
return false;
}
}
if let Some(at) = filter.address_type {
if self.address_type() != at {
return false;
}
}
true
}
}
//////////////////////////////////////////////////////////////////////////
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
@ -709,7 +725,10 @@ impl ConnectionDescriptor {
PeerScope::Local => self.remote.socket_address.address().is_local(),
}
}
pub fn matches_filter(&self, filter: &DialInfoFilter) -> bool {
}
impl MatchesDialInfoFilter for ConnectionDescriptor {
fn matches_filter(&self, filter: &DialInfoFilter) -> bool {
if !self.matches_peer_scope(filter.peer_scope) {
return false;
}
@ -1093,18 +1112,11 @@ impl VeilidAPI {
}
// wait for state change
// xxx: this should not use 'sleep', perhaps this function should be eliminated anyway
// xxx: it should really only be used for test anyway, and there is probably a better way to do this regardless
// xxx: that doesn't wait forever and can time out
// xxx: should have optional timeout
pub async fn wait_for_state(&self, state: VeilidState) -> Result<(), VeilidAPIError> {
loop {
intf::sleep(500).await;
match state {
VeilidState::Attachment(cs) => {
if self.attachment_manager()?.get_state() == cs {
break;
}
}
match state {
VeilidState::Attachment(cs) => {
self.attachment_manager()?.wait_for_state(cs).await;
}
}
Ok(())