diff --git a/veilid-core/src/routing_table/privacy.rs b/veilid-core/src/routing_table/privacy.rs index 352b716b..c5855a35 100644 --- a/veilid-core/src/routing_table/privacy.rs +++ b/veilid-core/src/routing_table/privacy.rs @@ -83,6 +83,14 @@ impl PrivateRoute { } } + /// Check if this is a stub route + pub fn is_stub(&self) -> bool { + if let PrivateRouteHops::FirstHop(first_hop) = self.hops { + return first_hop.next_hop.is_none(); + } + false + } + /// Remove the first unencrypted hop if possible pub fn pop_first_hop(&mut self) -> Option { match &mut self.hops { @@ -149,6 +157,8 @@ pub struct SafetyRoute { impl SafetyRoute { pub fn new_stub(public_key: DHTKey, private_route: PrivateRoute) -> Self { + // First hop should have already been popped off for stubbed safety routes since + // we are sending directly to the first hop assert!(matches!(private_route.hops, PrivateRouteHops::Data(_))); Self { public_key, @@ -156,6 +166,9 @@ impl SafetyRoute { hops: SafetyRouteHops::Private(private_route), } } + pub fn is_stub(&self) -> bool { + matches!(self.hops, SafetyRouteHops::Private(_)) + } } impl fmt::Display for SafetyRoute { diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 66d80cf6..e5a5afac 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -757,26 +757,38 @@ impl RouteSpecStore { /// Test an allocated route for continuity pub async fn test_route(&self, key: &DHTKey) -> EyreResult { - let inner = &mut *self.inner.lock(); - let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); - // Target is the last hop - let target = rsd.hop_node_refs.last().unwrap().clone(); - let hop_count = rsd.hops.len(); - let stability = rsd.stability; - let sequencing = rsd.sequencing; + let (target, safety_selection) = { + let inner = &mut *self.inner.lock(); + let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; + + // Routes with just one hop can be pinged directly + // More than one hop can be pinged across the route with the target being the second to last hop + if rsd.hops.len() == 1 { + let target = rsd.hop_node_refs[0].clone(); + let sequencing = rsd.sequencing; + (target, SafetySelection::Unsafe(sequencing)) + } else { + let target = rsd.hop_node_refs[rsd.hops.len() - 2].clone(); + let hop_count = rsd.hops.len(); + let stability = rsd.stability; + let sequencing = rsd.sequencing; + let safety_spec = SafetySpec { + preferred_route: Some(key.clone()), + hop_count, + stability, + sequencing, + }; + (target, SafetySelection::Safe(safety_spec)) + } + }; // Test with ping to end let res = match rpc_processor .rpc_call_status(Destination::Direct { target, - safety_selection: SafetySelection::Safe(SafetySpec { - preferred_route: Some(key.clone()), - hop_count, - stability, - sequencing, - }), + safety_selection, }) .await? { @@ -1100,8 +1112,11 @@ impl RouteSpecStore { // See if the preferred route is here if let Some(preferred_route) = safety_spec.preferred_route { - if inner.content.details.contains_key(&preferred_route) { - return Ok(Some(preferred_route)); + if let Some(preferred_rsd) = inner.content.details.get(&preferred_route) { + // Only use the preferred route if it doesn't end with the avoid nodes + if !avoid_node_ids.contains(preferred_rsd.hops.last().unwrap()) { + return Ok(Some(preferred_route)); + } } } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 1a5c1781..90986223 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -184,11 +184,21 @@ impl Answer { } struct RenderedOperation { - message: Vec, // The rendered operation bytes - node_id: DHTKey, // Destination node id we're sending to + message: Vec, // The rendered operation bytes + node_id: DHTKey, // Destination node id we're sending to node_ref: NodeRef, // Node to send envelope to (may not be destination node id in case of relay) hop_count: usize, // Total safety + private route hop count + 1 hop for the initial send + safety_route: Option, // The safety route used to send the message + private_route: Option, // The private route used to send the message } + +#[derive(Copy, Clone, Debug)] +enum RPCKind { + Question, + Statement, + Answer +} + ///////////////////////////////////////////////////////////////////// pub struct RPCProcessorInner { @@ -484,7 +494,7 @@ impl RPCProcessor { ) -> Result, RPCError> { let routing_table = self.routing_table(); let rss = routing_table.route_spec_store(); - + let pr_is_stub = private_route.is_stub(); let pr_hop_count = private_route.hop_count; let pr_pubkey = private_route.public_key; @@ -542,6 +552,16 @@ impl RPCProcessor { node_id: out_node_id, node_ref: compiled_route.first_hop, hop_count: out_hop_count, + safety_route: if compiled_route.safety_route.is_stub() { + None + } else { + Some(compiled_route.safety_route.public_key) + }, + private_route: if pr_is_stub { + None + } else { + Some(pr_pubkey) + } }; Ok(NetworkResult::value(out)) @@ -610,6 +630,8 @@ impl RPCProcessor { node_id, node_ref, hop_count: 1, + safety_route: None, + private_route: None, }); } SafetySelection::Safe(_) => { @@ -706,7 +728,17 @@ impl RPCProcessor { } } - // Issue a question over the network, possibly using an anonymized route + /// Record failure to send to node or route + fn record_send_failure(&self, rpc_kind: RPCKind, send_ts: u64, node_ref: NodeRef, safety_route: Option, private_route: Option) { + xxx implement me + } + + /// Record success sending to node or route + fn record_send_success(&self, rpc_kind: RPCKind, send_ts: u64, bytes: u64, node_ref: NodeRef, safety_route: Option, private_route: Option) { + xxx implement me + } + + /// Issue a question over the network, possibly using an anonymized route #[instrument(level = "debug", skip(self, question), err)] async fn question( &self, @@ -729,6 +761,8 @@ impl RPCProcessor { node_id, node_ref, hop_count, + safety_route, + private_route, } = network_result_try!(self.render_operation(dest.clone(), &operation)?); // Calculate answer timeout @@ -774,6 +808,11 @@ impl RPCProcessor { } } + // Safety route stats + if let Some(sr_pubkey) = safety_route { + // + } + // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { dest, @@ -807,6 +846,8 @@ impl RPCProcessor { node_id, node_ref, hop_count: _, + safety_route, + private_route, } = network_result_try!(self.render_operation(dest, &operation)?); // Send statement @@ -819,17 +860,22 @@ impl RPCProcessor { .map_err(|e| { // If we're returning an error, clean up node_ref - .stats_failed_to_send(send_ts, true); + .stats_failed_to_send(send_ts, false); RPCError::network(e) })? => { // If we couldn't send we're still cleaning up node_ref - .stats_failed_to_send(send_ts, true); + .stats_failed_to_send(send_ts, false); } ); // Successfully sent - node_ref.stats_question_sent(send_ts, bytes, true); + node_ref.stats_question_sent(send_ts, bytes, false); + + // Private route stats + xxx + // Safety route stats + safety_route Ok(NetworkResult::value(())) } @@ -860,6 +906,8 @@ impl RPCProcessor { node_id, node_ref, hop_count: _, + safety_route, + private_route, } = network_result_try!(self.render_operation(dest, &operation)?); // Send the reply @@ -871,7 +919,7 @@ impl RPCProcessor { .map_err(|e| { // If we're returning an error, clean up node_ref - .stats_failed_to_send(send_ts, true); + .stats_failed_to_send(send_ts, false); RPCError::network(e) })? => { // If we couldn't send we're still cleaning up @@ -883,6 +931,11 @@ impl RPCProcessor { // Reply successfully sent node_ref.stats_answer_sent(bytes); + // Private route stats + xxxx + // Safety route stats + xxx + Ok(NetworkResult::value(())) } diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index ef0ecef7..fa5ff34b 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -757,8 +757,25 @@ impl VeilidAPI { return Ok(out); } - async fn debug_route_test(&self, _args: Vec) -> Result { - let out = "xxx".to_string(); + async fn debug_route_test(&self, args: Vec) -> Result { + // + let netman = self.network_manager()?; + let routing_table = netman.routing_table(); + let rss = routing_table.route_spec_store(); + + let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_dht_key)?; + + let success = rss + .test_route(&route_id) + .await + .map_err(VeilidAPIError::internal)?; + + let out = if success { + "SUCCESS".to_owned() + } else { + "FAILED".to_owned() + }; + return Ok(out); }