From 4823c979ab06a562afe30ea6f3e4d6059d7bc006 Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 23 Feb 2023 21:07:46 -0500 Subject: [PATCH] routing work --- veilid-core/proto/veilid.capnp | 2 +- veilid-core/src/crypto/crypto_system.rs | 8 +- veilid-core/src/crypto/dh_cache.rs | 2 +- veilid-core/src/crypto/mod.rs | 11 +- .../src/network_manager/connection_table.rs | 4 +- .../src/routing_table/route_spec_store.rs | 428 +++++++++++------- .../coders/operations/operation_route.rs | 8 +- veilid-core/src/rpc_processor/rpc_app_call.rs | 16 +- 8 files changed, 288 insertions(+), 191 deletions(-) diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index c7940275..bb727a7d 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -263,7 +263,7 @@ struct PeerInfo @0xfe2d722d5d3c4bcb { struct RoutedOperation @0xcbcb8535b839e9dd { sequencing @0 :Sequencing; # sequencing preference to use to pass the message along - signatures @1 :List(TypedSignature); # signatures from nodes that have handled the private route + signatures @1 :List(Signature); # signatures from nodes that have handled the private route nonce @2 :Nonce; # nonce Xmsg data @3 :Data; # operation encrypted with ENC(Xmsg,DH(PKapr,SKbsr)) } diff --git a/veilid-core/src/crypto/crypto_system.rs b/veilid-core/src/crypto/crypto_system.rs index 4a3465c2..12a50c38 100644 --- a/veilid-core/src/crypto/crypto_system.rs +++ b/veilid-core/src/crypto/crypto_system.rs @@ -33,7 +33,7 @@ pub trait CryptoSystem { fn validate_hash_reader( &self, reader: &mut dyn std::io::Read, - dht_key: &PublicKey, + key: &PublicKey, ) -> Result; // Distance Metric @@ -42,13 +42,13 @@ pub trait CryptoSystem { // Authentication fn sign( &self, - dht_key: &PublicKey, - dht_key_secret: &SecretKey, + key: &PublicKey, + secret: &SecretKey, data: &[u8], ) -> Result; fn verify( &self, - dht_key: &PublicKey, + key: &PublicKey, data: &[u8], signature: &Signature, ) -> Result<(), VeilidAPIError>; diff --git a/veilid-core/src/crypto/dh_cache.rs b/veilid-core/src/crypto/dh_cache.rs index 47382a50..17c5d4ae 100644 --- a/veilid-core/src/crypto/dh_cache.rs +++ b/veilid-core/src/crypto/dh_cache.rs @@ -40,6 +40,6 @@ pub fn bytes_to_cache(bytes: &[u8], cache: &mut DHCache) { let v = DHCacheValue { shared_secret: SharedSecret::new(d[64..96].try_into().expect("asdf")), }; - cache.insert(k, v); + cache.insert(k, v, |_k, _v| {}); } } diff --git a/veilid-core/src/crypto/mod.rs b/veilid-core/src/crypto/mod.rs index e7ae9ecc..affa5da9 100644 --- a/veilid-core/src/crypto/mod.rs +++ b/veilid-core/src/crypto/mod.rs @@ -269,10 +269,13 @@ impl Crypto { secret: &SecretKey, ) -> Result { Ok( - match self.inner.lock().dh_cache.entry(DHCacheKey { - key: *key, - secret: *secret, - }) { + match self.inner.lock().dh_cache.entry( + DHCacheKey { + key: *key, + secret: *secret, + }, + |_k, _v| {}, + ) { Entry::Occupied(e) => e.get().shared_secret, Entry::Vacant(e) => { let shared_secret = vcrypto.compute_dh(key, secret)?; diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index 82d13ae3..7fe2a18d 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -136,7 +136,9 @@ impl ConnectionTable { }; // Add the connection to the table - let res = inner.conn_by_id[protocol_index].insert(id, network_connection); + let res = inner.conn_by_id[protocol_index].insert(id, network_connection, |_k, _v| { + // never lrus, unbounded + }); assert!(res.is_none()); // if we have reached the maximum number of connections per protocol type diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index e751e4d0..f82d5faa 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -164,12 +164,19 @@ impl RouteStats { #[archive_attr(repr(C), derive(CheckBytes))] pub struct RouteSpecDetail { /// Crypto kind - crypto_kind: CryptoKind, + pub crypto_kind: CryptoKind, /// Secret key #[with(Skip)] - secret_key: SecretKey, + pub secret_key: SecretKey, /// Route hops (node id keys) - hops: Vec, + pub hops: Vec, +} + +#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct RouteSetSpecDetail { + /// Route set per crypto kind + route_set: BTreeMap, /// Route noderefs #[with(Skip)] hop_node_refs: Vec, @@ -188,9 +195,19 @@ pub struct RouteSpecDetail { stats: RouteStats, } -impl RouteSpecDetail { - pub fn get_crypto_kind(&self) -> CryptoKind { - self.crypto_kind +impl RouteSetSpecDetail { + pub fn get_route_by_key(&self, key: PublicKey) -> Option<&RouteSpecDetail> { + self.route_set.get(&key) + } + pub fn get_route_by_key_mut(&mut self, key: PublicKey) -> Option<&mut RouteSpecDetail> { + self.route_set.get_mut(&key) + } + pub fn get_route_set_keys(&self) -> TypedKeySet { + let mut tks = TypedKeySet::new(); + for (k, v) in &self.route_set { + tks.add(TypedKey::new(v.crypto_kind, *k)); + } + tks } pub fn get_stats(&self) -> &RouteStats { &self.stats @@ -202,10 +219,7 @@ impl RouteSpecDetail { self.published } pub fn hop_count(&self) -> usize { - self.hops.len() - } - pub fn get_secret_key(&self) -> SecretKey { - self.secret_key + self.hop_node_refs.len() } pub fn get_stability(&self) -> Stability { self.stability @@ -225,15 +239,60 @@ impl RouteSpecDetail { #[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)] #[archive_attr(repr(C, align(8)), derive(CheckBytes))] pub struct RouteSpecStoreContent { - /// All of the routes we have allocated so far - details: HashMap, + /// All of the route sets we have allocated so far indexed by key + id_by_key: HashMap, + /// All of the route sets we have allocated so far + details: HashMap, +} + +impl RouteSpecStoreContent { + pub fn add_detail(&mut self, detail: RouteSetSpecDetail) -> String { + + // generate unique key string + let mut idbytes = [0u8; 16]; + for (pk, _) in &detail.route_set { + for (i, x) in pk.bytes.iter().enumerate() { + idbytes[i % 16] ^= *x; + } + } + let id = format!("{:08x}-{:04x}-{:04x}-{:04x}-{:08x}{:04x}", + u32::from_be_bytes(idbytes[0..4].try_into().expect("32 bits")), + u16::from_be_bytes(idbytes[4..6].try_into().expect("16 bits")), + u16::from_be_bytes(idbytes[6..8].try_into().expect("16 bits")), + u16::from_be_bytes(idbytes[8..10].try_into().expect("16 bits")), + u32::from_be_bytes(idbytes[10..14].try_into().expect("32 bits")), + u16::from_be_bytes(idbytes[14..16].try_into().expect("16 bits"))); + + // also store in id by key table + for (pk, _) in &detail.route_set { + self.id_by_key.insert(*pk, id.clone()); + } + self.details.insert(id.clone(), detail); + + id + } + pub fn remove_detail(&mut self, id: &String) { + let detail = self.details.remove(id).unwrap(); + for (pk, _) in &detail.route_set { + self.id_by_key.remove(&pk).unwrap(); + } + } + pub fn get_detail(&self, id: &String) -> Option<&RouteSetSpecDetail> { + self.details.get(id) + } + pub fn get_detail_mut(&mut self, id: &String) -> Option<&mut RouteSetSpecDetail> { + self.details.get_mut(id) + } + pub fn get_id_by_key(&self, key: &PublicKey) -> Option { + self.id_by_key.get(key).cloned() + } } /// What remote private routes have seen #[derive(Debug, Clone, Default)] pub struct RemotePrivateRouteInfo { - /// The private route itself - private_route: Option, + /// The private routes themselves + private_routes: HashMap, /// Did this remote private route see our node info due to no safety route in use last_seen_our_node_info_ts: Timestamp, /// Last time this remote private route was requested for any reason (cache expiration) @@ -260,8 +319,10 @@ pub struct RouteSpecStoreCache { used_end_nodes: HashMap, /// Route spec hop cache, used to quickly disqualify routes hop_cache: HashSet>, - /// Has a remote private route responded to a question and when - remote_private_route_cache: LruCache, + /// Remote private routes we've imported and statistics + remote_private_route_set_cache: LruCache, + /// Remote private routes indexed by public key + remote_private_routes_by_key: HashMap, /// Compiled route cache compiled_route_cache: LruCache, /// List of dead allocated routes @@ -270,13 +331,35 @@ pub struct RouteSpecStoreCache { dead_remote_routes: Vec, } +impl RouteSpecStoreCache { + pub fn get_used_node_count(&self, node_ids: &TypedKeySet) -> usize { + node_ids.iter().fold(0usize, |acc, k| { + acc + self + .used_nodes + .get(&k) + .cloned() + .unwrap_or_default() + }) + } + pub fn get_used_end_node_count(&self, node_ids: &TypedKeySet) -> usize { + node_ids.iter().fold(0usize, |acc, k| { + acc + self + .used_end_nodes + .get(&k) + .cloned() + .unwrap_or_default() + }) + } +} + impl Default for RouteSpecStoreCache { fn default() -> Self { Self { used_nodes: Default::default(), used_end_nodes: Default::default(), hop_cache: Default::default(), - remote_private_route_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE), + remote_private_route_set_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE), + remote_private_routes_by_key: HashMap::new(), compiled_route_cache: LruCache::new(COMPILED_ROUTE_CACHE_SIZE), dead_routes: Default::default(), dead_remote_routes: Default::default(), @@ -317,19 +400,19 @@ pub struct RouteSpecStore { unlocked_inner: Arc, } -fn route_hops_to_hop_cache(hops: &[PublicKey]) -> Vec { +fn route_hops_to_hop_cache(hops: &[NodeRef]) -> Vec { let mut cache: Vec = Vec::with_capacity(hops.len() * PUBLIC_KEY_LENGTH); for hop in hops { - cache.extend_from_slice(&hop.bytes); + cache.extend_from_slice(&hop.best_node_id().key.bytes); } cache } /// get the hop cache key for a particular route permutation -fn route_permutation_to_hop_cache(nodes: &[PeerInfo], perm: &[usize]) -> Vec { +fn route_permutation_to_hop_cache(rti: &RoutingTableInner, nodes: &[NodeRef], perm: &[usize]) -> Vec { let mut cache: Vec = Vec::with_capacity(perm.len() * PUBLIC_KEY_LENGTH); for n in perm { - cache.extend_from_slice(&nodes[*n].node_id.key.bytes) + cache.extend_from_slice(&nodes[*n].locked(rti).best_node_id().key.bytes) } cache } @@ -416,6 +499,7 @@ impl RouteSpecStore { }), inner: Arc::new(Mutex::new(RouteSpecStoreInner { content: RouteSpecStoreContent { + id_by_key: HashMap::new(), details: HashMap::new(), }, cache: Default::default(), @@ -441,39 +525,62 @@ impl RouteSpecStore { rsstdb.load_rkyv(0, b"content")?.unwrap_or_default(); // Look up all route hop noderefs since we can't serialize those - let mut dead_keys = Vec::new(); - for (k, rsd) in &mut content.details { + let mut dead_ids = Vec::new(); + for (rsid, rssd) in &mut content.details { + // Get first route since they all should resolve + let Some((pk, rsd)) = rssd.route_set.first_key_value() else { + dead_ids.push(rsid.clone()); + continue; + }; + // Go through first route for h in &rsd.hops { let Some(nr) = routing_table.lookup_node_ref(TypedKey::new(rsd.crypto_kind, *h)) else { - dead_keys.push(*k); + dead_ids.push(rsid.clone()); break; }; - rsd.hop_node_refs.push(nr); + rssd.hop_node_refs.push(nr); } } - for k in dead_keys { - log_rtab!(debug "no entry, killing off private route: {}", k.encode()); - content.details.remove(&k); + for id in dead_ids { + log_rtab!(debug "no entry, killing off private route: {}", id); + content.remove_detail(&id); } // Load secrets from pstore let pstore = routing_table.network_manager().protected_store(); - let out: Vec = pstore + let secret_key_map: HashMap = pstore .load_user_secret_rkyv("RouteSpecStore") .await? .unwrap_or_default(); - let mut dead_keys = Vec::new(); - for KeyPair { key, secret } in out { - if let Some(rsd) = content.details.get_mut(&key) { - rsd.secret_key = secret; - } else { - dead_keys.push(key); + // Ensure we got secret keys for all the public keys + let mut got_secret_key_ids = HashSet::new(); + for (rsid, rssd) in &mut content.details { + let mut found_all = true; + for (pk, rsd) in &mut rssd.route_set { + if let Some(sk) = secret_key_map.get(pk) { + rsd.secret_key = *sk; + } else { + found_all = false; + break; + } + } + if found_all { + got_secret_key_ids.insert(rsid.clone()); } } - for k in dead_keys { - log_rtab!(debug "killing off private route: {}", k.encode()); - content.details.remove(&k); + + // If we missed any, nuke those route ids + let dead_ids:Vec = content.details.keys().filter_map(|id| { + if !got_secret_key_ids.contains(id) { + Some(id.clone()) + } else { + None + } + }).collect(); + for id in dead_ids { + log_rtab!(debug "missing secret key, killing off private route: {}", id); + content.remove_detail(&id); } let mut inner = RouteSpecStoreInner { @@ -504,6 +611,7 @@ impl RouteSpecStore { }; // Save all the fields we care about to the frozen blob in table storage + // This skips #[with(Skip)] saving the secret keys, we save them in the protected store instead let table_store = self .unlocked_inner .routing_table @@ -519,12 +627,11 @@ impl RouteSpecStore { .network_manager() .protected_store(); - let mut out: Vec = Vec::with_capacity(content.details.len()); - for (k, v) in &content.details { - out.push(KeyPair { - key: *k, - secret: v.secret_key, - }); + let mut out: HashMap = HashMap::new(); + for (rsid, rssd) in &content.details { + for (pk, rsd) in &rssd.route_set { + out.insert(*pk, rsd.secret_key); + } } let _ = pstore.save_user_secret_rkyv("RouteSpecStore", &out).await?; // ignore if this previously existed or not @@ -555,44 +662,33 @@ impl RouteSpecStore { update_callback(update); } - fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec, rsd: &RouteSpecDetail) { + fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec, rssd: &RouteSetSpecDetail) { if !cache.hop_cache.insert(cache_key) { panic!("route should never be inserted twice"); } - for h in &rsd.hops { + for (pk, rsd) in &rssd.route_set { + for h in &rsd.hops { + cache + .used_nodes + .entry(TypedKey::new(rsd.crypto_kind, *h)) + .and_modify(|e| *e += 1) + .or_insert(1); + } cache - .used_nodes - .entry(*h) + .used_end_nodes + .entry(TypedKey::new(rsd.crypto_kind, *rsd.hops.last().unwrap())) .and_modify(|e| *e += 1) .or_insert(1); } - cache - .used_end_nodes - .entry(*rsd.hops.last().unwrap()) - .and_modify(|e| *e += 1) - .or_insert(1); } fn rebuild_cache(inner: &mut RouteSpecStoreInner) { - for v in inner.content.details.values() { - let cache_key = route_hops_to_hop_cache(&v.hops); - Self::add_to_cache(&mut inner.cache, cache_key, &v); + for rssd in inner.content.details.values() { + let cache_key = route_hops_to_hop_cache(&rssd.hop_node_refs); + Self::add_to_cache(&mut inner.cache, cache_key, &rssd); } } - fn detail<'a>( - inner: &'a RouteSpecStoreInner, - route_spec_key: &PublicKey, - ) -> Option<&'a RouteSpecDetail> { - inner.content.details.get(route_spec_key) - } - fn detail_mut<'a>( - inner: &'a mut RouteSpecStoreInner, - route_spec_key: &PublicKey, - ) -> Option<&'a mut RouteSpecDetail> { - inner.content.details.get_mut(route_spec_key) - } - /// Purge the route spec store pub async fn purge(&self) -> EyreResult<()> { { @@ -607,7 +703,7 @@ impl RouteSpecStore { /// Prefers nodes that are not currently in use by another route /// The route is not yet tested for its reachability /// Returns None if no route could be allocated at this time - /// Returns Some list of public keys for the requested set of crypto kinds + /// Returns Some route id string #[instrument(level = "trace", skip(self), ret, err)] pub fn allocate_route( &self, @@ -617,7 +713,7 @@ impl RouteSpecStore { hop_count: usize, directions: DirectionSet, avoid_nodes: &[TypedKey], - ) -> EyreResult> { + ) -> EyreResult> { let inner = &mut *self.inner.lock(); let routing_table = self.unlocked_inner.routing_table.clone(); let rti = &mut *routing_table.inner.write(); @@ -645,7 +741,7 @@ impl RouteSpecStore { hop_count: usize, directions: DirectionSet, avoid_nodes: &[TypedKey], - ) -> EyreResult> { + ) -> EyreResult> { use core::cmp::Ordering; if hop_count < 1 { @@ -666,7 +762,7 @@ impl RouteSpecStore { // Get list of all nodes, and sort them for selection let cur_ts = get_aligned_timestamp(); let filter = Box::new( - move |rti: &RoutingTableInner, entry: Option>| -> bool { + |rti: &RoutingTableInner, entry: Option>| -> bool { // Exclude our own node from routes if entry.is_none() { return false; @@ -674,14 +770,14 @@ impl RouteSpecStore { let entry = entry.unwrap(); // Exclude our relay if we have one - if let Some(own_relay_nr) = opt_own_relay_nr { + if let Some(own_relay_nr) = &opt_own_relay_nr { if own_relay_nr.same_bucket_entry(&entry) { return false; } } // Process node info exclusions - let keep = entry.with(rti, |_rti, e| { + let keep = entry.with_inner(|e| { // Exclude nodes that don't have our requested crypto kinds let common_ck = e.common_crypto_kinds(crypto_kinds); @@ -712,7 +808,7 @@ impl RouteSpecStore { return false; } // Exclude nodes whose relay is our own relay if we have one - if let Some(own_relay_nr) = opt_own_relay_nr { + if let Some(own_relay_nr) = &opt_own_relay_nr { if relay_ids.contains_any(&own_relay_nr.node_ids()) { return false; } @@ -725,7 +821,7 @@ impl RouteSpecStore { } // Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route - entry.with(rti, |_rti, e| { + entry.with_inner(|e| { let node_info_ok = if let Some(sni) = e.signed_node_info(RoutingDomain::PublicInternet) { sni.has_sequencing_matched_dial_info(sequencing) @@ -748,42 +844,24 @@ impl RouteSpecStore { entry1: &Option>, entry2: &Option>| -> Ordering { - - - // xxx also sort my most overlapping crypto kinds - + + // Our own node is filtered out + let entry1 = entry1.unwrap(); + let entry2 = entry2.unwrap(); + let entry1_node_ids = entry1.with_inner(|e| e.node_ids()); + let entry2_node_ids = entry2.with_inner(|e| e.node_ids()); // deprioritize nodes that we have already used as end points - let e1_used_end = inner - .cache - .used_end_nodes - .get(&v1.0) - .cloned() - .unwrap_or_default(); - let e2_used_end = inner - .cache - .used_end_nodes - .get(&v2.0) - .cloned() - .unwrap_or_default(); + let e1_used_end = inner.cache.get_used_end_node_count(&entry1_node_ids); + let e2_used_end = inner.cache.get_used_end_node_count(&entry2_node_ids); let cmp_used_end = e1_used_end.cmp(&e2_used_end); if !matches!(cmp_used_end, Ordering::Equal) { return cmp_used_end; } // deprioritize nodes we have used already anywhere - let e1_used = inner - .cache - .used_nodes - .get(&v1.0) - .cloned() - .unwrap_or_default(); - let e2_used = inner - .cache - .used_nodes - .get(&v2.0) - .cloned() - .unwrap_or_default(); + let e1_used = inner.cache.get_used_node_count(&entry1_node_ids); + let e2_used = inner.cache.get_used_node_count(&entry2_node_ids); let cmp_used = e1_used.cmp(&e2_used); if !matches!(cmp_used, Ordering::Equal) { return cmp_used; @@ -793,14 +871,12 @@ impl RouteSpecStore { // ensureordered will be taken care of by filter // and nopreference doesn't care if matches!(sequencing, Sequencing::PreferOrdered) { - let cmp_seq = v1.1.as_ref().unwrap().with(rti, |rti, e1| { - v2.1.as_ref() - .unwrap() - .with(rti, |_rti, e2| { - let e1_can_do_ordered = e1.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); - let e2_can_do_ordered = e2.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); - e2_can_do_ordered.cmp(&e1_can_do_ordered) - }) + let cmp_seq = entry1.with_inner(|e1| { + entry2.with_inner(|e2| { + let e1_can_do_ordered = e1.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); + let e2_can_do_ordered = e2.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); + e2_can_do_ordered.cmp(&e1_can_do_ordered) + }) }); if !matches!(cmp_seq, Ordering::Equal) { return cmp_seq; @@ -808,37 +884,28 @@ impl RouteSpecStore { } // always prioritize reliable nodes, but sort by oldest or fastest - let cmpout = v1.1.as_ref().unwrap().with(rti, |rti, e1| { - v2.1.as_ref() - .unwrap() - .with(rti, |_rti, e2| match stability { - Stability::LowLatency => { - BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2) - } - Stability::Reliable => { - BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2) - } - }) + let cmpout = entry1.with_inner(|e1| { + entry2.with_inner(|e2| match stability { + Stability::LowLatency => { + BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2) + } + Stability::Reliable => { + BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2) + } + }) }); cmpout }; + + let routing_table = self.unlocked_inner.routing_table.clone(); let transform = - |rti: &RoutingTableInner, k: PublicKey, v: Option>| -> PeerInfo { - // Return the peerinfo for that key - v.unwrap().with(rti, |_rti, e| { - e.make_peer_info(k, RoutingDomain::PublicInternet.into()) - .unwrap() - .clone() - }) + |rti: &RoutingTableInner, entry: Option>| -> NodeRef { + NodeRef::new(routing_table.clone(), entry.unwrap(), None) }; // Pull the whole routing table in sorted order - let node_count = rti.get_entry_count( - RoutingDomain::PublicInternet.into(), - BucketEntryState::Unreliable, - ); let nodes = - rti.find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform); + rti.find_peers_with_sort_and_filter(usize::MAX, cur_ts, filters, compare, transform); // If we couldn't find enough nodes, wait until we have more nodes in the routing table if nodes.len() < hop_count { @@ -846,10 +913,13 @@ impl RouteSpecStore { return Ok(None); } + // Get peer info for everything + let nodes_pi: Vec = nodes.iter().map(|nr| nr.locked(rti).make_peer_info(RoutingDomain::PublicInternet).unwrap()).collect(); + // Now go through nodes and try to build a route we haven't seen yet let perm_func = Box::new(|permutation: &[usize]| { // Get the route cache key - let cache_key = route_permutation_to_hop_cache(&nodes, permutation); + let cache_key = route_permutation_to_hop_cache(rti, &nodes, permutation); // Skip routes we have already seen if inner.cache.hop_cache.contains(&cache_key) { @@ -857,15 +927,16 @@ impl RouteSpecStore { } // Ensure the route doesn't contain both a node and its relay - let mut seen_nodes: HashSet = HashSet::new(); + let mut seen_nodes: HashSet = HashSet::new(); for n in permutation { - let node = nodes.get(*n).unwrap(); - if !seen_nodes.insert(node.node_id.key) { + let node = nodes.get(*n).unwrap().locked(rti); + if !seen_nodes.insert(node.best_node_id()) { // Already seen this node, should not be in the route twice return None; } - if let Some(relay_id) = node.signed_node_info.relay_id() { - if !seen_nodes.insert(relay_id.key) { + if let Some(relay) = node.relay(RoutingDomain::PublicInternet) { + let relay_id = relay.locked(rti).best_node_id(); + if !seen_nodes.insert(relay_id) { // Already seen this node, should not be in the route twice return None; } @@ -878,7 +949,7 @@ impl RouteSpecStore { let mut previous_node = &our_peer_info; let mut reachable = true; for n in permutation { - let current_node = nodes.get(*n).unwrap(); + let current_node = nodes_pi.get(*n).unwrap(); let cm = rti.get_contact_method( RoutingDomain::PublicInternet, previous_node, @@ -915,7 +986,7 @@ impl RouteSpecStore { let mut next_node = &our_peer_info; let mut reachable = true; for n in permutation.iter().rev() { - let current_node = nodes.get(*n).unwrap(); + let current_node = nodes_pi.get(*n).unwrap(); let cm = rti.get_contact_method( RoutingDomain::PublicInternet, next_node, @@ -970,23 +1041,26 @@ impl RouteSpecStore { return Ok(None); } - // Got a unique route, lets build the detail, register it, and return it - let hops: Vec = route_nodes.iter().map(|v| nodes[*v].node_id.key).collect(); - let hop_node_refs = hops + // Got a unique route, lets build the details, register it, and return it + let hop_node_refs:Vec = route_nodes .iter() - .map(|k| { - rti.lookup_node_ref(self.unlocked_inner.routing_table.clone(), *k) - .unwrap() - }) + .map(|k| nodes[*k].clone()) .collect(); + let mut route_set = BTreeMap::::new(); + for crypto_kind in crypto_kinds.iter().copied() { + let vcrypto = self.unlocked_inner.routing_table.crypto().get(crypto_kind).unwrap(); + let (public_key, secret_key) = vcrypto.generate_keypair(); + let hops: Vec = route_nodes.iter().map(|v| nodes[*v].node_ids().get(crypto_kind).unwrap().key).collect(); - let (public_key, secret_key) = generate_secret(); + route_set.insert(public_key, RouteSpecDetail { + crypto_kind, + secret_key, + hops, + }); + } - - - let rsd = RouteSpecDetail { - secret_key, - hops, + let rssd = RouteSetSpecDetail { + route_set, hop_node_refs, published: false, directions, @@ -998,18 +1072,19 @@ impl RouteSpecStore { drop(perm_func); // Add to cache - Self::add_to_cache(&mut inner.cache, cache_key, &rsd); + Self::add_to_cache(&mut inner.cache, cache_key, &rssd); // Keep route in spec store - inner.content.details.insert(public_key, rsd); + let id = inner.content.add_detail(rssd); - Ok(Some(public_key)) + Ok(Some(id)) } + /// validate data using a private route's key and signature chain #[instrument(level = "trace", skip(self, data, callback), ret)] pub fn with_signature_validated_route( &self, - public_key: &PublicKey, + public_key: &TypedKey, signatures: &[Signature], data: &[u8], last_hop_id: PublicKey, @@ -1019,8 +1094,22 @@ impl RouteSpecStore { R: fmt::Debug, { let inner = &*self.inner.lock(); - let Some(rsd) = Self::detail(inner, &public_key) else { - log_rpc!(debug "route does not exist: {:?}", public_key); + let crypto = self.unlocked_inner.routing_table.crypto(); + let Some(vcrypto) = crypto.get(public_key.kind) else { + log_rpc!(debug "can't handle route with public key: {:?}", public_key); + return None; + }; + + let Some(rsid) = inner.content.get_id_by_key(&public_key.key) else { + log_rpc!(debug "route id does not exist: {:?}", public_key.key); + return None; + }; + let Some(rssd) = inner.content.get_detail(&rsid) else { + log_rpc!(debug "route detail does not exist: {:?}", rsid); + return None; + }; + let Some(rsd) = rssd.route_set.get(&public_key.key) else { + log_rpc!(debug "route set {:?} does not have key: {:?}", rsid, public_key.key); return None; }; @@ -1042,7 +1131,7 @@ impl RouteSpecStore { } } else { // Verify a signature for a hop node along the route - if let Err(e) = verify(hop_public_key, data, &signatures[hop_n]) { + if let Err(e) = vcrypto.verify(hop_public_key, data, &signatures[hop_n]) { log_rpc!(debug "failed to verify signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e); return None; } @@ -1053,10 +1142,11 @@ impl RouteSpecStore { } #[instrument(level = "trace", skip(self), ret, err)] - async fn test_allocated_route(&self, key: &TypedKey) -> EyreResult { + async fn test_allocated_route(&self, id: &String) -> EyreResult { // Make loopback route to test with let dest = { - let private_route = self.assemble_private_route(key, None)?; + xxx figure out how to pick best crypto for the private route + let private_route = self.assemble_private_route(id, None)?; let inner = &mut *self.inner.lock(); let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; @@ -1281,7 +1371,7 @@ impl RouteSpecStore { /// List all allocated routes pub fn list_allocated_routes(&self, mut filter: F) -> Vec where - F: FnMut(&PublicKey, &RouteSpecDetail) -> Option, + F: FnMut(&PublicKey, &RouteSetSpecDetail) -> Option, { let inner = self.inner.lock(); let mut out = Vec::with_capacity(inner.content.details.len()); @@ -1492,7 +1582,7 @@ impl RouteSpecStore { // (outer hop is a RouteHopData, not a RouteHop). // Each loop mutates 'nonce', and 'blob_data' let mut nonce = Crypto::get_random_nonce(); - let crypto = routing_table.network_manager().crypto(); + let crypto = routing_table.crypto(); // Forward order (safety route), but inside-out for h in (1..safety_rsd.hops.len()).rev() { // Get blob to encrypt for next hop @@ -1780,7 +1870,7 @@ impl RouteSpecStore { /// Import a remote private route for compilation #[instrument(level = "trace", skip(self, blob), ret, err)] - pub fn import_remote_private_route(&self, blob: Vec) -> EyreResult { xxx continue here, maybe formalize 'private route set' as having its own non-key identifier for both remote and local routes... just a uuid map to typedkeyset? + pub fn import_remote_private_route(&self, blob: Vec) -> EyreResult { // decode the pr blob let private_routes = RouteSpecStore::blob_to_private_routes(blob)?; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs index e98c16de..e0090c70 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs @@ -3,7 +3,7 @@ use super::*; #[derive(Debug, Clone)] pub struct RoutedOperation { pub sequencing: Sequencing, - pub signatures: Vec, + pub signatures: Vec, pub nonce: Nonce, pub data: Vec, } @@ -22,14 +22,14 @@ impl RoutedOperation { reader: &veilid_capnp::routed_operation::Reader, ) -> Result { let sigs_reader = reader.get_signatures().map_err(RPCError::protocol)?; - let mut signatures = Vec::::with_capacity( + let mut signatures = Vec::::with_capacity( sigs_reader .len() .try_into() .map_err(RPCError::map_internal("too many signatures"))?, ); for s in sigs_reader.iter() { - let sig = decode_typed_signature(&s)?; + let sig = decode_signature512(&s)?; signatures.push(sig); } @@ -61,7 +61,7 @@ impl RoutedOperation { ); for (i, sig) in self.signatures.iter().enumerate() { let mut sig_builder = sigs_builder.reborrow().get(i as u32); - encode_typed_signature(sig, &mut sig_builder); + encode_signature512(sig, &mut sig_builder); } let mut n_builder = builder.reborrow().init_nonce(); encode_nonce(&self.nonce, &mut n_builder); diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index d0887981..de3b8fba 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -44,9 +44,6 @@ impl RPCProcessor { &self, msg: RPCMessage, ) -> Result, RPCError> { - // Get the crypto kind used to send this question - let crypto_kind = msg.header.crypto_kind(); - // Get the question let app_call_q = match msg.operation.kind() { RPCOperationKind::Question(q) => match q.detail() { @@ -56,15 +53,20 @@ impl RPCProcessor { _ => panic!("not a question"), }; + // Get the crypto kind used to send this question + let crypto_kind = msg.header.crypto_kind(); + + // Get the sender node id this came from + let sender = msg + .opt_sender_nr + .as_ref() + .map(|nr| nr.node_ids().get(crypto_kind).unwrap().key); + // Register a waiter for this app call let id = msg.operation.op_id(); let handle = self.unlocked_inner.waiting_app_call_table.add_op_waiter(id); // Pass the call up through the update callback - let sender = msg - .opt_sender_nr - .as_ref() - .map(|nr| nr.node_ids().get(crypto_kind).unwrap().key); let message = app_call_q.message.clone(); (self.unlocked_inner.update_callback)(VeilidUpdate::AppCall(VeilidAppCall { sender,