diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 6214d335..ef6837d1 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -150,8 +150,25 @@ impl RoutingTable { /// Called to initialize the routing table after it is created pub async fn init(&self) -> EyreResult<()> { + debug!("starting routing table init"); + + // Set up routespecstore + debug!("starting route spec store init"); + let route_spec_store = match RouteSpecStore::load(self.clone()).await { + Ok(v) => v, + Err(e) => { + log_rtab!(warn "Error loading route spec store: {}. Resetting.", e); + RouteSpecStore::new(self.clone()) + } + }; + debug!("finished route spec store init"); + let mut inner = self.inner.write(); inner.init(self.clone()); + + inner.route_spec_store = Some(route_spec_store); + + debug!("finished routing table init"); Ok(()) } @@ -169,6 +186,16 @@ impl RoutingTable { error!("kick_buckets_task not stopped: {}", e); } + debug!("saving route spec store"); + let rss = { + let mut inner = self.inner.write(); + inner.route_spec_store.take() + }; + if let Some(rss) = rss { + rss.save().await; + } + debug!("shutting down routing table"); + let mut inner = self.inner.write(); inner.terminate(); *inner = RoutingTableInner::new(self.unlocked_inner.clone()); @@ -192,7 +219,7 @@ impl RoutingTable { } pub fn route_spec_store(&self) -> RouteSpecStore { - self.inner.read().route_spec_store.clone() + self.inner.read().route_spec_store.as_ref().unwrap().clone() } pub fn relay_node(&self, domain: RoutingDomain) -> Option { @@ -521,9 +548,9 @@ impl RoutingTable { pub fn make_inbound_dial_info_entry_filter( routing_domain: RoutingDomain, dial_info_filter: DialInfoFilter, - ) -> impl FnMut(&RoutingTableInner, &BucketEntryInner) -> bool { + ) -> Box bool> { // does it have matching public dial info? - move |_rti, e| { + Box::new(move |_rti, e| { if let Some(ni) = e.node_info(routing_domain) { if ni .first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| { @@ -535,16 +562,16 @@ impl RoutingTable { } } false - } + }) } /// Makes a filter that finds nodes capable of dialing a particular outbound dialinfo - pub fn make_outbound_dial_info_entry_filter<'s>( + pub fn make_outbound_dial_info_entry_filter( routing_domain: RoutingDomain, dial_info: DialInfo, - ) -> impl FnMut(&RoutingTableInner, &'s BucketEntryInner) -> bool { + ) -> Box bool> { // does the node's outbound capabilities match the dialinfo? - move |_rti, e| { + Box::new(move |_rti, e| { if let Some(ni) = e.node_info(routing_domain) { let dif = DialInfoFilter::all() .with_protocol_type_set(ni.outbound_protocols) @@ -554,19 +581,15 @@ impl RoutingTable { } } false - } + }) } /// Make a filter that wraps another filter - pub fn combine_entry_filters<'a, 'b, F, G>( - mut f1: F, - mut f2: G, - ) -> impl FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool - where - F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, - G: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, - { - move |rti, e| { + pub fn combine_entry_filters( + mut f1: Box bool>, + mut f2: Box bool>, + ) -> Box bool> { + Box::new(move |rti, e| { if !f1(rti, e) { return false; } @@ -574,16 +597,16 @@ impl RoutingTable { return false; } true - } + }) } - pub fn find_fast_public_nodes_filtered<'a, 'b, F>( + pub fn find_fast_public_nodes_filtered( &self, node_count: usize, mut entry_filter: F, ) -> Vec where - F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, + F: FnMut(&RoutingTableInner, &BucketEntryInner) -> bool, { self.inner .read() diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index f51f0589..9d49d7fa 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -55,7 +55,7 @@ struct RouteSpecDetail { } /// The core representation of the RouteSpecStore that can be serialized -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct RouteSpecStoreContent { /// All of the routes we have allocated so far details: HashMap, @@ -79,14 +79,25 @@ pub struct RouteSpecStoreInner { /// RouteSpecStore cache cache: RouteSpecStoreCache, } -#[derive(Debug)] + pub struct RouteSpecStoreUnlockedInner { + /// Handle to routing table + routing_table: RoutingTable, /// Maximum number of hops in a route max_route_hop_count: usize, /// Default number of hops in a route default_route_hop_count: usize, } +impl fmt::Debug for RouteSpecStoreUnlockedInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RouteSpecStoreUnlockedInner") + .field("max_route_hop_count", &self.max_route_hop_count) + .field("default_route_hop_count", &self.default_route_hop_count) + .finish() + } +} + /// The routing table's storage for private/safety routes #[derive(Clone, Debug)] pub struct RouteSpecStore { @@ -176,17 +187,16 @@ where heaps_permutation(&mut permutation, hop_count - 1, f) } -xxx get routing table handle into routespecstore -xxx first figure out when new/load get called, does routing table need 'init' or can we just pick the right time to load the cache? what about flushing the cache ? we don't 'save' it yet, that should probably get flushed at the same time as the DH cache. - impl RouteSpecStore { - pub fn new(config: VeilidConfig) -> Self { + pub fn new(routing_table: RoutingTable) -> Self { + let config = routing_table.network_manager().config(); let c = config.get(); Self { unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner { max_route_hop_count: c.network.rpc.max_route_hop_count.into(), default_route_hop_count: c.network.rpc.default_route_hop_count.into(), + routing_table, }), inner: Arc::new(Mutex::new(RouteSpecStoreInner { content: RouteSpecStoreContent { @@ -243,6 +253,7 @@ impl RouteSpecStore { unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner { max_route_hop_count: c.network.rpc.max_route_hop_count.into(), default_route_hop_count: c.network.rpc.default_route_hop_count.into(), + routing_table, }), inner: Arc::new(Mutex::new(inner)), }; @@ -250,17 +261,28 @@ impl RouteSpecStore { Ok(rss) } - pub async fn save(&self, routing_table: RoutingTable) -> EyreResult<()> { - let inner = self.inner.lock(); + pub async fn save(&self) -> EyreResult<()> { + let content = { + let inner = self.inner.lock(); + inner.content.clone() + }; // Save all the fields we care about to the cbor blob in table storage - let table_store = routing_table.network_manager().table_store(); + let table_store = self + .unlocked_inner + .routing_table + .network_manager() + .table_store(); let rsstdb = table_store.open("RouteSpecStore", 1).await?; - rsstdb.store_cbor(0, b"content", &inner.content).await?; + rsstdb.store_cbor(0, b"content", &content).await?; // Keep secrets in protected store as well - let pstore = routing_table.network_manager().protected_store(); - for (k, v) in &inner.content.details { + let pstore = self + .unlocked_inner + .routing_table + .network_manager() + .protected_store(); + for (k, v) in &content.details { if pstore .save_user_secret( &format!("RouteSpecStore_{}", k.encode()), @@ -319,15 +341,28 @@ impl RouteSpecStore { /// Returns None if no route could be allocated at this time pub fn allocate_route( &self, + stability: Stability, + sequencing: Sequencing, + hop_count: usize, + directions: DirectionSet, + ) -> EyreResult> { + let inner = &mut *self.inner.lock(); + let routing_table = self.unlocked_inner.routing_table.clone(); + let rti = &mut *routing_table.inner.write(); + + self.allocate_route_inner(inner, rti, stability, sequencing, hop_count, directions) + } + + fn allocate_route_inner( + &self, + inner: &mut RouteSpecStoreInner, rti: &RoutingTableInner, - routing_table: RoutingTable, stability: Stability, sequencing: Sequencing, hop_count: usize, directions: DirectionSet, ) -> EyreResult> { use core::cmp::Ordering; - let mut inner = self.inner.lock(); if hop_count < 1 { bail!("Not allocating route less than one hop in length"); @@ -537,7 +572,7 @@ impl RouteSpecStore { let hop_node_refs = route_nodes .iter() .map(|v| { - rti.lookup_node_ref(routing_table.clone(), nodes[*v].0) + rti.lookup_node_ref(self.unlocked_inner.routing_table.clone(), nodes[*v].0) .unwrap() }) .collect(); @@ -650,12 +685,12 @@ impl RouteSpecStore { /// Returns Ok(None) if no allocation could happen at this time (not an error) pub fn compile_safety_route( &self, - rti: &mut RoutingTableInner, - routing_table: RoutingTable, safety_selection: SafetySelection, private_route: PrivateRoute, ) -> EyreResult> { let inner = &mut *self.inner.lock(); + let routing_table = self.unlocked_inner.routing_table.clone(); + let rti = &mut *routing_table.inner.write(); let pr_hopcount = private_route.hop_count as usize; let max_route_hop_count = self.unlocked_inner.max_route_hop_count; @@ -726,9 +761,9 @@ impl RouteSpecStore { } else { // No route found, gotta allocate one let sr_pubkey = match self - .allocate_route( + .allocate_route_inner( + inner, rti, - routing_table.clone(), safety_spec.stability, safety_spec.sequencing, safety_spec.hop_count, diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 4c90a59b..579c5c55 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -30,12 +30,11 @@ pub struct RoutingTableInner { /// Peers we have recently communicated with pub(super) recent_peers: LruCache, /// Storage for private/safety RouteSpecs - pub(super) route_spec_store: RouteSpecStore, + pub(super) route_spec_store: Option, } impl RoutingTableInner { pub fn new(unlocked_inner: Arc) -> RoutingTableInner { - let config = unlocked_inner.config.clone(); RoutingTableInner { unlocked_inner, buckets: Vec::new(), @@ -46,7 +45,7 @@ impl RoutingTableInner { self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats: TransferStatsDownUp::default(), recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), - route_spec_store: RouteSpecStore::new(config), + route_spec_store: None, } } @@ -331,12 +330,11 @@ impl RoutingTableInner { let bucket = Bucket::new(routing_table.clone()); self.buckets.push(bucket); } + Ok(()) } - pub fn terminate(&mut self) { - // - } + pub fn terminate(&mut self) {} pub fn configure_local_network_routing_domain( &mut self, @@ -448,15 +446,20 @@ impl RoutingTableInner { min_state: BucketEntryState, mut f: F, ) -> Option { + let mut entryvec = Vec::with_capacity(self.bucket_entry_count); for bucket in &self.buckets { for entry in bucket.entries() { if entry.1.with(self, |_rti, e| e.state(cur_ts) >= min_state) { - if let Some(out) = f(self, *entry.0, entry.1.clone()) { - return Some(out); - } + entryvec.push((*entry.0, entry.1.clone())); } } } + for entry in entryvec { + if let Some(out) = f(self, entry.0, entry.1) { + return Some(out); + } + } + None } @@ -469,15 +472,20 @@ impl RoutingTableInner { min_state: BucketEntryState, mut f: F, ) -> Option { + let mut entryvec = Vec::with_capacity(self.bucket_entry_count); for bucket in &self.buckets { for entry in bucket.entries() { if entry.1.with(self, |_rti, e| e.state(cur_ts) >= min_state) { - if let Some(out) = f(self, *entry.0, entry.1.clone()) { - return Some(out); - } + entryvec.push((*entry.0, entry.1.clone())); } } } + for entry in entryvec { + if let Some(out) = f(self, entry.0, entry.1) { + return Some(out); + } + } + None } @@ -777,7 +785,7 @@ impl RoutingTableInner { out } - pub fn touch_recent_peer(&self, node_id: DHTKey, last_connection: ConnectionDescriptor) { + pub fn touch_recent_peer(&mut self, node_id: DHTKey, last_connection: ConnectionDescriptor) { self.recent_peers .insert(node_id, RecentPeersEntry { last_connection }); } @@ -786,14 +794,14 @@ impl RoutingTableInner { // Find Nodes // Retrieve the fastest nodes in the routing table matching an entry filter - pub fn find_fast_public_nodes_filtered<'a, 'b, F>( + pub fn find_fast_public_nodes_filtered( &self, outer_self: RoutingTable, node_count: usize, mut entry_filter: F, ) -> Vec where - F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, + F: FnMut(&RoutingTableInner, &BucketEntryInner) -> bool, { self.find_fastest_nodes( // count diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index fe177606..b68dac80 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -432,7 +432,7 @@ impl RPCProcessor { // Compile the safety route with the private route let compiled_route: CompiledRoute = match rss - .compile_safety_route(rti, routing_table, safety_selection, private_route) + .compile_safety_route(safety_selection, private_route) .map_err(RPCError::internal)? { Some(cr) => cr, diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 454acb9c..72b5f2f3 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -94,16 +94,16 @@ impl RPCProcessor { routing_domain, dial_info.clone(), ); - let will_validate_dial_info_filter = |_rti, e: &BucketEntryInner| { + let will_validate_dial_info_filter = Box::new(move |_rti, e: &BucketEntryInner| { if let Some(status) = &e.node_status(routing_domain) { status.will_validate_dial_info() } else { true } - }; + }); let filter = RoutingTable::combine_entry_filters( outbound_dial_info_entry_filter, - will_validate_dial_info_filter, + will_validate_dial_info_filter, fuck this shit. do it tomorrow. ); // Find nodes matching filter to redirect this to