diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index 81e5e65a..da9d9cb6 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -405,7 +405,22 @@ reply - reply to an AppCall not handled directly by the server self.inner_mut().ui.set_config(config.config) } pub fn update_route(&mut self, route: veilid_core::VeilidStateRoute) { - //self.inner_mut().ui.set_config(config.config) + let mut out = String::new(); + if !route.dead_routes.is_empty() { + out.push_str(&format!("Dead routes: {:?}", route.dead_routes)); + } + if !route.dead_remote_routes.is_empty() { + if !out.is_empty() { + out.push_str("\n"); + } + out.push_str(&format!( + "Dead remote routes: {:?}", + route.dead_remote_routes + )); + } + if !out.is_empty() { + self.inner().ui.add_node_event(out); + } } pub fn update_log(&mut self, log: veilid_core::VeilidLog) { diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index ba3adccf..f9a22da3 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -283,6 +283,14 @@ impl NetworkManager { .connection_manager .clone() } + pub fn update_callback(&self) -> UpdateCallback { + self.unlocked_inner + .update_callback + .read() + .as_ref() + .unwrap() + .clone() + } #[instrument(level = "debug", skip_all, err)] pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 1e8e5ea5..c680eb1a 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -127,6 +127,9 @@ impl RoutingTable { pub fn rpc_processor(&self) -> RPCProcessor { self.network_manager().rpc_processor() } + pub fn update_callback(&self) -> UpdateCallback { + self.network_manager().update_callback() + } pub fn with_config(&self, f: F) -> R where F: FnOnce(&VeilidConfigInner) -> R, diff --git a/veilid-core/src/routing_table/privacy.rs b/veilid-core/src/routing_table/privacy.rs index a2322baf..e902622f 100644 --- a/veilid-core/src/routing_table/privacy.rs +++ b/veilid-core/src/routing_table/privacy.rs @@ -116,6 +116,18 @@ impl PrivateRoute { PrivateRouteHops::Empty => return None, } } + + pub fn first_hop_node_id(&self) -> Option { + let PrivateRouteHops::FirstHop(pr_first_hop) = &self.hops else { + return None; + }; + + // Get the safety route to use from the spec + Some(match &pr_first_hop.node { + RouteNode::NodeId(n) => n.key, + RouteNode::PeerInfo(p) => p.node_id.key, + }) + } } impl fmt::Display for PrivateRoute { diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 5723fbcc..e0167291 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -8,6 +8,8 @@ use rkyv::{ const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024; /// Remote private route cache entries expire in 5 minutes if they haven't been used const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: u64 = 300_000_000u64; +/// Amount of time a route can remain idle before it gets tested +const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000; /// Compiled route (safety route + private route) #[derive(Clone, Debug)] @@ -32,25 +34,25 @@ pub struct KeyPair { pub struct RouteStats { /// Consecutive failed to send count #[with(Skip)] - failed_to_send: u32, + pub failed_to_send: u32, /// Questions lost #[with(Skip)] - questions_lost: u32, + pub questions_lost: u32, /// Timestamp of when the route was created - created_ts: u64, + pub created_ts: u64, /// Timestamp of when the route was last checked for validity #[with(Skip)] - last_tested_ts: Option, + pub last_tested_ts: Option, /// Timestamp of when the route was last sent to #[with(Skip)] - last_sent_ts: Option, + pub last_sent_ts: Option, /// Timestamp of when the route was last received over #[with(Skip)] - last_received_ts: Option, + pub last_received_ts: Option, /// Transfers up and down - transfer_stats_down_up: TransferStatsDownUp, + pub transfer_stats_down_up: TransferStatsDownUp, /// Latency stats - latency_stats: LatencyStats, + pub latency_stats: LatencyStats, /// Accounting mechanism for this route's RPC latency #[with(Skip)] latency_stats_accounting: LatencyStatsAccounting, @@ -129,6 +131,28 @@ impl RouteStats { self.last_sent_ts = None; self.last_received_ts = None; } + + /// Check if a route needs testing + pub fn needs_testing(&self, cur_ts: u64) -> bool { + // Has the route had any failures lately? + if self.questions_lost > 0 || self.failed_to_send > 0 { + // If so, always test + return true; + } + + // Has the route been tested within the idle time we'd want to check things? + // (also if we've received successfully over the route, this will get set) + if let Some(last_tested_ts) = self.last_tested_ts { + if cur_ts.saturating_sub(last_tested_ts) > (ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64) { + return true; + } + } else { + // If this route has never been tested, it needs to be + return true; + } + + false + } } #[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] @@ -157,6 +181,15 @@ pub struct RouteSpecDetail { stats: RouteStats, } +impl RouteSpecDetail { + pub fn get_stats(&self) -> &RouteStats { + &self.stats + } + pub fn get_stats_mut(&mut self) -> &mut RouteStats { + &mut self.stats + } +} + /// The core representation of the RouteSpecStore that can be serialized #[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)] #[archive_attr(repr(C), derive(CheckBytes))] @@ -178,6 +211,15 @@ pub struct RemotePrivateRouteInfo { stats: RouteStats, } +impl RemotePrivateRouteInfo { + pub fn get_stats(&self) -> &RouteStats { + &self.stats + } + pub fn get_stats_mut(&mut self) -> &mut RouteStats { + &mut self.stats + } +} + /// Ephemeral data used to help the RouteSpecStore operate efficiently #[derive(Debug)] pub struct RouteSpecStoreCache { @@ -189,6 +231,10 @@ pub struct RouteSpecStoreCache { hop_cache: HashSet>, /// Has a remote private route responded to a question and when remote_private_route_cache: LruCache, + /// List of dead allocated routes + dead_routes: Vec, + /// List of dead remote routes + dead_remote_routes: Vec, } impl Default for RouteSpecStoreCache { @@ -198,6 +244,8 @@ impl Default for RouteSpecStoreCache { used_end_nodes: Default::default(), hop_cache: Default::default(), remote_private_route_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE), + dead_routes: Default::default(), + dead_remote_routes: Default::default(), } } } @@ -341,6 +389,7 @@ impl RouteSpecStore { } } + #[instrument(level = "trace", skip(routing_table), err)] pub async fn load(routing_table: RoutingTable) -> EyreResult { let (max_route_hop_count, default_route_hop_count) = { let config = routing_table.network_manager().config(); @@ -413,6 +462,7 @@ impl RouteSpecStore { Ok(rss) } + #[instrument(level = "trace", skip(self), err)] pub async fn save(&self) -> EyreResult<()> { let content = { let inner = self.inner.lock(); @@ -448,6 +498,29 @@ impl RouteSpecStore { Ok(()) } + #[instrument(level = "trace", skip(self))] + pub fn send_route_update(&self) { + let update_callback = self.unlocked_inner.routing_table.update_callback(); + + let (dead_routes, dead_remote_routes) = { + let mut inner = self.inner.lock(); + if inner.cache.dead_routes.is_empty() && inner.cache.dead_remote_routes.is_empty() { + // Nothing to do + return; + } + let dead_routes = core::mem::take(&mut inner.cache.dead_routes); + let dead_remote_routes = core::mem::take(&mut inner.cache.dead_remote_routes); + (dead_routes, dead_remote_routes) + }; + + let update = VeilidUpdate::Route(VeilidStateRoute { + dead_routes, + dead_remote_routes, + }); + + update_callback(update); + } + fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec, rsd: &RouteSpecDetail) { if !cache.hop_cache.insert(cache_key) { panic!("route should never be inserted twice"); @@ -500,6 +573,7 @@ impl RouteSpecStore { /// Prefers nodes that are not currently in use by another route /// The route is not yet tested for its reachability /// Returns None if no route could be allocated at this time + #[instrument(level = "trace", skip(self), ret, err)] pub fn allocate_route( &self, stability: Stability, @@ -523,6 +597,7 @@ impl RouteSpecStore { ) } + #[instrument(level = "trace", skip(self, inner, rti), ret, err)] fn allocate_route_inner( &self, inner: &mut RouteSpecStoreInner, @@ -789,6 +864,7 @@ impl RouteSpecStore { Ok(Some(public_key)) } + #[instrument(level = "trace", skip(self, data), ret, err)] pub fn validate_signatures( &self, public_key: &DHTKey, @@ -835,10 +911,8 @@ impl RouteSpecStore { ))) } - /// Test an allocated route for continuity - pub async fn test_route(&self, key: &DHTKey) -> EyreResult { - let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); - + #[instrument(level = "trace", skip(self), ret, err)] + async fn test_allocated_route(&self, key: &DHTKey) -> EyreResult { // Make loopback route to test with let dest = { let private_route = self.assemble_private_route(key, None)?; @@ -864,6 +938,7 @@ impl RouteSpecStore { }; // Test with double-round trip ping to self + let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); let _res = match rpc_processor.rpc_call_status(dest).await? { NetworkResult::Value(v) => v, _ => { @@ -875,13 +950,71 @@ impl RouteSpecStore { Ok(true) } - /// Release an allocated route that is no longer in use - pub fn release_route(&self, public_key: DHTKey) -> EyreResult<()> { - let mut inner = self.inner.lock(); - let Some(detail) = inner.content.details.remove(&public_key) else { - bail!("can't release route that was never allocated"); + #[instrument(level = "trace", skip(self), ret, err)] + async fn test_remote_route(&self, key: &DHTKey) -> EyreResult { + // Make private route test + let dest = { + // Get the route to test + let private_route = match self.peek_remote_private_route(key) { + Some(pr) => pr, + None => return Ok(false), + }; + + // Get a safety route that is good enough + let safety_spec = SafetySpec { + preferred_route: None, + hop_count: self.unlocked_inner.default_route_hop_count, + stability: Stability::LowLatency, + sequencing: Sequencing::NoPreference, + }; + + let safety_selection = SafetySelection::Safe(safety_spec); + + Destination::PrivateRoute { + private_route, + safety_selection, + } }; + // Test with double-round trip ping to self + let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); + let _res = match rpc_processor.rpc_call_status(dest).await? { + NetworkResult::Value(v) => v, + _ => { + // Did not error, but did not come back, just return false + return Ok(false); + } + }; + + Ok(true) + } + + /// Test an allocated route for continuity + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn test_route(&self, key: &DHTKey) -> EyreResult { + let is_remote = { + let inner = &mut *self.inner.lock(); + let cur_ts = intf::get_timestamp(); + Self::with_peek_remote_private_route(inner, cur_ts, key, |_| {}).is_some() + }; + if is_remote { + self.test_remote_route(key).await + } else { + self.test_allocated_route(key).await + } + } + + /// Release an allocated route that is no longer in use + #[instrument(level = "trace", skip(self), ret)] + fn release_allocated_route(&self, public_key: &DHTKey) -> bool { + let mut inner = self.inner.lock(); + let Some(detail) = inner.content.details.remove(public_key) else { + return false; + }; + + // Mark it as dead for the update + inner.cache.dead_routes.push(*public_key); + // Remove from hop cache let cache_key = route_hops_to_hop_cache(&detail.hops); if !inner.cache.hop_cache.remove(&cache_key) { @@ -917,11 +1050,27 @@ impl RouteSpecStore { panic!("used_end_nodes cache should have contained hop"); } } - Ok(()) + true + } + + /// Release an allocated or remote route that is no longer in use + #[instrument(level = "trace", skip(self), ret)] + pub fn release_route(&self, key: &DHTKey) -> bool { + let is_remote = { + let inner = &mut *self.inner.lock(); + let cur_ts = intf::get_timestamp(); + Self::with_peek_remote_private_route(inner, cur_ts, key, |_| {}).is_some() + }; + if is_remote { + self.release_remote_private_route(key) + } else { + self.release_allocated_route(key) + } } /// Find first matching unpublished route that fits into the selection criteria - fn first_unpublished_route_inner<'a>( + /// Don't pick any routes that have failed and haven't been tested yet + fn first_available_route_inner<'a>( inner: &'a RouteSpecStoreInner, min_hop_count: usize, max_hop_count: usize, @@ -930,6 +1079,7 @@ impl RouteSpecStore { directions: DirectionSet, avoid_node_ids: &[DHTKey], ) -> Option { + let cur_ts = intf::get_timestamp(); for detail in &inner.content.details { if detail.1.stability >= stability && detail.1.sequencing >= sequencing @@ -937,6 +1087,7 @@ impl RouteSpecStore { && detail.1.hops.len() <= max_hop_count && detail.1.directions.is_subset(directions) && !detail.1.published + && !detail.1.stats.needs_testing(cur_ts) { let mut avoid = false; for h in &detail.1.hops { @@ -953,19 +1104,47 @@ impl RouteSpecStore { None } - /// List all routes - pub fn list_routes(&self) -> Vec { + /// List all allocated routes + pub fn list_allocated_routes(&self, mut filter: F) -> Vec + where + F: FnMut(&DHTKey, &RouteSpecDetail) -> Option, + { let inner = self.inner.lock(); let mut out = Vec::with_capacity(inner.content.details.len()); for detail in &inner.content.details { - out.push(*detail.0); + if let Some(x) = filter(detail.0, detail.1) { + out.push(x); + } + } + out + } + + /// List all allocated routes + pub fn list_remote_routes(&self, mut filter: F) -> Vec + where + F: FnMut(&DHTKey, &RemotePrivateRouteInfo) -> Option, + { + let inner = self.inner.lock(); + let mut out = Vec::with_capacity(inner.cache.remote_private_route_cache.len()); + for info in &inner.cache.remote_private_route_cache { + if let Some(x) = filter(info.0, info.1) { + out.push(x); + } } out } /// Get the debug description of a route pub fn debug_route(&self, key: &DHTKey) -> Option { - let inner = &*self.inner.lock(); + let inner = &mut *self.inner.lock(); + let cur_ts = intf::get_timestamp(); + // If this is a remote route, print it + if let Some(s) = + Self::with_peek_remote_private_route(inner, cur_ts, key, |rpi| format!("{:#?}", rpi)) + { + return Some(s); + } + // Otherwise check allocated routes Self::detail(inner, key).map(|rsd| format!("{:#?}", rsd)) } @@ -1028,19 +1207,13 @@ impl RouteSpecStore { } }; - let PrivateRouteHops::FirstHop(pr_first_hop) = &private_route.hops else { - bail!("compiled private route should have first hop"); - }; - // If the safety route requested is also the private route, this is a loopback test, just accept it let sr_pubkey = if safety_spec.preferred_route == Some(private_route.public_key) { // Private route is also safety route during loopback test private_route.public_key } else { - // Get the safety route to use from the spec - let avoid_node_id = match &pr_first_hop.node { - RouteNode::NodeId(n) => n.key, - RouteNode::PeerInfo(p) => p.node_id.key, + let Some(avoid_node_id) = private_route.first_hop_node_id() else { + bail!("compiled private route should have first hop"); }; let Some(sr_pubkey) = self.get_route_for_safety_spec_inner(inner, rti, &safety_spec, Direction::Outbound.into(), &[avoid_node_id])? else { // No safety route could be found for this spec @@ -1176,6 +1349,7 @@ impl RouteSpecStore { } /// Get a route that matches a particular safety spec + #[instrument(level = "trace", skip(self, inner, rti), ret, err)] fn get_route_for_safety_spec_inner( &self, inner: &mut RouteSpecStoreInner, @@ -1204,7 +1378,7 @@ impl RouteSpecStore { } // Select a safety route from the pool or make one if we don't have one that matches - let sr_pubkey = if let Some(sr_pubkey) = Self::first_unpublished_route_inner( + let sr_pubkey = if let Some(sr_pubkey) = Self::first_available_route_inner( inner, safety_spec.hop_count, safety_spec.hop_count, @@ -1238,6 +1412,7 @@ impl RouteSpecStore { } /// Get a private sroute to use for the answer to question + #[instrument(level = "trace", skip(self), ret, err)] pub fn get_private_route_for_safety_spec( &self, safety_spec: &SafetySpec, @@ -1257,6 +1432,7 @@ impl RouteSpecStore { } /// Assemble private route for publication + #[instrument(level = "trace", skip(self), err)] pub fn assemble_private_route( &self, key: &DHTKey, @@ -1341,30 +1517,59 @@ impl RouteSpecStore { } /// Import a remote private route for compilation + #[instrument(level = "trace", skip(self, blob), ret, err)] pub fn import_remote_private_route(&self, blob: Vec) -> EyreResult { // decode the pr blob let private_route = RouteSpecStore::blob_to_private_route(blob)?; - // store the private route in our cache - let inner = &mut *self.inner.lock(); - let cur_ts = intf::get_timestamp(); + // ensure private route has first hop + if !matches!(private_route.hops, PrivateRouteHops::FirstHop(_)) { + bail!("private route must have first hop"); + } + // ensure this isn't also an allocated route + let inner = &mut *self.inner.lock(); + if Self::detail(inner, &private_route.public_key).is_some() { + bail!("should not import allocated route"); + } + + // store the private route in our cache + let cur_ts = intf::get_timestamp(); let key = Self::with_create_remote_private_route(inner, cur_ts, private_route, |r| { r.private_route.as_ref().unwrap().public_key.clone() }); Ok(key) } + /// Release a remote private route that is no longer in use + #[instrument(level = "trace", skip(self), ret)] + fn release_remote_private_route(&self, key: &DHTKey) -> bool { + let inner = &mut *self.inner.lock(); + if inner.cache.remote_private_route_cache.remove(key).is_some() { + // Mark it as dead for the update + inner.cache.dead_remote_routes.push(*key); + true + } else { + false + } + } + /// Retrieve an imported remote private route by its public key - pub fn get_remote_private_route(&self, key: &DHTKey) -> EyreResult { + pub fn get_remote_private_route(&self, key: &DHTKey) -> Option { let inner = &mut *self.inner.lock(); let cur_ts = intf::get_timestamp(); - let Some(pr) = Self::with_get_remote_private_route(inner, cur_ts, key, |r| { + Self::with_get_remote_private_route(inner, cur_ts, key, |r| { r.private_route.as_ref().unwrap().clone() - }) else { - bail!("remote private route not found"); - }; - Ok(pr) + }) + } + + /// Retrieve an imported remote private route by its public key but don't 'touch' it + pub fn peek_remote_private_route(&self, key: &DHTKey) -> Option { + let inner = &mut *self.inner.lock(); + let cur_ts = intf::get_timestamp(); + Self::with_peek_remote_private_route(inner, cur_ts, key, |r| { + r.private_route.as_ref().unwrap().clone() + }) } // get or create a remote private route cache entry @@ -1401,7 +1606,19 @@ impl RouteSpecStore { last_touched_ts: cur_ts, stats: RouteStats::new(cur_ts), }); - f(rpr) + + let out = f(rpr); + + // Ensure we LRU out items + if inner.cache.remote_private_route_cache.len() + > inner.cache.remote_private_route_cache.capacity() + { + let (dead_k, _) = inner.cache.remote_private_route_cache.remove_lru().unwrap(); + // Mark it as dead for the update + inner.cache.dead_remote_routes.push(dead_k); + } + + out } // get a remote private route cache entry @@ -1420,16 +1637,41 @@ impl RouteSpecStore { return Some(f(rpr)); } inner.cache.remote_private_route_cache.remove(key); + inner.cache.dead_remote_routes.push(*key); None } + // peek a remote private route cache entry + fn with_peek_remote_private_route( + inner: &mut RouteSpecStoreInner, + cur_ts: u64, + key: &DHTKey, + f: F, + ) -> Option + where + F: FnOnce(&mut RemotePrivateRouteInfo) -> R, + { + match inner.cache.remote_private_route_cache.entry(*key) { + hashlink::lru_cache::Entry::Occupied(mut o) => { + let rpr = o.get_mut(); + if cur_ts - rpr.last_touched_ts < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { + return Some(f(rpr)); + } + o.remove(); + inner.cache.dead_remote_routes.push(*key); + None + } + hashlink::lru_cache::Entry::Vacant(_) => None, + } + } + /// Check to see if this remote (not ours) private route has seen our node info yet /// This returns true if we have sent non-safety-route node info to the /// private route and gotten a response before pub fn has_remote_private_route_seen_our_node_info(&self, key: &DHTKey) -> bool { let inner = &mut *self.inner.lock(); let cur_ts = intf::get_timestamp(); - Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| rpr.seen_our_node_info) + Self::with_peek_remote_private_route(inner, cur_ts, key, |rpr| rpr.seen_our_node_info) .unwrap_or_default() } @@ -1468,7 +1710,7 @@ impl RouteSpecStore { } // Check for remote route if let Some(res) = - Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| f(&mut rpr.stats)) + Self::with_peek_remote_private_route(inner, cur_ts, key, |rpr| f(&mut rpr.stats)) { return Some(res); } @@ -1478,6 +1720,7 @@ impl RouteSpecStore { } /// Clear caches when local our local node info changes + #[instrument(level = "trace", skip(self))] pub fn reset(&self) { let inner = &mut *self.inner.lock(); diff --git a/veilid-core/src/routing_table/tasks/private_route_management.rs b/veilid-core/src/routing_table/tasks/private_route_management.rs index e1770a1e..3718366f 100644 --- a/veilid-core/src/routing_table/tasks/private_route_management.rs +++ b/veilid-core/src/routing_table/tasks/private_route_management.rs @@ -1,33 +1,97 @@ use super::super::*; use crate::xx::*; -use futures_util::stream::{FuturesOrdered, StreamExt}; +use futures_util::stream::{FuturesUnordered, StreamExt}; +use futures_util::FutureExt; use stop_token::future::FutureExt as StopFutureExt; impl RoutingTable { - // Keep private routes assigned and accessible - #[instrument(level = "trace", skip(self), err)] + /// Keep private routes assigned and accessible + #[instrument(level = "trace", skip(self, stop_token), err)] pub(crate) async fn private_route_management_task_routine( self, - _stop_token: StopToken, + stop_token: StopToken, _last_ts: u64, cur_ts: u64, ) -> EyreResult<()> { // Get our node's current node info and network class and do the right thing - let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); - let network_class = self.get_network_class(RoutingDomain::PublicInternet); + let network_class = self + .get_network_class(RoutingDomain::PublicInternet) + .unwrap_or(NetworkClass::Invalid); - // Get routing domain editor - let mut editor = self.edit_routing_domain(RoutingDomain::PublicInternet); - - // Do we know our network class yet? - if let Some(network_class) = network_class { - - // see if we have any routes that need testing + // If we don't know our network class then don't do this yet + if network_class == NetworkClass::Invalid { + return Ok(()); } - // Commit the changes - editor.commit().await; + // Collect any routes that need that need testing + let rss = self.route_spec_store(); + let mut routes_needing_testing = rss.list_allocated_routes(|k, v| { + let stats = v.get_stats(); + if stats.needs_testing(cur_ts) { + return Some(*k); + } else { + return None; + } + }); + let mut remote_routes_needing_testing = rss.list_remote_routes(|k, v| { + let stats = v.get_stats(); + if stats.needs_testing(cur_ts) { + return Some(*k); + } else { + return None; + } + }); + routes_needing_testing.append(&mut remote_routes_needing_testing); + + // Test all the routes that need testing at the same time + #[derive(Default, Debug)] + struct TestRouteContext { + failed: bool, + dead_routes: Vec, + } + + if !routes_needing_testing.is_empty() { + let mut unord = FuturesUnordered::new(); + let ctx = Arc::new(Mutex::new(TestRouteContext::default())); + for r in routes_needing_testing { + let rss = rss.clone(); + let ctx = ctx.clone(); + unord.push( + async move { + let success = match rss.test_route(&r).await { + Ok(v) => v, + Err(e) => { + log_rtab!(error "test route failed: {}", e); + ctx.lock().failed = true; + return; + } + }; + if success { + // Route is okay, leave it alone + return; + } + // Route test failed + ctx.lock().dead_routes.push(r); + } + .instrument(Span::current()) + .boxed(), + ); + } + + // Wait for test_route futures to complete in parallel + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + + // Process failed routes + let ctx = &mut *ctx.lock(); + for r in &ctx.dead_routes { + log_rtab!(debug "Dead route: {}", &r); + rss.release_route(r); + } + } + + // Send update (also may send updates for released routes done by other parts of the program) + rss.send_route_update(); Ok(()) } diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index dd63776a..3c119cff 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -205,7 +205,7 @@ impl RPCProcessor { private_route, safety_selection, } => { - let PrivateRouteHops::FirstHop(pr_first_hop) = &private_route.hops else { + let Some(avoid_node_id) = private_route.first_hop_node_id() else { return Err(RPCError::internal("destination private route must have first hop")); }; @@ -238,11 +238,6 @@ impl RPCProcessor { private_route.public_key } else { // Get the privat route to respond to that matches the safety route spec we sent the request with - let avoid_node_id = match &pr_first_hop.node { - RouteNode::NodeId(n) => n.key, - RouteNode::PeerInfo(p) => p.node_id.key, - }; - let Some(pr_key) = rss .get_private_route_for_safety_spec(safety_spec, &[avoid_node_id]) .map_err(RPCError::internal)? else { diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index ebade9d2..4119e3ef 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -34,13 +34,13 @@ fn get_route_id(rss: RouteSpecStore) -> impl Fn(&str) -> Option { return move |text: &str| { match DHTKey::try_decode(text).ok() { Some(key) => { - let routes = rss.list_routes(); + let routes = rss.list_allocated_routes(|k, _| Some(*k)); if routes.contains(&key) { return Some(key); } } None => { - let routes = rss.list_routes(); + let routes = rss.list_allocated_routes(|k, _| Some(*k)); for r in routes { let rkey = r.encode(); if rkey.starts_with(text) { @@ -126,14 +126,11 @@ fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option { - // Remove imported route - dc.imported_routes.remove(n); - info!("removed dead imported route {}", n); - return None; - } - Ok(v) => v, + let Some(private_route) = rss.get_remote_private_route(&pr_pubkey) else { + // Remove imported route + dc.imported_routes.remove(n); + info!("removed dead imported route {}", n); + return None; }; Some(Destination::private_route( private_route, @@ -636,11 +633,9 @@ impl VeilidAPI { let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_dht_key)?; // Release route - let out = match rss.release_route(route_id) { - Ok(()) => format!("Released"), - Err(e) => { - format!("Route release failed: {}", e) - } + let out = match rss.release_route(&route_id) { + true => "Released".to_owned(), + false => "Route does not exist".to_owned(), }; Ok(out) @@ -730,7 +725,7 @@ impl VeilidAPI { let routing_table = netman.routing_table(); let rss = routing_table.route_spec_store(); - let routes = rss.list_routes(); + let routes = rss.list_allocated_routes(|k, _| Some(*k)); let mut out = format!("Routes: (count = {}):\n", routes.len()); for r in routes { out.push_str(&format!("{}\n", r.encode())); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index c8988a4a..6a9ce931 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -2789,8 +2789,7 @@ impl VeilidAPI { .await .map_err(VeilidAPIError::no_connection)? { - rss.release_route(pr_pubkey) - .map_err(VeilidAPIError::generic)?; + rss.release_route(&pr_pubkey); return Err(VeilidAPIError::generic("allocated route failed to test")); } let private_route = rss @@ -2799,8 +2798,7 @@ impl VeilidAPI { let blob = match RouteSpecStore::private_route_to_blob(&private_route) { Ok(v) => v, Err(e) => { - rss.release_route(pr_pubkey) - .map_err(VeilidAPIError::generic)?; + rss.release_route(&pr_pubkey); return Err(VeilidAPIError::internal(e)); } }; diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 8e17b7de..f33ce133 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -129,9 +129,12 @@ impl RoutingContext { Target::PrivateRoute(pr) => { // Get remote private route let rss = self.api.routing_table()?.route_spec_store(); - let private_route = rss - .get_remote_private_route(&pr) - .map_err(|_| VeilidAPIError::KeyNotFound { key: pr })?; + let Some(private_route) = rss + .get_remote_private_route(&pr) + else { + return Err(VeilidAPIError::KeyNotFound { key: pr }); + }; + Ok(rpc_processor::Destination::PrivateRoute { private_route, safety_selection: self.unlocked_inner.safety_selection, diff --git a/veilid-flutter/lib/veilid.dart b/veilid-flutter/lib/veilid.dart index 4c539782..77c26268 100644 --- a/veilid-flutter/lib/veilid.dart +++ b/veilid-flutter/lib/veilid.dart @@ -1266,6 +1266,10 @@ abstract class VeilidUpdate { { return VeilidUpdateConfig(state: VeilidStateConfig.fromJson(json)); } + case "Route": + { + return VeilidUpdateRoute(state: VeilidStateRoute.fromJson(json)); + } default: { throw VeilidAPIExceptionInternal( @@ -1380,6 +1384,19 @@ class VeilidUpdateConfig implements VeilidUpdate { } } +class VeilidUpdateRoute implements VeilidUpdate { + final VeilidStateRoute state; + // + VeilidUpdateRoute({required this.state}); + + @override + Map get json { + var jsonRep = state.json; + jsonRep['kind'] = "Route"; + return jsonRep; + } +} + ////////////////////////////////////// /// VeilidStateAttachment @@ -1444,7 +1461,28 @@ class VeilidStateConfig { : config = jsonDecode(json['config']); Map get json { - return {'config': jsonEncode(config)}; + return {'config': config}; + } +} + +////////////////////////////////////// +/// VeilidStateRoute + +class VeilidStateRoute { + final List deadRoutes; + final List deadRemoteRoutes; + + VeilidStateRoute({ + required this.deadRoutes, + required this.deadRemoteRoutes, + }); + + VeilidStateRoute.fromJson(Map json) + : deadRoutes = jsonDecode(json['dead_routes']), + deadRemoteRoutes = jsonDecode(json['dead_remote_routes']); + + Map get json { + return {'dead_routes': deadRoutes, 'dead_remote_routes': deadRemoteRoutes}; } }