network class refactor checkpoint

This commit is contained in:
John Smith 2022-04-22 21:30:09 -04:00
parent 5b0ade9f49
commit 99dc4e16f9
10 changed files with 571 additions and 394 deletions

View File

@ -1,7 +1,7 @@
mod network_class_discovery;
mod network_tcp; mod network_tcp;
mod network_udp; mod network_udp;
mod protocol; mod protocol;
mod public_dialinfo_discovery;
mod start_protocols; mod start_protocols;
use crate::connection_manager::*; use crate::connection_manager::*;
@ -41,9 +41,7 @@ struct NetworkInner {
network_started: bool, network_started: bool,
network_needs_restart: bool, network_needs_restart: bool,
protocol_config: Option<ProtocolConfig>, protocol_config: Option<ProtocolConfig>,
udp_static_public_dialinfo: bool, static_public_dialinfo: ProtocolSet,
tcp_static_public_dialinfo: bool,
ws_static_public_dialinfo: bool,
network_class: Option<NetworkClass>, network_class: Option<NetworkClass>,
join_handles: Vec<JoinHandle<()>>, join_handles: Vec<JoinHandle<()>>,
udp_port: u16, udp_port: u16,
@ -64,7 +62,7 @@ struct NetworkInner {
struct NetworkUnlockedInner { struct NetworkUnlockedInner {
// Background processes // Background processes
update_public_dialinfo_task: TickTask, update_network_class_task: TickTask,
} }
#[derive(Clone)] #[derive(Clone)]
@ -82,9 +80,7 @@ impl Network {
network_started: false, network_started: false,
network_needs_restart: false, network_needs_restart: false,
protocol_config: None, protocol_config: None,
udp_static_public_dialinfo: false, static_public_dialinfo: ProtocolSet::empty(),
tcp_static_public_dialinfo: false,
ws_static_public_dialinfo: false,
network_class: None, network_class: None,
join_handles: Vec::new(), join_handles: Vec::new(),
udp_port: 0u16, udp_port: 0u16,
@ -104,7 +100,7 @@ impl Network {
fn new_unlocked_inner() -> NetworkUnlockedInner { fn new_unlocked_inner() -> NetworkUnlockedInner {
NetworkUnlockedInner { NetworkUnlockedInner {
update_public_dialinfo_task: TickTask::new(1), update_network_class_task: TickTask::new(1),
} }
} }
@ -115,13 +111,13 @@ impl Network {
unlocked_inner: Arc::new(Self::new_unlocked_inner()), unlocked_inner: Arc::new(Self::new_unlocked_inner()),
}; };
// Set public dialinfo tick task // Set update network class tick task
{ {
let this2 = this.clone(); let this2 = this.clone();
this.unlocked_inner this.unlocked_inner
.update_public_dialinfo_task .update_network_class_task
.set_routine(move |l, t| { .set_routine(move |l, t| {
Box::pin(this2.clone().update_public_dialinfo_task_routine(l, t)) Box::pin(this2.clone().update_network_class_task_routine(l, t))
}); });
} }
@ -239,6 +235,26 @@ impl Network {
} }
} }
pub fn with_interface_addresses<F, R>(&self, f: F) -> R
where
F: FnOnce(&[IpAddr]) -> R,
{
let inner = self.inner.lock();
inner.interfaces.with_best_addresses(f)
}
// See if our interface addresses have changed, if so we need to punt the network
// and redo all our addresses. This is overkill, but anything more accurate
// would require inspection of routing tables that we dont want to bother with
pub async fn check_interface_addresses(&self) -> Result<bool, String> {
let mut inner = self.inner.lock();
if !inner.interfaces.refresh().await? {
return Ok(false);
}
inner.network_needs_restart = true;
Ok(true)
}
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
// Send data to a dial info, unbound, using a new connection from a random port // Send data to a dial info, unbound, using a new connection from a random port
@ -386,34 +402,50 @@ impl Network {
// get protocol config // get protocol config
let protocol_config = { let protocol_config = {
let c = self.config.get(); let c = self.config.get();
ProtocolConfig { let mut inbound = ProtocolSet::new();
inbound: ProtocolSet {
udp: c.network.protocol.udp.enabled && c.capabilities.protocol_udp, if c.network.protocol.udp.enabled && c.capabilities.protocol_udp {
tcp: c.network.protocol.tcp.listen && c.capabilities.protocol_accept_tcp, inbound.insert(ProtocolType::UDP);
ws: c.network.protocol.ws.listen && c.capabilities.protocol_accept_ws,
wss: c.network.protocol.wss.listen && c.capabilities.protocol_accept_wss,
},
outbound: ProtocolSet {
udp: c.network.protocol.udp.enabled && c.capabilities.protocol_udp,
tcp: c.network.protocol.tcp.connect && c.capabilities.protocol_connect_tcp,
ws: c.network.protocol.ws.connect && c.capabilities.protocol_connect_ws,
wss: c.network.protocol.wss.connect && c.capabilities.protocol_connect_wss,
},
} }
if c.network.protocol.tcp.listen && c.capabilities.protocol_accept_tcp {
inbound.insert(ProtocolType::TCP);
}
if c.network.protocol.ws.listen && c.capabilities.protocol_accept_ws {
inbound.insert(ProtocolType::WS);
}
if c.network.protocol.wss.listen && c.capabilities.protocol_accept_wss {
inbound.insert(ProtocolType::WSS);
}
let mut outbound = ProtocolSet::new();
if c.network.protocol.udp.enabled && c.capabilities.protocol_udp {
outbound.insert(ProtocolType::UDP);
}
if c.network.protocol.tcp.connect && c.capabilities.protocol_connect_tcp {
outbound.insert(ProtocolType::TCP);
}
if c.network.protocol.ws.connect && c.capabilities.protocol_connect_ws {
outbound.insert(ProtocolType::WS);
}
if c.network.protocol.wss.connect && c.capabilities.protocol_connect_wss {
outbound.insert(ProtocolType::WSS);
}
ProtocolConfig { inbound, outbound }
}; };
self.inner.lock().protocol_config = Some(protocol_config); self.inner.lock().protocol_config = Some(protocol_config);
// start listeners // start listeners
if protocol_config.inbound.udp { if protocol_config.inbound.contains(ProtocolType::UDP) {
self.start_udp_listeners().await?; self.start_udp_listeners().await?;
} }
if protocol_config.inbound.ws { if protocol_config.inbound.contains(ProtocolType::WS) {
self.start_ws_listeners().await?; self.start_ws_listeners().await?;
} }
if protocol_config.inbound.wss { if protocol_config.inbound.contains(ProtocolType::WSS) {
self.start_wss_listeners().await?; self.start_wss_listeners().await?;
} }
if protocol_config.inbound.tcp { if protocol_config.inbound.contains(ProtocolType::TCP) {
self.start_tcp_listeners().await?; self.start_tcp_listeners().await?;
} }
@ -431,17 +463,21 @@ impl Network {
self.inner.lock().network_needs_restart self.inner.lock().network_needs_restart
} }
pub fn restart_network(&self) {
self.inner.lock().network_needs_restart = true;
}
pub async fn shutdown(&self) { pub async fn shutdown(&self) {
info!("stopping network"); info!("stopping network");
let network_manager = self.network_manager(); let network_manager = self.network_manager();
let routing_table = self.routing_table(); let routing_table = self.routing_table();
// Reset state
// Drop all dial info // Drop all dial info
routing_table.clear_dial_info_details(); routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
routing_table.clear_dial_info_details(RoutingDomain::LocalNetwork);
// Reset state including network class
// Cancels all async background tasks by dropping join handles // Cancels all async background tasks by dropping join handles
*self.inner.lock() = Self::new_inner(network_manager); *self.inner.lock() = Self::new_inner(network_manager);
@ -451,109 +487,22 @@ impl Network {
////////////////////////////////////////// //////////////////////////////////////////
pub fn get_network_class(&self) -> Option<NetworkClass> { pub fn get_network_class(&self) -> Option<NetworkClass> {
let inner = self.inner.lock(); let inner = self.inner.lock();
if !inner.network_started {
return None;
}
// If we've fixed the network class, return it rather than calculating it
if inner.network_class.is_some() {
return inner.network_class; return inner.network_class;
} }
// Go through our global dialinfo and see what our best network class is pub fn reset_network_class(&self) {
let mut network_class = NetworkClass::Invalid; let inner = self.inner.lock();
for did in inner.routing_table.public_dial_info_details() { inner.network_class = None;
if let Some(nc) = did.network_class {
if nc < network_class {
network_class = nc;
}
}
}
Some(network_class)
} }
////////////////////////////////////////// //////////////////////////////////////////
pub async fn tick(&self) -> Result<(), String> { pub async fn tick(&self) -> Result<(), String> {
let ( let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid);
routing_table,
protocol_config,
udp_static_public_dialinfo,
tcp_static_public_dialinfo,
ws_static_public_dialinfo,
network_class,
) = {
let inner = self.inner.lock();
(
inner.routing_table.clone(),
inner.protocol_config.unwrap_or_default(),
inner.udp_static_public_dialinfo,
inner.tcp_static_public_dialinfo,
inner.ws_static_public_dialinfo,
inner.network_class.unwrap_or(NetworkClass::Invalid),
)
};
// See if we have any UDPv4 public dialinfo, and if we should have it // If we need to figure out our network class, tick the task for it
// If we have statically configured public dialinfo, don't bother with this if network_class == NetworkClass::Invalid {
// If we can have public dialinfo, or we haven't figured out our network class yet, self.unlocked_inner.update_network_class_task.tick().await?;
// and we're active for UDP, we should attempt to get our public dialinfo sorted out
// and assess our network class if we haven't already
if (network_class.inbound_capable() || network_class == NetworkClass::Invalid)
&&
{
let filter = DialInfoFilter::global()
.with_protocol_type(ProtocolType::TCP)
.with_address_type(AddressType::IPV4);
let need_tcpv4_dialinfo = routing_table
.first_public_filtered_dial_info_detail(&filter)
.is_none();
if need_tcpv4_dialinfo {
}
self.unlocked_inner
.update_public_dialinfo_task
.tick()
.await?;
}
// Same but for TCPv4
if protocol_config.inbound.tcp
&& !tcp_static_public_dialinfo
&& (network_class.inbound_capable() || network_class == NetworkClass::Invalid)
{
let filter = DialInfoFilter::all()
.with_protocol_type(ProtocolType::TCP)
.with_address_type(AddressType::IPV4);
let need_tcpv4_dialinfo = routing_table
.first_public_filtered_dial_info_detail(&filter)
.is_none();
if need_tcpv4_dialinfo {
// If we have no public TCPv4 dialinfo, then we need to run a NAT check
// ensure the singlefuture is running for this
self.unlocked_inner
.update_tcpv4_dialinfo_task
.tick()
.await?;
}
}
// Same but for WSv4
if protocol_config.inbound.ws
&& !ws_static_public_dialinfo
&& (network_class.inbound_capable() || network_class == NetworkClass::Invalid)
{
let filter = DialInfoFilter::all()
.with_protocol_type(ProtocolType::WS)
.with_address_type(AddressType::IPV4);
let need_wsv4_dialinfo = routing_table
.first_public_filtered_dial_info_detail(&filter)
.is_none();
if need_wsv4_dialinfo {
// If we have no public TCPv4 dialinfo, then we need to run a NAT check
// ensure the singlefuture is running for this
self.unlocked_inner.update_wsv4_dialinfo_task.tick().await?;
}
} }
Ok(()) Ok(())

View File

@ -60,7 +60,7 @@ impl Network {
.with_protocol_type(protocol_type) .with_protocol_type(protocol_type)
.with_address_type(address_type); .with_address_type(address_type);
routing_table routing_table
.interface_dial_info_details() .dial_info_details(RoutingDomain::LocalNetwork)
.iter() .iter()
.filter_map(|did| { .filter_map(|did| {
if did.dial_info.matches_filter(&filter) { if did.dial_info.matches_filter(&filter) {
@ -100,7 +100,7 @@ impl Network {
None None
} }
pub async fn update_udpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { pub async fn update_udpv4_dialinfo(&self) -> Result<(), String> {
log_net!("looking for udpv4 public dial info"); log_net!("looking for udpv4 public dial info");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
@ -258,17 +258,42 @@ impl Network {
Ok(()) Ok(())
} }
pub async fn update_tcpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { pub async fn update_tcpv4_dialinfo(&self) -> Result<(), String> {
log_net!("looking for tcpv4 public dial info"); log_net!("looking for tcpv4 public dial info");
// xxx // xxx
//Err("unimplemented".to_owned()) //Err("unimplemented".to_owned())
Ok(()) Ok(())
} }
pub async fn update_wsv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { pub async fn update_wsv4_dialinfo(&self) -> Result<(), String> {
log_net!("looking for wsv4 public dial info"); log_net!("looking for wsv4 public dial info");
// xxx // xxx
//Err("unimplemented".to_owned()) //Err("unimplemented".to_owned())
Ok(()) Ok(())
} }
pub async fn update_network_class_task_routine(self, _l: u64, _t: u64) -> Result<(), String> {
log_net!("updating network class");
let protocol_config = self
.inner
.lock()
.protocol_config
.clone()
.unwrap_or_default();
if protocol_config.inbound.contains(ProtocolType::UDP) {
self.update_udpv4_dialinfo().await?;
}
if protocol_config.inbound.contains(ProtocolType::TCP) {
self.update_tcpv4_dialinfo().await?;
}
if protocol_config.inbound.contains(ProtocolType::WS) {
self.update_wsv4_dialinfo().await?;
}
Ok(())
}
} }

View File

@ -261,11 +261,12 @@ impl Network {
pub(super) async fn start_udp_listeners(&self) -> Result<(), String> { pub(super) async fn start_udp_listeners(&self) -> Result<(), String> {
trace!("starting udp listeners"); trace!("starting udp listeners");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let (listen_address, public_address) = { let (listen_address, public_address, enable_local_peer_scope) = {
let c = self.config.get(); let c = self.config.get();
( (
c.network.protocol.udp.listen_address.clone(), c.network.protocol.udp.listen_address.clone(),
c.network.protocol.udp.public_address.clone(), c.network.protocol.udp.public_address.clone(),
c.network.enable_local_peer_scope,
) )
}; };
@ -287,23 +288,28 @@ impl Network {
"UDP: starting listeners on port {} at {:?}", "UDP: starting listeners on port {} at {:?}",
udp_port, ip_addrs udp_port, ip_addrs
); );
let dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?; let local_dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?;
let mut static_public = false; let mut static_public = false;
for di in &dial_info_list {
// If the local interface address is global, // Register local dial info
for di in &local_dial_info_list {
// If the local interface address is global, or we are enabling local peer scope
// register global dial info if no public address is specified // register global dial info if no public address is specified
if public_address.is_none() && di.is_global() { if public_address.is_none() && (di.is_global() || enable_local_peer_scope) {
routing_table.register_public_dial_info( routing_table.register_dial_info(
RoutingDomain::PublicInternet,
di.clone(), di.clone(),
DialInfoOrigin::Static, DialInfoOrigin::Static,
Some(NetworkClass::Server),
); );
static_public = true; static_public = true;
} }
// Register interface dial info as well since the address is on the local interface // Register interface dial info as well since the address is on the local interface
routing_table.register_interface_dial_info(di.clone(), DialInfoOrigin::Static); routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
di.clone(),
DialInfoOrigin::Static,
);
} }
// Add static public dialinfo if it's configured // Add static public dialinfo if it's configured
@ -316,16 +322,43 @@ impl Network {
// Add all resolved addresses as public dialinfo // Add all resolved addresses as public dialinfo
for pdi_addr in &mut public_sockaddrs { for pdi_addr in &mut public_sockaddrs {
routing_table.register_public_dial_info( let pdi = DialInfo::udp_from_socketaddr(pdi_addr);
// Register the public address
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
pdi.clone(),
DialInfoOrigin::Static,
);
// See if this public address is also a local interface address
if !local_dial_info_list.contains(&pdi)
&& self.with_interface_addresses(|ip_addrs| {
for ip_addr in ip_addrs {
if pdi_addr.ip() == *ip_addr {
return true;
}
}
false
})
{
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
DialInfo::udp_from_socketaddr(pdi_addr), DialInfo::udp_from_socketaddr(pdi_addr),
DialInfoOrigin::Static, DialInfoOrigin::Static,
Some(NetworkClass::Server),
); );
}
static_public = true; static_public = true;
} }
} }
self.inner.lock().udp_static_public_dialinfo = static_public; if static_public {
self.inner
.lock()
.static_public_dialinfo
.insert(ProtocolType::UDP);
}
// Now create tasks for udp listeners // Now create tasks for udp listeners
self.create_udp_listener_tasks().await self.create_udp_listener_tasks().await
@ -334,12 +367,13 @@ impl Network {
pub(super) async fn start_ws_listeners(&self) -> Result<(), String> { pub(super) async fn start_ws_listeners(&self) -> Result<(), String> {
trace!("starting ws listeners"); trace!("starting ws listeners");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let (listen_address, url, path) = { let (listen_address, url, path, enable_local_peer_scope) = {
let c = self.config.get(); let c = self.config.get();
( (
c.network.protocol.ws.listen_address.clone(), c.network.protocol.ws.listen_address.clone(),
c.network.protocol.ws.url.clone(), c.network.protocol.ws.url.clone(),
c.network.protocol.ws.path.clone(), c.network.protocol.ws.path.clone(),
c.network.enable_local_peer_scope,
) )
}; };
@ -367,31 +401,7 @@ impl Network {
trace!("WS: listener started"); trace!("WS: listener started");
let mut static_public = false; let mut static_public = false;
for socket_address in socket_addresses { let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
if url.is_none() && socket_address.address().is_global() {
// Build global dial info request url
let global_url = format!("ws://{}/{}", socket_address, path);
// Create global dial info
let di = DialInfo::try_ws(socket_address, global_url)
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
routing_table.register_public_dial_info(
di,
DialInfoOrigin::Static,
Some(NetworkClass::Server),
);
static_public = true;
}
// Build interface dial info request url
let interface_url = format!("ws://{}/{}", socket_address, path);
// Create interface dial info
let di = DialInfo::try_ws(socket_address, interface_url)
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
routing_table.register_interface_dial_info(di, DialInfoOrigin::Static);
}
// Add static public dialinfo if it's configured // Add static public dialinfo if it's configured
if let Some(url) = url.as_ref() { if let Some(url) = url.as_ref() {
@ -410,17 +420,74 @@ impl Network {
.map_err(logthru_net!(error))?; .map_err(logthru_net!(error))?;
for gsa in global_socket_addrs { for gsa in global_socket_addrs {
routing_table.register_public_dial_info( let pdi = DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone())
DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone())
.map_err(map_to_string) .map_err(map_to_string)
.map_err(logthru_net!(error))?, .map_err(logthru_net!(error))?;
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
pdi.clone(),
DialInfoOrigin::Static,
);
static_public = true;
// See if this public address is also a local interface address
if !registered_addresses.contains(&gsa.ip())
&& self.with_interface_addresses(|ip_addrs| {
for ip_addr in ip_addrs {
if gsa.ip() == *ip_addr {
return true;
}
}
false
})
{
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
pdi,
DialInfoOrigin::Static, DialInfoOrigin::Static,
Some(NetworkClass::Server),
); );
} }
registered_addresses.insert(gsa.ip());
}
}
for socket_address in socket_addresses {
// Skip addresses we already did
if registered_addresses.contains(&socket_address.to_ip_addr()) {
continue;
}
// Build dial info request url
let local_url = format!("ws://{}/{}", socket_address, path);
let local_di = DialInfo::try_ws(socket_address, local_url)
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
if url.is_none() && (socket_address.address().is_global() || enable_local_peer_scope) {
// Register public dial info
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
local_di.clone(),
DialInfoOrigin::Static,
);
static_public = true; static_public = true;
} }
self.inner.lock().ws_static_public_dialinfo = static_public;
// Register local dial info
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
local_di,
DialInfoOrigin::Static,
);
}
if static_public {
self.inner
.lock()
.static_public_dialinfo
.insert(ProtocolType::WS);
}
Ok(()) Ok(())
} }
@ -429,11 +496,12 @@ impl Network {
trace!("starting wss listeners"); trace!("starting wss listeners");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let (listen_address, url) = { let (listen_address, url, enable_local_peer_scope) = {
let c = self.config.get(); let c = self.config.get();
( (
c.network.protocol.wss.listen_address.clone(), c.network.protocol.wss.listen_address.clone(),
c.network.protocol.wss.url.clone(), c.network.protocol.wss.url.clone(),
c.network.enable_local_peer_scope,
) )
}; };
@ -465,6 +533,9 @@ impl Network {
// is specified, then TLS won't validate, so no local dialinfo is possible. // is specified, then TLS won't validate, so no local dialinfo is possible.
// This is not the case with unencrypted websockets, which can be specified solely by an IP address // This is not the case with unencrypted websockets, which can be specified solely by an IP address
let mut static_public = false;
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
// Add static public dialinfo if it's configured // Add static public dialinfo if it's configured
if let Some(url) = url.as_ref() { if let Some(url) = url.as_ref() {
// Add static public dialinfo if it's configured // Add static public dialinfo if it's configured
@ -483,18 +554,48 @@ impl Network {
.map_err(logthru_net!(error))?; .map_err(logthru_net!(error))?;
for gsa in global_socket_addrs { for gsa in global_socket_addrs {
routing_table.register_public_dial_info( let pdi = DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone())
DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone())
.map_err(map_to_string) .map_err(map_to_string)
.map_err(logthru_net!(error))?, .map_err(logthru_net!(error))?;
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
pdi.clone(),
DialInfoOrigin::Static, DialInfoOrigin::Static,
Some(NetworkClass::Server),
); );
static_public = true;
// See if this public address is also a local interface address
if !registered_addresses.contains(&gsa.ip())
&& self.with_interface_addresses(|ip_addrs| {
for ip_addr in ip_addrs {
if gsa.ip() == *ip_addr {
return true;
}
}
false
})
{
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
pdi,
DialInfoOrigin::Static,
);
}
registered_addresses.insert(gsa.ip());
} }
} else { } else {
return Err("WSS URL must be specified due to TLS requirements".to_owned()); return Err("WSS URL must be specified due to TLS requirements".to_owned());
} }
if static_public {
self.inner
.lock()
.static_public_dialinfo
.insert(ProtocolType::WSS);
}
Ok(()) Ok(())
} }
@ -502,11 +603,12 @@ impl Network {
trace!("starting tcp listeners"); trace!("starting tcp listeners");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let (listen_address, public_address) = { let (listen_address, public_address, enable_local_peer_scope) = {
let c = self.config.get(); let c = self.config.get();
( (
c.network.protocol.tcp.listen_address.clone(), c.network.protocol.tcp.listen_address.clone(),
c.network.protocol.tcp.public_address.clone(), c.network.protocol.tcp.public_address.clone(),
c.network.enable_local_peer_scope,
) )
}; };
@ -534,20 +636,27 @@ impl Network {
trace!("TCP: listener started"); trace!("TCP: listener started");
let mut static_public = false; let mut static_public = false;
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
for socket_address in socket_addresses { for socket_address in socket_addresses {
let di = DialInfo::tcp(socket_address); let di = DialInfo::tcp(socket_address);
// Register global dial info if no public address is specified // Register global dial info if no public address is specified
if public_address.is_none() && di.is_global() { if public_address.is_none() && (di.is_global() || enable_local_peer_scope) {
routing_table.register_public_dial_info( routing_table.register_dial_info(
RoutingDomain::PublicInternet,
di.clone(), di.clone(),
DialInfoOrigin::Static, DialInfoOrigin::Static,
Some(NetworkClass::Server),
); );
static_public = true; static_public = true;
} }
// Register interface dial info // Register interface dial info
routing_table.register_interface_dial_info(di.clone(), DialInfoOrigin::Static); routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
di.clone(),
DialInfoOrigin::Static,
);
registered_addresses.insert(socket_address.to_ip_addr());
} }
// Add static public dialinfo if it's configured // Add static public dialinfo if it's configured
@ -560,16 +669,45 @@ impl Network {
// Add all resolved addresses as public dialinfo // Add all resolved addresses as public dialinfo
for pdi_addr in &mut public_sockaddrs { for pdi_addr in &mut public_sockaddrs {
routing_table.register_public_dial_info( // Skip addresses we already did
DialInfo::tcp_from_socketaddr(pdi_addr), if registered_addresses.contains(&pdi_addr.ip()) {
continue;
}
let pdi = DialInfo::tcp_from_socketaddr(pdi_addr);
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
pdi.clone(),
DialInfoOrigin::Static, DialInfoOrigin::Static,
None,
); );
static_public = true; static_public = true;
// See if this public address is also a local interface address
if self.with_interface_addresses(|ip_addrs| {
for ip_addr in ip_addrs {
if pdi_addr.ip() == *ip_addr {
return true;
}
}
false
}) {
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
pdi,
DialInfoOrigin::Static,
);
}
static_public = true;
} }
} }
self.inner.lock().tcp_static_public_dialinfo = static_public; if static_public {
self.inner
.lock()
.static_public_dialinfo
.insert(ProtocolType::TCP);
}
Ok(()) Ok(())
} }

View File

@ -295,6 +295,7 @@ impl NetworkInterface {
pub struct NetworkInterfaces { pub struct NetworkInterfaces {
valid: bool, valid: bool,
interfaces: BTreeMap<String, NetworkInterface>, interfaces: BTreeMap<String, NetworkInterface>,
interface_address_cache: Vec<IpAddr>,
} }
impl fmt::Debug for NetworkInterfaces { impl fmt::Debug for NetworkInterfaces {
@ -317,6 +318,7 @@ impl NetworkInterfaces {
Self { Self {
valid: false, valid: false,
interfaces: BTreeMap::new(), interfaces: BTreeMap::new(),
interface_address_cache: Vec::new(),
} }
} }
pub fn is_valid(&self) -> bool { pub fn is_valid(&self) -> bool {
@ -324,6 +326,7 @@ impl NetworkInterfaces {
} }
pub fn clear(&mut self) { pub fn clear(&mut self) {
self.interfaces.clear(); self.interfaces.clear();
self.interface_address_cache.clear();
self.valid = false; self.valid = false;
} }
// returns Ok(false) if refresh had no changes, Ok(true) if changes were present // returns Ok(false) if refresh had no changes, Ok(true) if changes were present
@ -341,6 +344,8 @@ impl NetworkInterfaces {
let changed = last_interfaces != self.interfaces; let changed = last_interfaces != self.interfaces;
if changed { if changed {
trace!("NetworkInterfaces refreshed: {:#?}?", self); trace!("NetworkInterfaces refreshed: {:#?}?", self);
self.cache_best_addresses();
} }
Ok(changed) Ok(changed)
} }
@ -351,7 +356,7 @@ impl NetworkInterfaces {
self.interfaces.iter() self.interfaces.iter()
} }
pub fn best_addresses(&self) -> Vec<IpAddr> { fn cache_best_addresses(&mut self) {
// Reduce interfaces to their best routable ip addresses // Reduce interfaces to their best routable ip addresses
let mut intf_addrs = Vec::new(); let mut intf_addrs = Vec::new();
for intf in self.interfaces.values() { for intf in self.interfaces.values() {
@ -370,6 +375,17 @@ impl NetworkInterfaces {
intf_addrs.sort(); intf_addrs.sort();
// Now export just the addresses // Now export just the addresses
intf_addrs.iter().map(|x| x.if_addr().ip()).collect() self.interface_address_cache = intf_addrs.iter().map(|x| x.if_addr().ip()).collect()
}
pub fn best_addresses(&self) -> Vec<IpAddr> {
self.interface_address_cache.clone()
}
pub fn with_best_addresses<F, R>(&self, f: F) -> R
where
F: FnOnce(&[IpAddr]) -> R,
{
f(&self.interface_address_cache)
} }
} }

View File

@ -162,6 +162,10 @@ impl Network {
self.inner.lock().network_needs_restart self.inner.lock().network_needs_restart
} }
pub fn restart_network(&self) {
self.inner.lock().network_needs_restart = true;
}
pub async fn shutdown(&self) { pub async fn shutdown(&self) {
trace!("stopping network"); trace!("stopping network");
@ -178,6 +182,18 @@ impl Network {
trace!("network stopped"); trace!("network stopped");
} }
pub fn with_interface_addresses<F, R>(&self, f: F) -> R
where
F: FnOnce(&[IpAddr]) -> R,
{
f(&[])
}
pub async fn check_interface_addresses(&self) -> Result<bool, String> {
Ok(false)
}
////////////////////////////////////////// //////////////////////////////////////////
pub fn get_network_class(&self) -> Option<NetworkClass> { pub fn get_network_class(&self) -> Option<NetworkClass> {
// xxx eventually detect tor browser? // xxx eventually detect tor browser?

View File

@ -14,6 +14,7 @@ pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1;
pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE;
pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_TABLE_SIZE: usize = 1024;
pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes
pub const GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3;
#[derive(Copy, Clone, Debug, Default)] #[derive(Copy, Clone, Debug, Default)]
pub struct ProtocolConfig { pub struct ProtocolConfig {
@ -80,18 +81,19 @@ enum ContactMethod {
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub enum SendDataKind { pub enum SendDataKind {
Direct, LocalDirect,
Indirect, GlobalDirect,
GlobalIndirect,
} }
// The mutable state of the network manager // The mutable state of the network manager
struct NetworkManagerInner { struct NetworkManagerInner {
routing_table: Option<RoutingTable>, routing_table: Option<RoutingTable>,
components: Option<NetworkComponents>, components: Option<NetworkComponents>,
network_class: Option<NetworkClass>,
stats: NetworkManagerStats, stats: NetworkManagerStats,
client_whitelist: LruCache<key::DHTKey, ClientWhitelistEntry>, client_whitelist: LruCache<key::DHTKey, ClientWhitelistEntry>,
relay_node: Option<NodeRef>, relay_node: Option<NodeRef>,
global_address_check_cache: LruCache<key::DHTKey, SocketAddress>,
} }
struct NetworkManagerUnlockedInner { struct NetworkManagerUnlockedInner {
@ -114,10 +116,10 @@ impl NetworkManager {
NetworkManagerInner { NetworkManagerInner {
routing_table: None, routing_table: None,
components: None, components: None,
network_class: None,
stats: NetworkManagerStats::default(), stats: NetworkManagerStats::default(),
client_whitelist: LruCache::new_unbounded(), client_whitelist: LruCache::new_unbounded(),
relay_node: None, relay_node: None,
global_address_check_cache: LruCache::new(8),
} }
} }
fn new_unlocked_inner(_config: VeilidConfig) -> NetworkManagerUnlockedInner { fn new_unlocked_inner(_config: VeilidConfig) -> NetworkManagerUnlockedInner {
@ -272,7 +274,6 @@ impl NetworkManager {
// reset the state // reset the state
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.components = None; inner.components = None;
inner.network_class = None;
trace!("NetworkManager::shutdown end"); trace!("NetworkManager::shutdown end");
} }
@ -551,11 +552,18 @@ impl NetworkManager {
pub async fn send_envelope<B: AsRef<[u8]>>( pub async fn send_envelope<B: AsRef<[u8]>>(
&self, &self,
node_ref: NodeRef, node_ref: NodeRef,
node_id: Option<DHTKey>, envelope_node_id: Option<DHTKey>,
body: B, body: B,
) -> Result<(), String> { ) -> Result<SendDataKind, String> {
if let Some(node_id) = node_id { let via_node_id = node_ref.node_id();
log_net!("sending envelope to {:?} via {:?}", node_id, node_ref); let envelope_node_id = envelope_node_id.unwrap_or(via_node_id);
if envelope_node_id != via_node_id {
log_net!(
"sending envelope to {:?} via {:?}",
envelope_node_id,
node_ref
);
} else { } else {
log_net!("sending envelope to {:?}", node_ref); log_net!("sending envelope to {:?}", node_ref);
} }
@ -567,9 +575,7 @@ impl NetworkManager {
if node_min > MAX_VERSION || node_max < MIN_VERSION { if node_min > MAX_VERSION || node_max < MIN_VERSION {
return Err(format!( return Err(format!(
"can't talk to this node {} because version is unsupported: ({},{})", "can't talk to this node {} because version is unsupported: ({},{})",
node_ref.node_id(), via_node_id, node_min, node_max
node_min,
node_max
)) ))
.map_err(logthru_rpc!(warn)); .map_err(logthru_rpc!(warn));
} }
@ -580,11 +586,17 @@ impl NetworkManager {
// Build the envelope to send // Build the envelope to send
let out = self let out = self
.build_envelope(node_id.unwrap_or_else(|| node_ref.node_id()), version, body) .build_envelope(envelope_node_id, version, body)
.map_err(logthru_rpc!(error))?; .map_err(logthru_rpc!(error))?;
// Send the envelope via whatever means necessary // Send the envelope via whatever means necessary
self.send_data(node_ref, out).await let send_data_kind = self.send_data(node_ref.clone(), out).await?;
// If we asked to relay from the start, then this is always indirect
if envelope_node_id != via_node_id {
return Ok(SendDataKind::GlobalIndirect);
}
return Ok(send_data_kind);
} }
// Called by the RPC handler when we want to issue an direct receipt // Called by the RPC handler when we want to issue an direct receipt
@ -867,7 +879,7 @@ impl NetworkManager {
&self, &self,
node_ref: NodeRef, node_ref: NodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> SystemPinBoxFuture<Result<(), String>> { ) -> SystemPinBoxFuture<Result<SendDataKind, String>> {
let this = self.clone(); let this = self.clone();
Box::pin(async move { Box::pin(async move {
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
@ -879,7 +891,11 @@ impl NetworkManager {
.map_err(logthru_net!())? .map_err(logthru_net!())?
{ {
None => { None => {
return Ok(()); return Ok(if descriptor.matches_peer_scope(PeerScope::Local) {
SendDataKind::LocalDirect
} else {
SendDataKind::GlobalDirect
});
} }
Some(d) => d, Some(d) => d,
} }
@ -890,19 +906,30 @@ impl NetworkManager {
// If we don't have last_connection, try to reach out to the peer via its dial info // If we don't have last_connection, try to reach out to the peer via its dial info
match this.get_contact_method(node_ref).map_err(logthru_net!())? { match this.get_contact_method(node_ref).map_err(logthru_net!())? {
ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => { ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => {
this.send_data(relay_nr, data).await this.send_data(relay_nr, data)
}
ContactMethod::Direct(dial_info) => {
this.net().send_data_to_dial_info(dial_info, data).await
}
ContactMethod::SignalReverse(relay_nr, target_node_ref) => {
this.do_reverse_connect(relay_nr, target_node_ref, data)
.await .await
.map(|_| SendDataKind::GlobalIndirect)
} }
ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { ContactMethod::Direct(dial_info) => this
this.do_hole_punch(relay_nr, target_node_ref, data).await .net()
.send_data_to_dial_info(dial_info, data)
.await
.map(|_| {
if dial_info.is_local() {
SendDataKind::LocalDirect
} else {
SendDataKind::GlobalDirect
} }
ContactMethod::Unreachable => Err("Can't send to this relay".to_owned()), }),
ContactMethod::SignalReverse(relay_nr, target_node_ref) => this
.do_reverse_connect(relay_nr, target_node_ref, data)
.await
.map(|_| SendDataKind::GlobalDirect),
ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => this
.do_hole_punch(relay_nr, target_node_ref, data)
.await
.map(|_| SendDataKind::GlobalDirect),
ContactMethod::Unreachable => Err("Can't send to this node".to_owned()),
} }
.map_err(logthru_net!()) .map_err(logthru_net!())
}) })
@ -1156,4 +1183,49 @@ impl NetworkManager {
.transfer_stats_accounting .transfer_stats_accounting
.add_down(bytes); .add_down(bytes);
} }
// Determine if a local IP address has changed
// this means we should restart the low level network and and recreate all of our dial info
// Wait until we have received confirmation from N different peers
pub async fn report_local_socket_address(
&self,
_socket_address: SocketAddress,
_reporting_peer: NodeRef,
) {
// XXX: Nothing here yet.
}
// Determine if a global IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers
pub async fn report_global_socket_address(
&self,
socket_address: SocketAddress,
reporting_peer: NodeRef,
) {
let mut inner = self.inner.lock();
inner
.global_address_check_cache
.insert(reporting_peer.node_id(), socket_address);
let network_class = inner
.components
.unwrap()
.net
.get_network_class()
.unwrap_or(NetworkClass::Invalid);
if network_class.inbound_capable() {
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
// then we zap the network class and re-detect it
// If we are inbound capable but start to see consistently different socket addresses from multiple reporting peers
// then we zap the network class and global dial info and re-detect it
} else {
// If we are currently outbound only, we don't have any public dial info
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
inner.components.unwrap().net.reset_network_class();
}
}
} }

View File

@ -23,15 +23,15 @@ impl RoutingTable {
out out
} }
pub fn debug_info_dialinfo(&self) -> String { pub fn debug_info_dialinfo(&self) -> String {
let ldis = self.interface_dial_info_details(); let ldis = self.dial_info_details(RoutingDomain::LocalNetwork);
let gdis = self.public_dial_info_details(); let gdis = self.dial_info_details(RoutingDomain::PublicInternet);
let mut out = String::new(); let mut out = String::new();
out += "Interface Dial Info Details:\n"; out += "Local Network Dial Info Details:\n";
for (n, ldi) in ldis.iter().enumerate() { for (n, ldi) in ldis.iter().enumerate() {
out += &format!(" {:>2}: {:?}\n", n, ldi); out += &format!(" {:>2}: {:?}\n", n, ldi);
} }
out += "Public Dial Info Details:\n"; out += "Public Internet Dial Info Details:\n";
for (n, gdi) in gdis.iter().enumerate() { for (n, gdi) in gdis.iter().enumerate() {
out += &format!(" {:>2}: {:?}\n", n, gdi); out += &format!(" {:>2}: {:?}\n", n, gdi);
} }

View File

@ -55,22 +55,11 @@ impl RoutingTable {
node_info: NodeInfo { node_info: NodeInfo {
network_class: netman.get_network_class().unwrap_or(NetworkClass::Invalid), network_class: netman.get_network_class().unwrap_or(NetworkClass::Invalid),
outbound_protocols: netman.get_protocol_config().unwrap_or_default().outbound, outbound_protocols: netman.get_protocol_config().unwrap_or_default().outbound,
dial_info_list: if !enable_local_peer_scope { dial_info_list: self
self.public_dial_info_details() .dial_info_details(RoutingDomain::PublicInternet)
.iter() .iter()
.map(|did| did.dial_info.clone()) .map(|did| did.dial_info.clone())
.collect() .collect(),
} else {
self.public_dial_info_details()
.iter()
.map(|did| did.dial_info.clone())
.chain(
self.interface_dial_info_details()
.iter()
.map(|did| did.dial_info.clone()),
)
.collect()
},
relay_peer_info: relay_node.map(|rn| Box::new(rn.peer_info())), relay_peer_info: relay_node.map(|rn| Box::new(rn.peer_info())),
}, },
} }

View File

@ -29,11 +29,21 @@ pub enum DialInfoOrigin {
Mapped, Mapped,
} }
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq)]
pub enum RoutingDomain {
PublicInternet,
LocalNetwork,
}
#[derive(Debug, Default)]
pub struct RoutingDomainDetail {
dial_info_details: Vec<DialInfoDetail>,
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)] #[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)]
pub struct DialInfoDetail { pub struct DialInfoDetail {
pub dial_info: DialInfo, pub dial_info: DialInfo,
pub origin: DialInfoOrigin, pub origin: DialInfoOrigin,
pub network_class: Option<NetworkClass>,
pub timestamp: u64, pub timestamp: u64,
} }
@ -48,13 +58,10 @@ struct RoutingTableInner {
node_id: DHTKey, node_id: DHTKey,
node_id_secret: DHTKeySecret, node_id_secret: DHTKeySecret,
buckets: Vec<Bucket>, buckets: Vec<Bucket>,
public_dial_info_details: Vec<DialInfoDetail>, public_internet_routing_domain: RoutingDomainDetail,
interface_dial_info_details: Vec<DialInfoDetail>, local_network_routing_domain: RoutingDomainDetail,
bucket_entry_count: usize, bucket_entry_count: usize,
// Waiters
eventual_changed_dial_info: Eventual,
// Transfer stats for this node // Transfer stats for this node
self_latency_stats_accounting: LatencyStatsAccounting, self_latency_stats_accounting: LatencyStatsAccounting,
self_transfer_stats_accounting: TransferStatsAccounting, self_transfer_stats_accounting: TransferStatsAccounting,
@ -90,10 +97,9 @@ impl RoutingTable {
node_id: DHTKey::default(), node_id: DHTKey::default(),
node_id_secret: DHTKeySecret::default(), node_id_secret: DHTKeySecret::default(),
buckets: Vec::new(), buckets: Vec::new(),
public_dial_info_details: Vec::new(), public_internet_routing_domain: RoutingDomainDetail::default(),
interface_dial_info_details: Vec::new(), local_network_routing_domain: RoutingDomainDetail::default(),
bucket_entry_count: 0, bucket_entry_count: 0,
eventual_changed_dial_info: Eventual::new(),
self_latency_stats_accounting: LatencyStatsAccounting::new(), self_latency_stats_accounting: LatencyStatsAccounting::new(),
self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats_accounting: TransferStatsAccounting::new(),
self_transfer_stats: TransferStatsDownUp::default(), self_transfer_stats: TransferStatsDownUp::default(),
@ -165,81 +171,78 @@ impl RoutingTable {
self.inner.lock().node_id_secret self.inner.lock().node_id_secret
} }
pub fn has_interface_dial_info(&self) -> bool { fn with_routing_domain<F, R>(inner: &RoutingTableInner, domain: RoutingDomain, f: F) -> R
!self.inner.lock().interface_dial_info_details.is_empty() where
F: FnOnce(&RoutingDomainDetail) -> R,
{
match domain {
RoutingDomain::PublicInternet => f(&inner.public_internet_routing_domain),
RoutingDomain::LocalNetwork => f(&inner.local_network_routing_domain),
}
} }
pub fn has_public_dial_info(&self) -> bool { fn with_routing_domain_mut<F, R>(
!self.inner.lock().public_dial_info_details.is_empty() inner: &mut RoutingTableInner,
domain: RoutingDomain,
f: F,
) -> R
where
F: FnOnce(&mut RoutingDomainDetail) -> R,
{
match domain {
RoutingDomain::PublicInternet => f(&mut inner.public_internet_routing_domain),
RoutingDomain::LocalNetwork => f(&mut inner.local_network_routing_domain),
}
} }
pub fn public_dial_info_details(&self) -> Vec<DialInfoDetail> { pub fn has_dial_info(&self, domain: RoutingDomain) -> bool {
self.inner.lock().public_dial_info_details.clone() let inner = self.inner.lock();
Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details.is_empty())
} }
pub fn interface_dial_info_details(&self) -> Vec<DialInfoDetail> { pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec<DialInfoDetail> {
self.inner.lock().interface_dial_info_details.clone() let inner = self.inner.lock();
Self::with_routing_domain(&*inner, domain, |rd| rd.dial_info_details.clone())
} }
pub fn first_public_filtered_dial_info_detail( pub fn first_filtered_dial_info_detail(
&self, &self,
domain: RoutingDomain,
filter: &DialInfoFilter, filter: &DialInfoFilter,
) -> Option<DialInfoDetail> { ) -> Option<DialInfoDetail> {
let inner = self.inner.lock(); let inner = self.inner.lock();
for did in &inner.public_dial_info_details { Self::with_routing_domain(&*inner, domain, |rd| {
for did in rd.dial_info_details {
if did.matches_filter(filter) { if did.matches_filter(filter) {
return Some(did.clone()); return Some(did.clone());
} }
} }
None None
})
} }
pub fn all_public_filtered_dial_info_details( pub fn all_filtered_dial_info_details(
&self, &self,
domain: RoutingDomain,
filter: &DialInfoFilter, filter: &DialInfoFilter,
) -> Vec<DialInfoDetail> { ) -> Vec<DialInfoDetail> {
let inner = self.inner.lock(); let inner = self.inner.lock();
Self::with_routing_domain(&*inner, domain, |rd| {
let mut ret = Vec::new(); let mut ret = Vec::new();
for did in &inner.public_dial_info_details { for did in rd.dial_info_details {
if did.matches_filter(filter) { if did.matches_filter(filter) {
ret.push(did.clone()); ret.push(did.clone());
} }
} }
ret ret
})
} }
pub fn first_interface_filtered_dial_info_detail( pub fn register_dial_info(
&self,
filter: &DialInfoFilter,
) -> Option<DialInfoDetail> {
let inner = self.inner.lock();
for did in &inner.interface_dial_info_details {
if did.matches_filter(filter) {
return Some(did.clone());
}
}
None
}
pub fn all_interface_filtered_dial_info_details(
&self,
filter: &DialInfoFilter,
) -> Vec<DialInfoDetail> {
let inner = self.inner.lock();
let mut ret = Vec::new();
for did in &inner.interface_dial_info_details {
if did.matches_filter(filter) {
ret.push(did.clone());
}
}
ret
}
pub fn register_public_dial_info(
&self, &self,
domain: RoutingDomain,
dial_info: DialInfo, dial_info: DialInfo,
origin: DialInfoOrigin, origin: DialInfoOrigin,
network_class: Option<NetworkClass>,
) { ) {
let timestamp = get_timestamp(); let timestamp = get_timestamp();
let enable_local_peer_scope = { let enable_local_peer_scope = {
@ -248,7 +251,10 @@ impl RoutingTable {
c.network.enable_local_peer_scope c.network.enable_local_peer_scope
}; };
if !enable_local_peer_scope && dial_info.is_local() { if !enable_local_peer_scope
&& matches!(domain, RoutingDomain::PublicInternet)
&& dial_info.is_local()
{
error!("shouldn't be registering local addresses as public"); error!("shouldn't be registering local addresses as public");
return; return;
} }
@ -258,21 +264,21 @@ impl RoutingTable {
} }
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
Self::with_routing_domain_mut(&mut *inner, domain, |rd| {
inner.public_dial_info_details.push(DialInfoDetail { rd.dial_info_details.push(DialInfoDetail {
dial_info: dial_info.clone(), dial_info: dial_info.clone(),
origin, origin,
network_class,
timestamp, timestamp,
}); });
});
// Re-sort dial info to endure preference ordering let domain_str = match domain {
inner RoutingDomain::PublicInternet => "Public",
.public_dial_info_details RoutingDomain::LocalNetwork => "Local",
.sort_by(|a, b| a.dial_info.cmp(&b.dial_info)); };
info!( info!(
"Public Dial Info: {}", "{} Dial Info: {}",
domain_str,
NodeDialInfo { NodeDialInfo {
node_id: NodeId::new(inner.node_id), node_id: NodeId::new(inner.node_id),
dial_info dial_info
@ -280,74 +286,13 @@ impl RoutingTable {
.to_string(), .to_string(),
); );
debug!(" Origin: {:?}", origin); debug!(" Origin: {:?}", origin);
debug!(" Network Class: {:?}", network_class);
Self::trigger_changed_dial_info(&mut *inner);
} }
pub fn register_interface_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) { pub fn clear_dial_info_details(&self, domain: RoutingDomain) {
if !dial_info.is_valid() {
error!("shouldn't be registering invalid interface addresses");
return;
}
let timestamp = get_timestamp();
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
Self::with_routing_domain_mut(&mut *inner, domain, |rd| {
inner.interface_dial_info_details.push(DialInfoDetail { rd.dial_info_details.clear();
dial_info: dial_info.clone(), })
origin,
network_class: None,
timestamp,
});
// Re-sort dial info to endure preference ordering
inner
.interface_dial_info_details
.sort_by(|a, b| a.dial_info.cmp(&b.dial_info));
info!(
"Interface Dial Info: {}",
NodeDialInfo {
node_id: NodeId::new(inner.node_id),
dial_info
}
.to_string(),
);
debug!(" Origin: {:?}", origin);
Self::trigger_changed_dial_info(&mut *inner);
}
pub fn clear_dial_info_details(&self) {
let mut inner = self.inner.lock();
inner.public_dial_info_details.clear();
inner.interface_dial_info_details.clear();
Self::trigger_changed_dial_info(&mut *inner);
}
pub async fn wait_changed_dial_info(&self) {
let inst = self
.inner
.lock()
.eventual_changed_dial_info
.instance_empty();
inst.await;
}
fn trigger_changed_dial_info(inner: &mut RoutingTableInner) {
// Clear 'seen node info' bits on routing table entries so we know to ping them
for b in &mut inner.buckets {
for e in b.entries_mut() {
e.1.set_seen_our_node_info(false);
}
}
//
// Release any waiters
let mut new_eventual = Eventual::new();
core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual);
spawn(new_eventual.resolve()).detach();
} }
fn bucket_depth(index: usize) -> usize { fn bucket_depth(index: usize) -> usize {
@ -688,7 +633,7 @@ impl RoutingTable {
k, k,
NodeInfo { NodeInfo {
network_class: NetworkClass::Server, // Bootstraps are always full servers network_class: NetworkClass::Server, // Bootstraps are always full servers
outbound_protocols: ProtocolSet::default(), // Bootstraps do not participate in relaying and will not make outbound requests outbound_protocols: ProtocolSet::empty(), // Bootstraps do not participate in relaying and will not make outbound requests
dial_info_list: v, // Dial info is as specified in the bootstrap list dial_info_list: v, // Dial info is as specified in the bootstrap list
relay_peer_info: None, // Bootstraps never require a relay themselves relay_peer_info: None, // Bootstraps never require a relay themselves
}, },

View File

@ -124,6 +124,7 @@ struct WaitableReply {
timeout: u64, timeout: u64,
node_ref: NodeRef, node_ref: NodeRef,
send_ts: u64, send_ts: u64,
send_data_kind: SendDataKind,
} }
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
@ -389,9 +390,6 @@ impl RPCProcessor {
} }
// Issue a request over the network, possibly using an anonymized route // Issue a request over the network, possibly using an anonymized route
// If the request doesn't want a reply, returns immediately
// If the request wants a reply then it waits for one asynchronously
// If it doesn't receive a response in a sufficient time, then it returns a timeout error
async fn request<T: capnp::message::ReaderSegments>( async fn request<T: capnp::message::ReaderSegments>(
&self, &self,
dest: Destination, dest: Destination,
@ -411,11 +409,13 @@ impl RPCProcessor {
(op_id, wants_answer) (op_id, wants_answer)
}; };
let out_node_id; let out_node_id; // Envelope Node Id
let mut out_noderef: Option<NodeRef> = None; let mut out_noderef: Option<NodeRef> = None; // Node to send envelope to
let hopcount: usize; let hopcount: usize; // Total safety + private route hop count
// Create envelope data
let out = { let out = {
let out; let out; // Envelope data
// To where are we sending the request // To where are we sending the request
match &dest { match &dest {
@ -539,19 +539,22 @@ impl RPCProcessor {
// send question // send question
let bytes = out.len() as u64; let bytes = out.len() as u64;
if let Err(e) = self let send_data_kind = match self
.network_manager() .network_manager()
.send_envelope(node_ref.clone(), Some(out_node_id), out) .send_envelope(node_ref.clone(), Some(out_node_id), out)
.await .await
.map_err(logthru_rpc!(error)) .map_err(logthru_rpc!(error))
.map_err(RPCError::Internal) .map_err(RPCError::Internal)
{ {
Ok(v) => v,
Err(e) => {
// Make sure to clean up op id waiter in case of error // Make sure to clean up op id waiter in case of error
if eventual.is_some() { if eventual.is_some() {
self.cancel_op_id_waiter(op_id); self.cancel_op_id_waiter(op_id);
} }
return Err(e); return Err(e);
} }
};
// Successfully sent // Successfully sent
let send_ts = get_timestamp(); let send_ts = get_timestamp();
@ -570,12 +573,13 @@ impl RPCProcessor {
timeout, timeout,
node_ref, node_ref,
send_ts, send_ts,
send_data_kind,
})), })),
} }
} }
// Issue a reply over the network, possibly using an anonymized route // Issue a reply over the network, possibly using an anonymized route
// If the request doesn't want a reply, this routine does nothing // The request must want a response, or this routine fails
async fn reply<T: capnp::message::ReaderSegments>( async fn reply<T: capnp::message::ReaderSegments>(
&self, &self,
request_rpcreader: RPCMessageReader, request_rpcreader: RPCMessageReader,
@ -1379,6 +1383,9 @@ impl RPCProcessor {
.await? .await?
.unwrap(); .unwrap();
// Note what kind of ping this was and to what peer scope
let send_data_kind = waitable_reply.send_data_kind;
// Wait for reply // Wait for reply
let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?; let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?;
@ -1423,6 +1430,26 @@ impl RPCProcessor {
e.update_node_status(node_status.clone()); e.update_node_status(node_status.clone());
}); });
// Report sender_info IP addresses to network manager
if let Some(socket_address) = sender_info.socket_address {
match send_data_kind {
SendDataKind::LocalDirect => {
self.network_manager()
.report_local_socket_address(socket_address, peer)
.await;
}
SendDataKind::GlobalDirect => {
self.network_manager()
.report_global_socket_address(socket_address, peer)
.await;
}
SendDataKind::GlobalIndirect => {
// Do nothing in this case, as the socket address returned here would be for any node other than ours
}
}
}
// Return the answer for anyone who may care // Return the answer for anyone who may care
let out = InfoAnswer { let out = InfoAnswer {
latency, latency,