mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-01-11 15:29:30 -05:00
more refactor
This commit is contained in:
parent
b3c55f4a6d
commit
bb4dbb7c4a
@ -80,9 +80,15 @@ impl RouteSetSpecDetail {
|
||||
pub fn hop_count(&self) -> usize {
|
||||
self.hop_node_refs.len()
|
||||
}
|
||||
pub fn hop_node_ref(&self, idx: usize) -> Option<NodeRef> {
|
||||
self.hop_node_refs.get(idx).cloned()
|
||||
}
|
||||
pub fn get_stability(&self) -> Stability {
|
||||
self.stability
|
||||
}
|
||||
pub fn get_directions(&self) -> DirectionSet {
|
||||
self.directions
|
||||
}
|
||||
pub fn is_sequencing_match(&self, sequencing: Sequencing) -> bool {
|
||||
match sequencing {
|
||||
Sequencing::NoPreference => true,
|
||||
|
@ -730,52 +730,46 @@ impl RouteSpecStore {
|
||||
/// Don't pick any routes that have failed and haven't been tested yet
|
||||
fn first_available_route_inner<'a>(
|
||||
inner: &'a RouteSpecStoreInner,
|
||||
crypto_kind: CryptoKind,
|
||||
min_hop_count: usize,
|
||||
max_hop_count: usize,
|
||||
stability: Stability,
|
||||
sequencing: Sequencing,
|
||||
directions: DirectionSet,
|
||||
avoid_nodes: &[TypedKey],
|
||||
) -> Option<PublicKey> {
|
||||
) -> Option<RouteId> {
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
|
||||
let mut routes = Vec::new();
|
||||
|
||||
// Get all valid routes, allow routes that need testing
|
||||
// but definitely prefer routes that have been recently tested
|
||||
for detail in &inner.content.details {
|
||||
if detail.1.stability >= stability
|
||||
&& detail.1.is_sequencing_match(sequencing)
|
||||
&& detail.1.hops.len() >= min_hop_count
|
||||
&& detail.1.hops.len() <= max_hop_count
|
||||
&& detail.1.directions.is_superset(directions)
|
||||
&& !detail.1.published
|
||||
for (id, rssd) in inner.content.iter_details() {
|
||||
if rssd.get_stability() >= stability
|
||||
&& rssd.is_sequencing_match(sequencing)
|
||||
&& rssd.hop_count() >= min_hop_count
|
||||
&& rssd.hop_count() <= max_hop_count
|
||||
&& rssd.get_directions().is_superset(directions)
|
||||
&& rssd.get_route_set_keys().kinds().contains(&crypto_kind)
|
||||
&& !rssd.is_published()
|
||||
&& !rssd.contains_nodes(avoid_nodes)
|
||||
{
|
||||
let mut avoid = false;
|
||||
for h in &detail.1.hops {
|
||||
if avoid_nodes.contains(h) {
|
||||
avoid = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !avoid {
|
||||
routes.push(detail);
|
||||
}
|
||||
routes.push((id, rssd));
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the routes by preference
|
||||
routes.sort_by(|a, b| {
|
||||
let a_needs_testing = a.1.stats.needs_testing(cur_ts);
|
||||
let b_needs_testing = b.1.stats.needs_testing(cur_ts);
|
||||
let a_needs_testing = a.1.get_stats().needs_testing(cur_ts);
|
||||
let b_needs_testing = b.1.get_stats().needs_testing(cur_ts);
|
||||
if !a_needs_testing && b_needs_testing {
|
||||
return cmp::Ordering::Less;
|
||||
}
|
||||
if !b_needs_testing && a_needs_testing {
|
||||
return cmp::Ordering::Greater;
|
||||
}
|
||||
let a_latency = a.1.stats.latency_stats().average;
|
||||
let b_latency = b.1.stats.latency_stats().average;
|
||||
let a_latency = a.1.get_stats().latency_stats().average;
|
||||
let b_latency = b.1.get_stats().latency_stats().average;
|
||||
|
||||
a_latency.cmp(&b_latency)
|
||||
});
|
||||
@ -846,15 +840,21 @@ impl RouteSpecStore {
|
||||
safety_selection: SafetySelection,
|
||||
mut private_route: PrivateRoute,
|
||||
) -> EyreResult<Option<CompiledRoute>> {
|
||||
// let profile_start_ts = get_timestamp();
|
||||
|
||||
// let profile_start_ts = get_timestamp();
|
||||
let inner = &mut *self.inner.lock();
|
||||
let routing_table = self.unlocked_inner.routing_table.clone();
|
||||
let rti = &mut *routing_table.inner.write();
|
||||
|
||||
let pr_pubkey = private_route.public_key;
|
||||
// Get useful private route properties
|
||||
let crypto_kind = private_route.crypto_kind();
|
||||
let crypto = routing_table.crypto();
|
||||
let Some(vcrypto) = crypto.get(crypto_kind) else {
|
||||
bail!("crypto not supported for route");
|
||||
};
|
||||
let pr_pubkey = private_route.public_key.key;
|
||||
let pr_hopcount = private_route.hop_count as usize;
|
||||
let max_route_hop_count = self.unlocked_inner.max_route_hop_count;
|
||||
|
||||
// Check private route hop count isn't larger than the max route hop count plus one for the 'first hop' header
|
||||
if pr_hopcount > (max_route_hop_count + 1) {
|
||||
bail!("private route hop count too long");
|
||||
@ -870,12 +870,11 @@ impl RouteSpecStore {
|
||||
};
|
||||
|
||||
let opt_first_hop = match pr_first_hop_node {
|
||||
RouteNode::NodeId(id) => rti.lookup_node_ref(routing_table.clone(), id.key),
|
||||
RouteNode::NodeId(id) => rti.lookup_any_node_ref(routing_table.clone(), id),
|
||||
RouteNode::PeerInfo(pi) => rti.register_node_with_peer_info(
|
||||
routing_table.clone(),
|
||||
RoutingDomain::PublicInternet,
|
||||
pi.node_id.key,
|
||||
pi.signed_node_info.clone(),
|
||||
pi,
|
||||
false,
|
||||
),
|
||||
};
|
||||
@ -892,22 +891,23 @@ impl RouteSpecStore {
|
||||
// Return the compiled safety route
|
||||
//println!("compile_safety_route profile (stub): {} us", (get_timestamp() - profile_start_ts));
|
||||
return Ok(Some(CompiledRoute {
|
||||
safety_route: SafetyRoute::new_stub(routing_table.node_id(), private_route),
|
||||
secret: routing_table.node_id_secret(),
|
||||
safety_route: SafetyRoute::new_stub(routing_table.node_id(crypto_kind), private_route),
|
||||
secret: routing_table.node_id_secret(crypto_kind),
|
||||
first_hop,
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
// If the safety route requested is also the private route, this is a loopback test, just accept it
|
||||
let sr_pubkey = if safety_spec.preferred_route == Some(private_route.public_key) {
|
||||
let opt_private_route_id = inner.content.get_id_by_key(&pr_pubkey);
|
||||
let sr_pubkey = if opt_private_route_id.is_some() && safety_spec.preferred_route == opt_private_route_id {
|
||||
// Private route is also safety route during loopback test
|
||||
private_route.public_key
|
||||
pr_pubkey
|
||||
} else {
|
||||
let Some(avoid_node_id) = private_route.first_hop_node_id() else {
|
||||
bail!("compiled private route should have first hop");
|
||||
};
|
||||
let Some(sr_pubkey) = self.get_route_for_safety_spec_inner(inner, rti, &safety_spec, Direction::Outbound.into(), &[avoid_node_id])? else {
|
||||
let Some(sr_pubkey) = self.get_route_for_safety_spec_inner(inner, rti, crypto_kind, &safety_spec, Direction::Outbound.into(), &[avoid_node_id])? else {
|
||||
// No safety route could be found for this spec
|
||||
return Ok(None);
|
||||
};
|
||||
@ -915,28 +915,33 @@ impl RouteSpecStore {
|
||||
};
|
||||
|
||||
// Look up a few things from the safety route detail we want for the compiled route and don't borrow inner
|
||||
let (optimize, first_hop, secret) = {
|
||||
let safety_rsd = Self::detail(inner, &sr_pubkey).ok_or_else(|| eyre!("route missing"))?;
|
||||
|
||||
// We can optimize the peer info in this safety route if it has been successfully
|
||||
// communicated over either via an outbound test, or used as a private route inbound
|
||||
// and we are replying over the same route as our safety route outbound
|
||||
let optimize = safety_rsd.stats.last_tested_ts.is_some() || safety_rsd.stats.last_received_ts.is_some();
|
||||
|
||||
// Get the first hop noderef of the safety route
|
||||
let mut first_hop = safety_rsd.hop_node_refs.first().unwrap().clone();
|
||||
// Ensure sequencing requirement is set on first hop
|
||||
first_hop.set_sequencing(safety_spec.sequencing);
|
||||
|
||||
// Get the safety route secret key
|
||||
let secret = safety_rsd.secret_key;
|
||||
|
||||
(optimize, first_hop, secret)
|
||||
let Some(safety_route_id) = inner.content.get_id_by_key(&sr_pubkey) else {
|
||||
bail!("route id missing");
|
||||
};
|
||||
let Some(safety_rssd) = inner.content.get_detail(&safety_route_id) else {
|
||||
bail!("route set detail missing");
|
||||
};
|
||||
let Some(safety_rsd) = safety_rssd.get_route_by_key(&sr_pubkey) else {
|
||||
bail!("route detail missing");
|
||||
};
|
||||
|
||||
// We can optimize the peer info in this safety route if it has been successfully
|
||||
// communicated over either via an outbound test, or used as a private route inbound
|
||||
// and we are replying over the same route as our safety route outbound
|
||||
let optimize = safety_rssd.get_stats().last_tested_ts.is_some() || safety_rssd.get_stats().last_received_ts.is_some();
|
||||
|
||||
// Get the first hop noderef of the safety route
|
||||
let mut first_hop = safety_rssd.hop_node_ref(0).unwrap();
|
||||
|
||||
// Ensure sequencing requirement is set on first hop
|
||||
first_hop.set_sequencing(safety_spec.sequencing);
|
||||
|
||||
// Get the safety route secret key
|
||||
let secret = safety_rsd.secret_key;
|
||||
|
||||
// See if we have a cached route we can use
|
||||
if optimize {
|
||||
if let Some(safety_route) = self.lookup_compiled_route_cache(inner, sr_pubkey, pr_pubkey) {
|
||||
if let Some(safety_route) = inner.cache.lookup_compiled_route_cache(sr_pubkey, pr_pubkey) {
|
||||
// Build compiled route
|
||||
let compiled_route = CompiledRoute {
|
||||
safety_route,
|
||||
@ -951,8 +956,6 @@ impl RouteSpecStore {
|
||||
|
||||
// Create hops
|
||||
let hops = {
|
||||
let safety_rsd = Self::detail(inner, &sr_pubkey).ok_or_else(|| eyre!("route missing"))?;
|
||||
|
||||
// start last blob-to-encrypt data off as private route
|
||||
let mut blob_data = {
|
||||
let mut pr_message = ::capnp::message::Builder::new_default();
|
||||
@ -970,18 +973,17 @@ impl RouteSpecStore {
|
||||
// safety route and does not include the dialInfo
|
||||
// (outer hop is a RouteHopData, not a RouteHop).
|
||||
// Each loop mutates 'nonce', and 'blob_data'
|
||||
let mut nonce = Crypto::get_random_nonce();
|
||||
let crypto = routing_table.crypto();
|
||||
let mut nonce = vcrypto.random_nonce();
|
||||
// Forward order (safety route), but inside-out
|
||||
for h in (1..safety_rsd.hops.len()).rev() {
|
||||
// Get blob to encrypt for next hop
|
||||
blob_data = {
|
||||
// Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr))
|
||||
let dh_secret = crypto
|
||||
let dh_secret = vcrypto
|
||||
.cached_dh(&safety_rsd.hops[h], &safety_rsd.secret_key)
|
||||
.wrap_err("dh failed")?;
|
||||
let enc_msg_data =
|
||||
Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
|
||||
vcrypto.encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
|
||||
.wrap_err("encryption failed")?;
|
||||
|
||||
// Make route hop data
|
||||
@ -994,14 +996,14 @@ impl RouteSpecStore {
|
||||
let route_hop = RouteHop {
|
||||
node: if optimize {
|
||||
// Optimized, no peer info, just the dht key
|
||||
RouteNode::NodeId(NodeId::new(safety_rsd.hops[h]))
|
||||
RouteNode::NodeId(safety_rsd.hops[h])
|
||||
} else {
|
||||
// Full peer info, required until we are sure the route has been fully established
|
||||
let node_id = safety_rsd.hops[h];
|
||||
let node_id = TypedKey::new(safety_rsd.crypto_kind, safety_rsd.hops[h]);
|
||||
let pi = rti
|
||||
.with_node_entry(node_id, |entry| {
|
||||
entry.with(rti, |_rti, e| {
|
||||
e.make_peer_info(node_id, RoutingDomain::PublicInternet)
|
||||
e.make_peer_info(RoutingDomain::PublicInternet)
|
||||
})
|
||||
})
|
||||
.flatten();
|
||||
@ -1025,14 +1027,14 @@ impl RouteSpecStore {
|
||||
};
|
||||
|
||||
// Make another nonce for the next hop
|
||||
nonce = Crypto::get_random_nonce();
|
||||
nonce = vcrypto.random_nonce();
|
||||
}
|
||||
|
||||
// Encode first RouteHopData
|
||||
let dh_secret = crypto
|
||||
let dh_secret = vcrypto
|
||||
.cached_dh(&safety_rsd.hops[0], &safety_rsd.secret_key)
|
||||
.map_err(RPCError::map_internal("dh failed"))?;
|
||||
let enc_msg_data = Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
|
||||
let enc_msg_data = vcrypto.encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
|
||||
.map_err(RPCError::map_internal("encryption failed"))?;
|
||||
|
||||
let route_hop_data = RouteHopData {
|
||||
@ -1045,14 +1047,14 @@ impl RouteSpecStore {
|
||||
|
||||
// Build safety route
|
||||
let safety_route = SafetyRoute {
|
||||
public_key: sr_pubkey,
|
||||
public_key: TypedKey::new(crypto_kind, sr_pubkey),
|
||||
hop_count: safety_spec.hop_count as u8,
|
||||
hops,
|
||||
};
|
||||
|
||||
// Add to cache but only if we have an optimized route
|
||||
if optimize {
|
||||
self.add_to_compiled_route_cache(inner, pr_pubkey, safety_route.clone());
|
||||
inner.cache.add_to_compiled_route_cache( pr_pubkey, safety_route.clone());
|
||||
}
|
||||
|
||||
// Build compiled route
|
||||
@ -1090,7 +1092,7 @@ impl RouteSpecStore {
|
||||
// See if the preferred route is here
|
||||
if let Some(preferred_route) = safety_spec.preferred_route {
|
||||
if let Some(preferred_rssd) = inner.content.get_detail(&preferred_route) {
|
||||
// Only use the preferred route if it has the desire crypto kind
|
||||
// Only use the preferred route if it has the desired crypto kind
|
||||
if let Some(preferred_key) = preferred_rssd.get_route_set_keys().get(crypto_kind) {
|
||||
// Only use the preferred route if it doesn't contain the avoid nodes
|
||||
if !preferred_rssd.contains_nodes(avoid_nodes) {
|
||||
@ -1101,8 +1103,9 @@ impl RouteSpecStore {
|
||||
}
|
||||
|
||||
// Select a safety route from the pool or make one if we don't have one that matches
|
||||
let sr_pubkey = if let Some(sr_pubkey) = Self::first_available_route_inner(
|
||||
let sr_route_id = if let Some(sr_route_id) = Self::first_available_route_inner(
|
||||
inner,
|
||||
crypto_kind,
|
||||
safety_spec.hop_count,
|
||||
safety_spec.hop_count,
|
||||
safety_spec.stability,
|
||||
@ -1111,14 +1114,14 @@ impl RouteSpecStore {
|
||||
avoid_nodes,
|
||||
) {
|
||||
// Found a route to use
|
||||
sr_pubkey
|
||||
sr_route_id
|
||||
} else {
|
||||
// No route found, gotta allocate one
|
||||
let sr_pubkey = match self
|
||||
let Some(sr_route_id) = self
|
||||
.allocate_route_inner(
|
||||
inner,
|
||||
rti,
|
||||
crypto_kind, ???
|
||||
&[crypto_kind],
|
||||
safety_spec.stability,
|
||||
safety_spec.sequencing,
|
||||
safety_spec.hop_count,
|
||||
@ -1126,12 +1129,14 @@ impl RouteSpecStore {
|
||||
avoid_nodes,
|
||||
)
|
||||
.map_err(RPCError::internal)?
|
||||
{
|
||||
Some(pk) => pk,
|
||||
None => return Ok(None),
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
sr_pubkey
|
||||
sr_route_id
|
||||
};
|
||||
|
||||
let sr_pubkey = inner.content.get_detail(&sr_route_id).unwrap().get_route_set_keys().get(crypto_kind).unwrap().key;
|
||||
|
||||
Ok(Some(sr_pubkey))
|
||||
}
|
||||
|
||||
@ -1366,7 +1371,6 @@ impl RouteSpecStore {
|
||||
pub fn get_route_id_for_key(&self, key: &PublicKey) -> Option<RouteId>
|
||||
{
|
||||
let inner = &mut *self.inner.lock();
|
||||
|
||||
// Check for local route
|
||||
if let Some(id) = inner.content.get_id_by_key(key) {
|
||||
return Some(id);
|
||||
|
@ -162,9 +162,9 @@ impl RPCProcessor {
|
||||
}
|
||||
SafetySelection::Safe(safety_spec) => {
|
||||
// Sent directly but with a safety route, respond to private route
|
||||
let ck = target.best_node_id().kind;
|
||||
let crypto_kind = target.best_node_id().kind;
|
||||
let Some(pr_key) = rss
|
||||
.get_private_route_for_safety_spec(ck, safety_spec, &target.node_ids())
|
||||
.get_private_route_for_safety_spec(crypto_kind, safety_spec, &target.node_ids())
|
||||
.map_err(RPCError::internal)? else {
|
||||
return Ok(NetworkResult::no_connection_other("no private route for response at this time"));
|
||||
};
|
||||
@ -188,12 +188,12 @@ impl RPCProcessor {
|
||||
}
|
||||
SafetySelection::Safe(safety_spec) => {
|
||||
// Sent via a relay but with a safety route, respond to private route
|
||||
let ck = target.best_node_id().kind;
|
||||
let crypto_kind = target.best_node_id().kind;
|
||||
|
||||
let mut avoid_nodes = relay.node_ids();
|
||||
avoid_nodes.add_all(&target.node_ids());
|
||||
let Some(pr_key) = rss
|
||||
.get_private_route_for_safety_spec(ck, safety_spec, &avoid_nodes)
|
||||
.get_private_route_for_safety_spec(crypto_kind, safety_spec, &avoid_nodes)
|
||||
.map_err(RPCError::internal)? else {
|
||||
return Ok(NetworkResult::no_connection_other("no private route for response at this time"));
|
||||
};
|
||||
@ -246,7 +246,7 @@ impl RPCProcessor {
|
||||
|
||||
// Check for loopback test
|
||||
let opt_private_route_id = rss.get_route_id_for_key(&private_route.public_key.key);
|
||||
let pr_key = if safety_spec.preferred_route == opt_private_route_id
|
||||
let pr_key = if opt_private_route_id.is_some() && safety_spec.preferred_route == opt_private_route_id
|
||||
{
|
||||
// Private route is also safety route during loopback test
|
||||
private_route.public_key.key
|
||||
|
@ -192,13 +192,13 @@ impl RPCProcessor {
|
||||
&routed_operation.signatures,
|
||||
&routed_operation.data,
|
||||
sender_id,
|
||||
|rsd| {
|
||||
|rssd, rsd| {
|
||||
(
|
||||
rsd.secret_key,
|
||||
SafetySpec {
|
||||
preferred_route: Some(pr_pubkey),
|
||||
hop_count: rsd.hop_count(),
|
||||
stability: rsd.get_stability(),
|
||||
preferred_route: rss.get_route_id_for_key(&pr_pubkey),
|
||||
hop_count: rssd.hop_count(),
|
||||
stability: rssd.get_stability(),
|
||||
sequencing: routed_operation.sequencing,
|
||||
},
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user