diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index a139334b..5a771d95 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -757,7 +757,9 @@ impl NetworkManager { Ok(v) => v, }; - receipt_manager.handle_receipt(receipt, None).await + receipt_manager + .handle_receipt(receipt, ReceiptReturned::OutOfBand) + .await } /// Process a received in-band receipt @@ -765,7 +767,7 @@ impl NetworkManager { pub async fn handle_in_band_receipt>( &self, receipt_data: R, - inbound_nr: NodeRef, + inbound_noderef: NodeRef, ) -> NetworkResult<()> { let receipt_manager = self.receipt_manager(); @@ -777,7 +779,27 @@ impl NetworkManager { }; receipt_manager - .handle_receipt(receipt, Some(inbound_nr)) + .handle_receipt(receipt, ReceiptReturned::InBand { inbound_noderef }) + .await + } + + /// Process a received private receipt + #[instrument(level = "trace", skip(self, receipt_data), ret)] + pub async fn handle_private_receipt>( + &self, + receipt_data: R, + ) -> NetworkResult<()> { + let receipt_manager = self.receipt_manager(); + + let receipt = match Receipt::from_signed_data(receipt_data.as_ref()) { + Err(e) => { + return NetworkResult::invalid_message(e.to_string()); + } + Ok(v) => v, + }; + + receipt_manager + .handle_receipt(receipt, ReceiptReturned::Private) .await } @@ -1001,7 +1023,7 @@ impl NetworkManager { // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedOutOfBand => { + ReceiptEvent::ReturnedPrivate | ReceiptEvent::ReturnedOutOfBand => { return Ok(NetworkResult::invalid_message( "reverse connect receipt should be returned in-band", )); @@ -1102,7 +1124,7 @@ impl NetworkManager { // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedOutOfBand => { + ReceiptEvent::ReturnedPrivate | ReceiptEvent::ReturnedOutOfBand => { return Ok(NetworkResult::invalid_message( "hole punch receipt should be returned in-band", )); diff --git a/veilid-core/src/receipt_manager.rs b/veilid-core/src/receipt_manager.rs index 06101a3c..28ba23d2 100644 --- a/veilid-core/src/receipt_manager.rs +++ b/veilid-core/src/receipt_manager.rs @@ -11,10 +11,18 @@ use xx::*; pub enum ReceiptEvent { ReturnedOutOfBand, ReturnedInBand { inbound_noderef: NodeRef }, + ReturnedPrivate, Expired, Cancelled, } +#[derive(Clone, Debug)] +pub enum ReceiptReturned { + OutOfBand, + InBand { inbound_noderef: NodeRef }, + Private, +} + pub trait ReceiptCallback: Send + 'static { fn call( &self, @@ -394,17 +402,17 @@ impl ReceiptManager { pub async fn handle_receipt( &self, receipt: Receipt, - inbound_noderef: Option, + receipt_returned: ReceiptReturned, ) -> NetworkResult<()> { let receipt_nonce = receipt.get_nonce(); let extra_data = receipt.get_extra_data(); log_rpc!(debug "<<== RECEIPT {} <- {}{}", receipt_nonce.encode(), - if let Some(nr) = &inbound_noderef { - nr.to_string() - } else { - "DIRECT".to_owned() + match receipt_returned { + ReceiptReturned::OutOfBand => "OutOfBand".to_owned(), + ReceiptReturned::InBand { ref inbound_noderef } => format!("InBand({})", inbound_noderef), + ReceiptReturned::Private => "Private".to_owned(), }, if extra_data.is_empty() { "".to_owned() @@ -435,10 +443,14 @@ impl ReceiptManager { record_mut.returns_so_far += 1; // Get the receipt event to return - let receipt_event = if let Some(inbound_noderef) = inbound_noderef { - ReceiptEvent::ReturnedInBand { inbound_noderef } - } else { - ReceiptEvent::ReturnedOutOfBand + let receipt_event = match receipt_returned { + ReceiptReturned::OutOfBand => ReceiptEvent::ReturnedOutOfBand, + ReceiptReturned::InBand { + ref inbound_noderef, + } => ReceiptEvent::ReturnedInBand { + inbound_noderef: inbound_noderef.clone(), + }, + ReceiptReturned::Private => ReceiptEvent::ReturnedPrivate, }; let callback_future = Self::perform_callback(receipt_event, &mut record_mut); diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index ba48563c..48199bfc 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -56,11 +56,7 @@ impl Bucket { self.entries.iter() } - pub(super) fn kick( - &mut self, - inner: &mut RoutingTableInner, - bucket_depth: usize, - ) -> Option> { + pub(super) fn kick(&mut self, bucket_depth: usize) -> Option> { // Get number of entries to attempt to purge from bucket let bucket_len = self.entries.len(); @@ -84,8 +80,8 @@ impl Bucket { if a.0 == b.0 { return core::cmp::Ordering::Equal; } - a.1.with(inner, |rti, ea| { - b.1.with(rti, |_rti, eb| { + a.1.with_inner(|ea| { + b.1.with_inner(|eb| { let astate = state_ordering(ea.state(cur_ts)); let bstate = state_ordering(eb.state(cur_ts)); // first kick dead nodes, then unreliable nodes diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 94428da9..f858fbd7 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -675,6 +675,24 @@ impl BucketEntry { let mut inner = self.inner.write(); f(rti, &mut *inner) } + + // Internal inner access for RoutingTableInner only + pub(super) fn with_inner(&self, f: F) -> R + where + F: FnOnce(&BucketEntryInner) -> R, + { + let inner = self.inner.read(); + f(&*inner) + } + + // Internal inner access for RoutingTableInner only + pub(super) fn with_mut_inner(&self, f: F) -> R + where + F: FnOnce(&mut BucketEntryInner) -> R, + { + let mut inner = self.inner.write(); + f(&mut *inner) + } } impl Drop for BucketEntry { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index d11880c5..6214d335 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -191,18 +191,8 @@ impl RoutingTable { self.inner.read().routing_domain_for_address(address) } - pub fn with_route_spec_store_mut(&self, f: F) -> R - where - F: FnOnce(&mut RouteSpecStore, &mut RoutingTableInner) -> R, - { - self.inner.write().with_route_spec_store_mut(f) - } - - pub fn with_route_spec_store(&self, f: F) -> R - where - F: FnOnce(&RouteSpecStore, &RoutingTableInner) -> R, - { - self.inner.read().with_route_spec_store(f) + pub fn route_spec_store(&self) -> RouteSpecStore { + self.inner.read().route_spec_store.clone() } pub fn relay_node(&self, domain: RoutingDomain) -> Option { diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 82a1c6a3..f51f0589 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -72,18 +72,27 @@ pub struct RouteSpecStoreCache { hop_cache: HashSet>, } -/// The routing table's storage for private/safety routes #[derive(Debug)] -pub struct RouteSpecStore { - /// Maximum number of hops in a route - max_route_hop_count: usize, - /// Default number of hops in a route - default_route_hop_count: usize, +pub struct RouteSpecStoreInner { /// Serialize RouteSpecStore content content: RouteSpecStoreContent, /// RouteSpecStore cache cache: RouteSpecStoreCache, } +#[derive(Debug)] +pub struct RouteSpecStoreUnlockedInner { + /// Maximum number of hops in a route + max_route_hop_count: usize, + /// Default number of hops in a route + default_route_hop_count: usize, +} + +/// The routing table's storage for private/safety routes +#[derive(Clone, Debug)] +pub struct RouteSpecStore { + inner: Arc>, + unlocked_inner: Arc, +} fn route_hops_to_hop_cache(hops: &[DHTKey]) -> Vec { let mut cache: Vec = Vec::with_capacity(hops.len() * DHT_KEY_LENGTH); @@ -167,17 +176,24 @@ where heaps_permutation(&mut permutation, hop_count - 1, f) } +xxx get routing table handle into routespecstore +xxx first figure out when new/load get called, does routing table need 'init' or can we just pick the right time to load the cache? what about flushing the cache ? we don't 'save' it yet, that should probably get flushed at the same time as the DH cache. + impl RouteSpecStore { pub fn new(config: VeilidConfig) -> Self { let c = config.get(); Self { - max_route_hop_count: c.network.rpc.max_route_hop_count.into(), - default_route_hop_count: c.network.rpc.default_route_hop_count.into(), - content: RouteSpecStoreContent { - details: HashMap::new(), - }, - cache: Default::default(), + unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner { + max_route_hop_count: c.network.rpc.max_route_hop_count.into(), + default_route_hop_count: c.network.rpc.default_route_hop_count.into(), + }), + inner: Arc::new(Mutex::new(RouteSpecStoreInner { + content: RouteSpecStoreContent { + details: HashMap::new(), + }, + cache: Default::default(), + })), } } @@ -187,18 +203,13 @@ impl RouteSpecStore { // Get cbor blob from table store let table_store = routing_table.network_manager().table_store(); let rsstdb = table_store.open("RouteSpecStore", 1).await?; - let content = rsstdb.load_cbor(0, b"content").await?.unwrap_or_default(); - let mut rss = RouteSpecStore { - max_route_hop_count: c.network.rpc.max_route_hop_count.into(), - default_route_hop_count: c.network.rpc.default_route_hop_count.into(), - content, - cache: Default::default(), - }; + let mut content: RouteSpecStoreContent = + rsstdb.load_cbor(0, b"content").await?.unwrap_or_default(); // Load secrets from pstore let pstore = routing_table.network_manager().protected_store(); let mut dead_keys = Vec::new(); - for (k, v) in &mut rss.content.details { + for (k, v) in &mut content.details { if let Some(secret_key) = pstore .load_user_secret(&format!("RouteSpecStore_{}", k.encode())) .await? @@ -217,23 +228,39 @@ impl RouteSpecStore { } for k in dead_keys { log_rtab!(debug "killing off private route: {}", k.encode()); - rss.content.details.remove(&k); + content.details.remove(&k); } + let mut inner = RouteSpecStoreInner { + content, + cache: Default::default(), + }; + // Rebuild the routespecstore cache - rss.rebuild_cache(); + Self::rebuild_cache(&mut inner); + + let rss = RouteSpecStore { + unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner { + max_route_hop_count: c.network.rpc.max_route_hop_count.into(), + default_route_hop_count: c.network.rpc.default_route_hop_count.into(), + }), + inner: Arc::new(Mutex::new(inner)), + }; + Ok(rss) } pub async fn save(&self, routing_table: RoutingTable) -> EyreResult<()> { + let inner = self.inner.lock(); + // Save all the fields we care about to the cbor blob in table storage let table_store = routing_table.network_manager().table_store(); let rsstdb = table_store.open("RouteSpecStore", 1).await?; - rsstdb.store_cbor(0, b"content", &self.content).await?; + rsstdb.store_cbor(0, b"content", &inner.content).await?; // Keep secrets in protected store as well let pstore = routing_table.network_manager().protected_store(); - for (k, v) in &self.content.details { + for (k, v) in &inner.content.details { if pstore .save_user_secret( &format!("RouteSpecStore_{}", k.encode()), @@ -266,18 +293,24 @@ impl RouteSpecStore { .or_insert(1); } - fn rebuild_cache(&mut self) { - for v in self.content.details.values() { + fn rebuild_cache(inner: &mut RouteSpecStoreInner) { + for v in inner.content.details.values() { let cache_key = route_hops_to_hop_cache(&v.hops); - Self::add_to_cache(&mut self.cache, cache_key, &v); + Self::add_to_cache(&mut inner.cache, cache_key, &v); } } - fn detail(&self, public_key: &DHTKey) -> Option<&RouteSpecDetail> { - self.content.details.get(&public_key) + fn detail<'a>( + inner: &'a RouteSpecStoreInner, + public_key: &DHTKey, + ) -> Option<&'a RouteSpecDetail> { + inner.content.details.get(public_key) } - fn detail_mut(&mut self, public_key: &DHTKey) -> Option<&mut RouteSpecDetail> { - self.content.details.get_mut(&public_key) + fn detail_mut<'a>( + inner: &'a mut RouteSpecStoreInner, + public_key: &DHTKey, + ) -> Option<&'a mut RouteSpecDetail> { + inner.content.details.get_mut(public_key) } /// Create a new route @@ -285,7 +318,7 @@ impl RouteSpecStore { /// The route is not yet tested for its reachability /// Returns None if no route could be allocated at this time pub fn allocate_route( - &mut self, + &self, rti: &RoutingTableInner, routing_table: RoutingTable, stability: Stability, @@ -294,12 +327,13 @@ impl RouteSpecStore { directions: DirectionSet, ) -> EyreResult> { use core::cmp::Ordering; + let mut inner = self.inner.lock(); if hop_count < 1 { bail!("Not allocating route less than one hop in length"); } - if hop_count > self.max_route_hop_count { + if hop_count > self.unlocked_inner.max_route_hop_count { bail!("Not allocating route longer than max route hop count"); } @@ -342,13 +376,13 @@ impl RouteSpecStore { v2: &(DHTKey, Option>)| -> Ordering { // deprioritize nodes that we have already used as end points - let e1_used_end = self + let e1_used_end = inner .cache .used_end_nodes .get(&v1.0) .cloned() .unwrap_or_default(); - let e2_used_end = self + let e2_used_end = inner .cache .used_end_nodes .get(&v2.0) @@ -360,13 +394,13 @@ impl RouteSpecStore { } // deprioritize nodes we have used already anywhere - let e1_used = self + let e1_used = inner .cache .used_nodes .get(&v1.0) .cloned() .unwrap_or_default(); - let e2_used = self + let e2_used = inner .cache .used_nodes .get(&v2.0) @@ -428,7 +462,7 @@ impl RouteSpecStore { cache_key = route_permutation_to_hop_cache(&nodes, permutation); // Skip routes we have already seen - if self.cache.hop_cache.contains(&cache_key) { + if inner.cache.hop_cache.contains(&cache_key) { return false; } @@ -529,10 +563,10 @@ impl RouteSpecStore { }; // Add to cache - Self::add_to_cache(&mut self.cache, cache_key, &rsd); + Self::add_to_cache(&mut inner.cache, cache_key, &rsd); // Keep route in spec store - self.content.details.insert(public_key, rsd); + inner.content.details.insert(public_key, rsd); Ok(Some(public_key)) } @@ -541,19 +575,21 @@ impl RouteSpecStore { where F: FnOnce(&RouteSpecDetail) -> R, { - self.detail(&public_key).map(|rsd| f(rsd)) + let inner = self.inner.lock(); + Self::detail(&*inner, &public_key).map(f) } - pub fn release_route(&mut self, public_key: DHTKey) { - if let Some(detail) = self.content.details.remove(&public_key) { + pub fn release_route(&self, public_key: DHTKey) { + let mut inner = self.inner.lock(); + if let Some(detail) = inner.content.details.remove(&public_key) { // Remove from hop cache let cache_key = route_hops_to_hop_cache(&detail.hops); - if !self.cache.hop_cache.remove(&cache_key) { + if !inner.cache.hop_cache.remove(&cache_key) { panic!("hop cache should have contained cache key"); } // Remove from used nodes cache for h in &detail.hops { - match self.cache.used_nodes.entry(*h) { + match inner.cache.used_nodes.entry(*h) { std::collections::hash_map::Entry::Occupied(mut o) => { *o.get_mut() -= 1; if *o.get() == 0 { @@ -566,7 +602,7 @@ impl RouteSpecStore { } } // Remove from end nodes cache - match self.cache.used_nodes.entry(*detail.hops.last().unwrap()) { + match inner.cache.used_nodes.entry(*detail.hops.last().unwrap()) { std::collections::hash_map::Entry::Occupied(mut o) => { *o.get_mut() -= 1; if *o.get() == 0 { @@ -584,14 +620,16 @@ impl RouteSpecStore { /// Find first matching unpublished route that fits into the selection criteria pub fn first_unpublished_route( - &mut self, + &self, min_hop_count: usize, max_hop_count: usize, stability: Stability, sequencing: Sequencing, directions: DirectionSet, ) -> Option { - for detail in &self.content.details { + let inner = self.inner.lock(); + + for detail in &inner.content.details { if detail.1.stability >= stability && detail.1.sequencing >= sequencing && detail.1.hops.len() >= min_hop_count @@ -611,14 +649,16 @@ impl RouteSpecStore { /// Returns an Err() if the parameters are wrong /// Returns Ok(None) if no allocation could happen at this time (not an error) pub fn compile_safety_route( - &mut self, + &self, rti: &mut RoutingTableInner, routing_table: RoutingTable, safety_selection: SafetySelection, private_route: PrivateRoute, ) -> EyreResult> { + let inner = &mut *self.inner.lock(); + let pr_hopcount = private_route.hop_count as usize; - let max_route_hop_count = self.max_route_hop_count; + let max_route_hop_count = self.unlocked_inner.max_route_hop_count; if pr_hopcount > max_route_hop_count { bail!("private route hop count too long"); } @@ -664,8 +704,7 @@ impl RouteSpecStore { // See if the preferred route is here let opt_safety_rsd: Option<(&mut RouteSpecDetail, DHTKey)> = if let Some(preferred_route) = safety_spec.preferred_route { - self.detail_mut(&preferred_route) - .map(|rsd| (rsd, preferred_route)) + Self::detail_mut(inner, &preferred_route).map(|rsd| (rsd, preferred_route)) } else { // Preferred safety route was not requested None @@ -683,7 +722,7 @@ impl RouteSpecStore { Direction::Outbound.into(), ) { // Found a route to use - (self.detail_mut(&sr_pubkey).unwrap(), sr_pubkey) + (Self::detail_mut(inner, &sr_pubkey).unwrap(), sr_pubkey) } else { // No route found, gotta allocate one let sr_pubkey = match self @@ -700,7 +739,7 @@ impl RouteSpecStore { Some(pk) => pk, None => return Ok(None), }; - (self.detail_mut(&sr_pubkey).unwrap(), sr_pubkey) + (Self::detail_mut(inner, &sr_pubkey).unwrap(), sr_pubkey) } }; @@ -837,14 +876,14 @@ impl RouteSpecStore { /// Assemble private route for publication pub fn assemble_private_route( - &mut self, + &self, rti: &RoutingTableInner, routing_table: RoutingTable, key: &DHTKey, ) -> EyreResult { - let rsd = self - .detail(&key) - .ok_or_else(|| eyre!("route does not exist"))?; + let inner = &*self.inner.lock(); + + let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; // See if we can optimize this compilation yet // We don't want to include full nodeinfo if we don't have to @@ -919,7 +958,8 @@ impl RouteSpecStore { /// When first deserialized, routes must be re-published in order to ensure they remain /// in the RouteSpecStore. pub fn mark_route_published(&mut self, key: &DHTKey) -> EyreResult<()> { - self.detail_mut(&key) + let inner = &mut *self.inner.lock(); + Self::detail_mut(inner, &key) .ok_or_else(|| eyre!("route does not exist"))? .published = true; Ok(()) @@ -929,7 +969,8 @@ impl RouteSpecStore { /// When first deserialized, routes must be re-tested for reachability /// This can be used to determine if routes need to be sent with full peerinfo or can just use a node id pub fn mark_route_reachable(&mut self, key: &DHTKey) -> EyreResult<()> { - self.detail_mut(&key) + let inner = &mut *self.inner.lock(); + Self::detail_mut(inner, &key) .ok_or_else(|| eyre!("route does not exist"))? .published = true; Ok(()) @@ -937,7 +978,8 @@ impl RouteSpecStore { /// Mark route as checked pub fn touch_route_checked(&mut self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> { - self.detail_mut(&key) + let inner = &mut *self.inner.lock(); + Self::detail_mut(inner, &key) .ok_or_else(|| eyre!("route does not exist"))? .last_checked_ts = Some(cur_ts); Ok(()) @@ -945,7 +987,8 @@ impl RouteSpecStore { /// Mark route as used pub fn touch_route_used(&mut self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> { - self.detail_mut(&key) + let inner = &mut *self.inner.lock(); + Self::detail_mut(inner, &key) .ok_or_else(|| eyre!("route does not exist"))? .last_used_ts = Some(cur_ts); Ok(()) @@ -953,20 +996,17 @@ impl RouteSpecStore { /// Record latency on the route pub fn record_latency(&mut self, key: &DHTKey, latency: u64) -> EyreResult<()> { - let lsa = &mut self - .detail_mut(&key) - .ok_or_else(|| eyre!("route does not exist"))? - .latency_stats_accounting; - self.detail_mut(&key) - .ok_or_else(|| eyre!("route does not exist"))? - .latency_stats = lsa.record_latency(latency); + let inner = &mut *self.inner.lock(); + + let rsd = Self::detail_mut(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; + rsd.latency_stats = rsd.latency_stats_accounting.record_latency(latency); Ok(()) } /// Get the calculated latency stats pub fn latency_stats(&mut self, key: &DHTKey) -> EyreResult { - Ok(self - .detail_mut(&key) + let inner = &mut *self.inner.lock(); + Ok(Self::detail_mut(inner, &key) .ok_or_else(|| eyre!("route does not exist"))? .latency_stats .clone()) @@ -974,27 +1014,24 @@ impl RouteSpecStore { /// Add download transfers to route pub fn add_down(&mut self, key: &DHTKey, bytes: u64) -> EyreResult<()> { - let tsa = &mut self - .detail_mut(&key) - .ok_or_else(|| eyre!("route does not exist"))? - .transfer_stats_accounting; - tsa.add_down(bytes); + let inner = &mut *self.inner.lock(); + let rsd = Self::detail_mut(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; + rsd.transfer_stats_accounting.add_down(bytes); Ok(()) } /// Add upload transfers to route pub fn add_up(&mut self, key: &DHTKey, bytes: u64) -> EyreResult<()> { - let tsa = &mut self - .detail_mut(&key) - .ok_or_else(|| eyre!("route does not exist"))? - .transfer_stats_accounting; - tsa.add_up(bytes); + let inner = &mut *self.inner.lock(); + let rsd = Self::detail_mut(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; + rsd.transfer_stats_accounting.add_up(bytes); Ok(()) } /// Process transfer statistics to get averages pub fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { - for rsd in self.content.details.values_mut() { + let inner = &mut *self.inner.lock(); + for rsd in inner.content.details.values_mut() { rsd.transfer_stats_accounting.roll_transfers( last_ts, cur_ts, diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs index b63462a8..6729667a 100644 --- a/veilid-core/src/routing_table/routing_domains.rs +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -116,7 +116,7 @@ impl RoutingDomainDetailCommon { where F: FnOnce(&PeerInfo) -> R, { - let cpi = self.cached_peer_info.lock(); + let mut cpi = self.cached_peer_info.lock(); if cpi.is_none() { // Regenerate peer info let pi = PeerInfo::new( @@ -131,6 +131,7 @@ impl RoutingDomainDetailCommon { dial_info_detail_list: self.dial_info_details.clone(), relay_peer_info: self .relay_node + .as_ref() .and_then(|rn| rn.make_peer_info(self.routing_domain).map(Box::new)), }, NodeId::new(self.node_id), @@ -268,7 +269,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { } // Get the target's inbound relay, it must have one or it is not reachable - if let Some(inbound_relay) = node_b.relay_peer_info { + if let Some(inbound_relay) = &node_b.relay_peer_info { // Note that relay_peer_info could be node_a, in which case a connection already exists // and we shouldn't have even gotten here if inbound_relay.node_id.key == *node_a_id { @@ -348,7 +349,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { } } // If the node B has no direct dial info, it needs to have an inbound relay - else if let Some(inbound_relay) = node_b.relay_peer_info { + else if let Some(inbound_relay) = &node_b.relay_peer_info { // Can we reach the full relay? if first_filtered_dial_info_detail( node_a, @@ -363,7 +364,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { } // If node A can't reach the node by other means, it may need to use its own relay - if let Some(outbound_relay) = node_a.relay_peer_info { + if let Some(outbound_relay) = &node_a.relay_peer_info { return ContactMethod::OutboundRelay(outbound_relay.node_id.key); } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 10918799..4c90a59b 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -35,6 +35,7 @@ pub struct RoutingTableInner { impl RoutingTableInner { pub fn new(unlocked_inner: Arc) -> RoutingTableInner { + let config = unlocked_inner.config.clone(); RoutingTableInner { unlocked_inner, buckets: Vec::new(), @@ -45,7 +46,7 @@ impl RoutingTableInner { self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats: TransferStatsDownUp::default(), recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), - route_spec_store: RouteSpecStore::new(unlocked_inner.config.clone()), + route_spec_store: RouteSpecStore::new(config), } } @@ -105,20 +106,6 @@ impl RoutingTableInner { } } - pub fn with_route_spec_store_mut(&mut self, f: F) -> R - where - F: FnOnce(&mut RouteSpecStore, &mut RoutingTableInner) -> R, - { - f(&mut self.route_spec_store, self) - } - - pub fn with_route_spec_store(&self, f: F) -> R - where - F: FnOnce(&RouteSpecStore, &RoutingTableInner) -> R, - { - f(&self.route_spec_store, self) - } - pub fn relay_node(&self, domain: RoutingDomain) -> Option { self.with_routing_domain(domain, |rd| rd.common().relay_node()) } @@ -382,8 +369,8 @@ impl RoutingTableInner { "Starting routing table buckets purge. Table currently has {} nodes", self.bucket_entry_count ); - for bucket in &self.buckets { - bucket.kick(self, 0); + for bucket in &mut self.buckets { + bucket.kick(0); } log_rtab!(debug "Routing table buckets purge complete. Routing table now has {} nodes", @@ -399,11 +386,12 @@ impl RoutingTableInner { ); for bucket in &self.buckets { for entry in bucket.entries() { - entry.1.with_mut(self, |_rti, e| { + entry.1.with_mut_inner(|e| { e.clear_last_connections(); }); } } + log_rtab!(debug "Routing table last_connections purge complete. Routing table now has {} nodes", self.bucket_entry_count @@ -416,7 +404,7 @@ impl RoutingTableInner { let bucket = &mut self.buckets[idx]; let bucket_depth = Self::bucket_depth(idx); - if let Some(dead_node_ids) = bucket.kick(self, bucket_depth) { + if let Some(dead_node_ids) = bucket.kick(bucket_depth) { // Remove counts self.bucket_entry_count -= dead_node_ids.len(); log_rtab!(debug "Routing table now has {} nodes", self.bucket_entry_count); diff --git a/veilid-core/src/rpc_processor/coders/private_safety_route.rs b/veilid-core/src/rpc_processor/coders/private_safety_route.rs index 50604970..4ac99409 100644 --- a/veilid-core/src/rpc_processor/coders/private_safety_route.rs +++ b/veilid-core/src/rpc_processor/coders/private_safety_route.rs @@ -52,11 +52,11 @@ pub fn encode_route_hop( let node_builder = builder.reborrow().init_node(); match &route_hop.node { RouteNode::NodeId(ni) => { - let ni_builder = node_builder.init_node_id(); + let mut ni_builder = node_builder.init_node_id(); encode_public_key(&ni.key, &mut ni_builder)?; } RouteNode::PeerInfo(pi) => { - let pi_builder = node_builder.init_peer_info(); + let mut pi_builder = node_builder.init_peer_info(); encode_peer_info(&pi, &mut pi_builder)?; } } diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index d68ba5f4..033b08f7 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -30,16 +30,18 @@ pub enum Destination { impl Destination { pub fn direct(target: NodeRef) -> Self { + let sequencing = target.sequencing(); Self::Direct { target, - safety_selection: SafetySelection::Unsafe(target.sequencing()), + safety_selection: SafetySelection::Unsafe(sequencing), } } pub fn relay(relay: NodeRef, target: DHTKey) -> Self { + let sequencing = relay.sequencing(); Self::Relay { relay, target, - safety_selection: SafetySelection::Unsafe(relay.sequencing()), + safety_selection: SafetySelection::Unsafe(sequencing), } } pub fn private_route(private_route: PrivateRoute, safety_selection: SafetySelection) -> Self { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index e1742359..fe177606 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -425,22 +425,23 @@ impl RPCProcessor { message_data: Vec, ) -> Result, RPCError> { let routing_table = self.routing_table(); + let rss = routing_table.route_spec_store(); + let pr_hop_count = private_route.hop_count; let pr_pubkey = private_route.public_key; - let compiled_route: CompiledRoute = - match self.routing_table().with_route_spec_store_mut(|rss, rti| { - // Compile the safety route with the private route - rss.compile_safety_route(rti, routing_table, safety_selection, private_route) - .map_err(RPCError::internal) - })? { - Some(cr) => cr, - None => { - return Ok(NetworkResult::no_connection_other( - "private route could not be compiled at this time", - )) - } - }; + // Compile the safety route with the private route + let compiled_route: CompiledRoute = match rss + .compile_safety_route(rti, routing_table, safety_selection, private_route) + .map_err(RPCError::internal)? + { + Some(cr) => cr, + None => { + return Ok(NetworkResult::no_connection_other( + "private route could not be compiled at this time", + )) + } + }; // Encrypt routed operation // Xmsg + ENC(Xmsg, DH(PKapr, SKbsr)) @@ -917,7 +918,7 @@ impl RPCProcessor { opt_sender_nr, } } - RPCMessageHeaderDetail::PrivateRoute(detail) => { + RPCMessageHeaderDetail::PrivateRoute(_) => { // Decode the RPC message let operation = { let reader = capnp::message::Reader::new(encoded_msg.data, Default::default()); diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index 6e61c754..0e317d9c 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -31,8 +31,14 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_node_info_update(&self, msg: RPCMessage) -> Result<(), RPCError> { - let sender_node_id = msg.header.envelope.get_sender_id(); - let routing_domain = msg.header.routing_domain; + let detail = match msg.header.detail { + RPCMessageHeaderDetail::Direct(detail) => detail, + RPCMessageHeaderDetail::PrivateRoute(_) => { + return Err(RPCError::protocol("node_info_update must be direct")); + } + }; + let sender_node_id = detail.envelope.get_sender_id(); + let routing_domain = detail.routing_domain; // Get the statement let node_info_update = match msg.operation.into_kind() { diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index 31c19023..667ca81e 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -33,11 +33,23 @@ impl RPCProcessor { // Handle it let network_manager = self.network_manager(); - network_result_value_or_log!(debug - network_manager - .handle_in_band_receipt(receipt, msg.header.peer_noderef) - .await => {} - ); + + match msg.header.detail { + RPCMessageHeaderDetail::Direct(detail) => { + network_result_value_or_log!(debug + network_manager + .handle_in_band_receipt(receipt, detail.peer_noderef) + .await => {} + ); + } + RPCMessageHeaderDetail::PrivateRoute(detail) => { + network_result_value_or_log!(debug + network_manager + .handle_private_receipt(receipt) + .await => {} + ); + } + } Ok(()) } diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index 91d5aebe..b12fa50b 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -170,8 +170,13 @@ impl RPCProcessor { // If the private route public key is our node id, then this was sent via safety route to our node directly // so there will be no signatures to validate let opt_pr_info = if private_route.public_key == self.routing_table.node_id() { - // the private route was a stub to our own node's secret - // return our secret key and an appropriate safety selection + // The private route was a stub + // Return our secret key and an appropriate safety selection + // + // Note: it is important that we never respond with a safety route to questions that come + // in without a private route. Giving away a safety route when the node id is known is + // a privacy violation! + // Get sequencing preference let sequencing = if detail .connection_descriptor @@ -191,46 +196,45 @@ impl RPCProcessor { let sender_id = detail.envelope.get_sender_id(); // Look up the private route and ensure it's one in our spec store - let opt_signatures_valid = self.routing_table.with_route_spec_store(|rss, rti| { - rss.with_route_spec_detail(&private_route.public_key, |rsd| { - // Ensure we have the right number of signatures - if routed_operation.signatures.len() != rsd.hops.len() - 1 { - // Wrong number of signatures - log_rpc!(debug "wrong number of signatures ({} should be {}) for routed operation on private route {}", routed_operation.signatures.len(), rsd.hops.len() - 1, private_route.public_key); - return None; - } - // Validate signatures to ensure the route was handled by the nodes and not messed with - for (hop_n, hop_public_key) in rsd.hops.iter().enumerate() { - // The last hop is not signed, as the whole packet is signed - if hop_n == routed_operation.signatures.len() { - // Verify the node we received the routed operation from is the last hop in our route - if *hop_public_key != sender_id { - log_rpc!(debug "received routed operation from the wrong hop ({} should be {}) on private route {}", hop_public_key.encode(), sender_id.encode(), private_route.public_key); - return None; - } - } else { - // Verify a signature for a hop node along the route - if let Err(e) = verify( - hop_public_key, - &routed_operation.data, - &routed_operation.signatures[hop_n], - ) { - log_rpc!(debug "failed to verify signature for hop {} at {} on private route {}", hop_n, hop_public_key, private_route.public_key); - return None; - } + let rss= self.routing_table.route_spec_store(); + let opt_signatures_valid = rss.with_route_spec_detail(&private_route.public_key, |rsd| { + // Ensure we have the right number of signatures + if routed_operation.signatures.len() != rsd.hops.len() - 1 { + // Wrong number of signatures + log_rpc!(debug "wrong number of signatures ({} should be {}) for routed operation on private route {}", routed_operation.signatures.len(), rsd.hops.len() - 1, private_route.public_key); + return None; + } + // Validate signatures to ensure the route was handled by the nodes and not messed with + for (hop_n, hop_public_key) in rsd.hops.iter().enumerate() { + // The last hop is not signed, as the whole packet is signed + if hop_n == routed_operation.signatures.len() { + // Verify the node we received the routed operation from is the last hop in our route + if *hop_public_key != sender_id { + log_rpc!(debug "received routed operation from the wrong hop ({} should be {}) on private route {}", hop_public_key.encode(), sender_id.encode(), private_route.public_key); + return None; + } + } else { + // Verify a signature for a hop node along the route + if let Err(e) = verify( + hop_public_key, + &routed_operation.data, + &routed_operation.signatures[hop_n], + ) { + log_rpc!(debug "failed to verify signature for hop {} at {} on private route {}", hop_n, hop_public_key, private_route.public_key); + return None; } } - // We got the correct signatures, return a key ans - Some(( - rsd.secret_key, - SafetySelection::Safe(SafetySpec { - preferred_route: Some(private_route.public_key), - hop_count: rsd.hops.len(), - stability: rsd.stability, - sequencing: rsd.sequencing, - }) - )) - }) + } + // We got the correct signatures, return a key ans + Some(( + rsd.secret_key, + SafetySelection::Safe(SafetySpec { + preferred_route: Some(private_route.public_key), + hop_count: rsd.hops.len(), + stability: rsd.stability, + sequencing: rsd.sequencing, + }) + )) }); opt_signatures_valid.ok_or_else(|| { RPCError::protocol("routed operation received on unallocated private route") diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 08cb3713..63fbcb79 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -32,7 +32,7 @@ impl RPCProcessor { // Handle it let network_manager = self.network_manager(); network_result_value_or_log!(debug network_manager - .handle_signal(msg.header.envelope.get_sender_id(), signal.signal_info) + .handle_signal(signal.signal_info) .await .map_err(RPCError::network)? => { return Ok(()); diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 079665d4..feebaa5c 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -103,8 +103,15 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> { - let connection_descriptor = msg.header.connection_descriptor; - let routing_domain = msg.header.routing_domain; + let detail = match &msg.header.detail { + RPCMessageHeaderDetail::Direct(detail) => detail, + RPCMessageHeaderDetail::PrivateRoute(_) => { + return Err(RPCError::protocol("status_q must be direct")); + } + }; + + let connection_descriptor = detail.connection_descriptor; + let routing_domain = detail.routing_domain; // Get the question let status_q = match msg.operation.kind() { diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 9c73baf2..454acb9c 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -34,7 +34,7 @@ impl RPCProcessor { // Wait for receipt match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => { + ReceiptEvent::ReturnedPrivate | ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => { log_net!(debug "validate_dial_info receipt should be returned out-of-band".green()); Ok(false) } @@ -54,6 +54,13 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> Result<(), RPCError> { + let detail = match msg.header.detail { + RPCMessageHeaderDetail::Direct(detail) => detail, + RPCMessageHeaderDetail::PrivateRoute(_) => { + return Err(RPCError::protocol("validate_dial_info must be direct")); + } + }; + // Get the statement let RPCOperationValidateDialInfo { dial_info, @@ -74,8 +81,8 @@ impl RPCProcessor { // Use the address type though, to ensure we reach an ipv6 capable node if this is // an ipv6 address let routing_table = self.routing_table(); - let sender_id = msg.header.envelope.get_sender_id(); - let routing_domain = msg.header.routing_domain; + let sender_id = detail.envelope.get_sender_id(); + let routing_domain = detail.routing_domain; let node_count = { let c = self.config.get(); c.network.dht.max_find_node_count as usize @@ -141,8 +148,6 @@ impl RPCProcessor { .await .map_err(RPCError::network)?; - // tracing::Span::current().record("res", &tracing::field::display(res)); - Ok(()) } }