cleaning up

This commit is contained in:
John Smith 2023-02-28 21:11:26 -05:00
parent 5a4c2cb37e
commit 615158d54e
23 changed files with 270 additions and 229 deletions

View File

@ -17,13 +17,17 @@ pub fn compare_crypto_kind(a: &CryptoKind, b: &CryptoKind) -> cmp::Ordering {
let b_idx = VALID_CRYPTO_KINDS.iter().position(|k| k == b);
if let Some(a_idx) = a_idx {
if let Some(b_idx) = b_idx {
// Both are valid, prefer better crypto kind
a_idx.cmp(&b_idx)
} else {
// A is valid, B is not
cmp::Ordering::Less
}
} else if let Some(b_idx) = b_idx {
} else if b_idx.is_some() {
// B is valid, A is not
cmp::Ordering::Greater
} else {
// Both are invalid, so use lex comparison
a.cmp(b)
}
}
@ -66,19 +70,9 @@ impl KeyPair {
}
}
#[derive(
Clone,
Copy,
Debug,
Serialize,
Deserialize,
PartialEq,
Eq,
Hash,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
xxx make default template version here for secretkey
and put Vec<TypedKey<SecretKey>> in settings
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes, Hash, PartialEq, Eq))]
pub struct TypedKey {
pub kind: CryptoKind,
@ -123,6 +117,23 @@ impl FromStr for TypedKey {
Ok(Self { kind, key })
}
}
impl<'de> Deserialize<'de> for TypedKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = <String as Deserialize>::deserialize(deserializer)?;
FromStr::from_str(&s).map_err(serde::de::Error::custom)
}
}
impl Serialize for TypedKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(self)
}
}
#[derive(
Clone,
@ -139,6 +150,7 @@ impl FromStr for TypedKey {
RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes, Hash, PartialEq, Eq))]
#[serde(from = "Vec<TypedKey>", into = "Vec<TypedKey>")]
pub struct TypedKeySet {
items: Vec<TypedKey>,
}
@ -192,12 +204,12 @@ impl TypedKeySet {
}
self.items.sort()
}
pub fn remove(&self, kind: CryptoKind) {
pub fn remove(&mut self, kind: CryptoKind) {
if let Some(idx) = self.items.iter().position(|x| x.kind == kind) {
self.items.remove(idx);
}
}
pub fn remove_all(&self, kinds: &[CryptoKind]) {
pub fn remove_all(&mut self, kinds: &[CryptoKind]) {
for k in kinds {
self.remove(*k);
}
@ -290,6 +302,18 @@ impl From<TypedKey> for TypedKeySet {
tks
}
}
impl From<Vec<TypedKey>> for TypedKeySet {
fn from(x: Vec<TypedKey>) -> Self {
let mut tks = TypedKeySet::with_capacity(x.len());
tks.add_all(&x);
tks
}
}
impl Into<Vec<TypedKey>> for TypedKeySet {
fn into(self) -> Vec<TypedKey> {
self.items
}
}
#[derive(
Clone,

View File

@ -269,7 +269,7 @@ impl CryptoSystem for CryptoSystemVLD0 {
shared_secret: &SharedSecret,
) {
let mut cipher = XChaCha20::new(&shared_secret.bytes.into(), &nonce.bytes.into());
cipher.apply_keystream_b2b(in_buf, &mut out_buf).unwrap();
cipher.apply_keystream_b2b(in_buf, out_buf).unwrap();
}
fn crypt_no_auth_aligned_8(

View File

@ -8,8 +8,6 @@ use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as
/// Helps to keep managed lists at particular distances so we can evict nodes by priority
/// where the priority comes from liveness and age of the entry (older is better)
pub struct Bucket {
/// handle to the routing table
routing_table: RoutingTable,
/// Map of keys to entries for this bucket
entries: BTreeMap<PublicKey, Arc<BucketEntry>>,
/// The crypto kind in use for the public keys in this bucket
@ -40,9 +38,8 @@ fn state_ordering(state: BucketEntryState) -> usize {
}
impl Bucket {
pub fn new(routing_table: RoutingTable, kind: CryptoKind) -> Self {
pub fn new(kind: CryptoKind) -> Self {
Self {
routing_table,
entries: BTreeMap::new(),
kind,
}

View File

@ -262,11 +262,11 @@ impl BucketEntryInner {
}
// Update the envelope version support we have to use
let mut envelope_support = signed_node_info.node_info().envelope_support.clone();
self.set_envelope_support(envelope_support);
let envelope_support = signed_node_info.node_info().envelope_support.clone();
// Update the signed node info
*opt_current_sni = Some(Box::new(signed_node_info));
self.set_envelope_support(envelope_support);
self.updated_since_last_network_change = true;
self.touch_last_seen(get_aligned_timestamp());
}
@ -466,7 +466,7 @@ impl BucketEntryInner {
self.envelope_support.sort();
}
pub fn set_envelope_support(&mut self, envelope_support: Vec<u8>) {
pub fn set_envelope_support(&mut self, mut envelope_support: Vec<u8>) {
envelope_support.dedup();
envelope_support.sort();
self.envelope_support = envelope_support;

View File

@ -65,7 +65,9 @@ pub struct RoutingTableHealth {
pub local_network_ready: bool,
}
pub(super) struct RoutingTableUnlockedInner {
pub type BucketIndex = (CryptoKind, usize);
pub struct RoutingTableUnlockedInner {
// Accessors
config: VeilidConfig,
network_manager: NetworkManager,
@ -73,7 +75,7 @@ pub(super) struct RoutingTableUnlockedInner {
/// The current node's public DHT keys and secrets
node_id_keypairs: BTreeMap<CryptoKind, KeyPair>,
/// Buckets to kick on our next kick task
kick_queue: Mutex<BTreeSet<(CryptoKind, usize)>>,
kick_queue: Mutex<BTreeSet<BucketIndex>>,
/// Background process for computing statistics
rolling_transfers_task: TickTask<EyreReport>,
/// Background process to purge dead routing table entries when necessary
@ -146,7 +148,7 @@ impl RoutingTableUnlockedInner {
}
pub fn matches_own_node_id_key(&self, node_id_key: &PublicKey) -> bool {
for (ck, v) in &self.node_id_keypairs {
for (_ck, v) in &self.node_id_keypairs {
if v.key == *node_id_key {
return true;
}
@ -154,7 +156,7 @@ impl RoutingTableUnlockedInner {
false
}
pub fn calculate_bucket_index(&self, node_id: &TypedKey) -> (CryptoKind, usize) {
pub fn calculate_bucket_index(&self, node_id: &TypedKey) -> BucketIndex {
let crypto = self.crypto();
let self_node_id = self.node_id_keypairs.get(&node_id.kind).unwrap().key;
let vcrypto = crypto.get(node_id.kind).unwrap();
@ -230,7 +232,7 @@ impl RoutingTable {
// Set up routing buckets
{
let mut inner = self.inner.write();
inner.init_buckets(self.clone());
inner.init_buckets();
}
// Load bucket entries from table db if possible
@ -238,7 +240,7 @@ impl RoutingTable {
if let Err(e) = self.load_buckets().await {
log_rtab!(debug "Error loading buckets from storage: {:#?}. Resetting.", e);
let mut inner = self.inner.write();
inner.init_buckets(self.clone());
inner.init_buckets();
}
// Set up routespecstore
@ -895,7 +897,7 @@ impl RoutingTable {
let nrs =
self.find_bootstrap_nodes_filtered_per_crypto_kind(*crypto_kind, max_per_type);
'nrloop: for nr in nrs {
for nro in out {
for nro in &out {
if nro.same_entry(&nr) {
continue 'nrloop;
}

View File

@ -36,6 +36,24 @@ pub struct RouteSetSpecDetail {
}
impl RouteSetSpecDetail {
pub fn new(
cur_ts: Timestamp,
route_set: BTreeMap<PublicKey, RouteSpecDetail>,
hop_node_refs: Vec<NodeRef>,
directions: DirectionSet,
stability: Stability,
can_do_sequenced: bool,
) -> Self {
Self {
route_set,
hop_node_refs,
published: false,
directions,
stability,
can_do_sequenced,
stats: RouteStats::new(cur_ts),
}
}
pub fn get_route_by_key(&self, key: &PublicKey) -> Option<&RouteSpecDetail> {
self.route_set.get(key)
}
@ -61,7 +79,7 @@ impl RouteSetSpecDetail {
self.route_set.iter()
}
pub fn iter_route_set_mut(
&self,
&mut self,
) -> alloc::collections::btree_map::IterMut<PublicKey, RouteSpecDetail> {
self.route_set.iter_mut()
}
@ -75,7 +93,7 @@ impl RouteSetSpecDetail {
self.published
}
pub fn set_published(&mut self, published: bool) {
self.published = self.published;
self.published = published;
}
pub fn hop_count(&self) -> usize {
self.hop_node_refs.len()
@ -97,7 +115,7 @@ impl RouteSetSpecDetail {
}
}
pub fn contains_nodes(&self, nodes: &[TypedKey]) -> bool {
for h in self.hop_node_refs {
for h in &self.hop_node_refs {
if h.node_ids().contains_any(nodes) {
return true;
}

View File

@ -46,10 +46,7 @@ impl RouteSpecStore {
routing_table,
}),
inner: Arc::new(Mutex::new(RouteSpecStoreInner {
content: RouteSpecStoreContent {
id_by_key: HashMap::new(),
details: HashMap::new(),
},
content: RouteSpecStoreContent::new(),
cache: Default::default(),
})),
}
@ -199,7 +196,7 @@ impl RouteSpecStore {
// Get list of all nodes, and sort them for selection
let cur_ts = get_aligned_timestamp();
let filter = Box::new(
|rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> bool {
|_rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> bool {
// Exclude our own node from routes
if entry.is_none() {
return false;
@ -277,14 +274,14 @@ impl RouteSpecStore {
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let compare = |rti: &RoutingTableInner,
let compare = |_rti: &RoutingTableInner,
entry1: &Option<Arc<BucketEntry>>,
entry2: &Option<Arc<BucketEntry>>|
-> Ordering {
// Our own node is filtered out
let entry1 = entry1.unwrap();
let entry2 = entry2.unwrap();
let entry1 = entry1.as_ref().unwrap().clone();
let entry2 = entry2.as_ref().unwrap().clone();
let entry1_node_ids = entry1.with_inner(|e| e.node_ids());
let entry2_node_ids = entry2.with_inner(|e| e.node_ids());
@ -336,7 +333,7 @@ impl RouteSpecStore {
let routing_table = self.unlocked_inner.routing_table.clone();
let transform =
|rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> NodeRef {
|_rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> NodeRef {
NodeRef::new(routing_table.clone(), entry.unwrap(), None)
};
@ -503,15 +500,14 @@ impl RouteSpecStore {
});
}
let rssd = RouteSetSpecDetail {
let rssd = RouteSetSpecDetail::new(
cur_ts,
route_set,
hop_node_refs,
published: false,
directions,
stability,
can_do_sequenced,
stats: RouteStats::new(cur_ts),
};
);
drop(perm_func);
@ -1162,7 +1158,7 @@ impl RouteSpecStore {
)?)
}
fn assemble_private_route_inner(&self, inner: &RouteSpecStoreInner, key: &PublicKey, rsd: &RouteSpecDetail, optimized: bool) -> EyreResult<PrivateRoute>
fn assemble_private_route_inner(&self, key: &PublicKey, rsd: &RouteSpecDetail, optimized: bool) -> EyreResult<PrivateRoute>
{
let routing_table = self.unlocked_inner.routing_table.clone();
let rti = &*routing_table.inner.read();
@ -1271,7 +1267,7 @@ impl RouteSpecStore {
let rsd = rssd.get_route_by_key(key).expect("route key index is broken");
self.assemble_private_route_inner(inner, key, rsd, optimized)
self.assemble_private_route_inner(key, rsd, optimized)
}
@ -1295,7 +1291,7 @@ impl RouteSpecStore {
let mut out = Vec::new();
for (key, rsd) in rssd.iter_route_set() {
out.push(self.assemble_private_route_inner(inner, key, rsd, optimized)?);
out.push(self.assemble_private_route_inner(key, rsd, optimized)?);
}
Ok(out)
}
@ -1315,7 +1311,7 @@ impl RouteSpecStore {
// validate the private routes
let inner = &mut *self.inner.lock();
for private_route in private_routes {
for private_route in &private_routes {
// ensure private route has first hop
if !matches!(private_route.hops, PrivateRouteHops::FirstHop(_)) {
@ -1340,33 +1336,6 @@ impl RouteSpecStore {
inner.cache.remove_remote_private_route(id)
}
/// Check if a remote private route id is valid
// #[instrument(level = "trace", skip(self), ret)]
// pub fn is_valid_remote_private_route(&self, id: &RouteId) -> bool {
// let inner = &mut *self.inner.lock();
// let cur_ts = get_aligned_timestamp();
// inner.cache.peek_remote_private_route_mut(cur_ts, id).is_some()
// }
// /// Retrieve an imported remote private route by its public key
// pub fn get_remote_private_route(&self, id: &String) -> Option<PrivateRoute> {
// let inner = &mut *self.inner.lock();
// let cur_ts = get_aligned_timestamp();
// Self::with_get_remote_private_route(inner, cur_ts, key, |r| {
// r.private_route.as_ref().unwrap().clone()
// })
// }
// /// Retrieve an imported remote private route by its public key but don't 'touch' it
// fn peek_remote_private_route(&self, id: &String) -> Option<PrivateRoute> {
// let inner = &mut *self.inner.lock();
// let cur_ts = get_aligned_timestamp();
// inner.cache.with_peek_remote_private_route(cur_ts, id, f)
// Self::with_peek_remote_private_route(inner, cur_ts, key, |r| {
// r.private_route.as_ref().unwrap().clone()
// })
// }
/// Get a route id for a route's public key
pub fn get_route_id_for_key(&self, key: &PublicKey) -> Option<RouteId>
{
@ -1524,8 +1493,6 @@ impl RouteSpecStore {
/// Convert private route list to binary blob
pub fn private_routes_to_blob(private_routes: &[PrivateRoute]) -> EyreResult<Vec<u8>> {
let mut pr_message = ::capnp::message::Builder::new_default();
let mut pr_builder = pr_message.init_root::<veilid_capnp::private_route::Builder>();
let mut buffer = vec![];
@ -1539,6 +1506,9 @@ impl RouteSpecStore {
// Serialize stream of private routes
for private_route in private_routes {
let mut pr_message = ::capnp::message::Builder::new_default();
let mut pr_builder = pr_message.init_root::<veilid_capnp::private_route::Builder>();
encode_private_route(private_route, &mut pr_builder)
.wrap_err("failed to encode private route")?;
@ -1563,7 +1533,7 @@ impl RouteSpecStore {
}
// Deserialize stream of private routes
let pr_slice = &blob[1..];
let mut pr_slice = &blob[1..];
let mut out = Vec::with_capacity(pr_count);
for _ in 0..pr_count {
let reader = capnp::serialize_packed::read_message(
@ -1577,7 +1547,7 @@ impl RouteSpecStore {
.get_root::<veilid_capnp::private_route::Reader>()
.map_err(RPCError::internal)
.wrap_err("failed to make reader for private_route")?;
let private_route = decode_private_route(&pr_reader, crypto).wrap_err("failed to decode private route")?;
let private_route = decode_private_route(&pr_reader, crypto.clone()).wrap_err("failed to decode private route")?;
out.push(private_route);
}

View File

@ -46,7 +46,7 @@ impl RouteSpecStoreCache {
if !self.hop_cache.insert(cache_key) {
panic!("route should never be inserted twice");
}
for (pk, rsd) in rssd.iter_route_set() {
for (_pk, rsd) in rssd.iter_route_set() {
for h in &rsd.hops {
self.used_nodes
.entry(*h)
@ -137,19 +137,25 @@ impl RouteSpecStoreCache {
self.remote_private_routes_by_key
.insert(private_route.public_key.key, id.clone());
}
let mut dead = None;
self.remote_private_route_set_cache
.insert(id, rprinfo, |dead_id, dead_rpri| {
// If anything LRUs out, remove from the by-key table
// Follow the same logic as 'remove_remote_private_route' here
for dead_private_route in dead_rpri.get_private_routes() {
self.remote_private_routes_by_key
.remove(&dead_private_route.public_key.key)
.unwrap();
self.invalidate_compiled_route_cache(&dead_private_route.public_key.key);
}
self.dead_remote_routes.push(dead_id);
dead = Some((dead_id, dead_rpri));
});
if let Some((dead_id, dead_rpri)) = dead {
// If anything LRUs out, remove from the by-key table
// Follow the same logic as 'remove_remote_private_route' here
for dead_private_route in dead_rpri.get_private_routes() {
self.remote_private_routes_by_key
.remove(&dead_private_route.public_key.key)
.unwrap();
self.invalidate_compiled_route_cache(&dead_private_route.public_key.key);
}
self.dead_remote_routes.push(dead_id);
}
id
}
@ -172,7 +178,7 @@ impl RouteSpecStoreCache {
cur_ts: Timestamp,
id: &RouteId,
) -> Option<&RemotePrivateRouteInfo> {
if let Some(rpri) = self.remote_private_route_set_cache.get(id) {
if let Some(rpri) = self.remote_private_route_set_cache.get_mut(id) {
if !rpri.did_expire(cur_ts) {
rpri.touch(cur_ts);
return Some(rpri);
@ -238,13 +244,9 @@ impl RouteSpecStoreCache {
rpri.touch(cur_ts);
}
} else {
let rpri = RemotePrivateRouteInfo {
// New remote private route cache entry
private_routes,
last_seen_our_node_info_ts: Timestamp::new(0),
last_touched_ts: cur_ts,
stats: RouteStats::new(cur_ts),
};
// New remote private route cache entry
let rpri = RemotePrivateRouteInfo::new(private_routes, cur_ts);
self.add_remote_private_route(id, rpri);
if self.peek_remote_private_route_mut(cur_ts, &id).is_none() {
panic!("remote private route should exist");
@ -268,7 +270,7 @@ impl RouteSpecStoreCache {
}
/// Stores a compiled 'safety + private' route so we don't have to compile it again later
pub fn add_to_compiled_route_cache(&self, pr_pubkey: PublicKey, safety_route: SafetyRoute) {
pub fn add_to_compiled_route_cache(&mut self, pr_pubkey: PublicKey, safety_route: SafetyRoute) {
let key = CompiledRouteCacheKey {
sr_pubkey: safety_route.public_key.key,
pr_pubkey,
@ -286,7 +288,7 @@ impl RouteSpecStoreCache {
/// Looks up an existing compiled route from the safety and private route components
pub fn lookup_compiled_route_cache(
&self,
&mut self,
sr_pubkey: PublicKey,
pr_pubkey: PublicKey,
) -> Option<SafetyRoute> {
@ -298,7 +300,7 @@ impl RouteSpecStoreCache {
}
/// When routes are dropped, they should be removed from the compiled route cache
fn invalidate_compiled_route_cache(&self, dead_key: &PublicKey) {
fn invalidate_compiled_route_cache(&mut self, dead_key: &PublicKey) {
let mut dead_entries = Vec::new();
for (k, _v) in self.compiled_route_cache.iter() {
if k.sr_pubkey == *dead_key || k.pr_pubkey == *dead_key {
@ -325,14 +327,14 @@ impl RouteSpecStoreCache {
/// Resets statistics for when our node info changes
pub fn reset_remote_private_routes(&mut self) {
// Restart stats for routes so we test the route again
for (_k, v) in self.remote_private_route_set_cache {
for (_k, v) in self.remote_private_route_set_cache.iter_mut() {
v.get_stats_mut().reset();
}
}
/// Roll transfer statistics
pub fn roll_transfers(&mut self, last_ts: Timestamp, cur_ts: Timestamp) {
for (_k, v) in self.remote_private_route_set_cache {
for (_k, v) in self.remote_private_route_set_cache.iter_mut() {
v.get_stats_mut().roll_transfers(last_ts, cur_ts);
}
}

View File

@ -11,6 +11,13 @@ pub struct RouteSpecStoreContent {
}
impl RouteSpecStoreContent {
pub fn new() -> Self {
Self {
id_by_key: HashMap::new(),
details: HashMap::new(),
}
}
pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStoreContent> {
// Deserialize what we can
let table_store = routing_table.network_manager().table_store();
@ -99,11 +106,11 @@ impl RouteSpecStoreContent {
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
rsstdb.store_rkyv(0, b"content", self).await?;
// // Keep secrets in protected store as well
// Keep secrets in protected store as well
let pstore = routing_table.network_manager().protected_store();
let mut out: HashMap<PublicKey, SecretKey> = HashMap::new();
for (rsid, rssd) in self.details.iter() {
for (_rsid, rssd) in self.details.iter() {
for (pk, rsd) in rssd.iter_route_set() {
out.insert(*pk, rsd.secret_key);
}

View File

@ -287,7 +287,8 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
// No common crypto kinds between these nodes, can't contact
return ContactMethod::Unreachable;
};
let node_a_id = peer_a.node_ids.get(best_ck).unwrap();
//let node_a_id = peer_a.node_ids.get(best_ck).unwrap();
let node_b_id = peer_b.node_ids.get(best_ck).unwrap();
// Get the best match dial info for node B if we have it

View File

@ -298,8 +298,8 @@ impl RoutingTableInner {
.with_dial_info_filter(dif)
}
fn bucket_depth(index: usize) -> usize {
match index {
fn bucket_depth(bucket_index: BucketIndex) -> usize {
match bucket_index.1 {
0 => 256,
1 => 128,
2 => 64,
@ -312,13 +312,13 @@ impl RoutingTableInner {
}
}
pub fn init_buckets(&mut self, routing_table: RoutingTable) {
pub fn init_buckets(&mut self) {
// Size the buckets (one per bit), one bucket set per crypto kind
self.buckets.clear();
for ck in VALID_CRYPTO_KINDS {
let ckbuckets = Vec::with_capacity(PUBLIC_KEY_LENGTH * 8);
let mut ckbuckets = Vec::with_capacity(PUBLIC_KEY_LENGTH * 8);
for _ in 0..PUBLIC_KEY_LENGTH * 8 {
let bucket = Bucket::new(routing_table.clone(), ck);
let bucket = Bucket::new(ck);
ckbuckets.push(bucket);
}
self.buckets.insert(ck, ckbuckets);
@ -356,7 +356,7 @@ impl RoutingTableInner {
self.bucket_entry_count()
);
for ck in VALID_CRYPTO_KINDS {
for bucket in &mut self.buckets[&ck] {
for bucket in self.buckets.get_mut(&ck).unwrap().iter_mut() {
bucket.kick(0);
}
}
@ -393,15 +393,15 @@ impl RoutingTableInner {
/// Attempt to settle buckets and remove entries down to the desired number
/// which may not be possible due extant NodeRefs
pub fn kick_bucket(&mut self, kind: CryptoKind, idx: usize) {
let bucket = &mut self.buckets[&kind][idx];
let bucket_depth = Self::bucket_depth(idx);
pub fn kick_bucket(&mut self, bucket_index: BucketIndex) {
let bucket = self.get_bucket_mut(bucket_index);
let bucket_depth = Self::bucket_depth(bucket_index);
if let Some(dead_node_ids) = bucket.kick(bucket_depth) {
if let Some(_dead_node_ids) = bucket.kick(bucket_depth) {
// Remove expired entries
self.all_entries.remove_expired();
log_rtab!(debug "Bucket {}:{} kicked Routing table now has {} nodes", kind, idx, self.bucket_entry_count());
log_rtab!(debug "Bucket {}:{} kicked Routing table now has {} nodes", bucket_index.0, bucket_index.1, self.bucket_entry_count());
// Now purge the routing table inner vectors
//let filter = |k: &DHTKey| dead_node_ids.contains(k);
@ -416,11 +416,11 @@ impl RoutingTableInner {
pub fn refresh_cached_entry_counts(&mut self) -> EntryCounts {
self.live_entry_count.clear();
let cur_ts = get_aligned_timestamp();
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
self.with_entries_mut(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
entry.with_inner(|e| {
if let Some(rd) = e.best_routing_domain(rti, RoutingDomainSet::all()) {
for crypto_kind in e.crypto_kinds() {
self.live_entry_count
rti.live_entry_count
.entry((rd, crypto_kind))
.and_modify(|x| *x += 1)
.or_insert(1);
@ -491,7 +491,7 @@ impl RoutingTableInner {
min_state: BucketEntryState,
mut f: F,
) -> Option<T> {
for entry in self.all_entries {
for entry in &self.all_entries {
if entry.with_inner(|e| e.state(cur_ts) >= min_state) {
if let Some(out) = f(self, entry) {
return Some(out);
@ -509,14 +509,17 @@ impl RoutingTableInner {
min_state: BucketEntryState,
mut f: F,
) -> Option<T> {
for entry in self.all_entries {
let mut entries = Vec::with_capacity(self.all_entries.len());
for entry in self.all_entries.iter() {
if entry.with_inner(|e| e.state(cur_ts) >= min_state) {
if let Some(out) = f(self, entry) {
return Some(out);
}
entries.push(entry);
}
}
for entry in entries {
if let Some(out) = f(self, entry) {
return Some(out);
}
}
None
}
@ -541,6 +544,7 @@ impl RoutingTableInner {
// If we need a ping via the normal timing mechanism, then do it
// or if this node is our own relay, then we keep it alive
let is_our_relay = opt_relay
.as_ref()
.map(|nr| nr.same_bucket_entry(&entry))
.unwrap_or(false);
if e.needs_ping(cur_ts, is_our_relay) {
@ -574,8 +578,24 @@ impl RoutingTableInner {
node_refs
}
fn get_bucket_mut(&mut self, bucket_index: BucketIndex) -> &mut Bucket {
self.buckets
.get_mut(&bucket_index.0)
.unwrap()
.get_mut(bucket_index.1)
.unwrap()
}
fn get_bucket(&self, bucket_index: BucketIndex) -> &Bucket {
self.buckets
.get(&bucket_index.0)
.unwrap()
.get(bucket_index.1)
.unwrap()
}
// Update buckets with new node ids we may have learned belong to this entry
fn update_bucket_entries(&self, entry: Arc<BucketEntry>, node_ids: &[TypedKey]) {
fn update_bucket_entries(&mut self, entry: Arc<BucketEntry>, node_ids: &[TypedKey]) {
entry.with_mut_inner(|e| {
let existing_node_ids = e.node_ids();
for node_id in node_ids {
@ -583,19 +603,19 @@ impl RoutingTableInner {
// Add new node id to entry
if let Some(old_node_id) = e.add_node_id(*node_id) {
// Remove any old node id for this crypto kind
let (kind, idx) = self.unlocked_inner.calculate_bucket_index(&old_node_id);
let bucket = &mut self.buckets[&kind][idx];
let bucket_index = self.unlocked_inner.calculate_bucket_index(&old_node_id);
let bucket = self.get_bucket_mut(bucket_index);
bucket.remove_entry(&old_node_id.key);
self.unlocked_inner.kick_queue.lock().insert((kind, idx));
self.unlocked_inner.kick_queue.lock().insert(bucket_index);
}
// Bucket the entry appropriately
let (kind, idx) = self.unlocked_inner.calculate_bucket_index(node_id);
let bucket = &mut self.buckets[&kind][idx];
let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id);
let bucket = self.get_bucket_mut(bucket_index);
bucket.add_existing_entry(node_id.key, entry.clone());
// Kick bucket
self.unlocked_inner.kick_queue.lock().insert((kind, idx));
self.unlocked_inner.kick_queue.lock().insert(bucket_index);
}
}
})
@ -627,8 +647,8 @@ impl RoutingTableInner {
log_rtab!(error "can't look up node id with invalid crypto kind");
return None;
}
let (kind, idx) = self.unlocked_inner.calculate_bucket_index(node_id);
let bucket = &self.buckets[&kind][idx];
let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id);
let bucket = self.get_bucket(bucket_index);
if let Some(entry) = bucket.entry(&node_id.key) {
// Best entry is the first one in sorted order that exists from the node id list
// Everything else that matches will be overwritten in the bucket and the
@ -644,10 +664,10 @@ impl RoutingTableInner {
// If the entry does exist already, update it
if let Some(best_entry) = best_entry {
// Update the entry with all of the node ids
self.update_bucket_entries(best_entry, node_ids);
self.update_bucket_entries(best_entry.clone(), node_ids);
// Make a noderef to return
let nr = NodeRef::new(outer_self.clone(), best_entry, None);
let nr = NodeRef::new(outer_self.clone(), best_entry.clone(), None);
// Update the entry with the update func
best_entry.with_mut_inner(|e| update_func(self, e));
@ -658,16 +678,16 @@ impl RoutingTableInner {
// If no entry exists yet, add the first entry to a bucket, possibly evicting a bucket member
let first_node_id = node_ids[0];
let (kind, idx) = self.unlocked_inner.calculate_bucket_index(&first_node_id);
let bucket = &mut self.buckets[&kind][idx];
let bucket_entry = self.unlocked_inner.calculate_bucket_index(&first_node_id);
let bucket = self.get_bucket_mut(bucket_entry);
let new_entry = bucket.add_new_entry(first_node_id.key);
self.unlocked_inner.kick_queue.lock().insert((kind, idx));
self.unlocked_inner.kick_queue.lock().insert(bucket_entry);
// Update the other bucket entries with the remaining node ids
self.update_bucket_entries(new_entry, node_ids);
self.update_bucket_entries(new_entry.clone(), node_ids);
// Make node ref to return
let nr = NodeRef::new(outer_self.clone(), new_entry, None);
let nr = NodeRef::new(outer_self.clone(), new_entry.clone(), None);
// Update the entry with the update func
new_entry.with_mut_inner(|e| update_func(self, e));
@ -684,9 +704,9 @@ impl RoutingTableInner {
outer_self: RoutingTable,
node_id_key: PublicKey,
) -> Option<NodeRef> {
VALID_CRYPTO_KINDS
.iter()
.find_map(|ck| self.lookup_node_ref(outer_self, TypedKey::new(*ck, node_id_key)))
VALID_CRYPTO_KINDS.iter().find_map(|ck| {
self.lookup_node_ref(outer_self.clone(), TypedKey::new(*ck, node_id_key))
})
}
/// Resolve an existing routing table entry and return a reference to it
@ -700,8 +720,8 @@ impl RoutingTableInner {
return None;
}
let (kind, idx) = self.unlocked_inner.calculate_bucket_index(&node_id);
let bucket = &self.buckets[&kind][idx];
let bucket_index = self.unlocked_inner.calculate_bucket_index(&node_id);
let bucket = self.get_bucket(bucket_index);
bucket
.entry(&node_id.key)
.map(|e| NodeRef::new(outer_self, e, None))
@ -738,12 +758,9 @@ impl RoutingTableInner {
log_rtab!(error "can't look up node id with invalid crypto kind");
return None;
}
let (kind, idx) = self.unlocked_inner.calculate_bucket_index(&node_id);
let bucket = &self.buckets[&kind][idx];
if let Some(e) = bucket.entry(&node_id.key) {
return Some(f(e));
}
None
let bucket_entry = self.unlocked_inner.calculate_bucket_index(&node_id);
let bucket = self.get_bucket(bucket_entry);
bucket.entry(&node_id.key).map(f)
}
/// Shortcut function to add a node to our routing table if it doesn't exist
@ -826,7 +843,7 @@ impl RoutingTableInner {
let mut dead_entry_count: usize = 0;
let cur_ts = get_aligned_timestamp();
for entry in self.all_entries {
for entry in self.all_entries.iter() {
match entry.with_inner(|e| e.state(cur_ts)) {
BucketEntryState::Reliable => {
reliable_entry_count += 1;
@ -877,20 +894,21 @@ impl RoutingTableInner {
node_count: usize,
mut filters: VecDeque<RoutingTableEntryFilter>,
) -> Vec<NodeRef> {
let public_node_filter = Box::new(|rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
let entry = v.unwrap();
entry.with_inner(|e| {
// skip nodes on local network
if e.node_info(RoutingDomain::LocalNetwork).is_some() {
return false;
}
// skip nodes not on public internet
if e.node_info(RoutingDomain::PublicInternet).is_none() {
return false;
}
true
})
}) as RoutingTableEntryFilter;
let public_node_filter =
Box::new(|_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
let entry = v.unwrap();
entry.with_inner(|e| {
// skip nodes on local network
if e.node_info(RoutingDomain::LocalNetwork).is_some() {
return false;
}
// skip nodes not on public internet
if e.node_info(RoutingDomain::PublicInternet).is_none() {
return false;
}
true
})
}) as RoutingTableEntryFilter;
filters.push_front(public_node_filter);
self.find_fastest_nodes(
@ -1001,7 +1019,7 @@ impl RoutingTableInner {
// Add filter to remove dead nodes always
let filter_dead = Box::new(
move |rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
move |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
if let Some(entry) = &v {
// always filter out dead nodes
if entry.with_inner(|e| e.state(cur_ts) == BucketEntryState::Dead) {
@ -1018,7 +1036,7 @@ impl RoutingTableInner {
filters.push_front(filter_dead);
// Fastest sort
let sort = |rti: &RoutingTableInner,
let sort = |_rti: &RoutingTableInner,
a_entry: &Option<Arc<BucketEntry>>,
b_entry: &Option<Arc<BucketEntry>>| {
// same nodes are always the same
@ -1084,7 +1102,7 @@ impl RoutingTableInner {
&self,
node_count: usize,
node_id: TypedKey,
filters: VecDeque<RoutingTableEntryFilter>,
mut filters: VecDeque<RoutingTableEntryFilter>,
transform: T,
) -> Vec<O>
where
@ -1099,7 +1117,7 @@ impl RoutingTableInner {
// Filter to ensure entries support the crypto kind in use
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
move |_rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
if let Some(entry) = opt_entry {
entry.with_inner(|e| e.crypto_kinds().contains(&crypto_kind))
} else {
@ -1111,7 +1129,7 @@ impl RoutingTableInner {
// Closest sort
// Distance is done using the node id's distance metric which may vary based on crypto system
let sort = |rti: &RoutingTableInner,
let sort = |_rti: &RoutingTableInner,
a_entry: &Option<Arc<BucketEntry>>,
b_entry: &Option<Arc<BucketEntry>>| {
// same nodes are always the same

View File

@ -12,7 +12,7 @@ pub struct BootstrapRecord {
dial_info_details: Vec<DialInfoDetail>,
}
impl BootstrapRecord {
pub fn merge(&mut self, other: &BootstrapRecord) {
pub fn merge(&mut self, other: BootstrapRecord) {
self.node_ids.add_all(&other.node_ids);
for x in other.envelope_support {
if !self.envelope_support.contains(&x) {
@ -20,9 +20,9 @@ impl BootstrapRecord {
self.envelope_support.sort();
}
}
for did in &other.dial_info_details {
if !self.dial_info_details.contains(did) {
self.dial_info_details.push(did.clone());
for did in other.dial_info_details {
if !self.dial_info_details.contains(&did) {
self.dial_info_details.push(did);
}
}
}
@ -48,7 +48,7 @@ impl RoutingTable {
let mut envelope_support = Vec::new();
for ess in records[1].split(",") {
let ess = ess.trim();
let es = match records[1].parse::<u8>() {
let es = match ess.parse::<u8>() {
Ok(v) => v,
Err(e) => {
bail!(
@ -63,7 +63,7 @@ impl RoutingTable {
envelope_support.sort();
// Node Id
let node_ids = TypedKeySet::new();
let mut node_ids = TypedKeySet::new();
for node_id_str in records[2].split(",") {
let node_id_str = node_id_str.trim();
let node_id = match TypedKey::from_str(&node_id_str) {
@ -76,6 +76,7 @@ impl RoutingTable {
);
}
};
node_ids.add(node_id);
}
// If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node
@ -220,7 +221,7 @@ impl RoutingTable {
if mbr.node_ids.contains_any(&bsrec.node_ids) {
// Merge record, pop this one out
let mbr = merged_bootstrap_records.remove(mbi);
bsrec.merge(&mbr);
bsrec.merge(mbr);
} else {
// No overlap, go to next record
mbi += 1;

View File

@ -10,13 +10,13 @@ impl RoutingTable {
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let kick_queue: Vec<(CryptoKind, usize)> =
let kick_queue: Vec<BucketIndex> =
core::mem::take(&mut *self.unlocked_inner.kick_queue.lock())
.into_iter()
.collect();
let mut inner = self.inner.write();
for (ck, idx) in kick_queue {
inner.kick_bucket(ck, idx)
for bucket_index in kick_queue {
inner.kick_bucket(bucket_index)
}
Ok(())
}

View File

@ -40,7 +40,7 @@ impl RoutingTable {
let mut filters = VecDeque::new();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
move |_rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Keep only the entries that contain the crypto kind we're looking for
if let Some(entry) = opt_entry {
entry.with_inner(|e| e.crypto_kinds().contains(&crypto_kind))

View File

@ -36,8 +36,8 @@ impl RoutingTable {
for nr in node_refs {
// If this is our relay, let's check for NAT keepalives
let mut did_pings = false;
if let Some(relay_nr) = opt_relay_nr {
if nr.same_entry(&relay_nr) {
if let Some(relay_nr) = &opt_relay_nr {
if nr.same_entry(relay_nr) {
// Relay nodes get pinged over all protocols we have inbound dialinfo for
// This is so we can preserve the inbound NAT mappings at our router
for did in &dids {

View File

@ -15,7 +15,7 @@ pub fn encode_peer_info(
for (i, nid) in peer_info.node_ids.iter().enumerate() {
encode_typed_key(
nid,
&mut nids_builder.get(
&mut nids_builder.reborrow().get(
i.try_into()
.map_err(RPCError::map_invalid_format("out of bound error"))?,
),
@ -39,7 +39,7 @@ pub fn decode_peer_info(
.reborrow()
.get_signed_node_info()
.map_err(RPCError::protocol)?;
let node_ids = TypedKeySet::with_capacity(nids_reader.len() as usize);
let mut node_ids = TypedKeySet::with_capacity(nids_reader.len() as usize);
for nid_reader in nids_reader.iter() {
node_ids.add(decode_typed_key(&nid_reader)?);
}

View File

@ -22,7 +22,7 @@ pub fn encode_signed_direct_node_info(
for (i, typed_signature) in signed_direct_node_info.signatures.iter().enumerate() {
encode_typed_signature(
typed_signature,
&mut sigs_builder.get(
&mut sigs_builder.reborrow().get(
i.try_into()
.map_err(RPCError::map_invalid_format("out of bound error"))?,
),

View File

@ -18,7 +18,7 @@ pub fn encode_signed_relayed_node_info(
for (i, typed_key) in signed_relayed_node_info.relay_ids.iter().enumerate() {
encode_typed_key(
typed_key,
&mut rids_builder.get(
&mut rids_builder.reborrow().get(
i.try_into()
.map_err(RPCError::map_invalid_format("out of bound error"))?,
),
@ -42,7 +42,7 @@ pub fn encode_signed_relayed_node_info(
for (i, typed_signature) in signed_relayed_node_info.signatures.iter().enumerate() {
encode_typed_signature(
typed_signature,
&mut sigs_builder.get(
&mut sigs_builder.reborrow().get(
i.try_into()
.map_err(RPCError::map_invalid_format("out of bound error"))?,
),
@ -81,7 +81,7 @@ pub fn decode_signed_relayed_node_info(
.reborrow()
.get_relay_info()
.map_err(RPCError::protocol)?;
let relay_info = decode_signed_direct_node_info(&ri_reader, crypto, &mut relay_ids)?;
let relay_info = decode_signed_direct_node_info(&ri_reader, crypto.clone(), &mut relay_ids)?;
// Ensure the relay info for the node has a superset of the crypto kinds of the node it is relaying
if common_crypto_kinds(

View File

@ -14,6 +14,6 @@ pub fn decode_typed_key(typed_key: &veilid_capnp::typed_key::Reader) -> Result<T
pub fn encode_typed_key(typed_key: &TypedKey, builder: &mut veilid_capnp::typed_key::Builder) {
builder.set_kind(u32::from_be_bytes(typed_key.kind.0));
let mut key_builder = builder.init_key();
let mut key_builder = builder.reborrow().init_key();
encode_key256(&typed_key.key, &mut key_builder);
}

View File

@ -19,6 +19,6 @@ pub fn encode_typed_signature(
builder: &mut veilid_capnp::typed_signature::Builder,
) {
builder.set_kind(u32::from_be_bytes(typed_signature.kind.0));
let mut sig_builder = builder.init_signature();
let mut sig_builder = builder.reborrow().init_signature();
encode_signature512(&typed_signature.signature, &mut sig_builder);
}

View File

@ -621,7 +621,7 @@ impl RPCProcessor {
// Get the actual destination node id accounting for relays
let (node_ref, destination_node_ref) = if let Destination::Relay {
relay: _,
target: ref target,
ref target,
safety_selection: _,
} = dest
{
@ -1392,7 +1392,7 @@ impl RPCProcessor {
}
#[instrument(level = "trace", skip(self, body), err)]
pub fn enqueue_safety_routed_message(
fn enqueue_safety_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,
remote_safety_route: PublicKey,
@ -1423,7 +1423,7 @@ impl RPCProcessor {
}
#[instrument(level = "trace", skip(self, body), err)]
pub fn enqueue_private_routed_message(
fn enqueue_private_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,
remote_safety_route: PublicKey,

View File

@ -4,7 +4,7 @@ use clap::{Arg, ArgMatches, Command};
use std::ffi::OsStr;
use std::path::Path;
use std::str::FromStr;
use veilid_core::{SecretKey, TypedKey};
use veilid_core::{SecretKey, TypedKeySet};
fn do_clap_matches(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> {
let matches = Command::new("veilid-server")
@ -236,19 +236,14 @@ pub fn process_command_line() -> EyreResult<(Settings, ArgMatches)> {
settingsrw.logging.terminal.enabled = false;
// Split or get secret
let (k, s) = if let Some((k, s)) = v.split_once(':') {
let k =
TypedKey::try_decode(k).wrap_err("failed to decode node id from command line")?;
let s = SecretKey::try_decode(s)?;
(k, s)
} else {
let k = TypedKey::try_decode(v)?;
let buffer = rpassword::prompt_password("Enter secret key (will not echo): ")
.wrap_err("invalid secret key")?;
let buffer = buffer.trim().to_string();
let s = SecretKey::try_decode(&buffer)?;
(k, s)
};
let tks =
TypedKeySet::from_str(v).wrap_err("failed to decode node id set from command line")?;
let buffer = rpassword::prompt_password("Enter secret key (will not echo): ")
.wrap_err("invalid secret key")?;
let buffer = buffer.trim().to_string();
let s = SecretKey::try_decode(&buffer)?;
settingsrw.core.network.node_id = Some(k);
settingsrw.core.network.node_id_secret = Some(s);
}

View File

@ -64,10 +64,10 @@ core:
client_whitelist_timeout_ms: 300000
reverse_connection_receipt_time_ms: 5000
hole_punch_receipt_time_ms: 5000
node_id: null
node_id_secret: null
bootstrap: ['bootstrap.dev.veilid.net']
routing_table:
node_id: null
node_id_secret: null
bootstrap: ['bootstrap.dev.veilid.net']
limit_over_attached: 64
limit_fully_attached: 32
limit_attached_strong: 16
@ -516,6 +516,9 @@ pub struct Dht {
#[derive(Debug, Deserialize, Serialize)]
pub struct RoutingTable {
pub node_id: Option<veilid_core::TypedKeySet>,
pub node_id_secret: Option<veilid_core::SecretKey>,
pub bootstrap: Vec<String>,
pub limit_over_attached: u32,
pub limit_fully_attached: u32,
pub limit_attached_strong: u32,
@ -534,9 +537,6 @@ pub struct Network {
pub client_whitelist_timeout_ms: u32,
pub reverse_connection_receipt_time_ms: u32,
pub hole_punch_receipt_time_ms: u32,
pub node_id: Option<veilid_core::TypedKey>,
pub node_id_secret: Option<veilid_core::SecretKey>,
pub bootstrap: Vec<String>,
pub routing_table: RoutingTable,
pub rpc: Rpc,
pub dht: Dht,
@ -902,9 +902,9 @@ impl Settings {
set_config_value!(inner.core.network.client_whitelist_timeout_ms, value);
set_config_value!(inner.core.network.reverse_connection_receipt_time_ms, value);
set_config_value!(inner.core.network.hole_punch_receipt_time_ms, value);
set_config_value!(inner.core.network.node_id, value);
set_config_value!(inner.core.network.node_id_secret, value);
set_config_value!(inner.core.network.bootstrap, value);
set_config_value!(inner.core.network.routing_table.node_id, value);
set_config_value!(inner.core.network.routing_table.node_id_secret, value);
set_config_value!(inner.core.network.routing_table.bootstrap, value);
set_config_value!(inner.core.network.routing_table.limit_over_attached, value);
set_config_value!(inner.core.network.routing_table.limit_fully_attached, value);
set_config_value!(
@ -1056,9 +1056,15 @@ impl Settings {
"network.hole_punch_receipt_time_ms" => {
Ok(Box::new(inner.core.network.hole_punch_receipt_time_ms))
}
"network.node_id" => Ok(Box::new(inner.core.network.node_id)),
"network.node_id_secret" => Ok(Box::new(inner.core.network.node_id_secret)),
"network.bootstrap" => Ok(Box::new(inner.core.network.bootstrap.clone())),
"network.routing_table.node_id" => {
Ok(Box::new(inner.core.network.routing_table.node_id))
}
"network.routing_table.node_id_secret" => {
Ok(Box::new(inner.core.network.routing_table.node_id_secret))
}
"network.routing_table.bootstrap" => {
Ok(Box::new(inner.core.network.routing_table.bootstrap.clone()))
}
"network.routing_table.limit_over_attached" => Ok(Box::new(
inner.core.network.routing_table.limit_over_attached,
)),
@ -1415,11 +1421,11 @@ mod tests {
assert_eq!(s.core.network.client_whitelist_timeout_ms, 300_000u32);
assert_eq!(s.core.network.reverse_connection_receipt_time_ms, 5_000u32);
assert_eq!(s.core.network.hole_punch_receipt_time_ms, 5_000u32);
assert_eq!(s.core.network.node_id, None);
assert_eq!(s.core.network.node_id_secret, None);
assert_eq!(s.core.network.routing_table.node_id, None);
assert_eq!(s.core.network.routing_table.node_id_secret, None);
//
assert_eq!(
s.core.network.bootstrap,
s.core.network.routing_table.bootstrap,
vec!["bootstrap.dev.veilid.net".to_owned()]
);
//