diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index c9834645..3786410a 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -617,7 +617,7 @@ impl NetworkManager { // Do we know our network class yet? if let Some(network_class) = network_class { - // see if we have any routes that need extending + // see if we have any routes that need testing } // Commit the changes diff --git a/veilid-core/src/routing_table/privacy.rs b/veilid-core/src/routing_table/privacy.rs index c5855a35..a2322baf 100644 --- a/veilid-core/src/routing_table/privacy.rs +++ b/veilid-core/src/routing_table/privacy.rs @@ -85,7 +85,7 @@ impl PrivateRoute { /// Check if this is a stub route pub fn is_stub(&self) -> bool { - if let PrivateRouteHops::FirstHop(first_hop) = self.hops { + if let PrivateRouteHops::FirstHop(first_hop) = &self.hops { return first_hop.next_hop.is_none(); } false diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index e5a5afac..3706681b 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -27,17 +27,26 @@ pub struct KeyPair { secret: DHTKeySecret, } -#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] +#[derive(Clone, Debug, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)] #[archive_attr(repr(C), derive(CheckBytes))] -pub struct RouteSpecDetail { - /// Secret key +pub struct RouteStats { + /// Consecutive failed to send count #[with(Skip)] - pub secret_key: DHTKeySecret, - /// Route hops - pub hops: Vec, - /// Route noderefs + failed_to_send: u32, + /// Questions lost #[with(Skip)] - hop_node_refs: Vec, + questions_lost: u32, + /// Timestamp of when the route was created + created_ts: u64, + /// Timestamp of when the route was last checked for validity + #[with(Skip)] + last_tested_ts: Option, + /// Timestamp of when the route was last sent to + #[with(Skip)] + last_sent_ts: Option, + /// Timestamp of when the route was last received over + #[with(Skip)] + last_received_ts: Option, /// Transfers up and down transfer_stats_down_up: TransferStatsDownUp, /// Latency stats @@ -48,26 +57,104 @@ pub struct RouteSpecDetail { /// Accounting mechanism for the bandwidth across this route #[with(Skip)] transfer_stats_accounting: TransferStatsAccounting, +} + +impl RouteStats { + /// Make new route stats + pub fn new(created_ts: u64) -> Self { + Self { + created_ts, + ..Default::default() + } + } + /// Mark a route as having failed to send + pub fn record_send_failed(&mut self) { + self.failed_to_send += 1; + } + + /// Mark a route as having lost a question + pub fn record_question_lost(&mut self) { + self.questions_lost += 1; + } + + /// Mark a route as having received something + pub fn record_received(&mut self, cur_ts: u64, bytes: u64) { + self.last_received_ts = Some(cur_ts); + self.last_tested_ts = Some(cur_ts); + self.transfer_stats_accounting.add_down(bytes); + } + + /// Mark a route as having been sent to + pub fn record_sent(&mut self, cur_ts: u64, bytes: u64) { + self.last_sent_ts = Some(cur_ts); + self.transfer_stats_accounting.add_up(bytes); + } + + /// Mark a route as having been sent to + pub fn record_latency(&mut self, latency: u64) { + self.latency_stats = self.latency_stats_accounting.record_latency(latency); + } + + /// Mark a route as having been tested + pub fn record_tested(&mut self, cur_ts: u64) { + self.last_tested_ts = Some(cur_ts); + + // Reset question_lost and failed_to_send if we test clean + self.failed_to_send = 0; + self.questions_lost = 0; + } + + /// Roll transfers for these route stats + pub fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { + self.transfer_stats_accounting.roll_transfers( + last_ts, + cur_ts, + &mut self.transfer_stats_down_up, + ) + } + + /// Get the latency stats + pub fn latency_stats(&self) -> &LatencyStats { + &self.latency_stats + } + + /// Get the transfer stats + pub fn transfer_stats(&self) -> &TransferStatsDownUp { + &self.transfer_stats_down_up + } + + /// Reset stats when network restarts + pub fn reset(&mut self) { + self.last_tested_ts = None; + self.last_sent_ts = None; + self.last_received_ts = None; + } +} + +#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct RouteSpecDetail { + /// Secret key + #[with(Skip)] + secret_key: DHTKeySecret, + /// Route hops + hops: Vec, + /// Route noderefs + #[with(Skip)] + hop_node_refs: Vec, /// Published private route, do not reuse for ephemeral routes /// Not serialized because all routes should be re-published when restarting #[with(Skip)] published: bool, - // Can optimize the rendering of this route, using node ids only instead of full peer info - #[with(Skip)] - reachable: bool, - /// Timestamp of when the route was created - created_ts: u64, - /// Timestamp of when the route was last checked for validity - last_checked_ts: Option, - /// Timestamp of when the route was last used for anything - last_used_ts: Option, /// Directions this route is guaranteed to work in #[with(RkyvEnumSet)] directions: DirectionSet, /// Stability preference (prefer reliable nodes over faster) - pub stability: Stability, + stability: Stability, /// Sequencing preference (connection oriented protocols vs datagram) - pub sequencing: Sequencing, + sequencing: Sequencing, + /// Stats + stats: RouteStats, } /// The core representation of the RouteSpecStore that can be serialized @@ -80,15 +167,15 @@ pub struct RouteSpecStoreContent { /// What remote private routes have seen #[derive(Debug, Clone, Default)] -struct RemotePrivateRouteInfo { +pub struct RemotePrivateRouteInfo { // The private route itself private_route: Option, - /// Timestamp of when the route was last used for anything - last_used_ts: u64, - /// The time this remote private route last responded - last_replied_ts: Option, /// Did this remote private route see our node info due to no safety route in use seen_our_node_info: bool, + /// Last time this remote private route was requested for any reason (cache expiration) + last_touched_ts: u64, + /// Stats + stats: RouteStats, } /// Ephemeral data used to help the RouteSpecStore operate efficiently @@ -684,18 +771,11 @@ impl RouteSpecStore { secret_key, hops, hop_node_refs, - transfer_stats_down_up: Default::default(), - latency_stats: Default::default(), - latency_stats_accounting: Default::default(), - transfer_stats_accounting: Default::default(), published: false, - reachable: false, - created_ts: cur_ts, - last_checked_ts: None, - last_used_ts: None, directions, stability, sequencing, + stats: RouteStats::new(cur_ts), }; drop(perm_func); @@ -759,46 +839,72 @@ impl RouteSpecStore { pub async fn test_route(&self, key: &DHTKey) -> EyreResult { let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); - let (target, safety_selection) = { + let dest = { + let private_route = self.assemble_private_route(key, None)?; + let inner = &mut *self.inner.lock(); let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; + let hop_count = rsd.hops.len(); + let stability = rsd.stability; + let sequencing = rsd.sequencing; // Routes with just one hop can be pinged directly // More than one hop can be pinged across the route with the target being the second to last hop if rsd.hops.len() == 1 { - let target = rsd.hop_node_refs[0].clone(); - let sequencing = rsd.sequencing; - (target, SafetySelection::Unsafe(sequencing)) - } else { - let target = rsd.hop_node_refs[rsd.hops.len() - 2].clone(); - let hop_count = rsd.hops.len(); - let stability = rsd.stability; - let sequencing = rsd.sequencing; let safety_spec = SafetySpec { preferred_route: Some(key.clone()), hop_count, stability, sequencing, }; - (target, SafetySelection::Safe(safety_spec)) + let safety_selection = SafetySelection::Safe(safety_spec); + + Destination::PrivateRoute { + private_route, + safety_selection, + } + } else { + let target = rsd.hop_node_refs[rsd.hops.len() - 2].clone(); + let safety_spec = SafetySpec { + preferred_route: Some(key.clone()), + hop_count, + stability, + sequencing, + }; + let safety_selection = SafetySelection::Safe(safety_spec); + + Destination::Direct { + target, + safety_selection, + } } }; // Test with ping to end - let res = match rpc_processor - .rpc_call_status(Destination::Direct { - target, - safety_selection, - }) - .await? - { + let cur_ts = intf::get_timestamp(); + let res = match rpc_processor.rpc_call_status(dest).await? { NetworkResult::Value(v) => v, _ => { + // // Do route stats for single hop route test because it + // // won't get stats for the route since it's done Direct + // if matches!(safety_selection, SafetySelection::Unsafe(_)) { + // self.with_route_stats(cur_ts, &key, |s| s.record_question_lost()); + // } + // Did not error, but did not come back, just return false return Ok(false); } }; + // // Do route stats for single hop route test because it + // // won't get stats for the route since it's done Direct + // if matches!(safety_selection, SafetySelection::Unsafe(_)) { + // self.with_route_stats(cur_ts, &key, |s| { + // s.record_tested(cur_ts); + // s.record_latency(res.latency); + // }); + // } + Ok(true) } @@ -912,7 +1018,8 @@ impl RouteSpecStore { let pr_hopcount = private_route.hop_count as usize; let max_route_hop_count = self.unlocked_inner.max_route_hop_count; - if pr_hopcount > max_route_hop_count { + // Check private route hop count isn't larger than the max route hop count plus one for the 'first hop' header + if pr_hopcount > (max_route_hop_count + 1) { bail!("private route hop count too long"); } // See if we are using a safety route, if not, short circuit this operation @@ -969,10 +1076,6 @@ impl RouteSpecStore { }; let safety_rsd = Self::detail_mut(inner, &sr_pubkey).unwrap(); - // See if we can optimize this compilation yet - // We don't want to include full nodeinfo if we don't have to - let optimize = safety_rsd.reachable; - // xxx implement caching here! // Create hops @@ -989,6 +1092,12 @@ impl RouteSpecStore { blob_data }; + // We can optimize the peer info in this safety route if it has been successfully + // communicated over either via an outbound test, or used as a private route inbound + // and we are replying over the same route as our safety route outbound + let optimize = safety_rsd.stats.last_tested_ts.is_some() + || safety_rsd.stats.last_received_ts.is_some(); + // Encode each hop from inside to outside // skips the outermost hop since that's entering the // safety route and does not include the dialInfo @@ -1177,7 +1286,7 @@ impl RouteSpecStore { pub fn assemble_private_route( &self, key: &DHTKey, - optimize: Option, + optimized: Option, ) -> EyreResult { let inner = &*self.inner.lock(); let routing_table = self.unlocked_inner.routing_table.clone(); @@ -1187,11 +1296,12 @@ impl RouteSpecStore { // See if we can optimize this compilation yet // We don't want to include full nodeinfo if we don't have to - let optimize = optimize.unwrap_or(rsd.reachable); + let optimized = optimized + .unwrap_or(rsd.stats.last_tested_ts.is_some() || rsd.stats.last_received_ts.is_some()); // Make innermost route hop to our own node let mut route_hop = RouteHop { - node: if optimize { + node: if optimized { RouteNode::NodeId(NodeId::new(routing_table.node_id())) } else { RouteNode::PeerInfo(rti.get_own_peer_info(RoutingDomain::PublicInternet)) @@ -1225,7 +1335,7 @@ impl RouteSpecStore { }; route_hop = RouteHop { - node: if optimize { + node: if optimized { // Optimized, no peer info, just the dht key RouteNode::NodeId(NodeId::new(rsd.hops[h])) } else { @@ -1300,22 +1410,22 @@ impl RouteSpecStore { .remote_private_route_cache .entry(pr_pubkey) .and_modify(|rpr| { - if cur_ts - rpr.last_used_ts >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { + if cur_ts - rpr.last_touched_ts >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { // Start fresh if this had expired - rpr.last_used_ts = cur_ts; - rpr.last_replied_ts = None; rpr.seen_our_node_info = false; + rpr.last_touched_ts = cur_ts; + rpr.stats = RouteStats::new(cur_ts); } else { // If not expired, just mark as being used - rpr.last_used_ts = cur_ts; + rpr.last_touched_ts = cur_ts; } }) .or_insert_with(|| RemotePrivateRouteInfo { // New remote private route cache entry private_route: Some(private_route), - last_used_ts: cur_ts, - last_replied_ts: None, seen_our_node_info: false, + last_touched_ts: cur_ts, + stats: RouteStats::new(cur_ts), }); f(rpr) } @@ -1331,7 +1441,8 @@ impl RouteSpecStore { F: FnOnce(&mut RemotePrivateRouteInfo) -> R, { let rpr = inner.cache.remote_private_route_cache.get_mut(key)?; - if cur_ts - rpr.last_used_ts < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { + if cur_ts - rpr.last_touched_ts < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { + rpr.last_touched_ts = cur_ts; return Some(f(rpr)); } inner.cache.remote_private_route_cache.remove(key); @@ -1355,6 +1466,12 @@ impl RouteSpecStore { cur_ts: u64, ) -> EyreResult<()> { let inner = &mut *self.inner.lock(); + // Check for local route. If this is not a remote private route + // then we just skip the recording. We may be running a test and using + // our own local route as the destination private route. + if let Some(_) = Self::detail_mut(inner, key) { + return Ok(()); + } if Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| { rpr.seen_our_node_info = true; }) @@ -1365,30 +1482,25 @@ impl RouteSpecStore { Ok(()) } - /// Mark a remote private route as having replied to a question { - pub fn mark_remote_private_route_replied(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> { + /// Get the route statistics for any route we know about, local or remote + pub fn with_route_stats(&self, cur_ts: u64, key: &DHTKey, f: F) -> Option + where + F: FnOnce(&mut RouteStats) -> R, + { let inner = &mut *self.inner.lock(); - if Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| { - rpr.last_replied_ts = Some(cur_ts); - }) - .is_none() - { - bail!("private route is missing from store: {}", key); + // Check for local route + if let Some(rsd) = Self::detail_mut(inner, key) { + return Some(f(&mut rsd.stats)); + } + // Check for remote route + if let Some(res) = + Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| f(&mut rpr.stats)) + { + return Some(res); } - Ok(()) - } - /// Mark a remote private route as having beed used { - pub fn mark_remote_private_route_used(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> { - let inner = &mut *self.inner.lock(); - if Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| { - rpr.last_used_ts = cur_ts; - }) - .is_none() - { - bail!("private route is missing from store: {}", key); - } - Ok(()) + log_rtab!(debug "route missing for stats: {}", key); + None } /// Clear caches when local our local node info changes @@ -1399,16 +1511,16 @@ impl RouteSpecStore { for (_k, v) in &mut inner.content.details { // Must republish route now v.published = false; - // Route is not known reachable now - v.reachable = false; - // We have yet to check it since local node info changed - v.last_checked_ts = None; + // Restart stats for routes so we test the route again + v.stats.reset(); } // Reset private route cache for (_k, v) in &mut inner.cache.remote_private_route_cache { - v.last_replied_ts = None; + // Our node info has changed v.seen_our_node_info = false; + // Restart stats for routes so we test the route again + v.stats.reset(); } } @@ -1423,78 +1535,17 @@ impl RouteSpecStore { Ok(()) } - /// Mark route as reachable - /// When first deserialized, routes must be re-tested for reachability - /// This can be used to determine if routes need to be sent with full peerinfo or can just use a node id - pub fn mark_route_reachable(&self, key: &DHTKey, reachable: bool) -> EyreResult<()> { - let inner = &mut *self.inner.lock(); - Self::detail_mut(inner, key) - .ok_or_else(|| eyre!("route does not exist"))? - .reachable = reachable; - Ok(()) - } - - /// Mark route as checked - pub fn touch_route_checked(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> { - let inner = &mut *self.inner.lock(); - Self::detail_mut(inner, key) - .ok_or_else(|| eyre!("route does not exist"))? - .last_checked_ts = Some(cur_ts); - Ok(()) - } - - /// Mark route as used - pub fn touch_route_used(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> { - let inner = &mut *self.inner.lock(); - Self::detail_mut(inner, key) - .ok_or_else(|| eyre!("route does not exist"))? - .last_used_ts = Some(cur_ts); - Ok(()) - } - - /// Record latency on the route - pub fn record_latency(&self, key: &DHTKey, latency: u64) -> EyreResult<()> { - let inner = &mut *self.inner.lock(); - - let rsd = Self::detail_mut(inner, key).ok_or_else(|| eyre!("route does not exist"))?; - rsd.latency_stats = rsd.latency_stats_accounting.record_latency(latency); - Ok(()) - } - - /// Get the calculated latency stats - pub fn latency_stats(&self, key: &DHTKey) -> EyreResult { - let inner = &mut *self.inner.lock(); - Ok(Self::detail_mut(inner, key) - .ok_or_else(|| eyre!("route does not exist"))? - .latency_stats - .clone()) - } - - /// Add download transfers to route - pub fn add_down(&self, key: &DHTKey, bytes: u64) -> EyreResult<()> { - let inner = &mut *self.inner.lock(); - let rsd = Self::detail_mut(inner, key).ok_or_else(|| eyre!("route does not exist"))?; - rsd.transfer_stats_accounting.add_down(bytes); - Ok(()) - } - - /// Add upload transfers to route - pub fn add_up(&self, key: &DHTKey, bytes: u64) -> EyreResult<()> { - let inner = &mut *self.inner.lock(); - let rsd = Self::detail_mut(inner, key).ok_or_else(|| eyre!("route does not exist"))?; - rsd.transfer_stats_accounting.add_up(bytes); - Ok(()) - } - /// Process transfer statistics to get averages pub fn roll_transfers(&self, last_ts: u64, cur_ts: u64) { let inner = &mut *self.inner.lock(); + + // Roll transfers for locally allocated routes for rsd in inner.content.details.values_mut() { - rsd.transfer_stats_accounting.roll_transfers( - last_ts, - cur_ts, - &mut rsd.transfer_stats_down_up, - ); + rsd.stats.roll_transfers(last_ts, cur_ts); + } + // Roll transfers for remote private routes + for (_k, v) in inner.cache.remote_private_route_cache.iter_mut() { + v.stats.roll_transfers(last_ts, cur_ts); } } diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs index f651d30e..52bf843c 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks.rs @@ -11,25 +11,31 @@ impl RoutingTable { cur_ts: u64, ) -> EyreResult<()> { // log_rtab!("--- rolling_transfers task"); - let mut inner = self.inner.write(); - let inner = &mut *inner; + { + let inner = &mut *self.inner.write(); - // Roll our own node's transfers - inner.self_transfer_stats_accounting.roll_transfers( - last_ts, - cur_ts, - &mut inner.self_transfer_stats, - ); + // Roll our own node's transfers + inner.self_transfer_stats_accounting.roll_transfers( + last_ts, + cur_ts, + &mut inner.self_transfer_stats, + ); - // Roll all bucket entry transfers - let entries: Vec> = inner - .buckets - .iter() - .flat_map(|b| b.entries().map(|(_k, v)| v.clone())) - .collect(); - for v in entries { - v.with_mut(inner, |_rti, e| e.roll_transfers(last_ts, cur_ts)); + // Roll all bucket entry transfers + let entries: Vec> = inner + .buckets + .iter() + .flat_map(|b| b.entries().map(|(_k, v)| v.clone())) + .collect(); + for v in entries { + v.with_mut(inner, |_rti, e| e.roll_transfers(last_ts, cur_ts)); + } } + + // Roll all route transfers + let rss = self.route_spec_store(); + rss.roll_transfers(last_ts, cur_ts); + Ok(()) } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 90986223..866c6ae1 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -55,6 +55,8 @@ struct RPCMessageHeaderDetailDirect { /// Header details for rpc messages received over only a safety route but not a private route #[derive(Debug, Clone)] struct RPCMessageHeaderDetailSafetyRouted { + /// Remote safety route used + remote_safety_route: DHTKey, /// The sequencing used for this route sequencing: Sequencing, } @@ -62,6 +64,8 @@ struct RPCMessageHeaderDetailSafetyRouted { /// Header details for rpc messages received over a private route #[derive(Debug, Clone)] struct RPCMessageHeaderDetailPrivateRouted { + /// Remote safety route used (or possibly node id the case of no safety route) + remote_safety_route: DHTKey, /// The private route we received the rpc over private_route: DHTKey, // The safety spec for replying to this private routed rpc @@ -162,12 +166,14 @@ where #[derive(Debug)] struct WaitableReply { - dest: Destination, handle: OperationWaitHandle, timeout: u64, node_ref: NodeRef, send_ts: u64, send_data_kind: SendDataKind, + safety_route: Option, + remote_private_route: Option, + reply_private_route: Option, } ///////////////////////////////////////////////////////////////////// @@ -184,19 +190,20 @@ impl Answer { } struct RenderedOperation { - message: Vec, // The rendered operation bytes - node_id: DHTKey, // Destination node id we're sending to + message: Vec, // The rendered operation bytes + node_id: DHTKey, // Destination node id we're sending to node_ref: NodeRef, // Node to send envelope to (may not be destination node id in case of relay) hop_count: usize, // Total safety + private route hop count + 1 hop for the initial send safety_route: Option, // The safety route used to send the message - private_route: Option, // The private route used to send the message + remote_private_route: Option, // The private route used to send the message + reply_private_route: Option, // The private route requested to receive the reply } #[derive(Copy, Clone, Debug)] enum RPCKind { Question, Statement, - Answer + Answer, } ///////////////////////////////////////////////////////////////////// @@ -443,42 +450,28 @@ impl RPCProcessor { .await; match &out { Err(_) | Ok(TimeoutOr::Timeout) => { - waitable_reply.node_ref.stats_question_lost(); + self.record_question_lost( + waitable_reply.send_ts, + waitable_reply.node_ref.clone(), + waitable_reply.safety_route, + waitable_reply.remote_private_route, + waitable_reply.reply_private_route, + ); } Ok(TimeoutOr::Value((rpcreader, _))) => { // Reply received let recv_ts = intf::get_timestamp(); - waitable_reply.node_ref.stats_answer_rcvd( + + // Record answer received + self.record_answer_received( waitable_reply.send_ts, recv_ts, rpcreader.header.body_len, - ); - // Process private route replies - if let Destination::PrivateRoute { - private_route, - safety_selection, - } = &waitable_reply.dest - { - let rss = self.routing_table.route_spec_store(); - - // If we received a reply from a private route, mark it as such - if let Err(e) = - rss.mark_remote_private_route_replied(&private_route.public_key, recv_ts) - { - log_rpc!(error "private route missing: {}", e); - } - - // If we sent to a private route without a safety route - // We need to mark our own node info as having been seen so we can optimize sending it - if let SafetySelection::Unsafe(_) = safety_selection { - if let Err(e) = rss.mark_remote_private_route_seen_our_node_info( - &private_route.public_key, - recv_ts, - ) { - log_rpc!(error "private route missing: {}", e); - } - } - } + waitable_reply.node_ref.clone(), + waitable_reply.safety_route, + waitable_reply.remote_private_route, + waitable_reply.reply_private_route, + ) } }; @@ -489,18 +482,19 @@ impl RPCProcessor { fn wrap_with_route( &self, safety_selection: SafetySelection, - private_route: PrivateRoute, + remote_private_route: PrivateRoute, + reply_private_route: Option, message_data: Vec, ) -> Result, RPCError> { let routing_table = self.routing_table(); let rss = routing_table.route_spec_store(); - let pr_is_stub = private_route.is_stub(); - let pr_hop_count = private_route.hop_count; - let pr_pubkey = private_route.public_key; + let pr_is_stub = remote_private_route.is_stub(); + let pr_hop_count = remote_private_route.hop_count; + let pr_pubkey = remote_private_route.public_key; // Compile the safety route with the private route let compiled_route: CompiledRoute = match rss - .compile_safety_route(safety_selection, private_route) + .compile_safety_route(safety_selection, remote_private_route) .map_err(RPCError::internal)? { Some(cr) => cr, @@ -510,6 +504,8 @@ impl RPCProcessor { )) } }; + let sr_is_stub = compiled_route.safety_route.is_stub(); + let sr_pubkey = compiled_route.safety_route.public_key; // Encrypt routed operation // Xmsg + ENC(Xmsg, DH(PKapr, SKbsr)) @@ -552,16 +548,9 @@ impl RPCProcessor { node_id: out_node_id, node_ref: compiled_route.first_hop, hop_count: out_hop_count, - safety_route: if compiled_route.safety_route.is_stub() { - None - } else { - Some(compiled_route.safety_route.public_key) - }, - private_route: if pr_is_stub { - None - } else { - Some(pr_pubkey) - } + safety_route: if sr_is_stub { None } else { Some(sr_pubkey) }, + remote_private_route: if pr_is_stub { None } else { Some(pr_pubkey) }, + reply_private_route, }; Ok(NetworkResult::value(out)) @@ -587,6 +576,15 @@ impl RPCProcessor { builder_to_vec(msg_builder)? }; + // Get reply private route if we are asking for one to be used in our 'respond to' + let reply_private_route = match operation.kind() { + RPCOperationKind::Question(q) => match q.respond_to() { + RespondTo::Sender => None, + RespondTo::PrivateRoute(pr) => Some(pr.public_key), + }, + RPCOperationKind::Statement(_) | RPCOperationKind::Answer(_) => None, + }; + // To where are we sending the request match dest { Destination::Direct { @@ -623,6 +621,9 @@ impl RPCProcessor { node_ref.set_sequencing(sequencing) } + // Reply private route should be None here, even for questions + assert!(reply_private_route.is_none()); + // If no safety route is being used, and we're not sending to a private // route, we can use a direct envelope instead of routing out = NetworkResult::value(RenderedOperation { @@ -631,7 +632,8 @@ impl RPCProcessor { node_ref, hop_count: 1, safety_route: None, - private_route: None, + remote_private_route: None, + reply_private_route: None, }); } SafetySelection::Safe(_) => { @@ -650,7 +652,12 @@ impl RPCProcessor { PrivateRoute::new_stub(node_id, RouteNode::PeerInfo(peer_info)); // Wrap with safety route - out = self.wrap_with_route(safety_selection, private_route, message)?; + out = self.wrap_with_route( + safety_selection, + private_route, + reply_private_route, + message, + )?; } }; } @@ -661,7 +668,12 @@ impl RPCProcessor { // Send to private route // --------------------- // Reply with 'route' operation - out = self.wrap_with_route(safety_selection, private_route, message)?; + out = self.wrap_with_route( + safety_selection, + private_route, + reply_private_route, + message, + )?; } } @@ -729,13 +741,237 @@ impl RPCProcessor { } /// Record failure to send to node or route - fn record_send_failure(&self, rpc_kind: RPCKind, send_ts: u64, node_ref: NodeRef, safety_route: Option, private_route: Option) { - xxx implement me + fn record_send_failure( + &self, + rpc_kind: RPCKind, + send_ts: u64, + node_ref: NodeRef, + safety_route: Option, + remote_private_route: Option, + ) { + let wants_answer = matches!(rpc_kind, RPCKind::Question); + + // Record for node if this was not sent via a route + if safety_route.is_none() && remote_private_route.is_none() { + node_ref.stats_failed_to_send(send_ts, wants_answer); + return; + } + + // If safety route was in use, record failure to send there + if let Some(sr_pubkey) = &safety_route { + let rss = self.routing_table.route_spec_store(); + rss.with_route_stats(send_ts, sr_pubkey, |s| s.record_send_failed()); + } else { + // If no safety route was in use, then it's the private route's fault if we have one + if let Some(pr_pubkey) = &remote_private_route { + let rss = self.routing_table.route_spec_store(); + rss.with_route_stats(send_ts, pr_pubkey, |s| s.record_send_failed()); + } + } + } + + /// Record question lost to node or route + fn record_question_lost( + &self, + send_ts: u64, + node_ref: NodeRef, + safety_route: Option, + remote_private_route: Option, + private_route: Option, + ) { + // Record for node if this was not sent via a route + if safety_route.is_none() && remote_private_route.is_none() { + node_ref.stats_question_lost(); + return; + } + // Get route spec store + let rss = self.routing_table.route_spec_store(); + + // If safety route was used, record question lost there + if let Some(sr_pubkey) = &safety_route { + let rss = self.routing_table.route_spec_store(); + rss.with_route_stats(send_ts, sr_pubkey, |s| { + s.record_question_lost(); + }); + } + // If remote private route was used, record question lost there + if let Some(rpr_pubkey) = &remote_private_route { + rss.with_route_stats(send_ts, rpr_pubkey, |s| { + s.record_question_lost(); + }); + } + // If private route was used, record question lost there + if let Some(pr_pubkey) = &private_route { + rss.with_route_stats(send_ts, pr_pubkey, |s| { + s.record_question_lost(); + }); + } } /// Record success sending to node or route - fn record_send_success(&self, rpc_kind: RPCKind, send_ts: u64, bytes: u64, node_ref: NodeRef, safety_route: Option, private_route: Option) { - xxx implement me + fn record_send_success( + &self, + rpc_kind: RPCKind, + send_ts: u64, + bytes: u64, + node_ref: NodeRef, + safety_route: Option, + remote_private_route: Option, + ) { + let wants_answer = matches!(rpc_kind, RPCKind::Question); + + // Record for node if this was not sent via a route + if safety_route.is_none() && remote_private_route.is_none() { + node_ref.stats_question_sent(send_ts, bytes, wants_answer); + return; + } + + // Get route spec store + let rss = self.routing_table.route_spec_store(); + + // If safety route was used, record send there + if let Some(sr_pubkey) = &safety_route { + rss.with_route_stats(send_ts, sr_pubkey, |s| { + s.record_sent(send_ts, bytes); + }); + } + + // If remote private route was used, record send there + if let Some(pr_pubkey) = &remote_private_route { + let rss = self.routing_table.route_spec_store(); + rss.with_route_stats(send_ts, pr_pubkey, |s| { + s.record_sent(send_ts, bytes); + }); + } + } + + /// Record answer received from node or route + fn record_answer_received( + &self, + send_ts: u64, + recv_ts: u64, + bytes: u64, + node_ref: NodeRef, + safety_route: Option, + remote_private_route: Option, + reply_private_route: Option, + ) { + // Record stats for remote node if this was direct + if safety_route.is_none() && remote_private_route.is_none() && reply_private_route.is_none() + { + node_ref.stats_answer_rcvd(send_ts, recv_ts, bytes); + return; + } + // Get route spec store + let rss = self.routing_table.route_spec_store(); + + // Get latency for all local routes + let mut total_local_latency = 0u64; + let total_latency = recv_ts.saturating_sub(send_ts); + + // If safety route was used, record route there + if let Some(sr_pubkey) = &safety_route { + rss.with_route_stats(send_ts, sr_pubkey, |s| { + // If we received an answer, the safety route we sent over can be considered tested + s.record_tested(recv_ts); + + // If we used a safety route to send, use our last tested latency + total_local_latency += s.latency_stats().average + }); + } + + // If local private route was used, record route there + if let Some(pr_pubkey) = &reply_private_route { + rss.with_route_stats(send_ts, pr_pubkey, |s| { + // Record received bytes + s.record_received(recv_ts, bytes); + + // If we used a private route to receive, use our last tested latency + total_local_latency += s.latency_stats().average + }); + } + + // If remote private route was used, record there + if let Some(rpr_pubkey) = &remote_private_route { + rss.with_route_stats(send_ts, rpr_pubkey, |s| { + // Record received bytes + s.record_received(recv_ts, bytes); + + // The remote route latency is recorded using the total latency minus the total local latency + let remote_latency = total_latency.saturating_sub(total_local_latency); + s.record_latency(remote_latency); + }); + + // If we sent to a private route without a safety route + // We need to mark our own node info as having been seen so we can optimize sending it + if let Err(e) = rss.mark_remote_private_route_seen_our_node_info(&rpr_pubkey, recv_ts) { + log_rpc!(error "private route missing: {}", e); + } + + // We can't record local route latency if a remote private route was used because + // there is no way other than the prior latency estimation to determine how much time was spent + // in the remote private route + // Instead, we rely on local route testing to give us latency numbers for our local routes + } else { + // If no remote private route was used, then record half the total latency on our local routes + // This is fine because if we sent with a local safety route, + // then we must have received with a local private route too, per the design rules + if let Some(sr_pubkey) = &safety_route { + let rss = self.routing_table.route_spec_store(); + rss.with_route_stats(send_ts, sr_pubkey, |s| { + s.record_latency(total_latency / 2); + }); + } + if let Some(pr_pubkey) = &reply_private_route { + rss.with_route_stats(send_ts, pr_pubkey, |s| { + s.record_latency(total_latency / 2); + }); + } + } + } + + /// Record question or statement received from node or route + fn record_question_received(&self, msg: &RPCMessage) { + let recv_ts = msg.header.timestamp; + let bytes = msg.header.body_len; + + // Process messages based on how they were received + match &msg.header.detail { + // Process direct messages + RPCMessageHeaderDetail::Direct(_) => { + if let Some(sender_nr) = msg.opt_sender_nr.clone() { + sender_nr.stats_question_rcvd(recv_ts, bytes); + return; + } + } + // Process messages that arrived with no private route (private route stub) + RPCMessageHeaderDetail::SafetyRouted(d) => { + let rss = self.routing_table.route_spec_store(); + + // This may record nothing if the remote safety route is not also + // a remote private route that been imported, but that's okay + rss.with_route_stats(recv_ts, &d.remote_safety_route, |s| { + s.record_received(recv_ts, bytes); + }); + } + // Process messages that arrived to our private route + RPCMessageHeaderDetail::PrivateRouted(d) => { + let rss = self.routing_table.route_spec_store(); + + // This may record nothing if the remote safety route is not also + // a remote private route that been imported, but that's okay + // it could also be a node id if no remote safety route was used + // in which case this also will do nothing + rss.with_route_stats(recv_ts, &d.remote_safety_route, |s| { + s.record_received(recv_ts, bytes); + }); + + // Record for our local private route we received over + rss.with_route_stats(recv_ts, &d.private_route, |s| { + s.record_received(recv_ts, bytes); + }); + } + } } /// Issue a question over the network, possibly using an anonymized route @@ -762,7 +998,8 @@ impl RPCProcessor { node_ref, hop_count, safety_route, - private_route, + remote_private_route, + reply_private_route, } = network_result_try!(self.render_operation(dest.clone(), &operation)?); // Calculate answer timeout @@ -781,46 +1018,34 @@ impl RPCProcessor { .await .map_err(|e| { // If we're returning an error, clean up - node_ref - .stats_failed_to_send(send_ts, true); + self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route); RPCError::network(e) })? => { // If we couldn't send we're still cleaning up - node_ref - .stats_failed_to_send(send_ts, true); + self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route); } ); // Successfully sent - node_ref.stats_question_sent(send_ts, bytes, true); - - // Private route stats - if let Destination::PrivateRoute { - private_route, - safety_selection: _, - } = &dest - { - let rss = self.routing_table.route_spec_store(); - if let Err(e) = - rss.mark_remote_private_route_used(&private_route.public_key, intf::get_timestamp()) - { - log_rpc!(error "private route missing: {}", e); - } - } - - // Safety route stats - if let Some(sr_pubkey) = safety_route { - // - } + self.record_send_success( + RPCKind::Question, + send_ts, + bytes, + node_ref.clone(), + safety_route, + remote_private_route, + ); // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { - dest, handle, timeout, node_ref, send_ts, send_data_kind, + safety_route, + remote_private_route, + reply_private_route, })) } @@ -847,7 +1072,8 @@ impl RPCProcessor { node_ref, hop_count: _, safety_route, - private_route, + remote_private_route, + reply_private_route: _, } = network_result_try!(self.render_operation(dest, &operation)?); // Send statement @@ -859,23 +1085,23 @@ impl RPCProcessor { .await .map_err(|e| { // If we're returning an error, clean up - node_ref - .stats_failed_to_send(send_ts, false); + self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route); RPCError::network(e) })? => { // If we couldn't send we're still cleaning up - node_ref - .stats_failed_to_send(send_ts, false); + self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route); } ); // Successfully sent - node_ref.stats_question_sent(send_ts, bytes, false); - - // Private route stats - xxx - // Safety route stats - safety_route + self.record_send_success( + RPCKind::Statement, + send_ts, + bytes, + node_ref, + safety_route, + remote_private_route, + ); Ok(NetworkResult::value(())) } @@ -907,7 +1133,8 @@ impl RPCProcessor { node_ref, hop_count: _, safety_route, - private_route, + remote_private_route, + reply_private_route: _, } = network_result_try!(self.render_operation(dest, &operation)?); // Send the reply @@ -918,23 +1145,23 @@ impl RPCProcessor { .await .map_err(|e| { // If we're returning an error, clean up - node_ref - .stats_failed_to_send(send_ts, false); + self.record_send_failure(RPCKind::Answer, send_ts, node_ref.clone(), safety_route, remote_private_route); RPCError::network(e) })? => { // If we couldn't send we're still cleaning up - node_ref - .stats_failed_to_send(send_ts, false); + self.record_send_failure(RPCKind::Answer, send_ts, node_ref.clone(), safety_route, remote_private_route); } ); // Reply successfully sent - node_ref.stats_answer_sent(bytes); - - // Private route stats - xxxx - // Safety route stats - xxx + self.record_send_success( + RPCKind::Answer, + send_ts, + bytes, + node_ref, + safety_route, + remote_private_route, + ); Ok(NetworkResult::value(())) } @@ -1019,9 +1246,11 @@ impl RPCProcessor { } }; - // Process stats + // Process stats for questions/statements received let kind = match msg.operation.kind() { RPCOperationKind::Question(_) => { + self.record_question_received(&msg); + if let Some(sender_nr) = msg.opt_sender_nr.clone() { sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len); } @@ -1147,12 +1376,14 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, body), err)] pub fn enqueue_safety_routed_message( &self, + remote_safety_route: DHTKey, sequencing: Sequencing, body: Vec, ) -> EyreResult<()> { let msg = RPCMessageEncoded { header: RPCMessageHeader { detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted { + remote_safety_route, sequencing, }), timestamp: intf::get_timestamp(), @@ -1174,6 +1405,7 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, body), err)] pub fn enqueue_private_routed_message( &self, + remote_safety_route: DHTKey, private_route: DHTKey, safety_spec: SafetySpec, body: Vec, @@ -1182,6 +1414,7 @@ impl RPCProcessor { header: RPCMessageHeader { detail: RPCMessageHeaderDetail::PrivateRouted( RPCMessageHeaderDetailPrivateRouted { + remote_safety_route, private_route, safety_spec, }, diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index 4abdd681..5b82d2ae 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -135,7 +135,7 @@ impl RPCProcessor { &self, detail: RPCMessageHeaderDetailDirect, routed_operation: RoutedOperation, - safety_route: &SafetyRoute, + remote_safety_route: &SafetyRoute, ) -> Result, RPCError> { // Get sequencing preference let sequencing = if detail @@ -153,7 +153,7 @@ impl RPCProcessor { let node_id_secret = self.routing_table.node_id_secret(); let dh_secret = self .crypto - .cached_dh(&safety_route.public_key, &node_id_secret) + .cached_dh(&remote_safety_route.public_key, &node_id_secret) .map_err(RPCError::protocol)?; let body = match Crypto::decrypt_aead( &routed_operation.data, @@ -168,7 +168,7 @@ impl RPCProcessor { }; // Pass message to RPC system - self.enqueue_safety_routed_message(sequencing, body) + self.enqueue_safety_routed_message(remote_safety_route.public_key, sequencing, body) .map_err(RPCError::internal)?; Ok(NetworkResult::value(())) @@ -180,7 +180,7 @@ impl RPCProcessor { &self, detail: RPCMessageHeaderDetailDirect, routed_operation: RoutedOperation, - safety_route: &SafetyRoute, + remote_safety_route: &SafetyRoute, private_route: &PrivateRoute, ) -> Result, RPCError> { // Get sender id @@ -204,7 +204,7 @@ impl RPCProcessor { // xxx: punish nodes that send messages that fail to decrypt eventually. How to do this for private routes? let dh_secret = self .crypto - .cached_dh(&safety_route.public_key, &secret_key) + .cached_dh(&remote_safety_route.public_key, &secret_key) .map_err(RPCError::protocol)?; let body = Crypto::decrypt_aead( &routed_operation.data, @@ -217,7 +217,7 @@ impl RPCProcessor { ))?; // Pass message to RPC system - self.enqueue_private_routed_message(private_route.public_key, safety_spec, body) + self.enqueue_private_routed_message(remote_safety_route.public_key, private_route.public_key, safety_spec, body) .map_err(RPCError::internal)?; Ok(NetworkResult::value(())) diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index fa5ff34b..ebade9d2 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -845,46 +845,52 @@ impl VeilidAPI { } pub async fn debug(&self, args: String) -> Result { - let args = args.trim_start(); - if args.is_empty() { - // No arguments runs help command - return self.debug_help("".to_owned()).await; - } - let (arg, rest) = args.split_once(' ').unwrap_or((args, "")); - let rest = rest.trim_start().to_owned(); + let res = { + let args = args.trim_start(); + if args.is_empty() { + // No arguments runs help command + return self.debug_help("".to_owned()).await; + } + let (arg, rest) = args.split_once(' ').unwrap_or((args, "")); + let rest = rest.trim_start().to_owned(); - if arg == "help" { - self.debug_help(rest).await - } else if arg == "buckets" { - self.debug_buckets(rest).await - } else if arg == "dialinfo" { - self.debug_dialinfo(rest).await - } else if arg == "txtrecord" { - self.debug_txtrecord(rest).await - } else if arg == "entries" { - self.debug_entries(rest).await - } else if arg == "entry" { - self.debug_entry(rest).await - } else if arg == "ping" { - self.debug_ping(rest).await - } else if arg == "contact" { - self.debug_contact(rest).await - } else if arg == "nodeinfo" { - self.debug_nodeinfo(rest).await - } else if arg == "purge" { - self.debug_purge(rest).await - } else if arg == "attach" { - self.debug_attach(rest).await - } else if arg == "detach" { - self.debug_detach(rest).await - } else if arg == "config" { - self.debug_config(rest).await - } else if arg == "restart" { - self.debug_restart(rest).await - } else if arg == "route" { - self.debug_route(rest).await - } else { - Ok(">>> Unknown command\n".to_owned()) + if arg == "help" { + self.debug_help(rest).await + } else if arg == "buckets" { + self.debug_buckets(rest).await + } else if arg == "dialinfo" { + self.debug_dialinfo(rest).await + } else if arg == "txtrecord" { + self.debug_txtrecord(rest).await + } else if arg == "entries" { + self.debug_entries(rest).await + } else if arg == "entry" { + self.debug_entry(rest).await + } else if arg == "ping" { + self.debug_ping(rest).await + } else if arg == "contact" { + self.debug_contact(rest).await + } else if arg == "nodeinfo" { + self.debug_nodeinfo(rest).await + } else if arg == "purge" { + self.debug_purge(rest).await + } else if arg == "attach" { + self.debug_attach(rest).await + } else if arg == "detach" { + self.debug_detach(rest).await + } else if arg == "config" { + self.debug_config(rest).await + } else if arg == "restart" { + self.debug_restart(rest).await + } else if arg == "route" { + self.debug_route(rest).await + } else { + Ok(">>> Unknown command\n".to_owned()) + } + }; + if let Ok(res) = &res { + debug!("{}", res); } + res } } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 605fcf90..b56b0277 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -2492,7 +2492,6 @@ pub enum SignalInfo { peer_info: PeerInfo, // Sender's peer info }, // XXX: WebRTC - // XXX: App-level signalling } ///////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/veilid_api/serialize_helpers.rs b/veilid-core/src/veilid_api/serialize_helpers.rs index 680e3a48..91a3b283 100644 --- a/veilid-core/src/veilid_api/serialize_helpers.rs +++ b/veilid-core/src/veilid_api/serialize_helpers.rs @@ -5,7 +5,7 @@ use rkyv::Archive as RkyvArchive; use rkyv::Deserialize as RkyvDeserialize; use rkyv::Serialize as RkyvSerialize; -// XXX: Don't trace these functions as they are used in the transfer of API logs, which will recurse! +// Don't trace these functions as they are used in the transfer of API logs, which will recurse! // #[instrument(level = "trace", ret, err)] pub fn deserialize_json<'a, T: de::Deserialize<'a> + Debug>(