route work

This commit is contained in:
John Smith 2022-10-28 22:26:21 -04:00
parent be55a42878
commit d335b56571
5 changed files with 246 additions and 23 deletions

View File

@ -136,22 +136,22 @@ struct RouteHop {
nodeId @0 :NodeID; # node id only for established routes nodeId @0 :NodeID; # node id only for established routes
peerInfo @1 :PeerInfo; # full peer info for this hop to establish the route peerInfo @1 :PeerInfo; # full peer info for this hop to establish the route
} }
nextHop @2 :RouteHopData; # Optional: next hop in encrypted blob nextHop @2 :RouteHopData; # Next hop in encrypted blob
# Null means no next hop, at destination (only used in private route, safety routes must enclose a stub private route)
} }
struct PrivateRoute { struct PrivateRoute {
publicKey @0 :RoutePublicKey; # private route public key (unique per private route) publicKey @0 :RoutePublicKey; # private route public key (unique per private route)
hopCount @1 :UInt8; # Count of hops left in the private route hopCount @1 :UInt8; # Count of hops left in the private route (for timeout calculation purposes only)
firstHop @2 :RouteHop; # Optional: first hop in the private route firstHop @2 :RouteHop; # Optional: first hop in the private route, if empty, this is the last hop and payload should be decrypted and processed.
} }
struct SafetyRoute { struct SafetyRoute {
publicKey @0 :RoutePublicKey; # safety route public key (unique per safety route) publicKey @0 :RoutePublicKey; # safety route public key (unique per safety route)
hopCount @1 :UInt8; # Count of hops left in the safety route hopCount @1 :UInt8; # Count of hops left in the safety route (for timeout calculation purposes only)
hops :union { hops :union {
data @2 :RouteHopData; # safety route has more hops data @2 :RouteHopData; # safety route has more hops
private @3 :PrivateRoute; # safety route has ended and private route follows private @3 :PrivateRoute; # safety route has ended and private route follows
xxx find better representation for privateroute stub (going straight to node)
} }
} }

View File

@ -2,7 +2,6 @@ use super::*;
use crate::veilid_api::*; use crate::veilid_api::*;
use serde::*; use serde::*;
/// Compiled route (safety route + private route) /// Compiled route (safety route + private route)
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct CompiledRoute { pub struct CompiledRoute {
@ -771,7 +770,7 @@ impl RouteSpecStore {
RouteNode::PeerInfo(pi.unwrap()) RouteNode::PeerInfo(pi.unwrap())
} }
}, },
next_hop: Some(route_hop_data), next_hop: route_hop_data,
}; };
// Make next blob from route hop // Make next blob from route hop

View File

@ -60,10 +60,10 @@ pub fn encode_route_hop(
encode_peer_info(&pi, &mut pi_builder)?; encode_peer_info(&pi, &mut pi_builder)?;
} }
} }
if let Some(rhd) = &route_hop.next_hop {
let mut rhd_builder = builder.reborrow().init_next_hop(); let mut rhd_builder = builder.reborrow().init_next_hop();
encode_route_hop_data(rhd, &mut rhd_builder)?; encode_route_hop_data(&route_hop.next_hop, &mut rhd_builder)?;
}
Ok(()) Ok(())
} }
@ -83,14 +83,10 @@ pub fn decode_route_hop(reader: &veilid_capnp::route_hop::Reader) -> Result<Rout
} }
}; };
let next_hop = if reader.has_next_hop() { let rhd_reader = reader
let rhd_reader = reader .get_next_hop()
.get_next_hop() .map_err(RPCError::map_protocol("invalid next hop in route hop"))?;
.map_err(RPCError::map_protocol("invalid next hop in route hop"))?; let next_hop = decode_route_hop_data(&rhd_reader)?;
Some(decode_route_hop_data(&rhd_reader)?)
} else {
None
};
Ok(RouteHop { node, next_hop }) Ok(RouteHop { node, next_hop })
} }

View File

@ -1,15 +1,241 @@
use super::*; use super::*;
impl RPCProcessor { impl RPCProcessor {
// xxx do not process latency for routed messages async fn process_route_safety_route_hop(
&self,
route: &RPCOperationRoute,
route_hop: RouteHop,
) -> Result<(), RPCError> {
// Get next hop node ref
let next_hop_nr = match route_hop.node {
RouteNode::NodeId(id) => {
//
self.routing_table
.lookup_node_ref(id.key)
.ok_or_else(|| RPCError::network(format!("node hop {} not found", id.key)))
}
RouteNode::PeerInfo(pi) => {
//
self.routing_table
.register_node_with_signed_node_info(
RoutingDomain::PublicInternet,
pi.node_id.key,
pi.signed_node_info,
false,
)
.ok_or_else(|| {
RPCError::network(format!(
"node hop {} could not be registered",
pi.node_id.key
))
})
}
}?;
// Pass along the route
let next_hop_route = RPCOperationRoute {
safety_route: SafetyRoute {
public_key: route.safety_route.public_key,
hop_count: route.safety_route.hop_count - 1,
hops: SafetyRouteHops::Data(route_hop.next_hop),
},
operation: route.operation,
};
let next_hop_route_stmt = RPCStatement::new(RPCStatementDetail::Route(next_hop_route));
// Send the next route statement
network_result_try!(
self.statement(Destination::direct(next_hop_nr), next_hop_route_stmt)
.await?
);
Ok(())
}
async fn process_route_safety_route_private_route_hop(
&self,
route: &RPCOperationRoute,
private_route: &PrivateRoute,
) -> Result<(), RPCError> {
//
let route_hop = private_route.first_hop.unwrap();
// Pass along the route
let next_hop_route = RPCOperationRoute {
safety_route: SafetyRoute {
public_key: route.safety_route.public_key,
hop_count: 0,
hops: SafetyRouteHops::PrivateRoute(Private),
},
operation: route.operation,
};
let next_hop_route_stmt = RPCStatement::new(RPCStatementDetail::Route(next_hop_route));
// Send the next route statement
network_result_try!(
self.statement(Destination::direct(next_hop_nr), next_hop_route_stmt)
.await?
);
Ok(())
}
async fn process_routed_operation(
&self,
route: &RPCOperationRoute,
private_route: &PrivateRoute,
) -> Result<(), RPCError> {
//
Ok(())
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_route(&self, msg: RPCMessage) -> Result<(), RPCError> { pub(crate) async fn process_route(&self, msg: RPCMessage) -> Result<(), RPCError> {
// xxx do not process latency for routed messages // xxx do not process latency for routed messages
// tracing::Span::current().record("res", &tracing::field::display(res)); // tracing::Span::current().record("res", &tracing::field::display(res));
xxx continue here // Get the statement
let route = match msg.operation.kind() {
RPCOperationKind::Statement(s) => match s.detail() {
RPCStatementDetail::Route(s) => s,
_ => panic!("not a route statement"),
},
_ => panic!("not a statement"),
};
Err(RPCError::unimplemented("process_route")) // See what kind of safety route we have going on here
match &route.safety_route.hops {
// There is a safety route hop
SafetyRouteHops::Data(d) => {
// See if this is last hop in safety route, if so, we're decoding a PrivateRoute not a RouteHop
let (blob_tag, blob_data) = if let Some(b) = d.blob.last() {
(*b, &d.blob[0..d.blob.len() - 1])
} else {
return Err(RPCError::protocol("no bytes in blob"));
};
// Decrypt the blob with DEC(nonce, DH(the SR's public key, this hop's secret)
let node_id_secret = self.routing_table.node_id_secret();
let dh_secret = self
.crypto
.cached_dh(&route.safety_route.public_key, &node_id_secret)
.map_err(RPCError::protocol)?;
let dec_blob_data = Crypto::decrypt_aead(blob_data, &d.nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
let dec_blob_reader = capnp::message::Reader::new(
RPCMessageData {
contents: dec_blob_data,
},
Default::default(),
);
// Decode the blob appropriately
if blob_tag == 0 {
// PrivateRoute
let private_route = {
let pr_reader = dec_blob_reader
.get_root::<veilid_capnp::private_route::Reader>()
.map_err(RPCError::protocol)?;
decode_private_route(&pr_reader)?
};
// Make sure hop count makes sense
if route.safety_route.hop_count as usize != 0 {
return Err(RPCError::protocol(
"Safety hop count should be zero if switched to private route",
));
}
// Get the next hop node ref
if private_route.first_hop.is_some() {
// Make sure hop count makes sense
if private_route.hop_count as usize
> self.unlocked_inner.max_route_hop_count
{
return Err(RPCError::protocol(
"Private route hop count too high to process",
));
}
if private_route.hop_count == 0 {
return Err(RPCError::protocol(
"Private route hop count should not be zero if there are more hops",
));
}
// Switching to private route from safety route
self.process_route_safety_route_private_route_hop(route, &private_route)
.await?;
} else {
// Make sure hop count makes sense
if private_route.hop_count != 0 {
return Err(RPCError::protocol(
"Private route hop count should be zero if we are at the end",
));
}
// Private route was a stub, process routed operation
self.process_routed_operation(route, &private_route).await?;
}
} else if blob_tag == 1 {
// RouteHop
let route_hop = {
let rh_reader = dec_blob_reader
.get_root::<veilid_capnp::route_hop::Reader>()
.map_err(RPCError::protocol)?;
decode_route_hop(&rh_reader)?
};
// Make sure hop count makes sense
if route.safety_route.hop_count as usize
> self.unlocked_inner.max_route_hop_count
{
return Err(RPCError::protocol(
"Safety route hop count too high to process",
));
}
if route.safety_route.hop_count == 0 {
return Err(RPCError::protocol(
"Safety route hop count should not be zero if there are more hops",
));
}
self.process_route_safety_route_hop(route, route_hop)
.await?;
} else {
return Err(RPCError::protocol("invalid blob tag"));
}
}
// Safety route has ended, now do private route
SafetyRouteHops::Private(private_route) => {
if private_route.first_hop.is_some() {
// Make sure hop count makes sense
if private_route.hop_count as usize > self.unlocked_inner.max_route_hop_count {
return Err(RPCError::protocol(
"Private route hop count too high to process",
));
}
if private_route.hop_count == 0 {
return Err(RPCError::protocol(
"Private route hop count should not be zero if there are more hops",
));
}
// There are some hops left
self.process_route_safety_route_private_route_hop(route, private_route)
.await?;
} else {
// Make sure hop count makes sense
if private_route.hop_count != 0 {
return Err(RPCError::protocol(
"Private route hop count should be zero if we are at the end",
));
}
// No hops left, time to process the routed operation
self.process_routed_operation(route, private_route).await?;
}
}
}
Ok(())
} }
} }

View File

@ -30,7 +30,7 @@ impl fmt::Display for RouteNode {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RouteHop { pub struct RouteHop {
pub node: RouteNode, pub node: RouteNode,
pub next_hop: Option<RouteHopData>, pub next_hop: RouteHopData,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -81,7 +81,9 @@ impl fmt::Display for PrivateRoute {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum SafetyRouteHops { pub enum SafetyRouteHops {
/// Has >= 1 safety route hops
Data(RouteHopData), Data(RouteHopData),
/// Has 0 safety route hops
Private(PrivateRoute), Private(PrivateRoute),
} }