more private route work

This commit is contained in:
John Smith 2023-02-24 21:02:24 -05:00
parent 4823c979ab
commit 7962d3fe11
11 changed files with 810 additions and 623 deletions

View File

@ -0,0 +1,35 @@
use super::*;
mod remote_private_route_info;
mod route_set_spec_detail;
mod route_spec_store;
mod route_spec_store_cache;
mod route_spec_store_content;
mod route_stats;
pub use remote_private_route_info::*;
pub use route_set_spec_detail::*;
pub use route_spec_store::*;
pub use route_spec_store_cache::*;
pub use route_spec_store_content::*;
pub use route_stats::*;
use crate::veilid_api::*;
use rkyv::{
with::Skip, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
};
/// The size of the remote private route cache
const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024;
/// Remote private route cache entries expire in 5 minutes if they haven't been used
const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: TimestampDuration = TimestampDuration::new(300_000_000u64);
/// Amount of time a route can remain idle before it gets tested
const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000;
/// The size of the compiled route cache
const COMPILED_ROUTE_CACHE_SIZE: usize = 256;
/// The type of an allocated route set id
pub type RouteSetSpecId = String;
/// Type type of an imported remote route set id
pub type RemotePrivateRouteId = String;

View File

@ -0,0 +1,51 @@
use super::*;
/// What remote private routes have seen
#[derive(Debug, Clone, Default)]
pub struct RemotePrivateRouteInfo {
/// The private routes themselves
private_routes: Vec<PrivateRoute>,
/// Did this remote private route see our node info due to no safety route in use
last_seen_our_node_info_ts: Timestamp,
/// Last time this remote private route was requested for any reason (cache expiration)
last_touched_ts: Timestamp,
/// Stats
stats: RouteStats,
}
impl RemotePrivateRouteInfo {
pub fn new(private_routes: Vec<PrivateRoute>, cur_ts: Timestamp) -> Self {
RemotePrivateRouteInfo {
private_routes,
last_seen_our_node_info_ts: Timestamp::new(0),
last_touched_ts: cur_ts,
stats: RouteStats::new(cur_ts),
}
}
pub fn get_private_routes(&self) -> &[PrivateRoute] {
&self.private_routes
}
pub fn get_stats(&self) -> &RouteStats {
&self.stats
}
pub fn get_stats_mut(&mut self) -> &mut RouteStats {
&mut self.stats
}
// Check to see if this remote private route has expired
pub fn did_expire(&self, cur_ts: Timestamp) -> bool {
cur_ts.saturating_sub(self.last_touched_ts) >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY
}
/// Start fresh if this had expired
pub fn unexpire(&mut self, cur_ts: Timestamp) {
self.last_seen_our_node_info_ts = Timestamp::new(0);
self.last_touched_ts = cur_ts;
self.stats = RouteStats::new(cur_ts);
}
/// Note when this was last used
pub fn touch(&mut self, cur_ts: Timestamp) {
self.last_touched_ts = cur_ts;
}
}

View File

@ -0,0 +1,112 @@
use super::*;
#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct RouteSpecDetail {
/// Crypto kind
pub crypto_kind: CryptoKind,
/// Secret key
#[with(Skip)]
pub secret_key: SecretKey,
/// Route hops (node id keys)
pub hops: Vec<PublicKey>,
}
#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct RouteSetSpecDetail {
/// Route set per crypto kind
route_set: BTreeMap<PublicKey, RouteSpecDetail>,
/// Route noderefs
#[with(Skip)]
hop_node_refs: Vec<NodeRef>,
/// Published private route, do not reuse for ephemeral routes
/// Not serialized because all routes should be re-published when restarting
#[with(Skip)]
published: bool,
/// Directions this route is guaranteed to work in
#[with(RkyvEnumSet)]
directions: DirectionSet,
/// Stability preference (prefer reliable nodes over faster)
stability: Stability,
/// Sequencing capability (connection oriented protocols vs datagram)
can_do_sequenced: bool,
/// Stats
stats: RouteStats,
}
impl RouteSetSpecDetail {
pub fn get_route_by_key(&self, key: &PublicKey) -> Option<&RouteSpecDetail> {
self.route_set.get(key)
}
pub fn get_route_by_key_mut(&mut self, key: &PublicKey) -> Option<&mut RouteSpecDetail> {
self.route_set.get_mut(key)
}
pub fn get_route_set_keys(&self) -> TypedKeySet {
let mut tks = TypedKeySet::new();
for (k, v) in &self.route_set {
tks.add(TypedKey::new(v.crypto_kind, *k));
}
tks
}
pub fn iter_route_set(
&self,
) -> alloc::collections::btree_map::Iter<PublicKey, RouteSpecDetail> {
self.route_set.iter()
}
pub fn get_stats(&self) -> &RouteStats {
&self.stats
}
pub fn get_stats_mut(&mut self) -> &mut RouteStats {
&mut self.stats
}
pub fn is_published(&self) -> bool {
self.published
}
pub fn set_published(&mut self, published: bool) {
self.published = self.published;
}
pub fn hop_count(&self) -> usize {
self.hop_node_refs.len()
}
pub fn get_stability(&self) -> Stability {
self.stability
}
pub fn is_sequencing_match(&self, sequencing: Sequencing) -> bool {
match sequencing {
Sequencing::NoPreference => true,
Sequencing::PreferOrdered => true,
Sequencing::EnsureOrdered => self.can_do_sequenced,
}
}
/// Generate a key for the cache that can be used to uniquely identify this route's contents
pub fn make_cache_key(&self) -> Vec<u8> {
let hops = &self.hop_node_refs;
let mut cache: Vec<u8> = Vec::with_capacity(hops.len() * PUBLIC_KEY_LENGTH);
for hop in hops {
cache.extend_from_slice(&hop.best_node_id().key.bytes);
}
cache
}
/// Generate a user-facing identifier for this allocated route
pub fn make_id(&self) -> RouteSetSpecId {
let mut idbytes = [0u8; 16];
for (pk, _) in self.route_set.iter() {
for (i, x) in pk.bytes.iter().enumerate() {
idbytes[i % 16] ^= *x;
}
}
let id = format!(
"{:08x}-{:04x}-{:04x}-{:04x}-{:08x}{:04x}",
u32::from_be_bytes(idbytes[0..4].try_into().expect("32 bits")),
u16::from_be_bytes(idbytes[4..6].try_into().expect("16 bits")),
u16::from_be_bytes(idbytes[6..8].try_into().expect("16 bits")),
u16::from_be_bytes(idbytes[8..10].try_into().expect("16 bits")),
u32::from_be_bytes(idbytes[10..14].try_into().expect("32 bits")),
u16::from_be_bytes(idbytes[14..16].try_into().expect("16 bits"))
);
id
}
}

View File

@ -1,371 +1,4 @@
use super::*;
use crate::veilid_api::*;
use rkyv::{
with::Skip, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
};
/// The size of the remote private route cache
const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024;
/// Remote private route cache entries expire in 5 minutes if they haven't been used
const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: TimestampDuration = TimestampDuration::new(300_000_000u64);
/// Amount of time a route can remain idle before it gets tested
const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000;
/// The size of the compiled route cache
const COMPILED_ROUTE_CACHE_SIZE: usize = 256;
// Compiled route key for caching
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
struct CompiledRouteCacheKey {
sr_pubkey: PublicKey,
pr_pubkey: PublicKey,
}
/// Compiled route (safety route + private route)
#[derive(Clone, Debug)]
pub struct CompiledRoute {
/// The safety route attached to the private route
pub safety_route: SafetyRoute,
/// The secret used to encrypt the message payload
pub secret: SecretKey,
/// The node ref to the first hop in the compiled route
pub first_hop: NodeRef,
}
#[derive(Clone, Debug, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct RouteStats {
/// Consecutive failed to send count
#[with(Skip)]
pub failed_to_send: u32,
/// Questions lost
#[with(Skip)]
pub questions_lost: u32,
/// Timestamp of when the route was created
pub created_ts: Timestamp,
/// Timestamp of when the route was last checked for validity
#[with(Skip)]
pub last_tested_ts: Option<Timestamp>,
/// Timestamp of when the route was last sent to
#[with(Skip)]
pub last_sent_ts: Option<Timestamp>,
/// Timestamp of when the route was last received over
#[with(Skip)]
pub last_received_ts: Option<Timestamp>,
/// Transfers up and down
pub transfer_stats_down_up: TransferStatsDownUp,
/// Latency stats
pub latency_stats: LatencyStats,
/// Accounting mechanism for this route's RPC latency
#[with(Skip)]
latency_stats_accounting: LatencyStatsAccounting,
/// Accounting mechanism for the bandwidth across this route
#[with(Skip)]
transfer_stats_accounting: TransferStatsAccounting,
}
impl RouteStats {
/// Make new route stats
pub fn new(created_ts: Timestamp) -> Self {
Self {
created_ts,
..Default::default()
}
}
/// Mark a route as having failed to send
pub fn record_send_failed(&mut self) {
self.failed_to_send += 1;
}
/// Mark a route as having lost a question
pub fn record_question_lost(&mut self) {
self.questions_lost += 1;
}
/// Mark a route as having received something
pub fn record_received(&mut self, cur_ts: Timestamp, bytes: ByteCount) {
self.last_received_ts = Some(cur_ts);
self.last_tested_ts = Some(cur_ts);
self.transfer_stats_accounting.add_down(bytes);
}
/// Mark a route as having been sent to
pub fn record_sent(&mut self, cur_ts: Timestamp, bytes: ByteCount) {
self.last_sent_ts = Some(cur_ts);
self.transfer_stats_accounting.add_up(bytes);
}
/// Mark a route as having been sent to
pub fn record_latency(&mut self, latency: TimestampDuration) {
self.latency_stats = self.latency_stats_accounting.record_latency(latency);
}
/// Mark a route as having been tested
pub fn record_tested(&mut self, cur_ts: Timestamp) {
self.last_tested_ts = Some(cur_ts);
// Reset question_lost and failed_to_send if we test clean
self.failed_to_send = 0;
self.questions_lost = 0;
}
/// Roll transfers for these route stats
pub fn roll_transfers(&mut self, last_ts: Timestamp, cur_ts: Timestamp) {
self.transfer_stats_accounting.roll_transfers(
last_ts,
cur_ts,
&mut self.transfer_stats_down_up,
)
}
/// Get the latency stats
pub fn latency_stats(&self) -> &LatencyStats {
&self.latency_stats
}
/// Get the transfer stats
pub fn transfer_stats(&self) -> &TransferStatsDownUp {
&self.transfer_stats_down_up
}
/// Reset stats when network restarts
pub fn reset(&mut self) {
self.last_tested_ts = None;
self.last_sent_ts = None;
self.last_received_ts = None;
}
/// Check if a route needs testing
pub fn needs_testing(&self, cur_ts: Timestamp) -> bool {
// Has the route had any failures lately?
if self.questions_lost > 0 || self.failed_to_send > 0 {
// If so, always test
return true;
}
// Has the route been tested within the idle time we'd want to check things?
// (also if we've received successfully over the route, this will get set)
if let Some(last_tested_ts) = self.last_tested_ts {
if cur_ts.saturating_sub(last_tested_ts)
> TimestampDuration::new(ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64)
{
return true;
}
} else {
// If this route has never been tested, it needs to be
return true;
}
false
}
}
#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct RouteSpecDetail {
/// Crypto kind
pub crypto_kind: CryptoKind,
/// Secret key
#[with(Skip)]
pub secret_key: SecretKey,
/// Route hops (node id keys)
pub hops: Vec<PublicKey>,
}
#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct RouteSetSpecDetail {
/// Route set per crypto kind
route_set: BTreeMap<PublicKey, RouteSpecDetail>,
/// Route noderefs
#[with(Skip)]
hop_node_refs: Vec<NodeRef>,
/// Published private route, do not reuse for ephemeral routes
/// Not serialized because all routes should be re-published when restarting
#[with(Skip)]
published: bool,
/// Directions this route is guaranteed to work in
#[with(RkyvEnumSet)]
directions: DirectionSet,
/// Stability preference (prefer reliable nodes over faster)
stability: Stability,
/// Sequencing capability (connection oriented protocols vs datagram)
can_do_sequenced: bool,
/// Stats
stats: RouteStats,
}
impl RouteSetSpecDetail {
pub fn get_route_by_key(&self, key: PublicKey) -> Option<&RouteSpecDetail> {
self.route_set.get(&key)
}
pub fn get_route_by_key_mut(&mut self, key: PublicKey) -> Option<&mut RouteSpecDetail> {
self.route_set.get_mut(&key)
}
pub fn get_route_set_keys(&self) -> TypedKeySet {
let mut tks = TypedKeySet::new();
for (k, v) in &self.route_set {
tks.add(TypedKey::new(v.crypto_kind, *k));
}
tks
}
pub fn get_stats(&self) -> &RouteStats {
&self.stats
}
pub fn get_stats_mut(&mut self) -> &mut RouteStats {
&mut self.stats
}
pub fn is_published(&self) -> bool {
self.published
}
pub fn hop_count(&self) -> usize {
self.hop_node_refs.len()
}
pub fn get_stability(&self) -> Stability {
self.stability
}
pub fn is_sequencing_match(&self, sequencing: Sequencing) -> bool {
match sequencing {
Sequencing::NoPreference => true,
Sequencing::PreferOrdered => true,
Sequencing::EnsureOrdered => {
self.can_do_sequenced
}
}
}
}
/// The core representation of the RouteSpecStore that can be serialized
#[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C, align(8)), derive(CheckBytes))]
pub struct RouteSpecStoreContent {
/// All of the route sets we have allocated so far indexed by key
id_by_key: HashMap<PublicKey, String>,
/// All of the route sets we have allocated so far
details: HashMap<String, RouteSetSpecDetail>,
}
impl RouteSpecStoreContent {
pub fn add_detail(&mut self, detail: RouteSetSpecDetail) -> String {
// generate unique key string
let mut idbytes = [0u8; 16];
for (pk, _) in &detail.route_set {
for (i, x) in pk.bytes.iter().enumerate() {
idbytes[i % 16] ^= *x;
}
}
let id = format!("{:08x}-{:04x}-{:04x}-{:04x}-{:08x}{:04x}",
u32::from_be_bytes(idbytes[0..4].try_into().expect("32 bits")),
u16::from_be_bytes(idbytes[4..6].try_into().expect("16 bits")),
u16::from_be_bytes(idbytes[6..8].try_into().expect("16 bits")),
u16::from_be_bytes(idbytes[8..10].try_into().expect("16 bits")),
u32::from_be_bytes(idbytes[10..14].try_into().expect("32 bits")),
u16::from_be_bytes(idbytes[14..16].try_into().expect("16 bits")));
// also store in id by key table
for (pk, _) in &detail.route_set {
self.id_by_key.insert(*pk, id.clone());
}
self.details.insert(id.clone(), detail);
id
}
pub fn remove_detail(&mut self, id: &String) {
let detail = self.details.remove(id).unwrap();
for (pk, _) in &detail.route_set {
self.id_by_key.remove(&pk).unwrap();
}
}
pub fn get_detail(&self, id: &String) -> Option<&RouteSetSpecDetail> {
self.details.get(id)
}
pub fn get_detail_mut(&mut self, id: &String) -> Option<&mut RouteSetSpecDetail> {
self.details.get_mut(id)
}
pub fn get_id_by_key(&self, key: &PublicKey) -> Option<String> {
self.id_by_key.get(key).cloned()
}
}
/// What remote private routes have seen
#[derive(Debug, Clone, Default)]
pub struct RemotePrivateRouteInfo {
/// The private routes themselves
private_routes: HashMap<PublicKey, PrivateRoute>,
/// Did this remote private route see our node info due to no safety route in use
last_seen_our_node_info_ts: Timestamp,
/// Last time this remote private route was requested for any reason (cache expiration)
last_touched_ts: Timestamp,
/// Stats
stats: RouteStats,
}
impl RemotePrivateRouteInfo {
pub fn get_stats(&self) -> &RouteStats {
&self.stats
}
pub fn get_stats_mut(&mut self) -> &mut RouteStats {
&mut self.stats
}
}
/// Ephemeral data used to help the RouteSpecStore operate efficiently
#[derive(Debug)]
pub struct RouteSpecStoreCache {
/// How many times nodes have been used
used_nodes: HashMap<TypedKey, usize>,
/// How many times nodes have been used at the terminal point of a route
used_end_nodes: HashMap<TypedKey, usize>,
/// Route spec hop cache, used to quickly disqualify routes
hop_cache: HashSet<Vec<u8>>,
/// Remote private routes we've imported and statistics
remote_private_route_set_cache: LruCache<String, RemotePrivateRouteInfo>,
/// Remote private routes indexed by public key
remote_private_routes_by_key: HashMap<PublicKey, String>,
/// Compiled route cache
compiled_route_cache: LruCache<CompiledRouteCacheKey, SafetyRoute>,
/// List of dead allocated routes
dead_routes: Vec<PublicKey>,
/// List of dead remote routes
dead_remote_routes: Vec<PublicKey>,
}
impl RouteSpecStoreCache {
pub fn get_used_node_count(&self, node_ids: &TypedKeySet) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self
.used_nodes
.get(&k)
.cloned()
.unwrap_or_default()
})
}
pub fn get_used_end_node_count(&self, node_ids: &TypedKeySet) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self
.used_end_nodes
.get(&k)
.cloned()
.unwrap_or_default()
})
}
}
impl Default for RouteSpecStoreCache {
fn default() -> Self {
Self {
used_nodes: Default::default(),
used_end_nodes: Default::default(),
hop_cache: Default::default(),
remote_private_route_set_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE),
remote_private_routes_by_key: HashMap::new(),
compiled_route_cache: LruCache::new(COMPILED_ROUTE_CACHE_SIZE),
dead_routes: Default::default(),
dead_remote_routes: Default::default(),
}
}
}
#[derive(Debug)]
pub struct RouteSpecStoreInner {
@ -400,23 +33,6 @@ pub struct RouteSpecStore {
unlocked_inner: Arc<RouteSpecStoreUnlockedInner>,
}
fn route_hops_to_hop_cache(hops: &[NodeRef]) -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(hops.len() * PUBLIC_KEY_LENGTH);
for hop in hops {
cache.extend_from_slice(&hop.best_node_id().key.bytes);
}
cache
}
/// get the hop cache key for a particular route permutation
fn route_permutation_to_hop_cache(rti: &RoutingTableInner, nodes: &[NodeRef], perm: &[usize]) -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(perm.len() * PUBLIC_KEY_LENGTH);
for n in perm {
cache.extend_from_slice(&nodes[*n].locked(rti).best_node_id().key.bytes)
}
cache
}
/// number of route permutations is the number of unique orderings
/// for a set of nodes, given that the first node is fixed
fn _get_route_permutation_count(hop_count: usize) -> usize {
@ -526,7 +142,7 @@ impl RouteSpecStore {
// Look up all route hop noderefs since we can't serialize those
let mut dead_ids = Vec::new();
for (rsid, rssd) in &mut content.details {
for (rsid, rssd) in content.iter_details_mut() {
// Get first route since they all should resolve
let Some((pk, rsd)) = rssd.route_set.first_key_value() else {
dead_ids.push(rsid.clone());
@ -555,7 +171,7 @@ impl RouteSpecStore {
// Ensure we got secret keys for all the public keys
let mut got_secret_key_ids = HashSet::new();
for (rsid, rssd) in &mut content.details {
for (rsid, rssd) in content.iter_details_mut() {
let mut found_all = true;
for (pk, rsd) in &mut rssd.route_set {
if let Some(sk) = secret_key_map.get(pk) {
@ -571,7 +187,7 @@ impl RouteSpecStore {
}
// If we missed any, nuke those route ids
let dead_ids:Vec<String> = content.details.keys().filter_map(|id| {
let dead_ids:Vec<String> = content.keys().filter_map(|id| {
if !got_secret_key_ids.contains(id) {
Some(id.clone())
} else {
@ -589,8 +205,11 @@ impl RouteSpecStore {
};
// Rebuild the routespecstore cache
Self::rebuild_cache(&mut inner);
for (_, rssd) in inner.content.iter_details() {
inner.cache.add_to_cache(&rssd);
}
// Return the loaded RouteSpecStore
let rss = RouteSpecStore {
unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner {
max_route_hop_count,
@ -603,6 +222,7 @@ impl RouteSpecStore {
Ok(rss)
}
#[instrument(level = "trace", skip(self), err)]
pub async fn save(&self) -> EyreResult<()> {
let content = {
@ -628,7 +248,7 @@ impl RouteSpecStore {
.protected_store();
let mut out: HashMap<PublicKey, SecretKey> = HashMap::new();
for (rsid, rssd) in &content.details {
for (rsid, rssd) in content.iter_details() {
for (pk, rsd) in &rssd.route_set {
out.insert(*pk, rsd.secret_key);
}
@ -641,17 +261,9 @@ impl RouteSpecStore {
#[instrument(level = "trace", skip(self))]
pub fn send_route_update(&self) {
let update_callback = self.unlocked_inner.routing_table.update_callback();
let (dead_routes, dead_remote_routes) = {
let mut inner = self.inner.lock();
if inner.cache.dead_routes.is_empty() && inner.cache.dead_remote_routes.is_empty() {
// Nothing to do
return;
}
let dead_routes = core::mem::take(&mut inner.cache.dead_routes);
let dead_remote_routes = core::mem::take(&mut inner.cache.dead_remote_routes);
(dead_routes, dead_remote_routes)
inner.cache.take_dead_routes()
};
let update = VeilidUpdate::Route(VeilidStateRoute {
@ -659,35 +271,11 @@ impl RouteSpecStore {
dead_remote_routes,
});
let update_callback = self.unlocked_inner.routing_table.update_callback();
update_callback(update);
}
fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec<u8>, rssd: &RouteSetSpecDetail) {
if !cache.hop_cache.insert(cache_key) {
panic!("route should never be inserted twice");
}
for (pk, rsd) in &rssd.route_set {
for h in &rsd.hops {
cache
.used_nodes
.entry(TypedKey::new(rsd.crypto_kind, *h))
.and_modify(|e| *e += 1)
.or_insert(1);
}
cache
.used_end_nodes
.entry(TypedKey::new(rsd.crypto_kind, *rsd.hops.last().unwrap()))
.and_modify(|e| *e += 1)
.or_insert(1);
}
}
fn rebuild_cache(inner: &mut RouteSpecStoreInner) {
for rssd in inner.content.details.values() {
let cache_key = route_hops_to_hop_cache(&rssd.hop_node_refs);
Self::add_to_cache(&mut inner.cache, cache_key, &rssd);
}
}
/// Purge the route spec store
pub async fn purge(&self) -> EyreResult<()> {
@ -918,11 +506,20 @@ impl RouteSpecStore {
// Now go through nodes and try to build a route we haven't seen yet
let perm_func = Box::new(|permutation: &[usize]| {
// Get the route cache key
/// Get the hop cache key for a particular route permutation
/// uses the same algorithm as RouteSetSpecDetail::make_cache_key
fn route_permutation_to_hop_cache(rti: &RoutingTableInner, nodes: &[NodeRef], perm: &[usize]) -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(perm.len() * PUBLIC_KEY_LENGTH);
for n in perm {
cache.extend_from_slice(&nodes[*n].locked(rti).best_node_id().key.bytes)
}
cache
}
let cache_key = route_permutation_to_hop_cache(rti, &nodes, permutation);
// Skip routes we have already seen
if inner.cache.hop_cache.contains(&cache_key) {
if inner.cache.contains_route(&cache_key) {
return None;
}
@ -1020,18 +617,16 @@ impl RouteSpecStore {
}
// Keep this route
let route_nodes = permutation.to_vec();
Some((route_nodes, cache_key, can_do_sequenced))
Some((route_nodes, can_do_sequenced))
}) as PermFunc;
let mut route_nodes: Vec<usize> = Vec::new();
let mut cache_key: Vec<u8> = Vec::new();
let mut can_do_sequenced: bool = true;
for start in 0..(nodes.len() - hop_count) {
// Try the permutations available starting with 'start'
if let Some((rn, ck, cds)) = with_route_permutations(hop_count, start, &perm_func) {
route_nodes = rn;
cache_key = ck;
can_do_sequenced = cds;
break;
}
@ -1072,7 +667,7 @@ impl RouteSpecStore {
drop(perm_func);
// Add to cache
Self::add_to_cache(&mut inner.cache, cache_key, &rssd);
inner.cache.add_to_cache(&rssd);
// Keep route in spec store
let id = inner.content.add_detail(rssd);
@ -1108,7 +703,7 @@ impl RouteSpecStore {
log_rpc!(debug "route detail does not exist: {:?}", rsid);
return None;
};
let Some(rsd) = rssd.route_set.get(&public_key.key) else {
let Some(rsd) = rssd.get_route_by_key(&public_key.key) else {
log_rpc!(debug "route set {:?} does not have key: {:?}", rsid, public_key.key);
return None;
};
@ -1145,14 +740,21 @@ impl RouteSpecStore {
async fn test_allocated_route(&self, id: &String) -> EyreResult<bool> {
// Make loopback route to test with
let dest = {
xxx figure out how to pick best crypto for the private route
let private_route = self.assemble_private_route(id, None)?;
let inner = &mut *self.inner.lock();
let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?;
// Get best route from set
// Match the private route's hop length for safety route length
let hop_count = rsd.hops.len();
let (key, hop_count) = {
let inner = &mut *self.inner.lock();
let Some(rssd) = inner.content.get_detail(id) else {
bail!("route id not allocated");
};
let Some(tkey) = rssd.get_route_set_keys().best() else {
bail!("route does not have best key");
};
(tkey.key, rssd.hop_count())
};
let private_route = self.assemble_private_route(&key, None)?;
// Always test routes with safety routes that are more likely to succeed
let stability = Stability::Reliable;
// Routes can test with whatever sequencing they were allocated with
@ -1186,11 +788,14 @@ impl RouteSpecStore {
}
#[instrument(level = "trace", skip(self), ret, err)]
async fn test_remote_route(&self, key: &TypedKey) -> EyreResult<bool> {
async fn test_remote_route(&self, id: &String) -> EyreResult<bool> {
// Make private route test
let dest = {
// Get best remote route from imported set
// Get the route to test
let private_route = match self.peek_remote_private_route(key) {
let private_route = match self.peek_remote_private_route(id) {
Some(pr) => pr,
None => return Ok(false),
};
@ -1241,72 +846,39 @@ impl RouteSpecStore {
/// Release an allocated route that is no longer in use
#[instrument(level = "trace", skip(self), ret)]
fn release_allocated_route(&self, public_key: &PublicKey) -> bool {
fn release_allocated_route(&self, id: &String) -> bool {
let mut inner = self.inner.lock();
let Some(detail) = inner.content.details.remove(public_key) else {
let Some(rssd) = inner.content.remove_detail(id) else {
return false;
};
// Mark it as dead for the update
inner.cache.dead_routes.push(*public_key);
// Remove from hop cache
let cache_key = route_hops_to_hop_cache(&detail.hops);
if !inner.cache.hop_cache.remove(&cache_key) {
if !inner.cache.remove_from_cache(&rssd) {
panic!("hop cache should have contained cache key");
}
// Remove from used nodes cache
for h in &detail.hops {
match inner.cache.used_nodes.entry(*h) {
std::collections::hash_map::Entry::Occupied(mut o) => {
*o.get_mut() -= 1;
if *o.get() == 0 {
o.remove();
}
}
std::collections::hash_map::Entry::Vacant(_) => {
panic!("used_nodes cache should have contained hop");
}
}
}
// Remove from end nodes cache
match inner
.cache
.used_end_nodes
.entry(*detail.hops.last().unwrap())
{
std::collections::hash_map::Entry::Occupied(mut o) => {
*o.get_mut() -= 1;
if *o.get() == 0 {
o.remove();
}
}
std::collections::hash_map::Entry::Vacant(_) => {
panic!("used_end_nodes cache should have contained hop");
}
}
true
}
/// Release an allocated or remote route that is no longer in use
#[instrument(level = "trace", skip(self), ret)]
pub fn release_route(&self, key: &PublicKey) -> bool {
pub fn release_route(&self, id: &String) -> bool {
let is_remote = {
let inner = &mut *self.inner.lock();
// Release from compiled route cache if it's used there
self.invalidate_compiled_route_cache(inner, key);
self.invalidate_compiled_route_cache(inner, id);
// Check to see if this is a remote route
let cur_ts = get_aligned_timestamp();
Self::with_peek_remote_private_route(inner, cur_ts, key, |_| {}).is_some()
Self::with_peek_remote_private_route(inner, cur_ts, id, |_| {}).is_some()
};
if is_remote {
self.release_remote_private_route(key)
self.release_remote_private_route(id)
} else {
self.release_allocated_route(key)
self.release_allocated_route(id)
}
}
@ -1869,14 +1441,16 @@ impl RouteSpecStore {
}
/// Import a remote private route for compilation
/// returns a route set id
#[instrument(level = "trace", skip(self, blob), ret, err)]
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> EyreResult<TypedKeySet> {
// decode the pr blob
let private_routes = RouteSpecStore::blob_to_private_routes(blob)?;
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> EyreResult<String> {
// decode the pr blob
let private_routes = RouteSpecStore::blob_to_private_routes(self.unlocked_inner.routing_table.crypto(), blob)?;
let mut out = TypedKeySet::new();
let inner = &mut *self.inner.lock();
// validate the private routes
for private_route in private_routes {
// ensure private route has first hop
@ -1885,37 +1459,26 @@ impl RouteSpecStore {
}
// ensure this isn't also an allocated route
if Self::detail(inner, &private_route.public_key.key).is_some() {
if inner.content.get_id_by_key(&private_route.public_key.key).is_some() {
bail!("should not import allocated route");
}
// store the private route in our cache
let cur_ts = get_aligned_timestamp();
let key = Self::with_create_remote_private_route(inner, cur_ts, private_route, |r| {
r.private_route.as_ref().unwrap().public_key.clone()
});
out.add(key);
}
Ok(out)
let cur_ts = get_aligned_timestamp();
let id = inner.cache.import_remote_private_route(cur_ts, private_routes);
Ok(id)
}
/// Release a remote private route that is no longer in use
#[instrument(level = "trace", skip(self), ret)]
fn release_remote_private_route(&self, key: &PublicKey) -> bool {
fn release_remote_private_route(&self, id: &String) -> bool {
let inner = &mut *self.inner.lock();
if inner.cache.remote_private_route_cache.remove(key).is_some() {
// Mark it as dead for the update
inner.cache.dead_remote_routes.push(*key);
true
} else {
false
}
inner.cache.remove_remote_private_route(id)
}
/// Retrieve an imported remote private route by its public key
pub fn get_remote_private_route(&self, key: &PublicKey) -> Option<PrivateRoute> {
pub fn get_remote_private_route(&self, id: &String) -> Option<PrivateRoute> {
let inner = &mut *self.inner.lock();
let cur_ts = get_aligned_timestamp();
Self::with_get_remote_private_route(inner, cur_ts, key, |r| {
@ -1924,7 +1487,8 @@ impl RouteSpecStore {
}
/// Retrieve an imported remote private route by its public key but don't 'touch' it
pub fn peek_remote_private_route(&self, key: &PublicKey) -> Option<PrivateRoute> {
pub fn peek_remote_private_route(&self, id: &String) -> Option<PrivateRoute> {
xx fix these
let inner = &mut *self.inner.lock();
let cur_ts = get_aligned_timestamp();
Self::with_peek_remote_private_route(inner, cur_ts, key, |r| {
@ -1932,99 +1496,6 @@ impl RouteSpecStore {
})
}
// get or create a remote private route cache entry
fn with_create_remote_private_route<F, R>(
inner: &mut RouteSpecStoreInner,
cur_ts: Timestamp,
private_route: PrivateRoute,
f: F,
) -> R
where
F: FnOnce(&mut RemotePrivateRouteInfo) -> R,
{
let pr_pubkey = private_route.public_key.key;
let rpr = inner
.cache
.remote_private_route_cache
.entry(pr_pubkey)
.and_modify(|rpr| {
if cur_ts.saturating_sub(rpr.last_touched_ts) >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY {
// Start fresh if this had expired
rpr.last_seen_our_node_info_ts = Timestamp::new(0);
rpr.last_touched_ts = cur_ts;
rpr.stats = RouteStats::new(cur_ts);
} else {
// If not expired, just mark as being used
rpr.last_touched_ts = cur_ts;
}
})
.or_insert_with(|| RemotePrivateRouteInfo {
// New remote private route cache entry
private_route: Some(private_route),
last_seen_our_node_info_ts: Timestamp::new(0),
last_touched_ts: cur_ts,
stats: RouteStats::new(cur_ts),
});
let out = f(rpr);
// Ensure we LRU out items
if inner.cache.remote_private_route_cache.len()
> inner.cache.remote_private_route_cache.capacity()
{
let (dead_k, _) = inner.cache.remote_private_route_cache.remove_lru().unwrap();
// Mark it as dead for the update
inner.cache.dead_remote_routes.push(dead_k);
}
out
}
// get a remote private route cache entry
fn with_get_remote_private_route<F, R>(
inner: &mut RouteSpecStoreInner,
cur_ts: Timestamp,
key: &PublicKey,
f: F,
) -> Option<R>
where
F: FnOnce(&mut RemotePrivateRouteInfo) -> R,
{
let rpr = inner.cache.remote_private_route_cache.get_mut(key)?;
if cur_ts.saturating_sub(rpr.last_touched_ts) < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY {
rpr.last_touched_ts = cur_ts;
return Some(f(rpr));
}
inner.cache.remote_private_route_cache.remove(key);
inner.cache.dead_remote_routes.push(*key);
None
}
// peek a remote private route cache entry
fn with_peek_remote_private_route<F, R>(
inner: &mut RouteSpecStoreInner,
cur_ts: Timestamp,
key: &PublicKey,
f: F,
) -> Option<R>
where
F: FnOnce(&mut RemotePrivateRouteInfo) -> R,
{
match inner.cache.remote_private_route_cache.entry(*key) {
hashlink::lru_cache::Entry::Occupied(mut o) => {
let rpr = o.get_mut();
if cur_ts.saturating_sub(rpr.last_touched_ts) < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY {
return Some(f(rpr));
}
o.remove();
inner.cache.dead_remote_routes.push(*key);
None
}
hashlink::lru_cache::Entry::Vacant(_) => None,
}
}
/// Check to see if this remote (not ours) private route has seen our current node info yet
/// This happens when you communicate with a private route without a safety route
pub fn has_remote_private_route_seen_our_node_info(&self, key: &PublicKey) -> bool {
@ -2119,28 +1590,21 @@ impl RouteSpecStore {
let inner = &mut *self.inner.lock();
// Clean up local allocated routes
for (_k, v) in &mut inner.content.details {
// Must republish route now
v.published = false;
// Restart stats for routes so we test the route again
v.stats.reset();
}
inner.content.reset_details();
// Reset private route cache
for (_k, v) in &mut inner.cache.remote_private_route_cache {
// Restart stats for routes so we test the route again
v.stats.reset();
}
inner.cache.reset_remote_private_routes();
}
/// Mark route as published
/// When first deserialized, routes must be re-published in order to ensure they remain
/// in the RouteSpecStore.
pub fn mark_route_published(&self, key: &PublicKey, published: bool) -> EyreResult<()> {
pub fn mark_route_published(&self, id: &String, published: bool) -> EyreResult<()> {
let inner = &mut *self.inner.lock();
Self::detail_mut(inner, key)
.ok_or_else(|| eyre!("route does not exist"))?
.published = published;
let Some(rssd) = inner.content.get_detail_mut(id) else {
bail!("route does not exist");
};
rssd.set_published(published);
Ok(())
}
@ -2149,8 +1613,8 @@ impl RouteSpecStore {
let inner = &mut *self.inner.lock();
// Roll transfers for locally allocated routes
for rsd in inner.content.details.values_mut() {
rsd.stats.roll_transfers(last_ts, cur_ts);
for rssd in inner.content.details.values_mut() {
rssd.stats.roll_transfers(last_ts, cur_ts);
}
// Roll transfers for remote private routes
for (_k, v) in inner.cache.remote_private_route_cache.iter_mut() {
@ -2216,6 +1680,7 @@ impl RouteSpecStore {
let private_route = decode_private_route(&pr_reader, crypto).wrap_err("failed to decode private route")?;
out.push(private_route);
}
Ok(out)
}
}

View File

@ -0,0 +1,325 @@
use super::*;
// Compiled route key for caching
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
struct CompiledRouteCacheKey {
sr_pubkey: PublicKey,
pr_pubkey: PublicKey,
}
/// Compiled route (safety route + private route)
#[derive(Clone, Debug)]
pub struct CompiledRoute {
/// The safety route attached to the private route
pub safety_route: SafetyRoute,
/// The secret used to encrypt the message payload
pub secret: SecretKey,
/// The node ref to the first hop in the compiled route
pub first_hop: NodeRef,
}
/// Ephemeral data used to help the RouteSpecStore operate efficiently
#[derive(Debug)]
pub struct RouteSpecStoreCache {
/// How many times nodes have been used
used_nodes: HashMap<PublicKey, usize>,
/// How many times nodes have been used at the terminal point of a route
used_end_nodes: HashMap<PublicKey, usize>,
/// Route spec hop cache, used to quickly disqualify routes
hop_cache: HashSet<Vec<u8>>,
/// Remote private routes we've imported and statistics
remote_private_route_set_cache: LruCache<RemotePrivateRouteId, RemotePrivateRouteInfo>,
/// Remote private routes indexed by public key
remote_private_routes_by_key: HashMap<PublicKey, RemotePrivateRouteId>,
/// Compiled route cache
compiled_route_cache: LruCache<CompiledRouteCacheKey, SafetyRoute>,
/// List of dead allocated routes
dead_routes: Vec<RouteSetSpecId>,
/// List of dead remote routes
dead_remote_routes: Vec<RemotePrivateRouteId>,
}
impl RouteSpecStoreCache {
/// add an allocated route set to our cache via its cache key
pub fn add_to_cache(&mut self, rssd: &RouteSetSpecDetail) {
let cache_key = rssd.make_cache_key();
if !self.hop_cache.insert(cache_key) {
panic!("route should never be inserted twice");
}
for (pk, rsd) in rssd.iter_route_set() {
for h in &rsd.hops {
self.used_nodes
.entry(*h)
.and_modify(|e| *e += 1)
.or_insert(1);
}
self.used_end_nodes
.entry(*rsd.hops.last().unwrap())
.and_modify(|e| *e += 1)
.or_insert(1);
}
}
/// checks if an allocated route is in our cache
pub fn contains_route(&self, cache_key: &Vec<u8>) -> bool {
self.hop_cache.contains(cache_key)
}
/// removes an allocated route set from our cache
pub fn remove_from_cache(&mut self, rssd: &RouteSetSpecDetail) -> bool {
let cache_key = rssd.make_cache_key();
// Remove from hop cache
if !self.hop_cache.remove(&cache_key) {
return false;
}
for (pk, rsd) in rssd.iter_route_set() {
for h in &rsd.hops {
// Remove from used nodes cache
match self.used_nodes.entry(*h) {
std::collections::hash_map::Entry::Occupied(mut o) => {
*o.get_mut() -= 1;
if *o.get() == 0 {
o.remove();
}
}
std::collections::hash_map::Entry::Vacant(_) => {
panic!("used_nodes cache should have contained hop");
}
}
}
// Remove from end nodes cache
match self.used_end_nodes.entry(*rsd.hops.last().unwrap()) {
std::collections::hash_map::Entry::Occupied(mut o) => {
*o.get_mut() -= 1;
if *o.get() == 0 {
o.remove();
}
}
std::collections::hash_map::Entry::Vacant(_) => {
panic!("used_end_nodes cache should have contained hop");
}
}
}
// Mark it as dead for the update
self.dead_routes.push(rssd.make_id());
true
}
/// calculate how many times a node with a particular node id set has been used anywhere in the path of our allocated routes
pub fn get_used_node_count(&self, node_ids: &TypedKeySet) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self.used_nodes.get(&k.key).cloned().unwrap_or_default()
})
}
/// calculate how many times a node with a particular node id set has been used at the end of the path of our allocated routes
pub fn get_used_end_node_count(&self, node_ids: &TypedKeySet) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self.used_end_nodes.get(&k.key).cloned().unwrap_or_default()
})
}
/// generate unique remote private route set id for a remote private route set
fn make_remote_private_route_id(private_routes: &[PrivateRoute]) -> String {
let mut idbytes = [0u8; 16];
for (pk, _) in &rprinfo.private_routes {
for (i, x) in pk.bytes.iter().enumerate() {
idbytes[i % 16] ^= *x;
}
}
let id = format!(
"{:08x}-{:04x}-{:04x}-{:04x}-{:08x}{:04x}",
u32::from_be_bytes(idbytes[0..4].try_into().expect("32 bits")),
u16::from_be_bytes(idbytes[4..6].try_into().expect("16 bits")),
u16::from_be_bytes(idbytes[6..8].try_into().expect("16 bits")),
u16::from_be_bytes(idbytes[8..10].try_into().expect("16 bits")),
u32::from_be_bytes(idbytes[10..14].try_into().expect("32 bits")),
u16::from_be_bytes(idbytes[14..16].try_into().expect("16 bits"))
);
id
}
/// add remote private route to caches
/// returns a remote private route set id
fn add_remote_private_route(
&mut self,
rprinfo: RemotePrivateRouteInfo,
) -> RemotePrivateRouteId {
let id = Self::make_remote_private_route_id(rprinfo.get_private_routes());
// also store in id by key table
for (pk, _) in rprinfo.get_private_routes() {
self.remote_private_routes_by_key.insert(*pk, id.clone());
}
self.remote_private_route_set_cache
.insert(id.clone(), rprinfo, |dead_id, dead_rpri| {
// If anything LRUs out, remove from the by-key table
for (dead_pk, _) in dead_rpri.get_private_routes() {
self.remote_private_routes_by_key.remove(&dead_pk).unwrap();
}
self.dead_remote_routes.push(dead_id);
});
id
}
/// remote private route cache accessor
fn get_remote_private_route(
&mut self,
id: &RemotePrivateRouteId,
) -> Option<&RemotePrivateRouteInfo> {
self.remote_private_route_set_cache.get(id)
}
/// mutable remote private route cache accessor
fn get_remote_private_route_mut(
&mut self,
id: &RemotePrivateRouteId,
) -> Option<&mut RemotePrivateRouteInfo> {
self.remote_private_route_set_cache.get_mut(id)
}
/// mutable remote private route cache accessor without lru action
fn peek_remote_private_route_mut(
&mut self,
id: &RemotePrivateRouteId,
) -> Option<&mut RemotePrivateRouteInfo> {
self.remote_private_route_set_cache.peek_mut(id)
}
/// look up a remote private route id by one of the route public keys
pub fn get_remote_private_route_id_by_key(
&self,
key: &PublicKey,
) -> Option<RemotePrivateRouteId> {
self.remote_private_routes_by_key.get(key).cloned()
}
/// get or create a remote private route cache entry
/// may LRU and/or expire other cache entries to make room for the new one
/// or update an existing entry with the same private route set
/// returns the route set id
pub fn import_remote_private_route(
&mut self,
cur_ts: Timestamp,
private_routes: Vec<PrivateRoute>,
) -> RemotePrivateRouteId {
// get id for this route set
let id = RouteSpecStoreCache::make_remote_private_route_id(&private_routes);
let rpri = if let Some(rpri) = self.get_remote_private_route_mut(&id) {
if rpri.did_expire(cur_ts) {
// Start fresh if this had expired
rpri.unexpire(cur_ts);
} else {
// If not expired, just mark as being used
rpri.touch(cur_ts);
}
} else {
let rpri = RemotePrivateRouteInfo {
// New remote private route cache entry
private_routes,
last_seen_our_node_info_ts: Timestamp::new(0),
last_touched_ts: cur_ts,
stats: RouteStats::new(cur_ts),
};
let new_id = self.add_remote_private_route(rpri);
assert_eq!(id, new_id);
if self.get_remote_private_route_mut(&id).is_none() {
bail!("remote private route should exist");
};
};
id
}
/// remove a remote private route from the cache
pub fn remove_remote_private_route(&mut self, id: &RemotePrivateRouteId) -> bool {
let Some(rprinfo) = self.remote_private_route_set_cache.remove(id) else {
return false;
};
for (pk, _) in rprinfo.get_private_routes() {
self.remote_private_routes_by_key.remove(&pk).unwrap();
}
self.dead_remote_routes.push(id.clone());
true
}
/// get an existing remote private route cache entry
/// will LRU entries and may expire entries and not return them if they are stale
/// calls a callback with the remote private route info if returned
pub fn with_get_remote_private_route<F, R>(
&mut self,
cur_ts: Timestamp,
id: &RemotePrivateRouteId,
f: F,
) -> Option<R>
where
F: FnOnce(&mut RemotePrivateRouteInfo) -> R,
{
if let Some(rpri) = self.get_remote_private_route_mut(&id) {
if !rpri.did_expire(cur_ts) {
rpri.touch(cur_ts);
return Some(f(rpri));
}
}
self.remove_remote_private_route(&id);
None
}
// peek a remote private route cache entry
// will not LRU entries but may expire entries and not return them if they are stale
/// calls a callback with the remote private route info if returned
pub fn with_peek_remote_private_route<F, R>(
&mut self,
cur_ts: Timestamp,
id: &RemotePrivateRouteId,
f: F,
) -> Option<R>
where
F: FnOnce(&mut RemotePrivateRouteInfo) -> R,
{
if let Some(rpri) = self.peek_remote_private_route_mut(&id) {
if !rpri.did_expire(cur_ts) {
rpri.touch(cur_ts);
return Some(f(rpri));
}
}
self.remove_remote_private_route(&id);
None
}
/// Take the dead local and remote routes so we can update clients
pub fn take_dead_routes(&mut self) -> (Vec<RouteSetSpecId>, Vec<RemotePrivateRouteId>) {
if self.dead_routes.is_empty() && self.dead_remote_routes.is_empty() {
// Nothing to do
return;
}
let dead_routes = core::mem::take(&mut self.dead_routes);
let dead_remote_routes = core::mem::take(&mut self.dead_remote_routes);
(dead_routes, dead_remote_routes)
}
/// Clean up imported remote routes
/// Resets statistics for when our node info changes
pub fn reset_details(&mut self) {
for (_k, v) in self.remote_private_route_cache {
// Restart stats for routes so we test the route again
v.stats.reset();
}
}
}
impl Default for RouteSpecStoreCache {
fn default() -> Self {
Self {
used_nodes: Default::default(),
used_end_nodes: Default::default(),
hop_cache: Default::default(),
remote_private_route_set_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE),
remote_private_routes_by_key: HashMap::new(),
compiled_route_cache: LruCache::new(COMPILED_ROUTE_CACHE_SIZE),
dead_routes: Default::default(),
dead_remote_routes: Default::default(),
}
}
}

View File

@ -0,0 +1,68 @@
use super::*;
/// The core representation of the RouteSpecStore that can be serialized
#[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C, align(8)), derive(CheckBytes))]
pub struct RouteSpecStoreContent {
/// All of the route sets we have allocated so far indexed by key
id_by_key: HashMap<PublicKey, RouteSetSpecId>,
/// All of the route sets we have allocated so far
details: HashMap<RouteSetSpecId, RouteSetSpecDetail>,
}
impl RouteSpecStoreContent {
pub fn add_detail(&mut self, detail: RouteSetSpecDetail) -> RouteSetSpecId {
// generate unique key string
let id = detail.make_id();
assert!(!self.details.contains_key(&id));
// also store in id by key table
for (pk, _) in detail.iter_route_set() {
self.id_by_key.insert(*pk, id.clone());
}
self.details.insert(id.clone(), detail);
id
}
pub fn remove_detail(&mut self, id: &RouteSetSpecId) -> Option<RouteSetSpecDetail> {
let detail = self.details.remove(id)?;
for (pk, _) in detail.iter_route_set() {
self.id_by_key.remove(&pk).unwrap();
}
Some(detail)
}
pub fn get_detail(&self, id: &RouteSetSpecId) -> Option<&RouteSetSpecDetail> {
self.details.get(id)
}
pub fn get_detail_mut(&mut self, id: &RouteSetSpecId) -> Option<&mut RouteSetSpecDetail> {
self.details.get_mut(id)
}
pub fn get_id_by_key(&self, key: &PublicKey) -> Option<RouteSetSpecId> {
self.id_by_key.get(key).cloned()
}
pub fn iter_ids(&self) -> std::collections::hash_map::Keys<RouteSetSpecId, RouteSetSpecDetail> {
self.details.keys()
}
pub fn iter_details(
&self,
) -> std::collections::hash_map::Iter<RouteSetSpecId, RouteSetSpecDetail> {
self.details.iter()
}
pub fn iter_details_mut(
&mut self,
) -> std::collections::hash_map::IterMut<RouteSetSpecId, RouteSetSpecDetail> {
self.details.iter_mut()
}
/// Clean up local allocated routes
/// Resets publication status and statistics for when our node info changes
/// Routes must be republished
pub fn reset_details(&mut self) {
for (_k, v) in &mut self.details {
// Must republish route now
v.set_published(false);
// Restart stats for routes so we test the route again
v.get_stats_mut().reset();
}
}
}

View File

@ -0,0 +1,129 @@
use super::*;
#[derive(Clone, Debug, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct RouteStats {
/// Consecutive failed to send count
#[with(Skip)]
pub failed_to_send: u32,
/// Questions lost
#[with(Skip)]
pub questions_lost: u32,
/// Timestamp of when the route was created
pub created_ts: Timestamp,
/// Timestamp of when the route was last checked for validity
#[with(Skip)]
pub last_tested_ts: Option<Timestamp>,
/// Timestamp of when the route was last sent to
#[with(Skip)]
pub last_sent_ts: Option<Timestamp>,
/// Timestamp of when the route was last received over
#[with(Skip)]
pub last_received_ts: Option<Timestamp>,
/// Transfers up and down
pub transfer_stats_down_up: TransferStatsDownUp,
/// Latency stats
pub latency_stats: LatencyStats,
/// Accounting mechanism for this route's RPC latency
#[with(Skip)]
latency_stats_accounting: LatencyStatsAccounting,
/// Accounting mechanism for the bandwidth across this route
#[with(Skip)]
transfer_stats_accounting: TransferStatsAccounting,
}
impl RouteStats {
/// Make new route stats
pub fn new(created_ts: Timestamp) -> Self {
Self {
created_ts,
..Default::default()
}
}
/// Mark a route as having failed to send
pub fn record_send_failed(&mut self) {
self.failed_to_send += 1;
}
/// Mark a route as having lost a question
pub fn record_question_lost(&mut self) {
self.questions_lost += 1;
}
/// Mark a route as having received something
pub fn record_received(&mut self, cur_ts: Timestamp, bytes: ByteCount) {
self.last_received_ts = Some(cur_ts);
self.last_tested_ts = Some(cur_ts);
self.transfer_stats_accounting.add_down(bytes);
}
/// Mark a route as having been sent to
pub fn record_sent(&mut self, cur_ts: Timestamp, bytes: ByteCount) {
self.last_sent_ts = Some(cur_ts);
self.transfer_stats_accounting.add_up(bytes);
}
/// Mark a route as having been sent to
pub fn record_latency(&mut self, latency: TimestampDuration) {
self.latency_stats = self.latency_stats_accounting.record_latency(latency);
}
/// Mark a route as having been tested
pub fn record_tested(&mut self, cur_ts: Timestamp) {
self.last_tested_ts = Some(cur_ts);
// Reset question_lost and failed_to_send if we test clean
self.failed_to_send = 0;
self.questions_lost = 0;
}
/// Roll transfers for these route stats
pub fn roll_transfers(&mut self, last_ts: Timestamp, cur_ts: Timestamp) {
self.transfer_stats_accounting.roll_transfers(
last_ts,
cur_ts,
&mut self.transfer_stats_down_up,
)
}
/// Get the latency stats
pub fn latency_stats(&self) -> &LatencyStats {
&self.latency_stats
}
/// Get the transfer stats
pub fn transfer_stats(&self) -> &TransferStatsDownUp {
&self.transfer_stats_down_up
}
/// Reset stats when network restarts
pub fn reset(&mut self) {
self.last_tested_ts = None;
self.last_sent_ts = None;
self.last_received_ts = None;
}
/// Check if a route needs testing
pub fn needs_testing(&self, cur_ts: Timestamp) -> bool {
// Has the route had any failures lately?
if self.questions_lost > 0 || self.failed_to_send > 0 {
// If so, always test
return true;
}
// Has the route been tested within the idle time we'd want to check things?
// (also if we've received successfully over the route, this will get set)
if let Some(last_tested_ts) = self.last_tested_ts {
if cur_ts.saturating_sub(last_tested_ts)
> TimestampDuration::new(ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64)
{
return true;
}
} else {
// If this route has never been tested, it needs to be
return true;
}
false
}
}

View File

@ -862,7 +862,9 @@ impl RoutingTableInner {
pub fn touch_recent_peer(&mut self, node_id: TypedKey, last_connection: ConnectionDescriptor) {
self.recent_peers
.insert(node_id, RecentPeersEntry { last_connection });
.insert(node_id, RecentPeersEntry { last_connection }, |_k, _v| {
// do nothing on lru eviction
});
}
//////////////////////////////////////////////////////////////////////

View File

@ -21,8 +21,8 @@ pub enum Destination {
},
/// Send to private route (privateroute)
PrivateRoute {
/// A private route to send to
private_route: PrivateRoute,
/// A private route set id to send to
private_route: String,
/// Require safety route or not
safety_selection: SafetySelection,
},

View File

@ -4,8 +4,8 @@ use super::*;
#[derive(Clone, Debug)]
pub enum Target {
NodeId(PublicKey),
PrivateRoute(PublicKey),
NodeId(PublicKey), // Node by any of its public keys
PrivateRoute(String), // Private route by its route set id
}
pub struct RoutingContextInner {}
@ -118,17 +118,17 @@ impl RoutingContext {
safety_selection: self.unlocked_inner.safety_selection,
})
}
Target::PrivateRoute(pr) => {
Target::PrivateRoute(rsid) => {
// Get remote private route
let rss = self.api.routing_table()?.route_spec_store();
let Some(private_route) = rss
.get_remote_private_route(&pr)
.get_remote_private_route(&rsid)
else {
apibail_key_not_found!(pr);
};
Ok(rpc_processor::Destination::PrivateRoute {
private_route,
private_route: rsid,
safety_selection: self.unlocked_inner.safety_selection,
})
}

View File

@ -512,8 +512,8 @@ impl SafetySelection {
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct SafetySpec {
/// preferred safety route if it still exists
pub preferred_route: Option<PublicKey>,
/// preferred safety route set id if it still exists
pub preferred_route: Option<String>,
/// must be greater than 0
pub hop_count: usize,
/// prefer reliability over speed