187 lines
6.1 KiB
Rust
Raw Normal View History

2021-11-22 11:28:30 -05:00
use super::*;
2022-06-25 10:57:33 -04:00
use core::sync::atomic::Ordering;
2021-11-22 11:28:30 -05:00
2023-02-11 15:54:55 -05:00
/// Routing Table Bucket
/// Stores map of public keys to entries, which may be in multiple routing tables per crypto kind
/// Keeps entries at a particular 'dht distance' from this cryptokind's node id
/// Helps to keep managed lists at particular distances so we can evict nodes by priority
/// where the priority comes from liveness and age of the entry (older is better)
2021-11-22 11:28:30 -05:00
pub struct Bucket {
2023-02-11 15:54:55 -05:00
/// Map of keys to entries for this bucket
2023-02-13 16:12:46 -05:00
entries: BTreeMap<PublicKey, Arc<BucketEntry>>,
2023-02-11 15:54:55 -05:00
/// The crypto kind in use for the public keys in this bucket
kind: CryptoKind,
2021-11-22 11:28:30 -05:00
}
2023-02-07 21:44:50 -05:00
pub(super) type EntriesIter<'a> =
2023-02-13 16:12:46 -05:00
alloc::collections::btree_map::Iter<'a, PublicKey, Arc<BucketEntry>>;
2021-11-22 11:28:30 -05:00
2023-07-15 16:18:13 -04:00
#[derive(Debug, Serialize, Deserialize)]
2023-02-09 21:01:04 -05:00
struct SerializedBucketEntryData {
2023-02-13 16:12:46 -05:00
key: PublicKey,
2023-02-09 21:01:04 -05:00
value: u32, // index into serialized entries list
2022-11-09 17:11:35 -05:00
}
2023-07-15 16:18:13 -04:00
#[derive(Debug, Serialize, Deserialize)]
2023-02-09 21:01:04 -05:00
struct SerializedBucketData {
entries: Vec<SerializedBucketEntryData>,
2022-11-09 17:11:35 -05:00
}
2022-11-06 16:07:56 -05:00
2021-11-22 11:28:30 -05:00
fn state_ordering(state: BucketEntryState) -> usize {
match state {
BucketEntryState::Dead => 0,
BucketEntryState::Unreliable => 1,
BucketEntryState::Reliable => 2,
}
}
impl Bucket {
2023-02-28 21:11:26 -05:00
pub fn new(kind: CryptoKind) -> Self {
2021-11-22 11:28:30 -05:00
Self {
entries: BTreeMap::new(),
2023-02-11 15:54:55 -05:00
kind,
2021-11-22 11:28:30 -05:00
}
}
2023-02-09 21:01:04 -05:00
pub(super) fn load_bucket(
&mut self,
data: Vec<u8>,
all_entries: &[Arc<BucketEntry>],
) -> EyreResult<()> {
2023-07-15 16:18:13 -04:00
let bucket_data: SerializedBucketData = deserialize_json_bytes(&data)?;
2022-11-06 16:07:56 -05:00
2022-11-09 17:11:35 -05:00
for e in bucket_data.entries {
2022-11-06 16:07:56 -05:00
self.entries
2023-02-09 21:01:04 -05:00
.insert(e.key, all_entries[e.value as usize].clone());
2022-11-06 16:07:56 -05:00
}
Ok(())
}
2023-02-09 21:01:04 -05:00
pub(super) fn save_bucket(
&self,
all_entries: &mut Vec<Arc<BucketEntry>>,
entry_map: &mut HashMap<*const BucketEntry, u32>,
2023-07-15 16:18:13 -04:00
) -> Vec<u8> {
2022-11-09 17:11:35 -05:00
let mut entries = Vec::new();
2022-11-06 16:07:56 -05:00
for (k, v) in &self.entries {
2023-02-09 21:01:04 -05:00
let entry_index = entry_map.entry(Arc::as_ptr(v)).or_insert_with(|| {
let entry_index = all_entries.len();
all_entries.push(v.clone());
entry_index as u32
});
entries.push(SerializedBucketEntryData {
2022-11-09 17:11:35 -05:00
key: *k,
2023-02-09 21:01:04 -05:00
value: *entry_index,
2022-11-09 17:11:35 -05:00
});
2022-11-06 16:07:56 -05:00
}
2023-02-16 19:54:29 -05:00
let bucket_data = SerializedBucketData { entries };
2023-09-17 20:45:30 -04:00
serialize_json_bytes(bucket_data)
2022-11-06 16:07:56 -05:00
}
2023-02-11 15:54:55 -05:00
/// Create a new entry with a node_id of this crypto kind and return it
2023-02-16 19:54:29 -05:00
pub(super) fn add_new_entry(&mut self, node_id_key: PublicKey) -> Arc<BucketEntry> {
2023-02-13 16:12:46 -05:00
log_rtab!("Node added: {}:{}", self.kind, node_id_key);
2021-11-22 11:28:30 -05:00
// Add new entry
2023-02-13 16:12:46 -05:00
let entry = Arc::new(BucketEntry::new(TypedKey::new(self.kind, node_id_key)));
self.entries.insert(node_id_key, entry.clone());
2023-02-11 15:54:55 -05:00
2023-02-16 19:54:29 -05:00
// Return the new entry
entry
2023-02-11 15:54:55 -05:00
}
/// Add an existing entry with a new node_id for this crypto kind
2023-02-13 16:12:46 -05:00
pub(super) fn add_existing_entry(&mut self, node_id_key: PublicKey, entry: Arc<BucketEntry>) {
log_rtab!("Existing node added: {}:{}", self.kind, node_id_key);
2023-02-11 15:54:55 -05:00
// Add existing entry
2023-02-13 16:12:46 -05:00
self.entries.insert(node_id_key, entry);
2021-11-22 11:28:30 -05:00
}
2023-02-11 15:54:55 -05:00
/// Remove an entry with a node_id for this crypto kind from the bucket
2023-02-16 19:54:29 -05:00
pub(super) fn remove_entry(&mut self, node_id_key: &PublicKey) {
2023-02-13 16:12:46 -05:00
log_rtab!("Node removed: {}:{}", self.kind, node_id_key);
2021-11-22 11:28:30 -05:00
// Remove the entry
2023-02-13 16:12:46 -05:00
self.entries.remove(node_id_key);
2021-11-22 11:28:30 -05:00
}
2023-02-13 16:12:46 -05:00
pub(super) fn entry(&self, key: &PublicKey) -> Option<Arc<BucketEntry>> {
2022-06-25 10:57:33 -04:00
self.entries.get(key).cloned()
2021-11-22 11:28:30 -05:00
}
2021-12-10 20:14:33 -05:00
pub(super) fn entries(&self) -> EntriesIter {
self.entries.iter()
}
2021-11-22 11:28:30 -05:00
2023-02-13 16:12:46 -05:00
pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<PublicKey>> {
2021-11-22 11:28:30 -05:00
// Get number of entries to attempt to purge from bucket
let bucket_len = self.entries.len();
2021-12-10 20:14:33 -05:00
// Don't bother kicking bucket unless it is full
2021-11-22 11:28:30 -05:00
if bucket_len <= bucket_depth {
return None;
}
2021-12-10 20:14:33 -05:00
2021-11-22 11:28:30 -05:00
// Try to purge the newest entries that overflow the bucket
2023-02-13 16:12:46 -05:00
let mut dead_node_ids: BTreeSet<PublicKey> = BTreeSet::new();
2021-11-22 11:28:30 -05:00
let mut extra_entries = bucket_len - bucket_depth;
// Get the sorted list of entries by their kick order
2023-09-17 20:45:30 -04:00
let mut sorted_entries: Vec<(PublicKey, Arc<BucketEntry>)> =
self.entries.iter().map(|(k, v)| (*k, v.clone())).collect();
2022-12-16 20:07:28 -05:00
let cur_ts = get_aligned_timestamp();
2022-06-25 10:57:33 -04:00
sorted_entries.sort_by(|a, b| -> core::cmp::Ordering {
if a.0 == b.0 {
return core::cmp::Ordering::Equal;
}
2022-10-30 23:23:12 -04:00
a.1.with_inner(|ea| {
b.1.with_inner(|eb| {
2022-06-25 10:57:33 -04:00
let astate = state_ordering(ea.state(cur_ts));
let bstate = state_ordering(eb.state(cur_ts));
// first kick dead nodes, then unreliable nodes
if astate < bstate {
return core::cmp::Ordering::Less;
}
if astate > bstate {
return core::cmp::Ordering::Greater;
}
// then kick by time added, most recent nodes are kicked first
let ata = ea.peer_stats().time_added;
let bta = eb.peer_stats().time_added;
bta.cmp(&ata)
})
})
});
2021-11-22 11:28:30 -05:00
2021-11-26 10:39:43 -05:00
for entry in sorted_entries {
2021-11-22 11:28:30 -05:00
// If we're not evicting more entries, exit, noting this may be the newest entry
if extra_entries == 0 {
break;
}
extra_entries -= 1;
// if this entry has references we can't drop it yet
2022-06-25 10:57:33 -04:00
if entry.1.ref_count.load(Ordering::Acquire) > 0 {
2021-11-22 11:28:30 -05:00
continue;
}
// if no references, lets evict it
2022-06-25 10:57:33 -04:00
dead_node_ids.insert(entry.0);
2021-11-22 11:28:30 -05:00
}
// Now purge the dead node ids
for id in &dead_node_ids {
// Remove the entry
self.remove_entry(id);
}
2021-11-26 10:39:43 -05:00
if !dead_node_ids.is_empty() {
2021-11-22 11:28:30 -05:00
Some(dead_node_ids)
} else {
None
}
}
}