checkpoint

This commit is contained in:
John Smith 2022-11-22 22:48:03 -05:00
parent e4406cb022
commit d7e7f3ba1d
4 changed files with 123 additions and 25 deletions

View File

@ -83,6 +83,14 @@ impl PrivateRoute {
}
}
/// Check if this is a stub route
pub fn is_stub(&self) -> bool {
if let PrivateRouteHops::FirstHop(first_hop) = self.hops {
return first_hop.next_hop.is_none();
}
false
}
/// Remove the first unencrypted hop if possible
pub fn pop_first_hop(&mut self) -> Option<RouteNode> {
match &mut self.hops {
@ -149,6 +157,8 @@ pub struct SafetyRoute {
impl SafetyRoute {
pub fn new_stub(public_key: DHTKey, private_route: PrivateRoute) -> Self {
// First hop should have already been popped off for stubbed safety routes since
// we are sending directly to the first hop
assert!(matches!(private_route.hops, PrivateRouteHops::Data(_)));
Self {
public_key,
@ -156,6 +166,9 @@ impl SafetyRoute {
hops: SafetyRouteHops::Private(private_route),
}
}
pub fn is_stub(&self) -> bool {
matches!(self.hops, SafetyRouteHops::Private(_))
}
}
impl fmt::Display for SafetyRoute {

View File

@ -757,26 +757,38 @@ impl RouteSpecStore {
/// Test an allocated route for continuity
pub async fn test_route(&self, key: &DHTKey) -> EyreResult<bool> {
let inner = &mut *self.inner.lock();
let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?;
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
// Target is the last hop
let target = rsd.hop_node_refs.last().unwrap().clone();
let (target, safety_selection) = {
let inner = &mut *self.inner.lock();
let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?;
// Routes with just one hop can be pinged directly
// More than one hop can be pinged across the route with the target being the second to last hop
if rsd.hops.len() == 1 {
let target = rsd.hop_node_refs[0].clone();
let sequencing = rsd.sequencing;
(target, SafetySelection::Unsafe(sequencing))
} else {
let target = rsd.hop_node_refs[rsd.hops.len() - 2].clone();
let hop_count = rsd.hops.len();
let stability = rsd.stability;
let sequencing = rsd.sequencing;
let safety_spec = SafetySpec {
preferred_route: Some(key.clone()),
hop_count,
stability,
sequencing,
};
(target, SafetySelection::Safe(safety_spec))
}
};
// Test with ping to end
let res = match rpc_processor
.rpc_call_status(Destination::Direct {
target,
safety_selection: SafetySelection::Safe(SafetySpec {
preferred_route: Some(key.clone()),
hop_count,
stability,
sequencing,
}),
safety_selection,
})
.await?
{
@ -1100,10 +1112,13 @@ impl RouteSpecStore {
// See if the preferred route is here
if let Some(preferred_route) = safety_spec.preferred_route {
if inner.content.details.contains_key(&preferred_route) {
if let Some(preferred_rsd) = inner.content.details.get(&preferred_route) {
// Only use the preferred route if it doesn't end with the avoid nodes
if !avoid_node_ids.contains(preferred_rsd.hops.last().unwrap()) {
return Ok(Some(preferred_route));
}
}
}
// Select a safety route from the pool or make one if we don't have one that matches
let sr_pubkey = if let Some(sr_pubkey) = Self::first_unpublished_route_inner(

View File

@ -188,7 +188,17 @@ struct RenderedOperation {
node_id: DHTKey, // Destination node id we're sending to
node_ref: NodeRef, // Node to send envelope to (may not be destination node id in case of relay)
hop_count: usize, // Total safety + private route hop count + 1 hop for the initial send
safety_route: Option<DHTKey>, // The safety route used to send the message
private_route: Option<DHTKey>, // The private route used to send the message
}
#[derive(Copy, Clone, Debug)]
enum RPCKind {
Question,
Statement,
Answer
}
/////////////////////////////////////////////////////////////////////
pub struct RPCProcessorInner {
@ -484,7 +494,7 @@ impl RPCProcessor {
) -> Result<NetworkResult<RenderedOperation>, RPCError> {
let routing_table = self.routing_table();
let rss = routing_table.route_spec_store();
let pr_is_stub = private_route.is_stub();
let pr_hop_count = private_route.hop_count;
let pr_pubkey = private_route.public_key;
@ -542,6 +552,16 @@ impl RPCProcessor {
node_id: out_node_id,
node_ref: compiled_route.first_hop,
hop_count: out_hop_count,
safety_route: if compiled_route.safety_route.is_stub() {
None
} else {
Some(compiled_route.safety_route.public_key)
},
private_route: if pr_is_stub {
None
} else {
Some(pr_pubkey)
}
};
Ok(NetworkResult::value(out))
@ -610,6 +630,8 @@ impl RPCProcessor {
node_id,
node_ref,
hop_count: 1,
safety_route: None,
private_route: None,
});
}
SafetySelection::Safe(_) => {
@ -706,7 +728,17 @@ impl RPCProcessor {
}
}
// Issue a question over the network, possibly using an anonymized route
/// Record failure to send to node or route
fn record_send_failure(&self, rpc_kind: RPCKind, send_ts: u64, node_ref: NodeRef, safety_route: Option<DHTKey>, private_route: Option<DHTKey>) {
xxx implement me
}
/// Record success sending to node or route
fn record_send_success(&self, rpc_kind: RPCKind, send_ts: u64, bytes: u64, node_ref: NodeRef, safety_route: Option<DHTKey>, private_route: Option<DHTKey>) {
xxx implement me
}
/// Issue a question over the network, possibly using an anonymized route
#[instrument(level = "debug", skip(self, question), err)]
async fn question(
&self,
@ -729,6 +761,8 @@ impl RPCProcessor {
node_id,
node_ref,
hop_count,
safety_route,
private_route,
} = network_result_try!(self.render_operation(dest.clone(), &operation)?);
// Calculate answer timeout
@ -774,6 +808,11 @@ impl RPCProcessor {
}
}
// Safety route stats
if let Some(sr_pubkey) = safety_route {
//
}
// Pass back waitable reply completion
Ok(NetworkResult::value(WaitableReply {
dest,
@ -807,6 +846,8 @@ impl RPCProcessor {
node_id,
node_ref,
hop_count: _,
safety_route,
private_route,
} = network_result_try!(self.render_operation(dest, &operation)?);
// Send statement
@ -819,17 +860,22 @@ impl RPCProcessor {
.map_err(|e| {
// If we're returning an error, clean up
node_ref
.stats_failed_to_send(send_ts, true);
.stats_failed_to_send(send_ts, false);
RPCError::network(e)
})? => {
// If we couldn't send we're still cleaning up
node_ref
.stats_failed_to_send(send_ts, true);
.stats_failed_to_send(send_ts, false);
}
);
// Successfully sent
node_ref.stats_question_sent(send_ts, bytes, true);
node_ref.stats_question_sent(send_ts, bytes, false);
// Private route stats
xxx
// Safety route stats
safety_route
Ok(NetworkResult::value(()))
}
@ -860,6 +906,8 @@ impl RPCProcessor {
node_id,
node_ref,
hop_count: _,
safety_route,
private_route,
} = network_result_try!(self.render_operation(dest, &operation)?);
// Send the reply
@ -871,7 +919,7 @@ impl RPCProcessor {
.map_err(|e| {
// If we're returning an error, clean up
node_ref
.stats_failed_to_send(send_ts, true);
.stats_failed_to_send(send_ts, false);
RPCError::network(e)
})? => {
// If we couldn't send we're still cleaning up
@ -883,6 +931,11 @@ impl RPCProcessor {
// Reply successfully sent
node_ref.stats_answer_sent(bytes);
// Private route stats
xxxx
// Safety route stats
xxx
Ok(NetworkResult::value(()))
}

View File

@ -757,8 +757,25 @@ impl VeilidAPI {
return Ok(out);
}
async fn debug_route_test(&self, _args: Vec<String>) -> Result<String, VeilidAPIError> {
let out = "xxx".to_string();
async fn debug_route_test(&self, args: Vec<String>) -> Result<String, VeilidAPIError> {
// <route id>
let netman = self.network_manager()?;
let routing_table = netman.routing_table();
let rss = routing_table.route_spec_store();
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_dht_key)?;
let success = rss
.test_route(&route_id)
.await
.map_err(VeilidAPIError::internal)?;
let out = if success {
"SUCCESS".to_owned()
} else {
"FAILED".to_owned()
};
return Ok(out);
}