diff --git a/veilid-cli/src/main.rs b/veilid-cli/src/main.rs index 836a7a66..1698cfa0 100644 --- a/veilid-cli/src/main.rs +++ b/veilid-cli/src/main.rs @@ -152,7 +152,7 @@ fn main() -> Result<(), String> { (Box::pin(f.compat()) as Pin>, tokio::io::stdout().compat_write()) }; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } @@ -171,7 +171,7 @@ fn main() -> Result<(), String> { let in_str = format!("{}\n", evaluate); let (in_obj, out_obj) = (futures::io::Cursor::new(in_str), tokio::io::stdout().compat_write()); } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } @@ -347,7 +347,7 @@ fn main() -> Result<(), String> { // Wait for ui and connection to complete let _ = tokio::join!(ui_future, connection_future); } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } Ok(()) diff --git a/veilid-cli/src/tools.rs b/veilid-cli/src/tools.rs index 2223a935..ffe92f2d 100644 --- a/veilid-cli/src/tools.rs +++ b/veilid-cli/src/tools.rs @@ -20,7 +20,7 @@ cfg_if! { local.block_on(&rt, f) } } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-core/src/crypto/types/byte_array_types.rs b/veilid-core/src/crypto/types/byte_array_types.rs index e270f8b1..b2096a82 100644 --- a/veilid-core/src/crypto/types/byte_array_types.rs +++ b/veilid-core/src/crypto/types/byte_array_types.rs @@ -267,7 +267,7 @@ macro_rules! byte_array_type { Self::new(value) } } - + impl From<$name> for [u8; $size] { fn from(value: $name) -> Self { value.bytes diff --git a/veilid-core/src/intf/native/system.rs b/veilid-core/src/intf/native/system.rs index b6759555..ed8dba81 100644 --- a/veilid-core/src/intf/native/system.rs +++ b/veilid-core/src/intf/native/system.rs @@ -29,7 +29,7 @@ cfg_if! { AsyncResolver::tokio(config, options) } } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-core/src/network_manager/native/igd_manager.rs b/veilid-core/src/network_manager/native/igd_manager.rs index 2cf28977..f332ba81 100644 --- a/veilid-core/src/network_manager/native/igd_manager.rs +++ b/veilid-core/src/network_manager/native/igd_manager.rs @@ -2,11 +2,11 @@ use super::*; use igd::*; use std::net::UdpSocket; - const UPNP_GATEWAY_DETECT_TIMEOUT_MS: u32 = 5_000; const UPNP_MAPPING_LIFETIME_MS: u32 = 120_000; const UPNP_MAPPING_ATTEMPTS: u32 = 3; -const UPNP_MAPPING_LIFETIME_US:TimestampDuration = TimestampDuration::new(UPNP_MAPPING_LIFETIME_MS as u64 * 1000u64); +const UPNP_MAPPING_LIFETIME_US: TimestampDuration = + TimestampDuration::new(UPNP_MAPPING_LIFETIME_MS as u64 * 1000u64); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] struct PortMapKey { @@ -36,7 +36,6 @@ pub struct IGDManager { inner: Arc>, } - fn convert_llpt(llpt: LowLevelProtocolType) -> PortMappingProtocol { match llpt { LowLevelProtocolType::UDP => PortMappingProtocol::UDP, @@ -44,7 +43,6 @@ fn convert_llpt(llpt: LowLevelProtocolType) -> PortMappingProtocol { } } - impl IGDManager { // @@ -71,7 +69,7 @@ impl IGDManager { return None; } }; - + // can be any routable ip address, // this is just to make the system routing table calculate the appropriate local ip address // using google's dns, but it wont actually send any packets to it @@ -82,7 +80,8 @@ impl IGDManager { IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)), 80, ), - }).map_err(|e| { + }) + .map_err(|e| { log_net!(debug "failed to connect to dummy address: {}", e); e }) @@ -92,9 +91,7 @@ impl IGDManager { } #[instrument(level = "trace", target = "net", skip_all)] - fn find_local_ip(inner: &mut IGDManagerInner, - address_type: AddressType, - ) -> Option { + fn find_local_ip(inner: &mut IGDManagerInner, address_type: AddressType) -> Option { if let Some(ip) = inner.local_ip_addrs.get(&address_type) { return Some(*ip); } @@ -112,10 +109,7 @@ impl IGDManager { } #[instrument(level = "trace", target = "net", skip_all)] - fn get_local_ip( - inner: &mut IGDManagerInner, - address_type: AddressType, - ) -> Option { + fn get_local_ip(inner: &mut IGDManagerInner, address_type: AddressType) -> Option { if let Some(ip) = inner.local_ip_addrs.get(&address_type) { return Some(*ip); } @@ -123,20 +117,14 @@ impl IGDManager { } #[instrument(level = "trace", target = "net", skip_all)] - fn find_gateway( - inner: &mut IGDManagerInner, - local_ip: IpAddr, - ) -> Option> { - + fn find_gateway(inner: &mut IGDManagerInner, local_ip: IpAddr) -> Option> { if let Some(gw) = inner.gateways.get(&local_ip) { return Some(gw.clone()); } let gateway = match local_ip { IpAddr::V4(v4) => { - let mut opts = SearchOptions::new_v4( - UPNP_GATEWAY_DETECT_TIMEOUT_MS as u64, - ); + let mut opts = SearchOptions::new_v4(UPNP_GATEWAY_DETECT_TIMEOUT_MS as u64); opts.bind_addr = SocketAddr::V4(SocketAddrV4::new(v4, 0)); match igd::search_gateway(opts) { @@ -162,7 +150,6 @@ impl IGDManager { } } } - }; let gw = Arc::new(gateway); inner.gateways.insert(local_ip, gw.clone()); @@ -170,59 +157,68 @@ impl IGDManager { } #[instrument(level = "trace", target = "net", skip_all)] - fn get_gateway( - inner: &mut IGDManagerInner, - local_ip: IpAddr, - ) -> Option> { + fn get_gateway(inner: &mut IGDManagerInner, local_ip: IpAddr) -> Option> { if let Some(gw) = inner.gateways.get(&local_ip) { return Some(gw.clone()); } None } - fn get_description(&self, llpt: LowLevelProtocolType, local_port:u16) -> String { - format!("{} map {} for port {}", self.config.get().program_name, convert_llpt(llpt), local_port ) + fn get_description(&self, llpt: LowLevelProtocolType, local_port: u16) -> String { + format!( + "{} map {} for port {}", + self.config.get().program_name, + convert_llpt(llpt), + local_port + ) } #[instrument(level = "trace", target = "net", skip_all)] - pub async fn unmap_port(&self, + pub async fn unmap_port( + &self, llpt: LowLevelProtocolType, at: AddressType, mapped_port: u16, ) -> Option<()> { let this = self.clone(); - blocking_wrapper("igd unmap_port", move || { - let mut inner = this.inner.lock(); + blocking_wrapper( + "igd unmap_port", + move || { + let mut inner = this.inner.lock(); - // If we already have this port mapped, just return the existing portmap - let mut found = None; - for (pmk, pmv) in &inner.port_maps { - if pmk.llpt == llpt && pmk.at == at && pmv.mapped_port == mapped_port { - found = Some(*pmk); - break; + // If we already have this port mapped, just return the existing portmap + let mut found = None; + for (pmk, pmv) in &inner.port_maps { + if pmk.llpt == llpt && pmk.at == at && pmv.mapped_port == mapped_port { + found = Some(*pmk); + break; + } } - } - let pmk = found?; - let _pmv = inner.port_maps.remove(&pmk).expect("key found but remove failed"); + let pmk = found?; + let _pmv = inner + .port_maps + .remove(&pmk) + .expect("key found but remove failed"); - - // Get local ip address - let local_ip = Self::find_local_ip(&mut inner, at)?; + // Get local ip address + let local_ip = Self::find_local_ip(&mut inner, at)?; - // Find gateway - let gw = Self::find_gateway(&mut inner, local_ip)?; + // Find gateway + let gw = Self::find_gateway(&mut inner, local_ip)?; - // Unmap port - match gw.remove_port(convert_llpt(llpt), mapped_port) { - Ok(()) => (), - Err(e) => { - // Failed to map external port - log_net!(debug "upnp failed to remove external port: {}", e); - return None; - } - }; - Some(()) - }, None) + // Unmap port + match gw.remove_port(convert_llpt(llpt), mapped_port) { + Ok(()) => (), + Err(e) => { + // Failed to map external port + log_net!(debug "upnp failed to remove external port: {}", e); + return None; + } + }; + Some(()) + }, + None, + ) .await } @@ -310,7 +306,13 @@ impl IGDManager { .await } - #[instrument(level = "trace", target = "net", name = "IGDManager::tick", skip_all, err)] + #[instrument( + level = "trace", + target = "net", + name = "IGDManager::tick", + skip_all, + err + )] pub async fn tick(&self) -> EyreResult { // Refresh mappings if we have them // If an error is received, then return false to restart the local network @@ -322,13 +324,14 @@ impl IGDManager { for (k, v) in &inner.port_maps { let mapping_lifetime = now.saturating_sub(v.timestamp); - if mapping_lifetime >= UPNP_MAPPING_LIFETIME_US || v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS { + if mapping_lifetime >= UPNP_MAPPING_LIFETIME_US + || v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS + { // Past expiration time or tried N times, do a full renew and fail out if we can't full_renews.push((*k, *v)); - } - else if mapping_lifetime >= v.renewal_lifetime { + } else if mapping_lifetime >= v.renewal_lifetime { // Attempt a normal renewal - renews.push((*k, *v)); + renews.push((*k, *v)); } } @@ -340,100 +343,125 @@ impl IGDManager { } let this = self.clone(); - blocking_wrapper("igd tick", move || { - let mut inner = this.inner.lock(); + blocking_wrapper( + "igd tick", + move || { + let mut inner = this.inner.lock(); - // Process full renewals - for (k, v) in full_renews { + // Process full renewals + for (k, v) in full_renews { + // Get local ip for address type + let local_ip = match Self::get_local_ip(&mut inner, k.at) { + Some(ip) => ip, + None => { + return Err(eyre!("local ip missing for address type")); + } + }; - // Get local ip for address type - let local_ip = match Self::get_local_ip(&mut inner, k.at) { - Some(ip) => ip, - None => { - return Err(eyre!("local ip missing for address type")); - } - }; - - // Get gateway for interface - let gw = match Self::get_gateway(&mut inner, local_ip) { - Some(gw) => gw, - None => { - return Err(eyre!("gateway missing for interface")); - } - }; + // Get gateway for interface + let gw = match Self::get_gateway(&mut inner, local_ip) { + Some(gw) => gw, + None => { + return Err(eyre!("gateway missing for interface")); + } + }; - // Delete the mapping if it exists, ignore any errors here - let _ = gw.remove_port(convert_llpt(k.llpt), v.mapped_port); - inner.port_maps.remove(&k); + // Delete the mapping if it exists, ignore any errors here + let _ = gw.remove_port(convert_llpt(k.llpt), v.mapped_port); + inner.port_maps.remove(&k); - let desc = this.get_description(k.llpt, k.local_port); - match gw.add_any_port(convert_llpt(k.llpt), SocketAddr::new(local_ip, k.local_port), (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, &desc) { - Ok(mapped_port) => { - log_net!(debug "full-renewed mapped port {:?} -> {:?}", v, k); - inner.port_maps.insert(k, PortMapValue { - ext_ip: v.ext_ip, - mapped_port, - timestamp: Timestamp::now(), - renewal_lifetime: TimestampDuration::new((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64), - renewal_attempts: 0, - }); - }, - Err(e) => { - info!("failed to full-renew mapped port {:?} -> {:?}: {}", v, k, e); - - // Must restart network now :( - return Ok(false); - } - }; + let desc = this.get_description(k.llpt, k.local_port); + match gw.add_any_port( + convert_llpt(k.llpt), + SocketAddr::new(local_ip, k.local_port), + (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, + &desc, + ) { + Ok(mapped_port) => { + log_net!(debug "full-renewed mapped port {:?} -> {:?}", v, k); + inner.port_maps.insert( + k, + PortMapValue { + ext_ip: v.ext_ip, + mapped_port, + timestamp: Timestamp::now(), + renewal_lifetime: TimestampDuration::new( + (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64, + ), + renewal_attempts: 0, + }, + ); + } + Err(e) => { + info!("failed to full-renew mapped port {:?} -> {:?}: {}", v, k, e); - } - // Process normal renewals - for (k, mut v) in renews { - - // Get local ip for address type - let local_ip = match Self::get_local_ip(&mut inner, k.at) { - Some(ip) => ip, - None => { - return Err(eyre!("local ip missing for address type")); - } - }; + // Must restart network now :( + return Ok(false); + } + }; + } + // Process normal renewals + for (k, mut v) in renews { + // Get local ip for address type + let local_ip = match Self::get_local_ip(&mut inner, k.at) { + Some(ip) => ip, + None => { + return Err(eyre!("local ip missing for address type")); + } + }; - // Get gateway for interface - let gw = match Self::get_gateway(&mut inner, local_ip) { - Some(gw) => gw, - None => { - return Err(eyre!("gateway missing for address type")); - } - }; + // Get gateway for interface + let gw = match Self::get_gateway(&mut inner, local_ip) { + Some(gw) => gw, + None => { + return Err(eyre!("gateway missing for address type")); + } + }; - let desc = this.get_description(k.llpt, k.local_port); - match gw.add_port(convert_llpt(k.llpt), v.mapped_port, SocketAddr::new(local_ip, k.local_port), (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, &desc) { - Ok(()) => { - log_net!("renewed mapped port {:?} -> {:?}", v, k); + let desc = this.get_description(k.llpt, k.local_port); + match gw.add_port( + convert_llpt(k.llpt), + v.mapped_port, + SocketAddr::new(local_ip, k.local_port), + (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, + &desc, + ) { + Ok(()) => { + log_net!("renewed mapped port {:?} -> {:?}", v, k); - inner.port_maps.insert(k, PortMapValue { - ext_ip: v.ext_ip, - mapped_port: v.mapped_port, - timestamp: Timestamp::now(), - renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64).into(), - renewal_attempts: 0, - }); - }, - Err(e) => { - log_net!(debug "failed to renew mapped port {:?} -> {:?}: {}", v, k, e); - - // Get closer to the maximum renewal timeline by a factor of two each time - v.renewal_lifetime = (v.renewal_lifetime + UPNP_MAPPING_LIFETIME_US) / 2u64; - v.renewal_attempts += 1; - - // Store new value to try again - inner.port_maps.insert(k, v); - } - }; - } - - // Normal exit, no restart - Ok(true) - }, Err(eyre!("failed to process blocking task"))).instrument(tracing::trace_span!("igd tick fut")).await + inner.port_maps.insert( + k, + PortMapValue { + ext_ip: v.ext_ip, + mapped_port: v.mapped_port, + timestamp: Timestamp::now(), + renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 + * 1000u64) + .into(), + renewal_attempts: 0, + }, + ); + } + Err(e) => { + log_net!(debug "failed to renew mapped port {:?} -> {:?}: {}", v, k, e); + + // Get closer to the maximum renewal timeline by a factor of two each time + v.renewal_lifetime = + (v.renewal_lifetime + UPNP_MAPPING_LIFETIME_US) / 2u64; + v.renewal_attempts += 1; + + // Store new value to try again + inner.port_maps.insert(k, v); + } + }; + } + + // Normal exit, no restart + Ok(true) + }, + Err(eyre!("failed to process blocking task")), + ) + .instrument(tracing::trace_span!("igd tick fut")) + .await } } diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index a3cca4a1..94eee1ae 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -288,7 +288,7 @@ impl Network { std_listener.set_nonblocking(true).expect("failed to set nonblocking"); let listener = TcpListener::from_std(std_listener).wrap_err("failed to create tokio tcp listener")?; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } @@ -317,7 +317,7 @@ impl Network { } else if #[cfg(feature="rt-tokio")] { let incoming_stream = tokio_stream::wrappers::TcpListenerStream::new(listener); } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 089bd687..dd4865fa 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -128,7 +128,7 @@ impl Network { std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking"); let udp_socket = UdpSocket::from_std(std_udp_socket).wrap_err("failed to make inbound tokio udpsocket")?; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } let socket_arc = Arc::new(udp_socket); diff --git a/veilid-core/src/network_manager/native/protocol/sockets.rs b/veilid-core/src/network_manager/native/protocol/sockets.rs index 2ef43277..6f918c95 100644 --- a/veilid-core/src/network_manager/native/protocol/sockets.rs +++ b/veilid-core/src/network_manager/native/protocol/sockets.rs @@ -9,7 +9,7 @@ cfg_if! { pub use tokio::net::{TcpStream, TcpListener, UdpSocket}; pub use tokio_util::compat::*; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } @@ -182,7 +182,7 @@ pub async fn nonblocking_connect( } else if #[cfg(feature="rt-tokio")] { Ok(TimeoutOr::value(TcpStream::from_std(async_stream.into_inner()?)?)) } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } } diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index c2919c77..0dab86f4 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -39,7 +39,7 @@ impl RawTcpNetworkConnection { // .shutdown() // .await // } else { - // compile_error!("needs executor implementation") + // compile_error!("needs executor implementation"); // } // } } diff --git a/veilid-core/src/network_manager/native/protocol/wrtc.rs b/veilid-core/src/network_manager/native/protocol/wrtc.rs index e69de29b..8b137891 100644 --- a/veilid-core/src/network_manager/native/protocol/wrtc.rs +++ b/veilid-core/src/network_manager/native/protocol/wrtc.rs @@ -0,0 +1 @@ + diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 6ef18bec..9adf0d1e 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -31,7 +31,7 @@ cfg_if! { WebsocketNetworkConnection>>; pub type WebsocketNetworkConnectionWS = WebsocketNetworkConnection>; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index b55795e8..4a4d49c0 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -82,7 +82,6 @@ pub struct NetworkConnectionStats { last_message_recv_time: Option, } - #[derive(Debug)] pub(in crate::network_manager) struct NetworkConnection { connection_id: NetworkConnectionId, @@ -104,7 +103,6 @@ impl Drop for NetworkConnection { } } - impl NetworkConnection { pub(super) fn dummy(id: NetworkConnectionId, flow: Flow) -> Self { // Create handle for sending (dummy is immediately disconnected) @@ -149,16 +147,19 @@ impl NetworkConnection { let local_stop_token = stop_source.token(); // Spawn connection processor and pass in protocol connection - let processor = spawn("connection processor", Self::process_connection( - connection_manager, - local_stop_token, - manager_stop_token, - connection_id, - flow, - receiver, - protocol_connection, - stats.clone(), - )); + let processor = spawn( + "connection processor", + Self::process_connection( + connection_manager, + local_stop_token, + manager_stop_token, + connection_id, + flow, + receiver, + protocol_connection, + stats.clone(), + ), + ); // Return the connection Self { @@ -198,7 +199,7 @@ impl NetworkConnection { self.ref_count > 0 } - pub fn protected_node_ref(&self) -> Option{ + pub fn protected_node_ref(&self) -> Option { self.protected_nr.clone() } @@ -221,7 +222,7 @@ impl NetworkConnection { } } - #[instrument(level="trace", target="net", skip_all)] + #[instrument(level = "trace", target = "net", skip_all)] async fn send_internal( protocol_connection: &ProtocolNetworkConnection, stats: Arc>, @@ -236,7 +237,7 @@ impl NetworkConnection { Ok(NetworkResult::Value(())) } - #[instrument(level="trace", target="net", skip_all)] + #[instrument(level = "trace", target = "net", skip_all)] async fn recv_internal( protocol_connection: &ProtocolNetworkConnection, stats: Arc>, @@ -446,16 +447,30 @@ impl NetworkConnection { } pub fn debug_print(&self, cur_ts: Timestamp) -> String { - format!("{} <- {} | {} | est {} sent {} rcvd {} refcount {}{}", - self.flow.remote_address(), - self.flow.local().map(|x| x.to_string()).unwrap_or("---".to_owned()), + format!( + "{} <- {} | {} | est {} sent {} rcvd {} refcount {}{}", + self.flow.remote_address(), + self.flow + .local() + .map(|x| x.to_string()) + .unwrap_or("---".to_owned()), self.connection_id.as_u64(), - debug_duration(cur_ts.as_u64().saturating_sub(self.established_time.as_u64())), - self.stats().last_message_sent_time.map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64())) ).unwrap_or("---".to_owned()), - self.stats().last_message_recv_time.map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64())) ).unwrap_or("---".to_owned()), - self.ref_count, + debug_duration( + cur_ts + .as_u64() + .saturating_sub(self.established_time.as_u64()) + ), + self.stats() + .last_message_sent_time + .map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64()))) + .unwrap_or("---".to_owned()), + self.stats() + .last_message_recv_time + .map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64()))) + .unwrap_or("---".to_owned()), + self.ref_count, if let Some(pnr) = &self.protected_nr { - format!(" PROTECTED:{}",pnr) + format!(" PROTECTED:{}", pnr) } else { "".to_owned() } diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index fabe30a8..46a4d5ec 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -10,9 +10,9 @@ impl NetworkManager { /// in this case, if it matches the node ref's filters and no more permissive flow /// could be established. /// - /// Sending to a node requires determining a NetworkClass compatible contact method + /// Sending to a node requires determining a NetworkClass compatible contact method /// between the source and destination node - #[instrument(level="trace", target="net", skip_all, err)] + #[instrument(level = "trace", target = "net", skip_all, err)] pub(crate) async fn send_data( &self, destination_node_ref: NodeRef, @@ -20,15 +20,10 @@ impl NetworkManager { ) -> EyreResult> { // First try to send data to the last flow we've seen this peer on let data = if let Some(flow) = destination_node_ref.last_flow() { - match self - .net() - .send_data_to_existing_flow(flow, data) - .await? - { + match self.net().send_data_to_existing_flow(flow, data).await? { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last flow since we just sent to it - destination_node_ref - .set_last_flow(unique_flow.flow, Timestamp::now()); + destination_node_ref.set_last_flow(unique_flow.flow, Timestamp::now()); return Ok(NetworkResult::value(SendDataMethod { opt_relayed_contact_method: None, @@ -48,15 +43,22 @@ impl NetworkManager { }; // No existing connection was found or usable, so we proceed to see how to make a new one - - // Get the best way to contact this node - let possibly_relayed_contact_method = self.get_node_contact_method(destination_node_ref.clone())?; - self.try_possibly_relayed_contact_method(possibly_relayed_contact_method, destination_node_ref, data).await + // Get the best way to contact this node + let possibly_relayed_contact_method = + self.get_node_contact_method(destination_node_ref.clone())?; + + self.try_possibly_relayed_contact_method( + possibly_relayed_contact_method, + destination_node_ref, + data, + ) + .await } - #[instrument(level="trace", target="net", skip_all)] - pub(crate) fn try_possibly_relayed_contact_method(&self, + #[instrument(level = "trace", target = "net", skip_all)] + pub(crate) fn try_possibly_relayed_contact_method( + &self, possibly_relayed_contact_method: NodeContactMethod, destination_node_ref: NodeRef, data: Vec, @@ -74,9 +76,9 @@ impl NetworkManager { } cm => (cm, destination_node_ref.clone(), None), }; - + #[cfg(feature = "verbose-tracing")] - log_net!(debug + log_net!(debug "ContactMethod: {:?} for {:?}", contact_method, destination_node_ref ); @@ -154,7 +156,7 @@ impl NetworkManager { } /// Send data using NodeContactMethod::Existing - #[instrument(level="trace", target="net", skip_all, err)] + #[instrument(level = "trace", target = "net", skip_all, err)] async fn send_data_ncm_existing( &self, target_node_ref: NodeRef, @@ -162,16 +164,13 @@ impl NetworkManager { ) -> EyreResult> { // First try to send data to the last connection we've seen this peer on let Some(flow) = target_node_ref.last_flow() else { - return Ok(NetworkResult::no_connection_other( - format!("should have found an existing connection: {}", target_node_ref) - )); + return Ok(NetworkResult::no_connection_other(format!( + "should have found an existing connection: {}", + target_node_ref + ))); }; - let unique_flow = match self - .net() - .send_data_to_existing_flow(flow, data) - .await? - { + let unique_flow = match self.net().send_data_to_existing_flow(flow, data).await? { SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, SendDataToExistingFlowResult::NotSent(_) => { return Ok(NetworkResult::no_connection_other( @@ -183,43 +182,41 @@ impl NetworkManager { // Update timestamp for this last connection since we just sent to it target_node_ref.set_last_flow(flow, Timestamp::now()); - Ok(NetworkResult::value(SendDataMethod{ - contact_method: NodeContactMethod::Existing, - opt_relayed_contact_method: None, - unique_flow - })) - } - - /// Send data using NodeContactMethod::Unreachable - #[instrument(level="trace", target="net", skip_all, err)] - async fn send_data_ncm_unreachable( - &self, - target_node_ref: NodeRef, - data: Vec, - ) -> EyreResult> { - // Try to send data to the last socket we've seen this peer on - let Some(flow) = target_node_ref.last_flow() else { - return Ok(NetworkResult::no_connection_other( - format!("Node is not reachable and has no existing connection: {}", target_node_ref) - )); - }; - - let unique_flow = match self - .net() - .send_data_to_existing_flow(flow, data) - .await? - { - SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, - SendDataToExistingFlowResult::NotSent(_) => { - return Ok(NetworkResult::no_connection_other( - format!("failed to send to unreachable node over existing connection: {:?}", flow) - )); - } - }; - - // Update timestamp for this last connection since we just sent to it - target_node_ref.set_last_flow(flow, Timestamp::now()); - + Ok(NetworkResult::value(SendDataMethod { + contact_method: NodeContactMethod::Existing, + opt_relayed_contact_method: None, + unique_flow, + })) + } + + /// Send data using NodeContactMethod::Unreachable + #[instrument(level = "trace", target = "net", skip_all, err)] + async fn send_data_ncm_unreachable( + &self, + target_node_ref: NodeRef, + data: Vec, + ) -> EyreResult> { + // Try to send data to the last socket we've seen this peer on + let Some(flow) = target_node_ref.last_flow() else { + return Ok(NetworkResult::no_connection_other(format!( + "Node is not reachable and has no existing connection: {}", + target_node_ref + ))); + }; + + let unique_flow = match self.net().send_data_to_existing_flow(flow, data).await? { + SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow, + SendDataToExistingFlowResult::NotSent(_) => { + return Ok(NetworkResult::no_connection_other(format!( + "failed to send to unreachable node over existing connection: {:?}", + flow + ))); + } + }; + + // Update timestamp for this last connection since we just sent to it + target_node_ref.set_last_flow(flow, Timestamp::now()); + Ok(NetworkResult::value(SendDataMethod { contact_method: NodeContactMethod::Existing, opt_relayed_contact_method: None, @@ -228,7 +225,7 @@ impl NetworkManager { } /// Send data using NodeContactMethod::SignalReverse - #[instrument(level="trace", target="net", skip_all, err)] + #[instrument(level = "trace", target = "net", skip_all, err)] async fn send_data_ncm_signal_reverse( &self, relay_nr: NodeRef, @@ -237,20 +234,15 @@ impl NetworkManager { ) -> EyreResult> { // First try to send data to the last socket we've seen this peer on let data = if let Some(flow) = target_node_ref.last_flow() { - match self - .net() - .send_data_to_existing_flow(flow, data) - .await? - { + match self.net().send_data_to_existing_flow(flow, data).await? { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it - target_node_ref - .set_last_flow(flow, Timestamp::now()); + target_node_ref.set_last_flow(flow, Timestamp::now()); - return Ok(NetworkResult::value(SendDataMethod{ + return Ok(NetworkResult::value(SendDataMethod { contact_method: NodeContactMethod::Existing, opt_relayed_contact_method: None, - unique_flow + unique_flow, })); } SendDataToExistingFlowResult::NotSent(data) => { @@ -276,7 +268,7 @@ impl NetworkManager { } /// Send data using NodeContactMethod::SignalHolePunch - #[instrument(level="trace", target="net", skip_all, err)] + #[instrument(level = "trace", target = "net", skip_all, err)] async fn send_data_ncm_signal_hole_punch( &self, relay_nr: NodeRef, @@ -285,20 +277,15 @@ impl NetworkManager { ) -> EyreResult> { // First try to send data to the last socket we've seen this peer on let data = if let Some(flow) = target_node_ref.last_flow() { - match self - .net() - .send_data_to_existing_flow(flow, data) - .await? - { + match self.net().send_data_to_existing_flow(flow, data).await? { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it - target_node_ref - .set_last_flow(flow, Timestamp::now()); + target_node_ref.set_last_flow(flow, Timestamp::now()); - return Ok(NetworkResult::value(SendDataMethod{ + return Ok(NetworkResult::value(SendDataMethod { contact_method: NodeContactMethod::Existing, opt_relayed_contact_method: None, - unique_flow + unique_flow, })); } SendDataToExistingFlowResult::NotSent(data) => { @@ -312,8 +299,10 @@ impl NetworkManager { data }; - let unique_flow = - network_result_try!(self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data).await?); + let unique_flow = network_result_try!( + self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data) + .await? + ); Ok(NetworkResult::value(SendDataMethod { contact_method: NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref), opt_relayed_contact_method: None, @@ -322,7 +311,7 @@ impl NetworkManager { } /// Send data using NodeContactMethod::Direct - #[instrument(level="trace", target="net", skip_all, err)] + #[instrument(level = "trace", target = "net", skip_all, err)] async fn send_data_ncm_direct( &self, node_ref: NodeRef, @@ -335,24 +324,20 @@ impl NetworkManager { // First try to send data to the last socket we've seen this peer on let data = if let Some(flow) = node_ref.last_flow() { #[cfg(feature = "verbose-tracing")] - log_net!(debug + log_net!(debug "ExistingConnection: {:?} for {:?}", flow, node_ref ); - match self - .net() - .send_data_to_existing_flow(flow, data) - .await? - { + match self.net().send_data_to_existing_flow(flow, data).await? { SendDataToExistingFlowResult::Sent(unique_flow) => { // Update timestamp for this last connection since we just sent to it node_ref.set_last_flow(flow, Timestamp::now()); - return Ok(NetworkResult::value(SendDataMethod{ + return Ok(NetworkResult::value(SendDataMethod { contact_method: NodeContactMethod::Existing, opt_relayed_contact_method: None, - unique_flow + unique_flow, })); } SendDataToExistingFlowResult::NotSent(d) => { @@ -366,8 +351,11 @@ impl NetworkManager { }; // New direct connection was necessary for this dial info - let unique_flow = - network_result_try!(self.net().send_data_to_dial_info(dial_info.clone(), data).await?); + let unique_flow = network_result_try!( + self.net() + .send_data_to_dial_info(dial_info.clone(), data) + .await? + ); // If we connected to this node directly, save off the last connection so we can use it again node_ref.set_last_flow(unique_flow.flow, Timestamp::now()); @@ -376,13 +364,13 @@ impl NetworkManager { contact_method: NodeContactMethod::Direct(dial_info), opt_relayed_contact_method: None, unique_flow, - })) + })) } /// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access /// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not /// allowed to use NodeRefs due to recursive locking - #[instrument(level="trace", target="net", skip_all, err)] + #[instrument(level = "trace", target = "net", skip_all, err)] pub(crate) fn get_node_contact_method( &self, target_node_ref: NodeRef, @@ -390,7 +378,11 @@ impl NetworkManager { let routing_table = self.routing_table(); // If a node is punished, then don't try to contact it - if target_node_ref.node_ids().iter().any(|nid| self.address_filter().is_node_id_punished(*nid)) { + if target_node_ref + .node_ids() + .iter() + .any(|nid| self.address_filter().is_node_id_punished(*nid)) + { return Ok(NodeContactMethod::Unreachable); } @@ -432,9 +424,10 @@ impl NetworkManager { let dial_info_filter = target_node_ref.dial_info_filter().filtered( &DialInfoFilter::all() .with_address_type_set(peer_a.signed_node_info().node_info().address_types()) - .with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols())); + .with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()), + ); let sequencing = target_node_ref.sequencing(); - + // If the node has had lost questions or failures to send, prefer sequencing // to improve reliability. The node may be experiencing UDP fragmentation drops // or other firewalling issues and may perform better with TCP. @@ -443,11 +436,15 @@ impl NetworkManager { // log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan()); // sequencing = Sequencing::PreferOrdered; // } - + // Deprioritize dial info that have recently failed let address_filter = self.address_filter(); let mut dial_info_failures_map = BTreeMap::::new(); - for did in peer_b.signed_node_info().node_info().all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |_| true) { + for did in peer_b + .signed_node_info() + .node_info() + .all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |_| true) + { if let Some(ts) = address_filter.get_dial_info_failed_ts(&did.dial_info) { dial_info_failures_map.insert(did.dial_info, ts); } @@ -455,9 +452,15 @@ impl NetworkManager { let dif_sort: Option> = if dial_info_failures_map.is_empty() { None } else { - Some(Arc::new(move |a: &DialInfoDetail, b: &DialInfoDetail| { - let ats = dial_info_failures_map.get(&a.dial_info).copied().unwrap_or_default(); - let bts = dial_info_failures_map.get(&b.dial_info).copied().unwrap_or_default(); + Some(Arc::new(move |a: &DialInfoDetail, b: &DialInfoDetail| { + let ats = dial_info_failures_map + .get(&a.dial_info) + .copied() + .unwrap_or_default(); + let bts = dial_info_failures_map + .get(&b.dial_info) + .copied() + .unwrap_or_default(); ats.cmp(&bts) })) }; @@ -491,7 +494,8 @@ impl NetworkManager { bail!("signalreverse target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key ); } relay_nr.set_sequencing(sequencing); - let target_node_ref = target_node_ref.filtered_clone(NodeRefFilter::from(dial_info_filter)); + let target_node_ref = + target_node_ref.filtered_clone(NodeRefFilter::from(dial_info_filter)); NodeContactMethod::SignalReverse(relay_nr, target_node_ref) } ContactMethod::SignalHolePunch(relay_key, target_key) => { @@ -511,8 +515,11 @@ impl NetworkManager { // if any other protocol were possible here we could update this and do_hole_punch // but tcp hole punch is very very unreliable it seems - let udp_target_node_ref = target_node_ref - .filtered_clone(NodeRefFilter::new().with_dial_info_filter(dial_info_filter).with_protocol_type(ProtocolType::UDP)); + let udp_target_node_ref = target_node_ref.filtered_clone( + NodeRefFilter::new() + .with_dial_info_filter(dial_info_filter) + .with_protocol_type(ProtocolType::UDP), + ); NodeContactMethod::SignalHolePunch(relay_nr, udp_target_node_ref) } @@ -555,14 +562,13 @@ impl NetworkManager { /// Send a reverse connection signal and wait for the return receipt over it /// Then send the data across the new connection /// Only usable for PublicInternet routing domain - #[instrument(level="trace", target="net", skip_all, err)] + #[instrument(level = "trace", target = "net", skip_all, err)] async fn do_reverse_connect( &self, relay_nr: NodeRef, target_nr: NodeRef, data: Vec, ) -> EyreResult> { - // Detect if network is stopping so we can break out of this let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else { return Ok(NetworkResult::service_unavailable("network is stopping")); @@ -580,18 +586,20 @@ impl NetworkManager { // Get target routing domain let Some(routing_domain) = target_nr.best_routing_domain() else { - return Ok(NetworkResult::no_connection_other("No routing domain for target for reverse connect")); + return Ok(NetworkResult::no_connection_other( + "No routing domain for target for reverse connect", + )); }; // Ensure we have a valid network class so our peer info is useful - if !self.routing_table().has_valid_network_class(routing_domain){ - return Ok(NetworkResult::no_connection_other("Network class not yet valid for reverse connect")); + if !self.routing_table().has_valid_network_class(routing_domain) { + return Ok(NetworkResult::no_connection_other( + "Network class not yet valid for reverse connect", + )); }; // Get our peer info - let peer_info = self - .routing_table() - .get_own_peer_info(routing_domain); + let peer_info = self.routing_table().get_own_peer_info(routing_domain); // Issue the signal let rpc = self.rpc_processor(); @@ -604,7 +612,11 @@ impl NetworkManager { .wrap_err("failed to send signal")?); // Wait for the return receipt - let inbound_nr = match eventual_value.timeout_at(stop_token).in_current_span().await { + let inbound_nr = match eventual_value + .timeout_at(stop_token) + .in_current_span() + .await + { Err(_) => { return Ok(NetworkResult::service_unavailable("network is stopping")); } @@ -640,27 +652,26 @@ impl NetworkManager { // And now use the existing connection to send over if let Some(flow) = inbound_nr.last_flow() { - match self - .net() - .send_data_to_existing_flow(flow, data) - .await? - { - SendDataToExistingFlowResult::Sent(unique_flow) => Ok(NetworkResult::value(unique_flow)), + match self.net().send_data_to_existing_flow(flow, data).await? { + SendDataToExistingFlowResult::Sent(unique_flow) => { + Ok(NetworkResult::value(unique_flow)) + } SendDataToExistingFlowResult::NotSent(_) => Ok(NetworkResult::no_connection_other( "unable to send over reverse connection", )), } } else { return Ok(NetworkResult::no_connection_other(format!( - "reverse connection dropped from {}", target_nr) - )); + "reverse connection dropped from {}", + target_nr + ))); } } /// Send a hole punch signal and do a negotiating ping and wait for the return receipt /// Then send the data across the new connection /// Only usable for PublicInternet routing domain - #[instrument(level="trace", target="net", skip_all, err)] + #[instrument(level = "trace", target = "net", skip_all, err)] async fn do_hole_punch( &self, relay_nr: NodeRef, @@ -691,18 +702,20 @@ impl NetworkManager { // Get target routing domain let Some(routing_domain) = target_nr.best_routing_domain() else { - return Ok(NetworkResult::no_connection_other("No routing domain for target for hole punch")); + return Ok(NetworkResult::no_connection_other( + "No routing domain for target for hole punch", + )); }; // Ensure we have a valid network class so our peer info is useful - if !self.routing_table().has_valid_network_class(routing_domain){ - return Ok(NetworkResult::no_connection_other("Network class not yet valid for hole punch")); + if !self.routing_table().has_valid_network_class(routing_domain) { + return Ok(NetworkResult::no_connection_other( + "Network class not yet valid for hole punch", + )); }; // Get our peer info - let peer_info = self - .routing_table() - .get_own_peer_info(routing_domain); + let peer_info = self.routing_table().get_own_peer_info(routing_domain); // Get the udp direct dialinfo for the hole punch let hole_punch_did = target_nr @@ -730,7 +743,11 @@ impl NetworkManager { .wrap_err("failed to send signal")?); // Wait for the return receipt - let inbound_nr = match eventual_value.timeout_at(stop_token).in_current_span().await { + let inbound_nr = match eventual_value + .timeout_at(stop_token) + .in_current_span() + .await + { Err(_) => { return Ok(NetworkResult::service_unavailable("network is stopping")); } @@ -770,20 +787,19 @@ impl NetworkManager { // And now use the existing connection to send over if let Some(flow) = inbound_nr.last_flow() { - match self - .net() - .send_data_to_existing_flow(flow, data) - .await? - { - SendDataToExistingFlowResult::Sent(unique_flow) => Ok(NetworkResult::value(unique_flow)), + match self.net().send_data_to_existing_flow(flow, data).await? { + SendDataToExistingFlowResult::Sent(unique_flow) => { + Ok(NetworkResult::value(unique_flow)) + } SendDataToExistingFlowResult::NotSent(_) => Ok(NetworkResult::no_connection_other( "unable to send over hole punch", )), } } else { return Ok(NetworkResult::no_connection_other(format!( - "hole punch dropped from {}", target_nr) - )); + "hole punch dropped from {}", + target_nr + ))); } } } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 94bba8c2..d0b8979e 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -1,7 +1,6 @@ use super::*; use core::sync::atomic::{AtomicU32, Ordering}; - /// Reliable pings are done with increased spacing between pings /// - Start secs is the number of seconds between the first two pings @@ -75,12 +74,10 @@ impl BucketEntryState { BucketEntryState::Reliable => 3, } } - } impl From for BucketEntryState { - fn from(value: BucketEntryStateReason) -> Self - { + fn from(value: BucketEntryStateReason) -> Self { match value { BucketEntryStateReason::Punished(_) => BucketEntryState::Punished, BucketEntryStateReason::Dead(_) => BucketEntryState::Dead, @@ -90,7 +87,6 @@ impl From for BucketEntryState { } } - #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] pub(crate) struct LastFlowKey(ProtocolType, AddressType); @@ -199,7 +195,7 @@ impl BucketEntryInner { return Ok(None); } // Won't change number of crypto kinds - node_ids.add(node_id); + node_ids.add(node_id); return Ok(Some(old_node_id)); } // Check to ensure we aren't adding more crypto kinds than we support @@ -223,7 +219,11 @@ impl BucketEntryInner { } /// All-of capability check - pub fn has_all_capabilities(&self, routing_domain: RoutingDomain, capabilities: &[Capability]) -> bool { + pub fn has_all_capabilities( + &self, + routing_domain: RoutingDomain, + capabilities: &[Capability], + ) -> bool { let Some(ni) = self.node_info(routing_domain) else { return false; }; @@ -231,7 +231,11 @@ impl BucketEntryInner { } /// Any-of capability check - pub fn has_any_capabilities(&self, routing_domain: RoutingDomain, capabilities: &[Capability]) -> bool { + pub fn has_any_capabilities( + &self, + routing_domain: RoutingDomain, + capabilities: &[Capability], + ) -> bool { let Some(ni) = self.node_info(routing_domain) else { return false; }; @@ -300,7 +304,9 @@ impl BucketEntryInner { } #[allow(dead_code)] - pub fn sort_fastest_reliable_fn(cur_ts: Timestamp) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering { + pub fn sort_fastest_reliable_fn( + cur_ts: Timestamp, + ) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering { move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2) } @@ -355,7 +361,7 @@ impl BucketEntryInner { // Update the envelope version support we have to use let envelope_support = signed_node_info.node_info().envelope_support().to_vec(); - + // Update the signed node info // Let the node try to live again but don't mark it as seen yet *opt_current_sni = Some(Box::new(signed_node_info)); @@ -363,7 +369,7 @@ impl BucketEntryInner { self.updated_since_last_network_change = true; self.make_not_dead(Timestamp::now()); - // If we're updating an entry's node info, purge all + // If we're updating an entry's node info, purge all // but the last connection in our last connections list // because the dial info could have changed and it's safer to just reconnect. // The latest connection would have been the one we got the new node info @@ -398,11 +404,7 @@ impl BucketEntryInner { } // Check connections - let last_flows = self.last_flows( - rti, - true, - NodeRefFilter::from(routing_domain), - ); + let last_flows = self.last_flows(rti, true, NodeRefFilter::from(routing_domain)); !last_flows.is_empty() } @@ -429,10 +431,9 @@ impl BucketEntryInner { }; // Peer info includes all node ids, even unvalidated ones let node_ids = self.node_ids(); - opt_current_sni.as_ref().map(|s| PeerInfo::new( - node_ids, - *s.clone(), - )) + opt_current_sni + .as_ref() + .map(|s| PeerInfo::new(node_ids, *s.clone())) } pub fn best_routing_domain( @@ -452,15 +453,9 @@ impl BucketEntryInner { } // Check connections let mut best_routing_domain: Option = None; - let last_connections = self.last_flows( - rti, - true, - NodeRefFilter::from(routing_domain_set), - ); + let last_connections = self.last_flows(rti, true, NodeRefFilter::from(routing_domain_set)); for lc in last_connections { - if let Some(rd) = - rti.routing_domain_for_address(lc.0.remote_address().address()) - { + if let Some(rd) = rti.routing_domain_for_address(lc.0.remote_address().address()) { if let Some(brd) = best_routing_domain { if rd < brd { best_routing_domain = Some(rd); @@ -474,10 +469,7 @@ impl BucketEntryInner { } fn flow_to_key(&self, last_flow: Flow) -> LastFlowKey { - LastFlowKey( - last_flow.protocol_type(), - last_flow.address_type(), - ) + LastFlowKey(last_flow.protocol_type(), last_flow.address_type()) } // Stores a flow in this entry's table of last flows @@ -487,15 +479,13 @@ impl BucketEntryInner { return; } let key = self.flow_to_key(last_flow); - self.last_flows - .insert(key, (last_flow, timestamp)); + self.last_flows.insert(key, (last_flow, timestamp)); } - // Removes a flow in this entry's table of last flows + // Removes a flow in this entry's table of last flows pub fn remove_last_flow(&mut self, last_flow: Flow) { let key = self.flow_to_key(last_flow); - self.last_flows - .remove(&key); + self.last_flows.remove(&key); } // Clears the table of last flows to ensure we create new ones and drop any existing ones @@ -509,7 +499,7 @@ impl BucketEntryInner { // No last_connections return; } - let mut dead_keys = Vec::with_capacity(self.last_flows.len()-1); + let mut dead_keys = Vec::with_capacity(self.last_flows.len() - 1); let mut most_recent_flow = None; let mut most_recent_flow_time = 0u64; for (k, v) in &self.last_flows { @@ -539,8 +529,7 @@ impl BucketEntryInner { only_live: bool, filter: NodeRefFilter, ) -> Vec<(Flow, Timestamp)> { - let opt_connection_manager = - rti.unlocked_inner.network_manager.opt_connection_manager(); + let opt_connection_manager = rti.unlocked_inner.network_manager.opt_connection_manager(); let mut out: Vec<(Flow, Timestamp)> = self .last_flows @@ -588,9 +577,7 @@ impl BucketEntryInner { }) .collect(); // Sort with newest timestamps - out.sort_by(|a, b| { - b.1.cmp(&a.1) - }); + out.sort_by(|a, b| b.1.cmp(&a.1)); out } @@ -615,7 +602,11 @@ impl BucketEntryInner { } pub fn best_envelope_version(&self) -> Option { - self.envelope_support.iter().rev().find(|x| VALID_ENVELOPE_VERSIONS.contains(x)).copied() + self.envelope_support + .iter() + .rev() + .find(|x| VALID_ENVELOPE_VERSIONS.contains(x)) + .copied() } pub fn state_reason(&self, cur_ts: Timestamp) -> BucketEntryStateReason { @@ -657,14 +648,8 @@ impl BucketEntryInner { } pub fn node_status(&self, routing_domain: RoutingDomain) -> Option { match routing_domain { - RoutingDomain::LocalNetwork => self - .local_network - .node_status - .as_ref().cloned(), - RoutingDomain::PublicInternet => self - .public_internet - .node_status - .as_ref().cloned() + RoutingDomain::LocalNetwork => self.local_network.node_status.as_ref().cloned(), + RoutingDomain::PublicInternet => self.public_internet.node_status.as_ref().cloned(), } } @@ -714,7 +699,10 @@ impl BucketEntryInner { } ///// state machine handling - pub(super) fn check_unreliable(&self, cur_ts: Timestamp) -> Option { + pub(super) fn check_unreliable( + &self, + cur_ts: Timestamp, + ) -> Option { // If we have had any failures to send, this is not reliable if self.peer_stats.rpc_stats.failed_to_send > 0 { return Some(BucketEntryUnreliableReason::FailedToSend); @@ -730,7 +718,8 @@ impl BucketEntryInner { None => return Some(BucketEntryUnreliableReason::NotSeenConsecutively), // If not have seen the node consistently for longer than UNRELIABLE_PING_SPAN_SECS then it is unreliable Some(ts) => { - let seen_consecutively = cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1_000_000u64); + let seen_consecutively = cur_ts.saturating_sub(ts) + >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1_000_000u64); if !seen_consecutively { return Some(BucketEntryUnreliableReason::InUnreliablePingSpan); } @@ -749,20 +738,23 @@ impl BucketEntryInner { // a node is not dead if we haven't heard from it yet, // but we give it NEVER_REACHED_PING_COUNT chances to ping before we say it's dead None => { - let no_answers = self.peer_stats.rpc_stats.recent_lost_answers >= NEVER_SEEN_PING_COUNT; + let no_answers = + self.peer_stats.rpc_stats.recent_lost_answers >= NEVER_SEEN_PING_COUNT; if no_answers { - return Some(BucketEntryDeadReason::TooManyLostAnswers) + return Some(BucketEntryDeadReason::TooManyLostAnswers); } } - + // return dead if we have not heard from the node at all for the duration of the unreliable ping span // and we have tried to reach it and failed the entire time of unreliable ping span Some(ts) => { - let not_seen = cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1_000_000u64); - let no_answers = self.peer_stats.rpc_stats.recent_lost_answers >= (UNRELIABLE_PING_SPAN_SECS / UNRELIABLE_PING_INTERVAL_SECS); + let not_seen = cur_ts.saturating_sub(ts) + >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1_000_000u64); + let no_answers = self.peer_stats.rpc_stats.recent_lost_answers + >= (UNRELIABLE_PING_SPAN_SECS / UNRELIABLE_PING_INTERVAL_SECS); if not_seen && no_answers { - return Some(BucketEntryDeadReason::NoPingResponse) - } + return Some(BucketEntryDeadReason::NoPingResponse); + } } } @@ -794,7 +786,7 @@ impl BucketEntryInner { pub(super) fn needs_ping(&self, cur_ts: Timestamp) -> bool { // See which ping pattern we are to use let state = self.state(cur_ts); - + match state { BucketEntryState::Reliable => { // If we are in a reliable state, we need a ping on an exponential scale @@ -809,7 +801,9 @@ impl BucketEntryInner { let first_consecutive_seen_ts = self.peer_stats.rpc_stats.first_consecutive_seen_ts.unwrap(); let start_of_reliable_time = first_consecutive_seen_ts - + TimestampDuration::new_secs(UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS); + + TimestampDuration::new_secs( + UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS, + ); let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time); let reliable_last = latest_contact_time.saturating_sub(start_of_reliable_time); @@ -826,7 +820,10 @@ impl BucketEntryInner { } BucketEntryState::Unreliable => { // If we are in an unreliable state, we need a ping every UNRELIABLE_PING_INTERVAL_SECS seconds - self.needs_constant_ping(cur_ts, TimestampDuration::new(UNRELIABLE_PING_INTERVAL_SECS as u64 * 1_000_000u64)) + self.needs_constant_ping( + cur_ts, + TimestampDuration::new(UNRELIABLE_PING_INTERVAL_SECS as u64 * 1_000_000u64), + ) } BucketEntryState::Dead => { error!("Should not be asking this for dead nodes"); @@ -836,7 +833,6 @@ impl BucketEntryInner { error!("Should not be asking this for punished nodes"); false } - } } @@ -941,7 +937,6 @@ pub(crate) struct BucketEntry { impl BucketEntry { pub(super) fn new(first_node_id: TypedKey) -> Self { - // First node id should always be one we support since TypedKeySets are sorted and we must have at least one supported key assert!(VALID_CRYPTO_KINDS.contains(&first_node_id.kind)); diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index 85537830..19be5262 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -254,26 +254,28 @@ impl RoutingTable { } //#[instrument(level = "trace", skip(self), err)] - pub(crate) fn bootstrap_with_peer(self, crypto_kinds: Vec, pi: PeerInfo, unord: &FuturesUnordered>) { - + pub(crate) fn bootstrap_with_peer( + self, + crypto_kinds: Vec, + pi: PeerInfo, + unord: &FuturesUnordered>, + ) { log_rtab!( "--- bootstrapping {} with {:?}", pi.node_ids(), pi.signed_node_info().node_info().dial_info_detail_list() ); - let nr = - match self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, true) { + let nr = match self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, true) { Ok(nr) => nr, Err(e) => { log_rtab!(error "failed to register bootstrap peer info: {}", e); return; } }; - + // Add this our futures to process in parallel for crypto_kind in crypto_kinds { - // Bootstrap this crypto kind let nr = nr.clone(); let routing_table = self.clone(); @@ -320,8 +322,11 @@ impl RoutingTable { } #[instrument(level = "trace", skip(self), err)] - pub(crate) async fn bootstrap_with_peer_list(self, peers: Vec, stop_token: StopToken) -> EyreResult<()> { - + pub(crate) async fn bootstrap_with_peer_list( + self, + peers: Vec, + stop_token: StopToken, + ) -> EyreResult<()> { log_rtab!(debug " bootstrapped peers: {:?}", &peers); // Get crypto kinds to bootstrap @@ -332,7 +337,8 @@ impl RoutingTable { // Run all bootstrap operations concurrently let mut unord = FuturesUnordered::>::new(); for peer in peers { - self.clone().bootstrap_with_peer(crypto_kinds.clone(), peer, &unord); + self.clone() + .bootstrap_with_peer(crypto_kinds.clone(), peer, &unord); } // Wait for all bootstrap operations to complete before we complete the singlefuture @@ -355,7 +361,6 @@ impl RoutingTable { crypto_kinds } - #[instrument(level = "trace", skip(self), err)] pub(crate) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> { let bootstrap = self @@ -378,7 +383,7 @@ impl RoutingTable { } } } - + // Get a peer list from bootstrap to process let peers = if !bootstrap_dialinfos.is_empty() { // Direct bootstrap @@ -398,28 +403,34 @@ impl RoutingTable { } else { // If not direct, resolve bootstrap servers and recurse their TXT entries let bsrecs = self.resolve_bootstrap(bootstrap).await?; - let peers : Vec = bsrecs.into_iter().map(|bsrec| { - // Get crypto support from list of node ids - let crypto_support = bsrec.node_ids.kinds(); + let peers: Vec = bsrecs + .into_iter() + .map(|bsrec| { + // Get crypto support from list of node ids + let crypto_support = bsrec.node_ids.kinds(); - // Make unsigned SignedNodeInfo - let sni = - SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo::new( - NetworkClass::InboundCapable, // Bootstraps are always inbound capable - ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled - AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable - bsrec.envelope_support, // Envelope support is as specified in the bootstrap list - crypto_support, // Crypto support is derived from list of node ids - vec![], // Bootstrap needs no capabilities - bsrec.dial_info_details, // Dial info is as specified in the bootstrap list - ))); + // Make unsigned SignedNodeInfo + let sni = SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature( + NodeInfo::new( + NetworkClass::InboundCapable, // Bootstraps are always inbound capable + ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled + AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable + bsrec.envelope_support, // Envelope support is as specified in the bootstrap list + crypto_support, // Crypto support is derived from list of node ids + vec![], // Bootstrap needs no capabilities + bsrec.dial_info_details, // Dial info is as specified in the bootstrap list + ), + )); - PeerInfo::new(bsrec.node_ids, sni) - }).collect(); + PeerInfo::new(bsrec.node_ids, sni) + }) + .collect(); peers }; - self.clone().bootstrap_with_peer_list(peers, stop_token).await + self.clone() + .bootstrap_with_peer_list(peers, stop_token) + .await } } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index ed322ccc..0d55628a 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -32,18 +32,18 @@ mod rpc_start_tunnel; pub(crate) use coders::*; pub(crate) use destination::*; +pub(crate) use fanout_call::*; pub(crate) use operation_waiter::*; pub(crate) use rpc_error::*; pub(crate) use rpc_status::*; -pub(crate) use fanout_call::*; use super::*; use crypto::*; +use fanout_queue::*; use futures_util::StreamExt; use network_manager::*; use routing_table::*; -use fanout_queue::*; use stop_token::future::FutureExt; use storage_manager::*; @@ -53,7 +53,7 @@ use storage_manager::*; struct RPCMessageHeaderDetailDirect { /// The decoded header of the envelope envelope: Envelope, - /// The noderef of the peer that sent the message (not the original sender). + /// The noderef of the peer that sent the message (not the original sender). /// Ensures node doesn't get evicted from routing table until we're done with it /// Should be filted to the routing domain of the peer that we received from peer_noderef: NodeRef, @@ -171,14 +171,13 @@ pub(crate) struct RPCMessage { opt_sender_nr: Option, } -#[instrument(level="trace", target="rpc", skip_all, err)] +#[instrument(level = "trace", target = "rpc", skip_all, err)] pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder) -> Result, RPCError> where T: capnp::message::Allocator + 'a, { let mut buffer = vec![]; - capnp::serialize_packed::write_message(&mut buffer, &builder) - .map_err(RPCError::protocol)?; + capnp::serialize_packed::write_message(&mut buffer, &builder).map_err(RPCError::protocol)?; Ok(buffer) } @@ -200,15 +199,23 @@ struct WaitableReply { #[derive(Clone, Debug, Default)] pub struct Answer { /// Hpw long it took to get this answer - pub _latency: TimestampDuration, + pub _latency: TimestampDuration, /// The private route requested to receive the reply pub reply_private_route: Option, /// The answer itself - pub answer: T, + pub answer: T, } impl Answer { - pub fn new(latency: TimestampDuration, reply_private_route: Option, answer: T) -> Self { - Self { _latency: latency, reply_private_route, answer } + pub fn new( + latency: TimestampDuration, + reply_private_route: Option, + answer: T, + ) -> Self { + Self { + _latency: latency, + reply_private_route, + answer, + } } } @@ -395,11 +402,10 @@ impl RPCProcessor { for task_n in 0..self.unlocked_inner.concurrency { let this = self.clone(); let receiver = channel.1.clone(); - let jh = spawn(&format!("rpc worker {}",task_n), Self::rpc_worker( - this, - inner.stop_source.as_ref().unwrap().token(), - receiver, - )); + let jh = spawn( + &format!("rpc worker {}", task_n), + Self::rpc_worker(this, inner.stop_source.as_ref().unwrap().token(), receiver), + ); inner.worker_join_handles.push(jh); } } @@ -408,7 +414,7 @@ impl RPCProcessor { self.storage_manager .set_rpc_processor(Some(self.clone())) .await; - + guard.success(); Ok(()) } @@ -453,11 +459,13 @@ impl RPCProcessor { /// Get waiting app call id for debugging purposes pub fn get_app_call_ids(&self) -> Vec { - self.unlocked_inner.waiting_app_call_table.get_operation_ids() + self.unlocked_inner + .waiting_app_call_table + .get_operation_ids() } /// Determine if a SignedNodeInfo can be placed into the specified routing domain - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn verify_node_info( &self, routing_domain: RoutingDomain, @@ -466,14 +474,16 @@ impl RPCProcessor { ) -> bool { let routing_table = self.routing_table(); routing_table.signed_node_info_is_valid_in_routing_domain(routing_domain, signed_node_info) - && signed_node_info.node_info().has_all_capabilities(capabilities) + && signed_node_info + .node_info() + .has_all_capabilities(capabilities) } ////////////////////////////////////////////////////////////////////// /// Search the network for a single node and add it to the routing table and return the node reference /// If no node was found in the timeout, this returns None - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] async fn search_for_node_id( &self, node_id: TypedKey, @@ -493,33 +503,32 @@ impl RPCProcessor { let call_routine = |next_node: NodeRef| { let this = self.clone(); async move { - let v = network_result_try!(this - .clone() - .rpc_call_find_node( - Destination::direct(next_node).with_safety(safety_selection), - node_id, - vec![], - ) - .await?); + let v = network_result_try!( + this.clone() + .rpc_call_find_node( + Destination::direct(next_node).with_safety(safety_selection), + node_id, + vec![], + ) + .await? + ); Ok(NetworkResult::value(v.answer)) } }; // Routine to call to check if we're done at each step - let check_done = |_:&[NodeRef]| { - let Ok(Some(nr)) = routing_table - .lookup_node_ref(node_id) else { - return None; - }; - + let check_done = |_: &[NodeRef]| { + let Ok(Some(nr)) = routing_table.lookup_node_ref(node_id) else { + return None; + }; + // ensure we have some dial info for the entry already, // and that the node is still alive // if not, we should keep looking for better info - if nr.state(Timestamp::now()).is_alive() && - nr.has_any_dial_info() { + if nr.state(Timestamp::now()).is_alive() && nr.has_any_dial_info() { return Some(nr); } - + None }; @@ -540,60 +549,66 @@ impl RPCProcessor { /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference /// Note: This routine can possibly be recursive, hence the SendPinBoxFuture async form - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] pub fn resolve_node( &self, node_id: TypedKey, safety_selection: SafetySelection, - ) -> SendPinBoxFuture, RPCError>> { + ) -> SendPinBoxFuture, RPCError>> { let this = self.clone(); - Box::pin(async move { - let _guard = this.unlocked_inner.startup_lock.enter().map_err(RPCError::map_try_again("not started up"))?; + Box::pin( + async move { + let _guard = this + .unlocked_inner + .startup_lock + .enter() + .map_err(RPCError::map_try_again("not started up"))?; - let routing_table = this.routing_table(); + let routing_table = this.routing_table(); - // First see if we have the node in our routing table already - if let Some(nr) = routing_table - .lookup_node_ref(node_id) - .map_err(RPCError::internal)? - { - // ensure we have some dial info for the entry already, - // and that the node is still alive - // if not, we should do the find_node anyway - if nr.state(Timestamp::now()).is_alive() && - nr.has_any_dial_info() { - return Ok(Some(nr)); + // First see if we have the node in our routing table already + if let Some(nr) = routing_table + .lookup_node_ref(node_id) + .map_err(RPCError::internal)? + { + // ensure we have some dial info for the entry already, + // and that the node is still alive + // if not, we should do the find_node anyway + if nr.state(Timestamp::now()).is_alive() && nr.has_any_dial_info() { + return Ok(Some(nr)); + } } + + // If nobody knows where this node is, ask the DHT for it + let (node_count, _consensus_count, fanout, timeout) = { + let c = this.config.get(); + ( + c.network.dht.max_find_node_count as usize, + c.network.dht.resolve_node_count as usize, + c.network.dht.resolve_node_fanout as usize, + TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)), + ) + }; + + // Search in preferred cryptosystem order + let nr = match this + .search_for_node_id(node_id, node_count, fanout, timeout, safety_selection) + .await + { + TimeoutOr::Timeout => None, + TimeoutOr::Value(Ok(v)) => v, + TimeoutOr::Value(Err(e)) => { + return Err(e); + } + }; + + Ok(nr) } - - // If nobody knows where this node is, ask the DHT for it - let (node_count, _consensus_count, fanout, timeout) = { - let c = this.config.get(); - ( - c.network.dht.max_find_node_count as usize, - c.network.dht.resolve_node_count as usize, - c.network.dht.resolve_node_fanout as usize, - TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)), - ) - }; - - // Search in preferred cryptosystem order - let nr = match this - .search_for_node_id(node_id, node_count, fanout, timeout, safety_selection) - .await - { - TimeoutOr::Timeout => None, - TimeoutOr::Value(Ok(v)) => v, - TimeoutOr::Value(Err(e)) => { - return Err(e); - } - }; - - Ok(nr) - }.in_current_span()) + .in_current_span(), + ) } - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] async fn wait_for_reply( &self, waitable_reply: WaitableReply, @@ -645,17 +660,25 @@ impl RPCProcessor { if let Some(reply_private_route) = waitable_reply.reply_private_route { match &rpcreader.header.detail { RPCMessageHeaderDetail::Direct(_) => { - return Err(RPCError::protocol("should have received reply over private route or stub")); - }, + return Err(RPCError::protocol( + "should have received reply over private route or stub", + )); + } RPCMessageHeaderDetail::SafetyRouted(sr) => { - let node_id = self.routing_table.node_id(sr.direct.envelope.get_crypto_kind()); + let node_id = self + .routing_table + .node_id(sr.direct.envelope.get_crypto_kind()); if node_id.value != reply_private_route { - return Err(RPCError::protocol("should have received reply from safety route to a stub")); + return Err(RPCError::protocol( + "should have received reply from safety route to a stub", + )); } - }, + } RPCMessageHeaderDetail::PrivateRouted(pr) => { if pr.private_route != reply_private_route { - return Err(RPCError::protocol("received reply over the wrong private route")); + return Err(RPCError::protocol( + "received reply over the wrong private route", + )); } } }; @@ -666,7 +689,7 @@ impl RPCProcessor { } /// Wrap an operation with a private route inside a safety route - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn wrap_with_route( &self, safety_selection: SafetySelection, @@ -683,12 +706,15 @@ impl RPCProcessor { let pr_pubkey = remote_private_route.public_key.value; let crypto_kind = remote_private_route.crypto_kind(); let Some(vcrypto) = self.crypto.get(crypto_kind) else { - return Err(RPCError::internal("crypto not available for selected private route")); + return Err(RPCError::internal( + "crypto not available for selected private route", + )); }; // Compile the safety route with the private route let compiled_route: CompiledRoute = network_result_try!(rss - .compile_safety_route(safety_selection, remote_private_route).to_rpc_network_result()?); + .compile_safety_route(safety_selection, remote_private_route) + .to_rpc_network_result()?); let sr_is_stub = compiled_route.safety_route.is_stub(); let sr_pubkey = compiled_route.safety_route.public_key.value; @@ -741,12 +767,12 @@ impl RPCProcessor { /// Produce a byte buffer that represents the wire encoding of the entire /// unencrypted envelope body for a RPC message. This incorporates /// wrapping a private and/or safety route if they are specified. - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn render_operation( &self, dest: Destination, operation: &RPCOperation, - ) ->RPCNetworkResult { + ) -> RPCNetworkResult { let out: NetworkResult; // Encode message to a builder and make a message reader for it @@ -873,14 +899,17 @@ impl RPCProcessor { /// routing table caching when it is okay to do so /// Also check target's timestamp of our own node info, to see if we should send that /// And send our timestamp of the target's node info so they can determine if they should update us on their next rpc - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn get_sender_peer_info(&self, dest: &Destination) -> SenderPeerInfo { // Don't do this if the sender is to remain private // Otherwise we would be attaching the original sender's identity to the final destination, // thus defeating the purpose of the safety route entirely :P let Some(UnsafeRoutingInfo { - opt_node, opt_relay: _, opt_routing_domain - }) = dest.get_unsafe_routing_info(self.routing_table.clone()) else { + opt_node, + opt_relay: _, + opt_routing_domain, + }) = dest.get_unsafe_routing_info(self.routing_table.clone()) + else { return SenderPeerInfo::default(); }; let Some(node) = opt_node else { @@ -915,7 +944,7 @@ impl RPCProcessor { } /// Record failure to send to node or route - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn record_send_failure( &self, rpc_kind: RPCKind, @@ -925,7 +954,7 @@ impl RPCProcessor { remote_private_route: Option, ) { let wants_answer = matches!(rpc_kind, RPCKind::Question); - + // Record for node if this was not sent via a route if safety_route.is_none() && remote_private_route.is_none() { node_ref.stats_failed_to_send(send_ts, wants_answer); @@ -950,7 +979,7 @@ impl RPCProcessor { } /// Record question lost to node or route - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn record_question_lost( &self, send_ts: Timestamp, @@ -993,7 +1022,7 @@ impl RPCProcessor { } /// Record success sending to node or route - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn record_send_success( &self, rpc_kind: RPCKind, @@ -1037,7 +1066,7 @@ impl RPCProcessor { /// Record answer received from node or route #[allow(clippy::too_many_arguments)] - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn record_answer_received( &self, send_ts: Timestamp, @@ -1123,7 +1152,7 @@ impl RPCProcessor { } /// Record question or statement received from node or route - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn record_question_received(&self, msg: &RPCMessage) { let recv_ts = msg.header.timestamp; let bytes = msg.header.body_len; @@ -1168,7 +1197,7 @@ impl RPCProcessor { /// Issue a question over the network, possibly using an anonymized route /// Optionally keeps a context to be passed to the answer processor when an answer is received - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] async fn question( &self, dest: Destination, @@ -1248,12 +1277,12 @@ impl RPCProcessor { remote_private_route, ); - // Ref the connection so it doesn't go away until we're done with the waitable reply - let opt_connection_ref_scope = send_data_method.unique_flow.connection_id.and_then(|id| self - .network_manager() - .connection_manager() - .try_connection_ref_scope(id)); + let opt_connection_ref_scope = send_data_method.unique_flow.connection_id.and_then(|id| { + self.network_manager() + .connection_manager() + .try_connection_ref_scope(id) + }); // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { @@ -1270,12 +1299,8 @@ impl RPCProcessor { } /// Issue a statement over the network, possibly using an anonymized route - #[instrument(level="trace", target="rpc", skip_all)] - async fn statement( - &self, - dest: Destination, - statement: RPCStatement, - ) ->RPCNetworkResult<()> { + #[instrument(level = "trace", target = "rpc", skip_all)] + async fn statement(&self, dest: Destination, statement: RPCStatement) -> RPCNetworkResult<()> { // Get sender peer info if we should send that let spi = self.get_sender_peer_info(&dest); @@ -1342,13 +1367,8 @@ impl RPCProcessor { } /// Issue a reply over the network, possibly using an anonymized route /// The request must want a response, or this routine fails - #[instrument(level="trace", target="rpc", skip_all)] - async fn answer( - &self, - request: RPCMessage, - answer: RPCAnswer, - ) ->RPCNetworkResult<()> { - + #[instrument(level = "trace", target = "rpc", skip_all)] + async fn answer(&self, request: RPCMessage, answer: RPCAnswer) -> RPCNetworkResult<()> { // Extract destination from respond_to let dest = network_result_try!(self.get_respond_to_destination(&request)); @@ -1420,7 +1440,7 @@ impl RPCProcessor { /// Decoding RPC from the wire /// This performs a capnp decode on the data, and if it passes the capnp schema /// it performs the cryptographic validation required to pass the operation up for processing - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn decode_rpc_operation( &self, encoded_msg: &RPCMessageEncoded, @@ -1438,17 +1458,17 @@ impl RPCProcessor { } /// Cryptographic RPC validation and sanitization - /// + /// /// This code may modify the RPC operation to remove elements that are inappropriate for this node - /// or reject the RPC operation entirely. For example, PeerInfo in fanout peer lists may be + /// or reject the RPC operation entirely. For example, PeerInfo in fanout peer lists may be /// removed if they are deemed inappropriate for this node, without rejecting the entire operation. - /// + /// /// We do this as part of the RPC network layer to ensure that any RPC operations that are /// processed have already been validated cryptographically and it is not the job of the /// caller or receiver. This does not mean the operation is 'semantically correct'. For /// complex operations that require stateful validation and a more robust context than /// 'signatures', the caller must still perform whatever validation is necessary - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn validate_rpc_operation(&self, operation: &mut RPCOperation) -> Result<(), RPCError> { // If this is an answer, get the question context for this answer // If we received an answer for a question we did not ask, this will return an error @@ -1473,11 +1493,8 @@ impl RPCProcessor { } ////////////////////////////////////////////////////////////////////// - #[instrument(level="trace", target="rpc", skip_all)] - async fn process_rpc_message( - &self, - encoded_msg: RPCMessageEncoded, - ) ->RPCNetworkResult<()> { + #[instrument(level = "trace", target = "rpc", skip_all)] + async fn process_rpc_message(&self, encoded_msg: RPCMessageEncoded) -> RPCNetworkResult<()> { let address_filter = self.network_manager.address_filter(); // Decode operation appropriately based on header detail @@ -1497,19 +1514,22 @@ impl RPCProcessor { log_rpc!(debug "Invalid RPC Operation: {}", e); // Punish nodes that send direct undecodable crap - address_filter.punish_node_id(sender_node_id, PunishmentReason::FailedToDecodeOperation); - }, + address_filter.punish_node_id( + sender_node_id, + PunishmentReason::FailedToDecodeOperation, + ); + } // Ignored messages that should be dropped RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => { log_rpc!("Dropping RPC Operation: {}", e); - }, + } // Internal errors that deserve louder logging RPCError::Unimplemented(_) | RPCError::Internal(_) => { log_rpc!(error "Error decoding RPC operation: {}", e); } }; return Ok(NetworkResult::invalid_message(e)); - }, + } }; // Get the routing domain this message came over @@ -1521,7 +1541,8 @@ impl RPCProcessor { // Ensure the sender peer info is for the actual sender specified in the envelope if !sender_peer_info.node_ids().contains(&sender_node_id) { // Attempted to update peer info for the wrong node id - address_filter.punish_node_id(sender_node_id, PunishmentReason::WrongSenderPeerInfo); + address_filter + .punish_node_id(sender_node_id, PunishmentReason::WrongSenderPeerInfo); return Ok(NetworkResult::invalid_message( "attempt to update peer info for non-sender node id", )); @@ -1533,10 +1554,14 @@ impl RPCProcessor { sender_peer_info.signed_node_info(), &[], ) { - address_filter.punish_node_id(sender_node_id, PunishmentReason::FailedToVerifySenderPeerInfo); - return Ok(NetworkResult::invalid_message( - format!("sender peerinfo has invalid peer scope: {:?}",sender_peer_info.signed_node_info()) - )); + address_filter.punish_node_id( + sender_node_id, + PunishmentReason::FailedToVerifySenderPeerInfo, + ); + return Ok(NetworkResult::invalid_message(format!( + "sender peerinfo has invalid peer scope: {:?}", + sender_peer_info.signed_node_info() + ))); } opt_sender_nr = match self.routing_table().register_node_with_peer_info( routing_domain, @@ -1545,9 +1570,12 @@ impl RPCProcessor { ) { Ok(v) => Some(v), Err(e) => { - address_filter.punish_node_id(sender_node_id, PunishmentReason::FailedToRegisterSenderPeerInfo); + address_filter.punish_node_id( + sender_node_id, + PunishmentReason::FailedToRegisterSenderPeerInfo, + ); return Ok(NetworkResult::invalid_message(e)); - } + } } } @@ -1556,7 +1584,7 @@ impl RPCProcessor { opt_sender_nr = match self.routing_table().lookup_node_ref(sender_node_id) { Ok(v) => v, Err(e) => { - // If this fails it's not the other node's fault. We should be able to look up a + // If this fails it's not the other node's fault. We should be able to look up a // node ref for a registered sender node id that just sent a message to us return Ok(NetworkResult::no_connection_other(e)); } @@ -1608,26 +1636,26 @@ impl RPCProcessor { if let Some(sender_nr) = msg.opt_sender_nr.clone() { sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len); } - + // Log rpc receive #[cfg(feature = "verbose-tracing")] - debug!(target: "rpc_message", dir = "recv", kind = "question", op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header); + debug!(target: "rpc_message", dir = "recv", kind = "question", op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header); } RPCOperationKind::Statement(_) => { if let Some(sender_nr) = msg.opt_sender_nr.clone() { sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len); } - + // Log rpc receive #[cfg(feature = "verbose-tracing")] - debug!(target: "rpc_message", dir = "recv", kind = "statement", op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header); + debug!(target: "rpc_message", dir = "recv", kind = "statement", op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header); } RPCOperationKind::Answer(_) => { // Answer stats are processed in wait_for_reply // Log rpc receive #[cfg(feature = "verbose-tracing")] - debug!(target: "rpc_message", dir = "recv", kind = "answer", op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header); + debug!(target: "rpc_message", dir = "recv", kind = "answer", op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header); } }; @@ -1664,26 +1692,27 @@ impl RPCProcessor { }, RPCOperationKind::Answer(_) => { let op_id = msg.operation.op_id(); - if let Err(e) = self.unlocked_inner + if let Err(e) = self + .unlocked_inner .waiting_rpc_table - .complete_op_waiter(op_id, msg) { - match e { - RPCError::Unimplemented(_) | - RPCError::Internal(_) => { - log_rpc!(error "Could not complete rpc operation: id = {}: {}", op_id, e); - }, - RPCError::InvalidFormat(_) | - RPCError::Protocol(_) | - RPCError::Network(_) | - RPCError::TryAgain(_) => { - log_rpc!(debug "Could not complete rpc operation: id = {}: {}", op_id, e); - }, - RPCError::Ignore(_) => { - log_rpc!("Answer late: id = {}", op_id); - }, - }; - // Don't throw an error here because it's okay if the original operation timed out - } + .complete_op_waiter(op_id, msg) + { + match e { + RPCError::Unimplemented(_) | RPCError::Internal(_) => { + log_rpc!(error "Could not complete rpc operation: id = {}: {}", op_id, e); + } + RPCError::InvalidFormat(_) + | RPCError::Protocol(_) + | RPCError::Network(_) + | RPCError::TryAgain(_) => { + log_rpc!(debug "Could not complete rpc operation: id = {}: {}", op_id, e); + } + RPCError::Ignore(_) => { + log_rpc!("Answer late: id = {}", op_id); + } + }; + // Don't throw an error here because it's okay if the original operation timed out + } Ok(NetworkResult::value(())) } } @@ -1696,10 +1725,10 @@ impl RPCProcessor { ) { while let Ok(Ok((prev_span, msg))) = receiver.recv_async().timeout_at(stop_token.clone()).await - { + { let rpc_message_span = tracing::trace_span!("rpc message"); rpc_message_span.follows_from(prev_span); - + network_result_value_or_log!(match self .process_rpc_message(msg).instrument(rpc_message_span) .await @@ -1716,7 +1745,7 @@ impl RPCProcessor { } } - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] pub fn enqueue_direct_message( &self, envelope: Envelope, @@ -1725,7 +1754,11 @@ impl RPCProcessor { routing_domain: RoutingDomain, body: Vec, ) -> EyreResult<()> { - let _guard = self.unlocked_inner.startup_lock.enter().map_err(RPCError::map_try_again("not started up"))?; + let _guard = self + .unlocked_inner + .startup_lock + .enter() + .map_err(RPCError::map_try_again("not started up"))?; let header = RPCMessageHeader { detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect { @@ -1756,7 +1789,7 @@ impl RPCProcessor { Ok(()) } - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn enqueue_safety_routed_message( &self, direct: RPCMessageHeaderDetailDirect, @@ -1791,7 +1824,7 @@ impl RPCProcessor { Ok(()) } - #[instrument(level="trace", target="rpc", skip_all)] + #[instrument(level = "trace", target = "rpc", skip_all)] fn enqueue_private_routed_message( &self, direct: RPCMessageHeaderDetailDirect, diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 36aad09c..3a7b4f33 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -15,7 +15,7 @@ impl RPCProcessor { /// Because this leaks information about the identity of the node itself, /// replying to this request received over a private route will leak /// the identity of the node and defeat the private route. - + #[instrument(level = "trace", target = "rpc", skip(self, last_descriptor), fields(ret.value.data.len, ret.value.data.seq, @@ -29,7 +29,7 @@ impl RPCProcessor { key: TypedKey, subkey: ValueSubkey, last_descriptor: Option, - ) ->RPCNetworkResult> { + ) -> RPCNetworkResult> { let _guard = self .unlocked_inner .startup_lock @@ -105,31 +105,34 @@ impl RPCProcessor { let (value, peers, descriptor) = get_value_a.destructure(); if debug_target_enabled!("dht") { - let debug_string_value = value.as_ref().map(|v| { - format!(" len={} seq={} writer={}", - v.value_data().data().len(), - v.value_data().seq(), - v.value_data().writer(), - ) - }).unwrap_or_default(); - + let debug_string_value = value + .as_ref() + .map(|v| { + format!( + " len={} seq={} writer={}", + v.value_data().data().len(), + v.value_data().seq(), + v.value_data().writer(), + ) + }) + .unwrap_or_default(); + let debug_string_answer = format!( "OUT <== GetValueA({} #{}{}{} peers={}) <= {}", key, subkey, debug_string_value, - if descriptor.is_some() { - " +desc" - } else { - "" - }, + if descriptor.is_some() { " +desc" } else { "" }, peers.len(), dest ); log_dht!(debug "{}", debug_string_answer); - - let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + + let peer_ids: Vec = peers + .iter() + .filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())) + .collect(); log_dht!(debug "Peers: {:#?}", peer_ids); } @@ -153,7 +156,10 @@ impl RPCProcessor { if let Some(value) = &value { tracing::Span::current().record("ret.value.data.len", value.value_data().data().len()); tracing::Span::current().record("ret.value.data.seq", value.value_data().seq()); - tracing::Span::current().record("ret.value.data.writer", value.value_data().writer().to_string()); + tracing::Span::current().record( + "ret.value.data.writer", + value.value_data().writer().to_string(), + ); } #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("ret.peers.len", peers.len()); @@ -172,11 +178,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] - pub(crate) async fn process_get_value_q( - &self, - msg: RPCMessage, - ) ->RPCNetworkResult<()> { - + pub(crate) async fn process_get_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ensure this never came over a private route, safety route is okay though match &msg.header.detail { RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} @@ -189,14 +191,8 @@ impl RPCProcessor { // Ignore if disabled let routing_table = self.routing_table(); let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); - if !opi - .signed_node_info() - .node_info() - .has_capability(CAP_DHT) - { - return Ok(NetworkResult::service_unavailable( - "dht is not available", - )); + if !opi.signed_node_info().node_info().has_capability(CAP_DHT) { + return Ok(NetworkResult::service_unavailable("dht is not available")); } // Get the question @@ -214,18 +210,16 @@ impl RPCProcessor { // Get the nodes that we know about that are closer to the the key than our own node let routing_table = self.routing_table(); - let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); + let closer_to_key_peers = network_result_try!( + routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]) + ); if debug_target_enabled!("dht") { let debug_string = format!( "IN <=== GetValueQ({} #{}{}) <== {}", key, subkey, - if want_descriptor { - " +wantdesc" - } else { - "" - }, + if want_descriptor { " +wantdesc" } else { "" }, msg.header.direct_sender_node_id() ); @@ -237,29 +231,34 @@ impl RPCProcessor { let c = self.config.get(); c.network.dht.set_value_count as usize }; - let (get_result_value, get_result_descriptor) = if closer_to_key_peers.len() >= set_value_count { - // Not close enough - (None, None) - } else { - // Close enough, lets get it + let (get_result_value, get_result_descriptor) = + if closer_to_key_peers.len() >= set_value_count { + // Not close enough + (None, None) + } else { + // Close enough, lets get it - // See if we have this record ourselves - let storage_manager = self.storage_manager(); - let get_result = network_result_try!(storage_manager - .inbound_get_value(key, subkey, want_descriptor) - .await - .map_err(RPCError::internal)?); - (get_result.opt_value, get_result.opt_descriptor) - }; + // See if we have this record ourselves + let storage_manager = self.storage_manager(); + let get_result = network_result_try!(storage_manager + .inbound_get_value(key, subkey, want_descriptor) + .await + .map_err(RPCError::internal)?); + (get_result.opt_value, get_result.opt_descriptor) + }; if debug_target_enabled!("dht") { - let debug_string_value = get_result_value.as_ref().map(|v| { - format!(" len={} seq={} writer={}", - v.value_data().data().len(), - v.value_data().seq(), - v.value_data().writer(), - ) - }).unwrap_or_default(); + let debug_string_value = get_result_value + .as_ref() + .map(|v| { + format!( + " len={} seq={} writer={}", + v.value_data().data().len(), + v.value_data().seq(), + v.value_data().writer(), + ) + }) + .unwrap_or_default(); let debug_string_answer = format!( "IN ===> GetValueA({} #{}{}{} peers={}) ==> {}", @@ -274,10 +273,10 @@ impl RPCProcessor { closer_to_key_peers.len(), msg.header.direct_sender_node_id() ); - + log_dht!(debug "{}", debug_string_answer); } - + // Make GetValue answer let get_value_a = RPCOperationGetValueA::new( get_result_value.map(|x| (*x).clone()), @@ -286,7 +285,10 @@ impl RPCProcessor { )?; // Send GetValue answer - self.answer(msg, RPCAnswer::new(RPCAnswerDetail::GetValueA(Box::new(get_value_a)))) - .await + self.answer( + msg, + RPCAnswer::new(RPCAnswerDetail::GetValueA(Box::new(get_value_a))), + ) + .await } } diff --git a/veilid-core/src/rpc_processor/rpc_inspect_value.rs b/veilid-core/src/rpc_processor/rpc_inspect_value.rs index bebb5a11..589ab45a 100644 --- a/veilid-core/src/rpc_processor/rpc_inspect_value.rs +++ b/veilid-core/src/rpc_processor/rpc_inspect_value.rs @@ -37,7 +37,7 @@ impl RPCProcessor { .startup_lock .enter() .map_err(RPCError::map_try_again("not started up"))?; - + // Ensure destination never has a private route // and get the target noderef so we can validate the response let Some(target) = dest.node() else { diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 7ba3cb1f..2b7ac63e 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -33,7 +33,7 @@ impl RPCProcessor { value: SignedValueData, descriptor: SignedValueDescriptor, send_descriptor: bool, - ) ->RPCNetworkResult> { + ) -> RPCNetworkResult> { let _guard = self .unlocked_inner .startup_lock @@ -62,11 +62,7 @@ impl RPCProcessor { subkey, value.value_data().data().len(), value.value_data().writer(), - if send_descriptor { - " +senddesc" - } else { - "" - }, + if send_descriptor { " +senddesc" } else { "" }, dest ); @@ -122,31 +118,33 @@ impl RPCProcessor { let (set, value, peers) = set_value_a.destructure(); if debug_target_enabled!("dht") { - let debug_string_value = value.as_ref().map(|v| { - format!(" len={} writer={}", - v.value_data().data().len(), - v.value_data().writer(), - ) - }).unwrap_or_default(); - + let debug_string_value = value + .as_ref() + .map(|v| { + format!( + " len={} writer={}", + v.value_data().data().len(), + v.value_data().writer(), + ) + }) + .unwrap_or_default(); let debug_string_answer = format!( "OUT <== SetValueA({} #{}{}{} peers={}) <= {}", key, subkey, - if set { - " +set" - } else { - "" - }, + if set { " +set" } else { "" }, debug_string_value, peers.len(), dest, ); log_dht!(debug "{}", debug_string_answer); - - let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + + let peer_ids: Vec = peers + .iter() + .filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())) + .collect(); log_dht!(debug "Peers: {:#?}", peer_ids); } @@ -172,7 +170,10 @@ impl RPCProcessor { if let Some(value) = &value { tracing::Span::current().record("ret.value.data.len", value.value_data().data().len()); tracing::Span::current().record("ret.value.data.seq", value.value_data().seq()); - tracing::Span::current().record("ret.value.data.writer", value.value_data().writer().to_string()); + tracing::Span::current().record( + "ret.value.data.writer", + value.value_data().writer().to_string(), + ); } #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("ret.peers.len", peers.len()); @@ -187,25 +188,16 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)] - pub(crate) async fn process_set_value_q( - &self, - msg: RPCMessage, - ) ->RPCNetworkResult<()> { + pub(crate) async fn process_set_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ignore if disabled let routing_table = self.routing_table(); let rss = routing_table.route_spec_store(); - + let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); - if !opi - .signed_node_info() - .node_info() - .has_capability(CAP_DHT) - { - return Ok(NetworkResult::service_unavailable( - "dht is not available", - )); + if !opi.signed_node_info().node_info().has_capability(CAP_DHT) { + return Ok(NetworkResult::service_unavailable("dht is not available")); } - + // Ensure this never came over a private route, safety route is okay though match &msg.header.detail { RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} @@ -232,10 +224,12 @@ impl RPCProcessor { // Get target for ValueChanged notifications let dest = network_result_try!(self.get_respond_to_destination(&msg)); let target = dest.get_target(rss)?; - + // Get the nodes that we know about that are closer to the the key than our own node let routing_table = self.routing_table(); - let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); + let closer_to_key_peers = network_result_try!( + routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]) + ); let debug_string = format!( "IN <=== SetValueQ({} #{} len={} seq={} writer={}{}) <== {}", @@ -244,11 +238,7 @@ impl RPCProcessor { value.value_data().data().len(), value.value_data().seq(), value.value_data().writer(), - if descriptor.is_some() { - " +desc" - } else { - "" - }, + if descriptor.is_some() { " +desc" } else { "" }, msg.header.direct_sender_node_id() ); @@ -268,7 +258,13 @@ impl RPCProcessor { // Save the subkey, creating a new record if necessary let storage_manager = self.storage_manager(); let new_value = network_result_try!(storage_manager - .inbound_set_value(key, subkey, Arc::new(value), descriptor.map(Arc::new), target) + .inbound_set_value( + key, + subkey, + Arc::new(value), + descriptor.map(Arc::new), + target + ) .await .map_err(RPCError::internal)?); @@ -276,23 +272,23 @@ impl RPCProcessor { }; if debug_target_enabled!("dht") { - let debug_string_value = new_value.as_ref().map(|v| { - format!(" len={} seq={} writer={}", - v.value_data().data().len(), - v.value_data().seq(), - v.value_data().writer(), - ) - }).unwrap_or_default(); + let debug_string_value = new_value + .as_ref() + .map(|v| { + format!( + " len={} seq={} writer={}", + v.value_data().data().len(), + v.value_data().seq(), + v.value_data().writer(), + ) + }) + .unwrap_or_default(); let debug_string_answer = format!( "IN ===> SetValueA({} #{}{}{} peers={}) ==> {}", key, subkey, - if set { - " +set" - } else { - "" - }, + if set { " +set" } else { "" }, debug_string_value, closer_to_key_peers.len(), msg.header.direct_sender_node_id() @@ -302,10 +298,14 @@ impl RPCProcessor { } // Make SetValue answer - let set_value_a = RPCOperationSetValueA::new(set, new_value.map(|x| (*x).clone()), closer_to_key_peers)?; + let set_value_a = + RPCOperationSetValueA::new(set, new_value.map(|x| (*x).clone()), closer_to_key_peers)?; // Send SetValue answer - self.answer(msg, RPCAnswer::new(RPCAnswerDetail::SetValueA(Box::new(set_value_a)))) - .await + self.answer( + msg, + RPCAnswer::new(RPCAnswerDetail::SetValueA(Box::new(set_value_a))), + ) + .await } } diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 675aef6f..f84f4e64 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -195,7 +195,7 @@ impl StorageManager { // send partial update if desired if ctx.send_partial_update { - ctx.send_partial_update=false; + ctx.send_partial_update = false; // return partial result let fanout_result = FanoutResult { @@ -225,60 +225,73 @@ impl StorageManager { }; // Call the fanout in a spawned task - spawn("outbound_get_value fanout", Box::pin(async move { - let fanout_call = FanoutCall::new( - routing_table.clone(), - key, - key_count, - fanout, - timeout_us, - capability_fanout_node_info_filter(vec![CAP_DHT]), - call_routine, - check_done, - ); + spawn( + "outbound_get_value fanout", + Box::pin( + async move { + let fanout_call = FanoutCall::new( + routing_table.clone(), + key, + key_count, + fanout, + timeout_us, + capability_fanout_node_info_filter(vec![CAP_DHT]), + call_routine, + check_done, + ); - let kind = match fanout_call.run(init_fanout_queue).await { - // If we don't finish in the timeout (too much time passed checking for consensus) - TimeoutOr::Timeout => FanoutResultKind::Timeout, - // If we finished with or without consensus (enough nodes returning the same value) - TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished, - // If we ran out of nodes before getting consensus) - TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted, - // Failed - TimeoutOr::Value(Err(e)) => { - // If we finished with an error, return that - log_dht!(debug "GetValue fanout error: {}", e); - if let Err(e) = out_tx.send(Err(e.into())) { - log_dht!(debug "Sending GetValue fanout error failed: {}", e); + let kind = match fanout_call.run(init_fanout_queue).await { + // If we don't finish in the timeout (too much time passed checking for consensus) + TimeoutOr::Timeout => FanoutResultKind::Timeout, + // If we finished with or without consensus (enough nodes returning the same value) + TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished, + // If we ran out of nodes before getting consensus) + TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted, + // Failed + TimeoutOr::Value(Err(e)) => { + // If we finished with an error, return that + log_dht!(debug "GetValue fanout error: {}", e); + if let Err(e) = out_tx.send(Err(e.into())) { + log_dht!(debug "Sending GetValue fanout error failed: {}", e); + } + return; + } + }; + + let ctx = context.lock(); + let fanout_result = FanoutResult { + kind, + value_nodes: ctx.value_nodes.clone(), + }; + log_network_result!(debug "GetValue Fanout: {:?}", fanout_result); + + if let Err(e) = out_tx.send(Ok(OutboundGetValueResult { + fanout_result, + get_result: GetResult { + opt_value: ctx.value.clone(), + opt_descriptor: ctx.descriptor.clone(), + }, + })) { + log_dht!(debug "Sending GetValue result failed: {}", e); } - return; } - }; - - let ctx = context.lock(); - let fanout_result = FanoutResult { - kind, - value_nodes: ctx.value_nodes.clone(), - }; - log_network_result!(debug "GetValue Fanout: {:?}", fanout_result); - - if let Err(e) = out_tx.send(Ok(OutboundGetValueResult { - fanout_result, - get_result: GetResult { - opt_value: ctx.value.clone(), - opt_descriptor: ctx.descriptor.clone(), - }, - })) { - log_dht!(debug "Sending GetValue result failed: {}", e); - } - }.instrument(tracing::trace_span!("outbound_get_value result")))) + .instrument(tracing::trace_span!("outbound_get_value result")), + ), + ) .detach(); Ok(out_rx) } #[instrument(level = "trace", target = "dht", skip_all)] - pub(super) fn process_deferred_outbound_get_value_result_inner(&self, inner: &mut StorageManagerInner, res_rx: flume::Receiver>, key: TypedKey, subkey: ValueSubkey, last_seq: ValueSeqNum) { + pub(super) fn process_deferred_outbound_get_value_result_inner( + &self, + inner: &mut StorageManagerInner, + res_rx: flume::Receiver>, + key: TypedKey, + subkey: ValueSubkey, + last_seq: ValueSeqNum, + ) { let this = self.clone(); inner.process_deferred_results( res_rx, @@ -326,7 +339,13 @@ impl StorageManager { } #[instrument(level = "trace", target = "dht", skip_all)] - pub(super) async fn process_outbound_get_value_result(&self, key: TypedKey, subkey: ValueSubkey, opt_last_seq: Option, result: get_value::OutboundGetValueResult) -> Result, VeilidAPIError> { + pub(super) async fn process_outbound_get_value_result( + &self, + key: TypedKey, + subkey: ValueSubkey, + opt_last_seq: Option, + result: get_value::OutboundGetValueResult, + ) -> Result, VeilidAPIError> { // See if we got a value back let Some(get_result_value) = result.get_result.opt_value else { // If we got nothing back then we also had nothing beforehand, return nothing @@ -335,7 +354,7 @@ impl StorageManager { // Keep the list of nodes that returned a value for later reference let mut inner = self.lock().await?; - + inner.process_fanout_results( key, core::iter::once((subkey, &result.fanout_result)), @@ -353,7 +372,7 @@ impl StorageManager { ) .await?; } - Ok(Some(get_result_value.value_data().clone())) + Ok(Some(get_result_value.value_data().clone())) } /// Handle a received 'Get Value' query diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index d673c3c5..b437c16e 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -197,9 +197,10 @@ impl StorageManager { kind: FanoutResultKind::Partial, value_nodes: ctx.value_nodes.clone(), }; - let out=OutboundSetValueResult { + let out = OutboundSetValueResult { fanout_result, - signed_value_data: ctx.value.clone()}; + signed_value_data: ctx.value.clone(), + }; log_dht!(debug "Sending partial SetValue result: {:?}", out); if let Err(e) = out_tx.send(Ok(out)) { @@ -224,59 +225,71 @@ impl StorageManager { }; // Call the fanout in a spawned task - spawn("outbound_set_value fanout", Box::pin(async move { - let fanout_call = FanoutCall::new( - routing_table.clone(), - key, - key_count, - fanout, - timeout_us, - capability_fanout_node_info_filter(vec![CAP_DHT]), - call_routine, - check_done, - ); + spawn( + "outbound_set_value fanout", + Box::pin( + async move { + let fanout_call = FanoutCall::new( + routing_table.clone(), + key, + key_count, + fanout, + timeout_us, + capability_fanout_node_info_filter(vec![CAP_DHT]), + call_routine, + check_done, + ); - let kind = match fanout_call.run(init_fanout_queue).await { - // If we don't finish in the timeout (too much time passed checking for consensus) - TimeoutOr::Timeout => FanoutResultKind::Timeout, - // If we finished with or without consensus (enough nodes returning the same value) - TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished, - // If we ran out of nodes before getting consensus) - TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted, - // Failed - TimeoutOr::Value(Err(e)) => { - // If we finished with an error, return that - log_dht!(debug "SetValue fanout error: {}", e); - if let Err(e) = out_tx.send(Err(e.into())) { - log_dht!(debug "Sending SetValue fanout error failed: {}", e); + let kind = match fanout_call.run(init_fanout_queue).await { + // If we don't finish in the timeout (too much time passed checking for consensus) + TimeoutOr::Timeout => FanoutResultKind::Timeout, + // If we finished with or without consensus (enough nodes returning the same value) + TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished, + // If we ran out of nodes before getting consensus) + TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted, + // Failed + TimeoutOr::Value(Err(e)) => { + // If we finished with an error, return that + log_dht!(debug "SetValue fanout error: {}", e); + if let Err(e) = out_tx.send(Err(e.into())) { + log_dht!(debug "Sending SetValue fanout error failed: {}", e); + } + return; + } + }; + + let ctx = context.lock(); + let fanout_result = FanoutResult { + kind, + value_nodes: ctx.value_nodes.clone(), + }; + log_network_result!(debug "SetValue Fanout: {:?}", fanout_result); + + if let Err(e) = out_tx.send(Ok(OutboundSetValueResult { + fanout_result, + signed_value_data: ctx.value.clone(), + })) { + log_dht!(debug "Sending SetValue result failed: {}", e); } - return; } - }; - - let ctx = context.lock(); - let fanout_result = FanoutResult { - kind, - value_nodes: ctx.value_nodes.clone(), - }; - log_network_result!(debug "SetValue Fanout: {:?}", fanout_result); - - if let Err(e) = out_tx.send(Ok(OutboundSetValueResult { - fanout_result, - signed_value_data: ctx.value.clone(), - })) { - log_dht!(debug "Sending SetValue result failed: {}", e); - } - }.instrument(tracing::trace_span!("outbound_set_value fanout routine")))) + .instrument(tracing::trace_span!("outbound_set_value fanout routine")), + ), + ) .detach(); Ok(out_rx) } #[instrument(level = "trace", target = "dht", skip_all)] - pub(super) fn process_deferred_outbound_set_value_result_inner(&self, inner: &mut StorageManagerInner, - res_rx: flume::Receiver>, - key: TypedKey, subkey: ValueSubkey, last_value_data: ValueData, safety_selection: SafetySelection, ) { + pub(super) fn process_deferred_outbound_set_value_result_inner( + &self, + inner: &mut StorageManagerInner, + res_rx: flume::Receiver>, + key: TypedKey, + subkey: ValueSubkey, + last_value_data: ValueData, + safety_selection: SafetySelection, + ) { let this = self.clone(); let last_value_data = Arc::new(Mutex::new(last_value_data)); inner.process_deferred_results( @@ -336,8 +349,14 @@ impl StorageManager { } #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn process_outbound_set_value_result(&self, key: TypedKey, subkey: ValueSubkey, last_value_data: ValueData, safety_selection: SafetySelection, result: set_value::OutboundSetValueResult) -> Result, VeilidAPIError> { - + pub(super) async fn process_outbound_set_value_result( + &self, + key: TypedKey, + subkey: ValueSubkey, + last_value_data: ValueData, + safety_selection: SafetySelection, + result: set_value::OutboundSetValueResult, + ) -> Result, VeilidAPIError> { // Regain the lock after network access let mut inner = self.lock().await?; diff --git a/veilid-core/src/table_store/tests/test_table_store.rs b/veilid-core/src/table_store/tests/test_table_store.rs index 85821339..a17c82f3 100644 --- a/veilid-core/src/table_store/tests/test_table_store.rs +++ b/veilid-core/src/table_store/tests/test_table_store.rs @@ -223,7 +223,15 @@ pub async fn test_protect_unprotect(vcrypto: CryptoSystemVersion, ts: TableStore ); let deks = [dek1, dek2, dek3]; - let passwords = ["", " ", " ", "12345678", "|/\\!@#$%^&*()_+", "Ⓜ️", "🔥🔥♾️"]; + let passwords = [ + "", + " ", + " ", + "12345678", + "|/\\!@#$%^&*()_+", + "Ⓜ️", + "🔥🔥♾️", + ]; for dek in deks { for password in passwords { diff --git a/veilid-core/src/tests/native/mod.rs b/veilid-core/src/tests/native/mod.rs index 306881d7..da3b577a 100644 --- a/veilid-core/src/tests/native/mod.rs +++ b/veilid-core/src/tests/native/mod.rs @@ -49,7 +49,7 @@ cfg_if::cfg_if! { rt.block_on(f) } } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-core/src/veilid_api/api.rs b/veilid-core/src/veilid_api/api.rs index 0bcf58e1..79878848 100644 --- a/veilid-core/src/veilid_api/api.rs +++ b/veilid-core/src/veilid_api/api.rs @@ -263,7 +263,7 @@ impl VeilidAPI { } /// Allocate a new private route and specify a specific cryptosystem, stability and sequencing preference. - /// Faster connections may be possible with [Stability::LowLatency], and [Sequencing::NoPreference] at the + /// Faster connections may be possible with [Stability::LowLatency], and [Sequencing::NoPreference] at the /// expense of some loss of messages. /// Returns a route id and a publishable 'blob' with the route encrypted with each crypto kind. /// Those nodes importing the blob will have their choice of which crypto kind to use. diff --git a/veilid-flutter/rust/src/dart_ffi.rs b/veilid-flutter/rust/src/dart_ffi.rs index 724b71fe..a12c73d5 100644 --- a/veilid-flutter/rust/src/dart_ffi.rs +++ b/veilid-flutter/rust/src/dart_ffi.rs @@ -283,7 +283,7 @@ pub extern "C" fn initialize_veilid_core(platform_config: FfiStr) { .with_endpoint(format!("http://{}", grpc_endpoint)); let batch = opentelemetry::runtime::Tokio; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-flutter/rust/src/tools.rs b/veilid-flutter/rust/src/tools.rs index b92fb6e9..6a76f50d 100644 --- a/veilid-flutter/rust/src/tools.rs +++ b/veilid-flutter/rust/src/tools.rs @@ -16,19 +16,19 @@ cfg_if! { pub use async_std::future::timeout; } else if #[cfg(feature="rt-tokio")] { pub use tokio::task::JoinHandle; - + //pub use tokio::time::error::Elapsed as TimeoutError; - + pub fn spawn + Send + 'static, T: Send + 'static>(f: F) -> JoinHandle { GLOBAL_RUNTIME.spawn(f) } - - + + lazy_static::lazy_static! { static ref GLOBAL_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Runtime::new().unwrap(); } } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 33cd160c..cef2d5bc 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -27,7 +27,7 @@ cfg_if! { use tokio::io::AsyncBufReadExt; use tokio::io::AsyncWriteExt; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index 5f9a5100..5c9a9bf1 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -5,7 +5,7 @@ #![recursion_limit = "256"] #[cfg(all(feature = "rt-async-std", windows))] -compile_error! {"async-std compilation for windows is currently unsupportedg"} +compile_error!("async-std compilation for windows is currently unsupported"); mod client_api; mod server; diff --git a/veilid-server/src/tools.rs b/veilid-server/src/tools.rs index a1ca8116..6d5a4778 100644 --- a/veilid-server/src/tools.rs +++ b/veilid-server/src/tools.rs @@ -49,6 +49,6 @@ cfg_if! { local.block_on(&rt, f) } } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-server/src/veilid_logs.rs b/veilid-server/src/veilid_logs.rs index 1e0ac8b9..bff8e13a 100644 --- a/veilid-server/src/veilid_logs.rs +++ b/veilid-server/src/veilid_logs.rs @@ -132,7 +132,7 @@ impl VeilidLogs { .with_endpoint(format!("http://{}", grpc_endpoint)); let batch = opentelemetry_sdk::runtime::Tokio; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } diff --git a/veilid-tools/src/ipc/ipc_async_std/windows.rs b/veilid-tools/src/ipc/ipc_async_std/windows.rs index 2a7d26d2..6cb04301 100644 --- a/veilid-tools/src/ipc/ipc_async_std/windows.rs +++ b/veilid-tools/src/ipc/ipc_async_std/windows.rs @@ -1 +1 @@ -compile_error! {"async-std compilation for windows is currently unsupported"} +compile_error!("async-std compilation for windows is currently unsupported"); diff --git a/veilid-tools/src/lib.rs b/veilid-tools/src/lib.rs index 4cb232ac..f6d1f911 100644 --- a/veilid-tools/src/lib.rs +++ b/veilid-tools/src/lib.rs @@ -175,7 +175,7 @@ cfg_if! { #[doc(no_inline)] pub use tokio::task::JoinHandle as LowLevelJoinHandle; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } } diff --git a/veilid-tools/src/must_join_handle.rs b/veilid-tools/src/must_join_handle.rs index e2d1f530..20128550 100644 --- a/veilid-tools/src/must_join_handle.rs +++ b/veilid-tools/src/must_join_handle.rs @@ -27,7 +27,7 @@ impl MustJoinHandle { jh.detach(); } } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } self.completed = true; @@ -52,7 +52,7 @@ impl MustJoinHandle { drop(self.join_handle.take()); self.completed = true; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } @@ -97,7 +97,7 @@ impl Future for MustJoinHandle { } else if #[cfg(target_arch = "wasm32")] { Poll::Ready(t) } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } } diff --git a/veilid-tools/src/network_interfaces/netlink.rs b/veilid-tools/src/network_interfaces/netlink.rs index 409e4fc4..279b80fe 100644 --- a/veilid-tools/src/network_interfaces/netlink.rs +++ b/veilid-tools/src/network_interfaces/netlink.rs @@ -20,7 +20,7 @@ cfg_if! { } else if #[cfg(feature="rt-tokio")] { use netlink_sys::{TokioSocket as RTNetLinkSocket}; } else { - compile_error!("needs executor implementation") + compile_error!("needs executor implementation"); } } use std::convert::TryInto;