more refactor

This commit is contained in:
John Smith 2023-02-22 11:15:26 -05:00
parent ae991334d3
commit ecc69bff27
5 changed files with 159 additions and 22 deletions

View File

@ -540,10 +540,21 @@ impl RoutingTable {
&self, &self,
routing_domain_set: RoutingDomainSet, routing_domain_set: RoutingDomainSet,
min_state: BucketEntryState, min_state: BucketEntryState,
crypto_kinds: &[CryptoKind],
) -> usize { ) -> usize {
self.inner self.inner
.read() .read()
.get_entry_count(routing_domain_set, min_state) .get_entry_count(routing_domain_set, min_state, crypto_kinds)
}
pub fn get_entry_count_per_crypto_kind(
&self,
routing_domain_set: RoutingDomainSet,
min_state: BucketEntryState,
) -> BTreeMap<CryptoKind, usize> {
self.inner
.read()
.get_entry_count_per_crypto_kind(routing_domain_set, min_state)
} }
pub fn get_nodes_needing_ping( pub fn get_nodes_needing_ping(
@ -774,14 +785,19 @@ impl RoutingTable {
.find_fast_public_nodes_filtered(self.clone(), node_count, filters) .find_fast_public_nodes_filtered(self.clone(), node_count, filters)
} }
/// Retrieve up to N of each type of protocol capable nodes /// Retrieve up to N of each type of protocol capable nodes for a single crypto kind
pub fn find_bootstrap_nodes_filtered(&self, max_per_type: usize) -> Vec<NodeRef> { fn find_bootstrap_nodes_filtered_per_crypto_kind(
&self,
crypto_kind: CryptoKind,
max_per_type: usize,
) -> Vec<NodeRef> {
let protocol_types = vec![ let protocol_types = vec![
ProtocolType::UDP, ProtocolType::UDP,
ProtocolType::TCP, ProtocolType::TCP,
ProtocolType::WS, ProtocolType::WS,
ProtocolType::WSS, ProtocolType::WSS,
]; ];
let protocol_types_len = protocol_types.len(); let protocol_types_len = protocol_types.len();
let mut nodes_proto_v4 = vec![0usize, 0usize, 0usize, 0usize]; let mut nodes_proto_v4 = vec![0usize, 0usize, 0usize, 0usize];
let mut nodes_proto_v6 = vec![0usize, 0usize, 0usize, 0usize]; let mut nodes_proto_v6 = vec![0usize, 0usize, 0usize, 0usize];
@ -795,6 +811,11 @@ impl RoutingTable {
return false; return false;
} }
// Ensure crypto kind is supported
if !e.crypto_kinds().contains(&crypto_kind) {
return false;
}
// does it have some dial info we need? // does it have some dial info we need?
let filter = |n: &NodeInfo| { let filter = |n: &NodeInfo| {
let mut keep = false; let mut keep = false;
@ -840,6 +861,27 @@ impl RoutingTable {
) )
} }
/// Retrieve up to N of each type of protocol capable nodes for all crypto kinds
pub fn find_bootstrap_nodes_filtered(&self, max_per_type: usize) -> Vec<NodeRef> {
let mut out =
self.find_bootstrap_nodes_filtered_per_crypto_kind(VALID_CRYPTO_KINDS[0], max_per_type);
// Merge list of nodes so we don't have duplicates
for crypto_kind in &VALID_CRYPTO_KINDS[1..] {
let nrs =
self.find_bootstrap_nodes_filtered_per_crypto_kind(*crypto_kind, max_per_type);
'nrloop: for nr in nrs {
for nro in out {
if nro.same_entry(&nr) {
continue 'nrloop;
}
}
out.push(nr);
}
}
out
}
pub fn find_peers_with_sort_and_filter<C, T, O>( pub fn find_peers_with_sort_and_filter<C, T, O>(
&self, &self,
node_count: usize, node_count: usize,

View File

@ -2,7 +2,7 @@ use super::*;
use weak_table::PtrWeakHashSet; use weak_table::PtrWeakHashSet;
const RECENT_PEERS_TABLE_SIZE: usize = 64; const RECENT_PEERS_TABLE_SIZE: usize = 64;
pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>;
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
@ -18,6 +18,8 @@ pub struct RoutingTableInner {
pub(super) buckets: BTreeMap<CryptoKind, Vec<Bucket>>, pub(super) buckets: BTreeMap<CryptoKind, Vec<Bucket>>,
/// A weak set of all the entries we have in the buckets for faster iteration /// A weak set of all the entries we have in the buckets for faster iteration
pub(super) all_entries: PtrWeakHashSet<Weak<BucketEntry>>, pub(super) all_entries: PtrWeakHashSet<Weak<BucketEntry>>,
/// A rough count of the entries in the table per routing domain and crypto kind
pub(super) live_entry_count: EntryCounts,
/// The public internet routing domain /// The public internet routing domain
pub(super) public_internet_routing_domain: PublicInternetRoutingDomainDetail, pub(super) public_internet_routing_domain: PublicInternetRoutingDomainDetail,
/// The dial info we use on the local network /// The dial info we use on the local network
@ -42,6 +44,7 @@ impl RoutingTableInner {
public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(),
local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(), local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(),
all_entries: PtrWeakHashSet::new(), all_entries: PtrWeakHashSet::new(),
live_entry_count: BTreeMap::new(),
self_latency_stats_accounting: LatencyStatsAccounting::new(), self_latency_stats_accounting: LatencyStatsAccounting::new(),
self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats_accounting: TransferStatsAccounting::new(),
self_transfer_stats: TransferStatsDownUp::default(), self_transfer_stats: TransferStatsDownUp::default(),
@ -409,18 +412,45 @@ impl RoutingTableInner {
} }
} }
/// Build the counts of entries per routing domain and crypto kind and cache them
pub fn refresh_cached_entry_counts(&mut self) -> EntryCounts {
self.live_entry_count.clear();
let cur_ts = get_aligned_timestamp();
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
entry.with_inner(|e| {
if let Some(rd) = e.best_routing_domain(rti, RoutingDomainSet::all()) {
for crypto_kind in e.crypto_kinds() {
self.live_entry_count
.entry((rd, crypto_kind))
.and_modify(|x| *x += 1)
.or_insert(1);
}
}
});
Option::<()>::None
});
self.live_entry_count.clone()
}
/// Return the last cached entry counts
pub fn cached_entry_counts(&self) -> EntryCounts {
self.live_entry_count.clone()
}
/// Count entries that match some criteria
pub fn get_entry_count( pub fn get_entry_count(
&self, &self,
routing_domain_set: RoutingDomainSet, routing_domain_set: RoutingDomainSet,
min_state: BucketEntryState, min_state: BucketEntryState,
crypto_kinds: &[CryptoKind], xxx finish this and peer minimum refresh and bootstrap tick, then both routines crypto_kinds: &[CryptoKind],
) -> usize { ) -> usize {
let mut count = 0usize; let mut count = 0usize;
let cur_ts = get_aligned_timestamp(); let cur_ts = get_aligned_timestamp();
self.with_entries(cur_ts, min_state, |rti, e| { self.with_entries(cur_ts, min_state, |rti, e| {
if e.with_inner(|e| e.best_routing_domain(rti, routing_domain_set)) if e.with_inner(|e| {
.is_some() e.best_routing_domain(rti, routing_domain_set).is_some()
{ && !common_crypto_kinds(&e.crypto_kinds(), crypto_kinds).is_empty()
}) {
count += 1; count += 1;
} }
Option::<()>::None Option::<()>::None
@ -428,6 +458,33 @@ impl RoutingTableInner {
count count
} }
/// Count entries per crypto kind that match some criteria
pub fn get_entry_count_per_crypto_kind(
&self,
routing_domain_set: RoutingDomainSet,
min_state: BucketEntryState,
) -> BTreeMap<CryptoKind, usize> {
let mut counts = BTreeMap::new();
let cur_ts = get_aligned_timestamp();
self.with_entries(cur_ts, min_state, |rti, e| {
if let Some(crypto_kinds) = e.with_inner(|e| {
if e.best_routing_domain(rti, routing_domain_set).is_some() {
Some(e.crypto_kinds())
} else {
None
}
}) {
// Got crypto kinds, add to map
for ck in crypto_kinds {
counts.entry(ck).and_modify(|x| *x += 1).or_insert(1);
}
}
Option::<()>::None
});
counts
}
/// Iterate entries with a filter
pub fn with_entries<T, F: FnMut(&RoutingTableInner, Arc<BucketEntry>) -> Option<T>>( pub fn with_entries<T, F: FnMut(&RoutingTableInner, Arc<BucketEntry>) -> Option<T>>(
&self, &self,
cur_ts: Timestamp, cur_ts: Timestamp,
@ -445,6 +502,7 @@ impl RoutingTableInner {
None None
} }
/// Iterate entries with a filter mutably
pub fn with_entries_mut<T, F: FnMut(&mut RoutingTableInner, Arc<BucketEntry>) -> Option<T>>( pub fn with_entries_mut<T, F: FnMut(&mut RoutingTableInner, Arc<BucketEntry>) -> Option<T>>(
&mut self, &mut self,
cur_ts: Timestamp, cur_ts: Timestamp,
@ -615,7 +673,7 @@ impl RoutingTableInner {
new_entry.with_mut_inner(|e| update_func(self, e)); new_entry.with_mut_inner(|e| update_func(self, e));
// Kick the bucket // Kick the bucket
log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable)); log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable, &VALID_CRYPTO_KINDS));
Some(nr) Some(nr)
} }

View File

@ -294,6 +294,9 @@ impl RoutingTable {
log_rtab!(debug "--- bootstrap_task"); log_rtab!(debug "--- bootstrap_task");
// Get counts by crypto kind
let entry_count = self.inner.read().cached_entry_counts();
// See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism // See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism
let mut bootstrap_dialinfos = Vec::<DialInfo>::new(); let mut bootstrap_dialinfos = Vec::<DialInfo>::new();
for b in &bootstrap { for b in &bootstrap {
@ -341,6 +344,14 @@ impl RoutingTable {
{ {
// Add this our futures to process in parallel // Add this our futures to process in parallel
for crypto_kind in VALID_CRYPTO_KINDS { for crypto_kind in VALID_CRYPTO_KINDS {
// Do we need to bootstrap this crypto kind?
let eckey = (RoutingDomain::PublicInternet, crypto_kind);
let cnt = entry_count.get(&eckey).copied().unwrap_or_default();
if cnt != 0 {
continue;
}
// Bootstrap this crypto kind
let nr = nr.clone(); let nr = nr.clone();
let routing_table = self.clone(); let routing_table = self.clone();
unord.push( unord.push(

View File

@ -134,21 +134,30 @@ impl RoutingTable {
self.unlocked_inner.kick_buckets_task.tick().await?; self.unlocked_inner.kick_buckets_task.tick().await?;
} }
// See how many live PublicInternet entries we have // Refresh entry counts
let live_public_internet_entry_count = self.get_entry_count( let entry_counts = {
RoutingDomain::PublicInternet.into(), let mut inner = self.inner.write();
BucketEntryState::Unreliable, inner.refresh_cached_entry_counts()
); };
let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize);
// If none, then add the bootstrap nodes to it // Figure out which tables need bootstrap or peer minimum refresh
if live_public_internet_entry_count == 0 { let mut needs_bootstrap = false;
let mut needs_peer_minimum_refresh = false;
for ck in VALID_CRYPTO_KINDS {
let eckey = (RoutingDomain::PublicInternet, ck);
let cnt = entry_counts.get(&eckey).copied().unwrap_or_default();
if cnt == 0 {
needs_bootstrap = true;
} else if cnt < min_peer_count {
needs_peer_minimum_refresh = true;
}
}
if needs_bootstrap {
self.unlocked_inner.bootstrap_task.tick().await?; self.unlocked_inner.bootstrap_task.tick().await?;
} }
// If we still don't have enough peers, find nodes until we do if needs_peer_minimum_refresh {
else if !self.unlocked_inner.bootstrap_task.is_running()
&& live_public_internet_entry_count < min_peer_count
{
self.unlocked_inner.peer_minimum_refresh_task.tick().await?; self.unlocked_inner.peer_minimum_refresh_task.tick().await?;
} }

View File

@ -16,6 +16,9 @@ impl RoutingTable {
self, self,
stop_token: StopToken, stop_token: StopToken,
) -> EyreResult<()> { ) -> EyreResult<()> {
// Get counts by crypto kind
let entry_count = self.inner.read().cached_entry_counts();
let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize);
// For the PublicInternet routing domain, get list of all peers we know about // For the PublicInternet routing domain, get list of all peers we know about
@ -24,11 +27,21 @@ impl RoutingTable {
let mut ord = FuturesOrdered::new(); let mut ord = FuturesOrdered::new();
for crypto_kind in VALID_CRYPTO_KINDS { for crypto_kind in VALID_CRYPTO_KINDS {
// Do we need to peer minimum refresh this crypto kind?
let eckey = (RoutingDomain::PublicInternet, crypto_kind);
let cnt = entry_count.get(&eckey).copied().unwrap_or_default();
if cnt == 0 || cnt > min_peer_count {
// If we have enough nodes, skip it
// If we have zero nodes, bootstrap will get it
continue;
}
let routing_table = self.clone(); let routing_table = self.clone();
let mut filters = VecDeque::new(); let mut filters = VecDeque::new();
let filter = Box::new( let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| { move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Keep only the entries that contain the crypto kind we're looking for
if let Some(entry) = opt_entry { if let Some(entry) = opt_entry {
entry.with_inner(|e| e.crypto_kinds().contains(&crypto_kind)) entry.with_inner(|e| e.crypto_kinds().contains(&crypto_kind))
} else { } else {
@ -49,8 +62,12 @@ impl RoutingTable {
for nr in noderefs { for nr in noderefs {
let routing_table = self.clone(); let routing_table = self.clone();
ord.push_back( ord.push_back(
async move { routing_table.reverse_find_node(nr, false).await } async move {
.instrument(Span::current()), routing_table
.reverse_find_node(crypto_kind, nr, false)
.await
}
.instrument(Span::current()),
); );
} }
} }