refactor checkpoint

This commit is contained in:
John Smith 2022-10-09 14:59:01 -04:00
parent 1fdcd5ae45
commit 338dc6b39d
24 changed files with 1122 additions and 564 deletions

View File

@ -16,15 +16,21 @@ use serde::{Deserialize, Serialize};
//////////////////////////////////////////////////////////////////////
/// Length of a DHT key in bytes
#[allow(dead_code)]
pub const DHT_KEY_LENGTH: usize = 32;
/// Length of a DHT key in bytes after encoding to base64url
#[allow(dead_code)]
pub const DHT_KEY_LENGTH_ENCODED: usize = 43;
/// Length of a DHT secret in bytes
#[allow(dead_code)]
pub const DHT_KEY_SECRET_LENGTH: usize = 32;
/// Length of a DHT secret in bytes after encoding to base64url
#[allow(dead_code)]
pub const DHT_KEY_SECRET_LENGTH_ENCODED: usize = 43;
/// Length of a DHT signature in bytes
#[allow(dead_code)]
/// Length of a DHT signature in bytes after encoding to base64url
pub const DHT_SIGNATURE_LENGTH: usize = 64;
#[allow(dead_code)]
pub const DHT_SIGNATURE_LENGTH_ENCODED: usize = 86;

View File

@ -38,6 +38,7 @@ use xx::*;
////////////////////////////////////////////////////////////////////////////////////////
pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1;
pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1;
pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE;
pub const IPADDR_TABLE_SIZE: usize = 1024;
pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes
@ -148,11 +149,6 @@ struct NetworkManagerInner {
BTreeMap<PublicAddressCheckCacheKey, LruCache<IpAddr, SocketAddress>>,
public_address_inconsistencies_table:
BTreeMap<PublicAddressCheckCacheKey, HashMap<IpAddr, u64>>,
protocol_config: Option<ProtocolConfig>,
public_inbound_dial_info_filter: Option<DialInfoFilter>,
local_inbound_dial_info_filter: Option<DialInfoFilter>,
public_outbound_dial_info_filter: Option<DialInfoFilter>,
local_outbound_dial_info_filter: Option<DialInfoFilter>,
}
struct NetworkManagerUnlockedInner {
@ -163,6 +159,7 @@ struct NetworkManagerUnlockedInner {
// Background processes
rolling_transfers_task: TickTask<EyreReport>,
relay_management_task: TickTask<EyreReport>,
private_route_management_task: TickTask<EyreReport>,
bootstrap_task: TickTask<EyreReport>,
peer_minimum_refresh_task: TickTask<EyreReport>,
ping_validator_task: TickTask<EyreReport>,
@ -186,11 +183,6 @@ impl NetworkManager {
client_whitelist: LruCache::new_unbounded(),
public_address_check_cache: BTreeMap::new(),
public_address_inconsistencies_table: BTreeMap::new(),
protocol_config: None,
public_inbound_dial_info_filter: None,
local_inbound_dial_info_filter: None,
public_outbound_dial_info_filter: None,
local_outbound_dial_info_filter: None,
}
}
fn new_unlocked_inner(config: VeilidConfig) -> NetworkManagerUnlockedInner {
@ -201,6 +193,7 @@ impl NetworkManager {
update_callback: RwLock::new(None),
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS),
private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS),
bootstrap_task: TickTask::new(1),
peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms),
ping_validator_task: TickTask::new(1),
@ -248,6 +241,23 @@ impl NetworkManager {
)
});
}
// Set private route management tick task
{
let this2 = this.clone();
this.unlocked_inner
.private_route_management_task
.set_routine(move |s, l, t| {
Box::pin(
this2
.clone()
.private_route_management_task_routine(s, l, t)
.instrument(trace_span!(
parent: None,
"private route management task routine"
)),
)
});
}
// Set bootstrap tick task
{
let this2 = this.clone();
@ -434,41 +444,6 @@ impl NetworkManager {
return Err(e);
}
// Store copy of protocol config and dial info filters
{
let pc = self.net().get_protocol_config().unwrap();
let mut inner = self.inner.lock();
inner.public_inbound_dial_info_filter = Some(
DialInfoFilter::all()
.with_protocol_type_set(pc.inbound)
.with_address_type_set(pc.family_global),
);
inner.local_inbound_dial_info_filter = Some(
DialInfoFilter::all()
.with_protocol_type_set(pc.inbound)
.with_address_type_set(pc.family_local),
);
inner.public_outbound_dial_info_filter = Some(
DialInfoFilter::all()
.with_protocol_type_set(pc.outbound)
.with_address_type_set(pc.family_global),
);
inner.local_outbound_dial_info_filter = Some(
DialInfoFilter::all()
.with_protocol_type_set(pc.outbound)
.with_address_type_set(pc.family_local),
);
inner.protocol_config = Some(pc);
}
// Inform routing table entries that our dial info has changed
for rd in RoutingDomain::all() {
self.send_node_info_updates(rd, true).await;
}
// Inform api clients that things have changed
self.send_network_update();
@ -527,12 +502,7 @@ impl NetworkManager {
// reset the state
debug!("resetting network manager state");
{
let mut inner = self.inner.lock();
inner.public_inbound_dial_info_filter = None;
inner.local_inbound_dial_info_filter = None;
inner.public_outbound_dial_info_filter = None;
inner.local_outbound_dial_info_filter = None;
inner.protocol_config = None;
*self.inner.lock() = NetworkManager::new_inner();
}
// send update
@ -640,15 +610,6 @@ impl NetworkManager {
Ok(())
}
// Return what network class we are in
pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option<NetworkClass> {
if let Some(components) = self.unlocked_inner.components.read().as_ref() {
components.net.get_network_class(routing_domain)
} else {
None
}
}
// Get our node's capabilities
fn generate_public_internet_node_status(&self) -> PublicInternetNodeStatus {
let node_info = self
@ -694,58 +655,6 @@ impl NetworkManager {
}
}
// Return what protocols we have enabled
pub fn get_protocol_config(&self) -> ProtocolConfig {
let inner = self.inner.lock();
inner.protocol_config.as_ref().unwrap().clone()
}
// Return a dial info filter for what we can receive
pub fn get_inbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter {
let inner = self.inner.lock();
match routing_domain {
RoutingDomain::PublicInternet => inner
.public_inbound_dial_info_filter
.as_ref()
.unwrap()
.clone(),
RoutingDomain::LocalNetwork => inner
.local_inbound_dial_info_filter
.as_ref()
.unwrap()
.clone(),
}
}
pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter {
let dif = self.get_inbound_dial_info_filter(routing_domain);
NodeRefFilter::new()
.with_routing_domain(routing_domain)
.with_dial_info_filter(dif)
}
// Return a dial info filter for what we can send out
pub fn get_outbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter {
let inner = self.inner.lock();
match routing_domain {
RoutingDomain::PublicInternet => inner
.public_outbound_dial_info_filter
.as_ref()
.unwrap()
.clone(),
RoutingDomain::LocalNetwork => inner
.local_outbound_dial_info_filter
.as_ref()
.unwrap()
.clone(),
}
}
pub fn get_outbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter {
let dif = self.get_outbound_dial_info_filter(routing_domain);
NodeRefFilter::new()
.with_routing_domain(routing_domain)
.with_dial_info_filter(dif)
}
// Generates a multi-shot/normal receipt
#[instrument(level = "trace", skip(self, extra_data, callback), err)]
pub fn generate_receipt<D: AsRef<[u8]>>(
@ -890,7 +799,7 @@ impl NetworkManager {
};
// Get the udp direct dialinfo for the hole punch
let outbound_nrf = self
let outbound_nrf = routing_table
.get_outbound_node_ref_filter(RoutingDomain::PublicInternet)
.with_protocol_type(ProtocolType::UDP);
peer_nr.set_filter(Some(outbound_nrf));
@ -1027,7 +936,10 @@ impl NetworkManager {
#[instrument(level = "trace", skip(self), ret)]
fn get_contact_method_public(&self, target_node_ref: NodeRef) -> ContactMethod {
// Scope noderef down to protocols we can do outbound
let public_outbound_nrf = self.get_outbound_node_ref_filter(RoutingDomain::PublicInternet);
let routing_table = self.routing_table();
let public_outbound_nrf =
routing_table.get_outbound_node_ref_filter(RoutingDomain::PublicInternet);
let target_node_ref = target_node_ref.filtered_clone(public_outbound_nrf.clone());
// Get the best match internet dial info if we have it
@ -1047,16 +959,14 @@ impl NetworkManager {
// Can we reach the inbound relay?
if inbound_relay_nr.first_filtered_dial_info_detail().is_some() {
// Can we receive anything inbound ever?
let our_network_class = self
let our_network_class = routing_table
.get_network_class(RoutingDomain::PublicInternet)
.unwrap_or(NetworkClass::Invalid);
if matches!(our_network_class, NetworkClass::InboundCapable) {
let routing_table = self.routing_table();
///////// Reverse connection
// Get the best match dial info for an reverse inbound connection
let reverse_dif = self
let reverse_dif = routing_table
.get_inbound_dial_info_filter(RoutingDomain::PublicInternet)
.filtered(
&target_node_ref
@ -1090,7 +1000,7 @@ impl NetworkManager {
udp_target_nr.first_filtered_dial_info_detail()
{
// Does the self node have a direct udp dialinfo the target can reach?
let inbound_udp_dif = self
let inbound_udp_dif = routing_table
.get_inbound_dial_info_filter(RoutingDomain::PublicInternet)
.filtered(
&target_node_ref
@ -1151,7 +1061,10 @@ impl NetworkManager {
#[instrument(level = "trace", skip(self), ret)]
fn get_contact_method_local(&self, target_node_ref: NodeRef) -> ContactMethod {
// Scope noderef down to protocols we can do outbound
let local_outbound_nrf = self.get_outbound_node_ref_filter(RoutingDomain::LocalNetwork);
let routing_table = self.routing_table();
let local_outbound_nrf =
routing_table.get_outbound_node_ref_filter(RoutingDomain::LocalNetwork);
let target_node_ref = target_node_ref.filtered_clone(local_outbound_nrf);
// Get the best matching local direct dial info if we have it
@ -1865,7 +1778,7 @@ impl NetworkManager {
let mut bad_public_address_detection_punishment: Option<
Box<dyn FnOnce() + Send + 'static>,
> = None;
let public_internet_network_class = net
let public_internet_network_class = routing_table
.get_network_class(RoutingDomain::PublicInternet)
.unwrap_or(NetworkClass::Invalid);
let needs_public_address_detection =

View File

@ -40,11 +40,9 @@ struct NetworkInner {
/// such as dhcp release or change of address or interfaces being added or removed
network_needs_restart: bool,
/// the calculated protocol configuration for inbound/outbound protocols
protocol_config: Option<ProtocolConfig>,
protocol_config: ProtocolConfig,
/// set of statically configured protocols with public dialinfo
static_public_dialinfo: ProtocolTypeSet,
/// network class per routing domain
network_class: [Option<NetworkClass>; RoutingDomain::count()],
/// join handles for all the low level network background tasks
join_handles: Vec<MustJoinHandle<()>>,
/// stop source for shutting down the low level network background tasks
@ -120,9 +118,8 @@ impl Network {
needs_public_dial_info_check: false,
doing_public_dial_info_check: false,
public_dial_info_check_punishment: None,
protocol_config: None,
protocol_config: Default::default(),
static_public_dialinfo: ProtocolTypeSet::empty(),
network_class: [None, None],
join_handles: Vec::new(),
stop_source: None,
udp_port: 0u16,
@ -620,7 +617,7 @@ impl Network {
/////////////////////////////////////////////////////////////////
pub fn get_protocol_config(&self) -> Option<ProtocolConfig> {
pub fn get_protocol_config(&self) -> ProtocolConfig {
self.inner.lock().protocol_config
}
@ -734,7 +731,8 @@ impl Network {
family_local,
}
};
inner.protocol_config = Some(protocol_config);
inner.protocol_config = protocol_config;
protocol_config
};
@ -771,27 +769,37 @@ impl Network {
// that we have ports available to us
self.free_bound_first_ports();
// If we have static public dialinfo, upgrade our network class
// set up the routing table's network config
// if we have static public dialinfo, upgrade our network class
editor_public_internet.setup_network(
protocol_config.inbound,
protocol_config.outbound,
protocol_config.family_global,
);
editor_local_network.setup_network(
protocol_config.inbound,
protocol_config.outbound,
protocol_config.family_local,
);
let detect_address_changes = {
let c = self.config.get();
c.network.detect_address_changes
};
if !detect_address_changes {
let mut inner = self.inner.lock();
let inner = self.inner.lock();
if !inner.static_public_dialinfo.is_empty() {
inner.network_class[RoutingDomain::PublicInternet as usize] =
Some(NetworkClass::InboundCapable);
editor_public_internet.set_network_class(Some(NetworkClass::InboundCapable));
}
}
info!("network started");
self.inner.lock().network_started = true;
// commit routing table edits
editor_public_internet.commit().await;
editor_local_network.commit().await;
info!("network started");
self.inner.lock().network_started = true;
Ok(())
}
@ -873,11 +881,6 @@ impl Network {
inner.doing_public_dial_info_check
}
pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option<NetworkClass> {
let inner = self.inner.lock();
inner.network_class[routing_domain as usize]
}
//////////////////////////////////////////
#[instrument(level = "trace", skip(self), err)]
@ -939,6 +942,7 @@ impl Network {
// If we need to figure out our network class, tick the task for it
if detect_address_changes {
let public_internet_network_class = self
.routing_table()
.get_network_class(RoutingDomain::PublicInternet)
.unwrap_or(NetworkClass::Invalid);
let needs_public_dial_info_check = self.needs_public_dial_info_check();

View File

@ -125,7 +125,7 @@ impl DiscoveryContext {
RoutingDomain::PublicInternet,
dial_info_filter.clone(),
);
let disallow_relays_filter = move |e: &BucketEntryInner| {
let disallow_relays_filter = move |_rti, e: &BucketEntryInner| {
if let Some(n) = e.node_info(RoutingDomain::PublicInternet) {
n.relay_peer_info.is_none()
} else {
@ -610,12 +610,14 @@ impl Network {
_l: u64,
_t: u64,
) -> EyreResult<()> {
let routing_table = self.routing_table();
// Figure out if we can optimize TCP/WS checking since they are often on the same port
let (protocol_config, existing_network_class, tcp_same_port) = {
let inner = self.inner.lock();
let protocol_config = inner.protocol_config.unwrap_or_default();
let protocol_config = inner.protocol_config;
let existing_network_class =
inner.network_class[RoutingDomain::PublicInternet as usize];
routing_table.get_network_class(RoutingDomain::PublicInternet);
let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP)
&& protocol_config.inbound.contains(ProtocolType::WS)
{
@ -625,7 +627,6 @@ impl Network {
};
(protocol_config, existing_network_class, tcp_same_port)
};
let routing_table = self.routing_table();
// Process all protocol and address combinations
let mut futures = FuturesUnordered::new();
@ -849,17 +850,16 @@ impl Network {
// Is the network class different?
if existing_network_class != new_network_class {
self.inner.lock().network_class[RoutingDomain::PublicInternet as usize] =
new_network_class;
editor.set_network_class(new_network_class);
changed = true;
log_net!(debug "PublicInternet network class changed to {:?}", new_network_class);
}
} else if existing_network_class.is_some() {
// Network class could not be determined
editor.clear_dial_info_details();
self.inner.lock().network_class[RoutingDomain::PublicInternet as usize] = None;
editor.set_network_class(None);
changed = true;
log_net!(debug "network class cleared");
log_net!(debug "PublicInternet network class cleared");
}
// Punish nodes that told us our public address had changed when it didn't

View File

@ -495,8 +495,8 @@ impl NetworkManager {
// even the unreliable ones, and ask them to find nodes close to our node too
let noderefs = routing_table.find_fastest_nodes(
min_peer_count,
|_k, _v| true,
|k: DHTKey, v: Option<Arc<BucketEntry>>| {
|_rti, _k, _v| true,
|_rti, k: DHTKey, v: Option<Arc<BucketEntry>>| {
NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None)
},
);
@ -525,7 +525,7 @@ impl NetworkManager {
// Get our node's current node info and network class and do the right thing
let routing_table = self.routing_table();
let node_info = routing_table.get_own_node_info(RoutingDomain::PublicInternet);
let network_class = self.get_network_class(RoutingDomain::PublicInternet);
let network_class = routing_table.get_network_class(RoutingDomain::PublicInternet);
// Get routing domain editor
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
@ -594,6 +594,34 @@ impl NetworkManager {
Ok(())
}
// Keep private routes assigned and accessible
#[instrument(level = "trace", skip(self), err)]
pub(super) async fn private_route_management_task_routine(
self,
_stop_token: StopToken,
_last_ts: u64,
cur_ts: u64,
) -> EyreResult<()> {
// Get our node's current node info and network class and do the right thing
let routing_table = self.routing_table();
let node_info = routing_table.get_own_node_info(RoutingDomain::PublicInternet);
let network_class = routing_table.get_network_class(RoutingDomain::PublicInternet);
// Get routing domain editor
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
// Do we know our network class yet?
if let Some(network_class) = network_class {
// see if we have any routes that need extending
}
// Commit the changes
editor.commit().await;
Ok(())
}
// Compute transfer statistics for the low level network
#[instrument(level = "trace", skip(self), err)]
pub(super) async fn rolling_transfers_task_routine(

View File

@ -12,7 +12,7 @@ use std::io;
struct NetworkInner {
network_started: bool,
network_needs_restart: bool,
protocol_config: Option<ProtocolConfig>,
protocol_config: ProtocolConfig,
}
struct NetworkUnlockedInner {
@ -34,7 +34,7 @@ impl Network {
NetworkInner {
network_started: false,
network_needs_restart: false,
protocol_config: None, //join_handle: None,
protocol_config: Default::default(),
}
}
@ -247,7 +247,7 @@ impl Network {
pub async fn startup(&self) -> EyreResult<()> {
// get protocol config
self.inner.lock().protocol_config = Some({
self.inner.lock().protocol_config = {
let c = self.config.get();
let inbound = ProtocolTypeSet::new();
let mut outbound = ProtocolTypeSet::new();
@ -269,7 +269,7 @@ impl Network {
family_global,
family_local,
}
});
};
self.inner.lock().network_started = true;
Ok(())
@ -337,7 +337,7 @@ impl Network {
};
}
pub fn get_protocol_config(&self) -> Option<ProtocolConfig> {
pub fn get_protocol_config(&self) -> ProtocolConfig {
self.inner.lock().protocol_config.clone()
}

View File

@ -48,13 +48,6 @@ impl Bucket {
// newest_entry is updated by kick_bucket()
}
pub(super) fn roll_transfers(&self, last_ts: u64, cur_ts: u64) {
// Called every ROLLING_TRANSFERS_INTERVAL_SECS
for (_k, v) in &self.entries {
v.with_mut(|e| e.roll_transfers(last_ts, cur_ts));
}
}
pub(super) fn entry(&self, key: &DHTKey) -> Option<Arc<BucketEntry>> {
self.entries.get(key).cloned()
}
@ -63,7 +56,11 @@ impl Bucket {
self.entries.iter()
}
pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<DHTKey>> {
pub(super) fn kick(
&self,
inner: &mut RoutingTableInner,
bucket_depth: usize,
) -> Option<BTreeSet<DHTKey>> {
// Get number of entries to attempt to purge from bucket
let bucket_len = self.entries.len();
@ -87,8 +84,8 @@ impl Bucket {
if a.0 == b.0 {
return core::cmp::Ordering::Equal;
}
a.1.with(|ea| {
b.1.with(|eb| {
a.1.with(inner, |rti, ea| {
b.1.with(rti, |_rti, eb| {
let astate = state_ordering(ea.state(cur_ts));
let bstate = state_ordering(eb.state(cur_ts));
// first kick dead nodes, then unreliable nodes

View File

@ -132,6 +132,28 @@ impl BucketEntryInner {
}
}
// Less is more reliable then older
pub fn cmp_oldest_reliable(cur_ts: u64, e1: &Self, e2: &Self) -> std::cmp::Ordering {
// Reverse compare so most reliable is at front
let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts));
if ret != std::cmp::Ordering::Equal {
return ret;
}
// Lower timestamp to the front, recent or no timestamp is at the end
if let Some(e1_ts) = &e1.peer_stats.rpc_stats.first_consecutive_seen_ts {
if let Some(e2_ts) = &e2.peer_stats.rpc_stats.first_consecutive_seen_ts {
e1_ts.cmp(&e2_ts)
} else {
std::cmp::Ordering::Less
}
} else if e2.peer_stats.rpc_stats.first_consecutive_seen_ts.is_some() {
std::cmp::Ordering::Greater
} else {
std::cmp::Ordering::Equal
}
}
pub fn sort_fastest_reliable_fn(cur_ts: u64) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering {
move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2)
}
@ -645,20 +667,26 @@ impl BucketEntry {
}
}
pub(super) fn with<F, R>(&self, f: F) -> R
// Note, that this requires -also- holding the RoutingTable read lock, as an
// immutable reference to RoutingTableInner must be passed in to get this
// This ensures that an operation on the routing table can not change entries
// while it is being read from
pub(super) fn with<F, R>(&self, rti: &RoutingTableInner, f: F) -> R
where
F: FnOnce(&BucketEntryInner) -> R,
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> R,
{
let inner = self.inner.read();
f(&*inner)
f(rti, &*inner)
}
pub(super) fn with_mut<F, R>(&self, f: F) -> R
// Note, that this requires -also- holding the RoutingTable write lock, as a
// mutable reference to RoutingTableInner must be passed in to get this
pub(super) fn with_mut<F, R>(&self, rti: &mut RoutingTableInner, f: F) -> R
where
F: FnOnce(&mut BucketEntryInner) -> R,
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> R,
{
let mut inner = self.inner.write();
f(&mut *inner)
f(rti, &mut *inner)
}
}

View File

@ -102,6 +102,7 @@ impl RoutingTable {
pub fn debug_info_entries(&self, limit: usize, min_state: BucketEntryState) -> String {
let inner = self.inner.read();
let inner = &*inner;
let cur_ts = intf::get_timestamp();
let mut out = String::new();
@ -114,14 +115,14 @@ impl RoutingTable {
let filtered_entries: Vec<(&DHTKey, &Arc<BucketEntry>)> = inner.buckets[b]
.entries()
.filter(|e| {
let state = e.1.with(|e| e.state(cur_ts));
let state = e.1.with(inner, |_rti, e| e.state(cur_ts));
state >= min_state
})
.collect();
if !filtered_entries.is_empty() {
out += &format!(" Bucket #{}:\n", b);
for e in filtered_entries {
let state = e.1.with(|e| e.state(cur_ts));
let state = e.1.with(inner, |_rti, e| e.state(cur_ts));
out += &format!(
" {} [{}]\n",
e.0.encode(),
@ -161,6 +162,7 @@ impl RoutingTable {
pub fn debug_info_buckets(&self, min_state: BucketEntryState) -> String {
let inner = self.inner.read();
let inner = &*inner;
let cur_ts = intf::get_timestamp();
let mut out = String::new();
@ -175,7 +177,7 @@ impl RoutingTable {
while c < COLS {
let mut cnt = 0;
for e in inner.buckets[b].entries() {
if e.1.with(|e| e.state(cur_ts) >= min_state) {
if e.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) {
cnt += 1;
}
}

View File

@ -17,9 +17,9 @@ impl RoutingTable {
pub fn make_inbound_dial_info_entry_filter(
routing_domain: RoutingDomain,
dial_info_filter: DialInfoFilter,
) -> impl FnMut(&BucketEntryInner) -> bool {
) -> impl FnMut(&RoutingTableInner, &BucketEntryInner) -> bool {
// does it have matching public dial info?
move |e| {
move |_rti, e| {
if let Some(ni) = e.node_info(routing_domain) {
if ni
.first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| {
@ -35,12 +35,12 @@ impl RoutingTable {
}
// Makes a filter that finds nodes capable of dialing a particular outbound dialinfo
pub fn make_outbound_dial_info_entry_filter(
pub fn make_outbound_dial_info_entry_filter<'s>(
routing_domain: RoutingDomain,
dial_info: DialInfo,
) -> impl FnMut(&BucketEntryInner) -> bool {
) -> impl FnMut(&RoutingTableInner, &'s BucketEntryInner) -> bool {
// does the node's outbound capabilities match the dialinfo?
move |e| {
move |_rti, e| {
if let Some(ni) = e.node_info(routing_domain) {
let dif = DialInfoFilter::all()
.with_protocol_type_set(ni.outbound_protocols)
@ -54,19 +54,19 @@ impl RoutingTable {
}
// Make a filter that wraps another filter
pub fn combine_entry_filters<F, G>(
pub fn combine_entry_filters<'a, 'b, F, G>(
mut f1: F,
mut f2: G,
) -> impl FnMut(&BucketEntryInner) -> bool
) -> impl FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool
where
F: FnMut(&BucketEntryInner) -> bool,
G: FnMut(&BucketEntryInner) -> bool,
F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool,
G: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool,
{
move |e| {
if !f1(e) {
move |rti, e| {
if !f1(rti, e) {
return false;
}
if !f2(e) {
if !f2(rti, e) {
return false;
}
true
@ -74,21 +74,21 @@ impl RoutingTable {
}
// Retrieve the fastest nodes in the routing table matching an entry filter
pub fn find_fast_public_nodes_filtered<F>(
pub fn find_fast_public_nodes_filtered<'r, 'e, F>(
&self,
node_count: usize,
mut entry_filter: F,
) -> Vec<NodeRef>
where
F: FnMut(&BucketEntryInner) -> bool,
F: FnMut(&'r RoutingTableInner, &'e BucketEntryInner) -> bool,
{
self.find_fastest_nodes(
// count
node_count,
// filter
|_k: DHTKey, v: Option<Arc<BucketEntry>>| {
|rti, _k: DHTKey, v: Option<Arc<BucketEntry>>| {
let entry = v.unwrap();
entry.with(|e| {
entry.with(rti, |rti, e| {
// skip nodes on local network
if e.node_info(RoutingDomain::LocalNetwork).is_some() {
return false;
@ -98,11 +98,11 @@ impl RoutingTable {
return false;
}
// skip nodes that dont match entry filter
entry_filter(e)
entry_filter(rti, e)
})
},
// transform
|k: DHTKey, v: Option<Arc<BucketEntry>>| {
|_rti, k: DHTKey, v: Option<Arc<BucketEntry>>| {
NodeRef::new(self.clone(), k, v.unwrap().clone(), None)
},
)
@ -123,9 +123,9 @@ impl RoutingTable {
// count
protocol_types.len() * 2 * max_per_type,
// filter
move |_k: DHTKey, v: Option<Arc<BucketEntry>>| {
move |rti, _k: DHTKey, v: Option<Arc<BucketEntry>>| {
let entry = v.unwrap();
entry.with(|e| {
entry.with(rti, |_rti, e| {
// skip nodes on our local network here
if e.has_node_info(RoutingDomain::LocalNetwork.into()) {
return false;
@ -164,20 +164,21 @@ impl RoutingTable {
})
},
// transform
|k: DHTKey, v: Option<Arc<BucketEntry>>| {
|_rti, k: DHTKey, v: Option<Arc<BucketEntry>>| {
NodeRef::new(self.clone(), k, v.unwrap().clone(), None)
},
)
}
pub fn filter_has_valid_signed_node_info(
&self,
pub fn filter_has_valid_signed_node_info_inner(
inner: &RoutingTableInner,
routing_domain: RoutingDomain,
has_valid_own_node_info: bool,
v: Option<Arc<BucketEntry>>,
) -> bool {
match v {
None => self.has_valid_own_node_info(routing_domain),
Some(entry) => entry.with(|e| {
None => has_valid_own_node_info,
Some(entry) => entry.with(inner, |_rti, e| {
e.signed_node_info(routing_domain.into())
.map(|sni| sni.has_valid_signature())
.unwrap_or(false)
@ -185,15 +186,18 @@ impl RoutingTable {
}
}
pub fn transform_to_peer_info(
&self,
pub fn transform_to_peer_info_inner(
inner: &RoutingTableInner,
routing_domain: RoutingDomain,
own_peer_info: PeerInfo,
k: DHTKey,
v: Option<Arc<BucketEntry>>,
) -> PeerInfo {
match v {
None => self.get_own_peer_info(routing_domain),
Some(entry) => entry.with(|e| e.make_peer_info(k, routing_domain).unwrap()),
None => own_peer_info,
Some(entry) => entry.with(inner, |_rti, e| {
e.make_peer_info(k, routing_domain).unwrap()
}),
}
}
@ -206,14 +210,16 @@ impl RoutingTable {
mut transform: T,
) -> Vec<O>
where
F: FnMut(DHTKey, Option<Arc<BucketEntry>>) -> bool,
F: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
C: FnMut(
&RoutingTableInner,
&(DHTKey, Option<Arc<BucketEntry>>),
&(DHTKey, Option<Arc<BucketEntry>>),
) -> core::cmp::Ordering,
T: FnMut(DHTKey, Option<Arc<BucketEntry>>) -> O,
T: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
{
let inner = self.inner.read();
let inner = &*inner;
let self_node_id = self.unlocked_inner.node_id;
// collect all the nodes for sorting
@ -221,27 +227,32 @@ impl RoutingTable {
Vec::<(DHTKey, Option<Arc<BucketEntry>>)>::with_capacity(inner.bucket_entry_count + 1);
// add our own node (only one of there with the None entry)
if filter(self_node_id, None) {
if filter(inner, self_node_id, None) {
nodes.push((self_node_id, None));
}
// add all nodes from buckets
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
// Apply filter
if filter(k, Some(v.clone())) {
nodes.push((k, Some(v.clone())));
}
Option::<()>::None
});
Self::with_entries(
&*inner,
cur_ts,
BucketEntryState::Unreliable,
|rti, k, v| {
// Apply filter
if filter(rti, k, Some(v.clone())) {
nodes.push((k, Some(v.clone())));
}
Option::<()>::None
},
);
// sort by preference for returning nodes
nodes.sort_by(compare);
nodes.sort_by(|a, b| compare(inner, a, b));
// return transformed vector for filtered+sorted nodes
let cnt = usize::min(node_count, nodes.len());
let mut out = Vec::<O>::with_capacity(cnt);
for node in nodes {
let val = transform(node.0, node.1);
let val = transform(inner, node.0, node.1);
out.push(val);
}
@ -255,21 +266,21 @@ impl RoutingTable {
transform: T,
) -> Vec<O>
where
F: FnMut(DHTKey, Option<Arc<BucketEntry>>) -> bool,
T: FnMut(DHTKey, Option<Arc<BucketEntry>>) -> O,
F: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
T: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
{
let cur_ts = intf::get_timestamp();
let out = self.find_peers_with_sort_and_filter(
node_count,
cur_ts,
// filter
|k, v| {
|rti, k, v| {
if let Some(entry) = &v {
// always filter out dead nodes
if entry.with(|e| e.state(cur_ts) == BucketEntryState::Dead) {
if entry.with(rti, |_rti, e| e.state(cur_ts) == BucketEntryState::Dead) {
false
} else {
filter(k, v)
filter(rti, k, v)
}
} else {
// always filter out self peer, as it is irrelevant to the 'fastest nodes' search
@ -277,7 +288,7 @@ impl RoutingTable {
}
},
// sort
|(a_key, a_entry), (b_key, b_entry)| {
|rti, (a_key, a_entry), (b_key, b_entry)| {
// same nodes are always the same
if a_key == b_key {
return core::cmp::Ordering::Equal;
@ -292,8 +303,8 @@ impl RoutingTable {
// reliable nodes come first
let ae = a_entry.as_ref().unwrap();
let be = b_entry.as_ref().unwrap();
ae.with(|ae| {
be.with(|be| {
ae.with(rti, |rti, ae| {
be.with(rti, |_rti, be| {
let ra = ae.check_reliable(cur_ts);
let rb = be.check_reliable(cur_ts);
if ra != rb {
@ -337,8 +348,8 @@ impl RoutingTable {
mut transform: T,
) -> Vec<O>
where
F: FnMut(DHTKey, Option<Arc<BucketEntry>>) -> bool,
T: FnMut(DHTKey, Option<Arc<BucketEntry>>) -> O,
F: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
T: FnMut(&RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
{
let cur_ts = intf::get_timestamp();
let node_count = {
@ -351,7 +362,7 @@ impl RoutingTable {
// filter
filter,
// sort
|(a_key, a_entry), (b_key, b_entry)| {
|rti, (a_key, a_entry), (b_key, b_entry)| {
// same nodes are always the same
if a_key == b_key {
return core::cmp::Ordering::Equal;
@ -360,10 +371,10 @@ impl RoutingTable {
// reliable nodes come first, pessimistically treating our own node as unreliable
let ra = a_entry
.as_ref()
.map_or(false, |x| x.with(|x| x.check_reliable(cur_ts)));
.map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts)));
let rb = b_entry
.as_ref()
.map_or(false, |x| x.with(|x| x.check_reliable(cur_ts)));
.map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts)));
if ra != rb {
if ra {
return core::cmp::Ordering::Less;
@ -420,9 +431,7 @@ impl RoutingTable {
fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool {
// Get all our outbound protocol/address types
let outbound_dif = self
.network_manager()
.get_outbound_dial_info_filter(RoutingDomain::PublicInternet);
let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet);
let mapped_port_info = self.get_low_level_port_info();
move |e: &BucketEntryInner| {
@ -481,9 +490,9 @@ impl RoutingTable {
let mut best_inbound_relay: Option<(DHTKey, Arc<BucketEntry>)> = None;
// Iterate all known nodes for candidates
Self::with_entries(inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
Self::with_entries(inner, cur_ts, BucketEntryState::Unreliable, |rti, k, v| {
let v2 = v.clone();
v.with(|e| {
v.with(rti, |rti, e| {
// Ensure we have the node's status
if let Some(node_status) = e.node_status(routing_domain) {
// Ensure the node will relay
@ -491,7 +500,7 @@ impl RoutingTable {
// Compare against previous candidate
if let Some(best_inbound_relay) = best_inbound_relay.as_mut() {
// Less is faster
let better = best_inbound_relay.1.with(|best| {
let better = best_inbound_relay.1.with(rti, |_rti, best| {
BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best)
== std::cmp::Ordering::Less
});

View File

@ -3,6 +3,7 @@ mod bucket_entry;
mod debug;
mod find_nodes;
mod node_ref;
mod route_spec_store;
mod routing_domain_editor;
mod routing_domains;
mod stats_accounting;
@ -19,6 +20,7 @@ pub use debug::*;
pub use find_nodes::*;
use hashlink::LruCache;
pub use node_ref::*;
pub use route_spec_store::*;
pub use routing_domain_editor::*;
pub use routing_domains::*;
pub use stats_accounting::*;
@ -41,7 +43,7 @@ struct RoutingTableInner {
/// The public internet routing domain
public_internet_routing_domain: PublicInternetRoutingDomainDetail,
/// The dial info we use on the local network
local_network_routing_domain: LocalInternetRoutingDomainDetail,
local_network_routing_domain: LocalNetworkRoutingDomainDetail,
/// Interim accounting mechanism for this node's RPC latency to any other node
self_latency_stats_accounting: LatencyStatsAccounting,
/// Interim accounting mechanism for the total bandwidth to/from this node
@ -50,6 +52,8 @@ struct RoutingTableInner {
self_transfer_stats: TransferStatsDownUp,
/// Peers we have recently communicated with
recent_peers: LruCache<DHTKey, RecentPeersEntry>,
/// Storage for private/safety RouteSpecs
route_spec_store: RouteSpecStore,
}
#[derive(Clone, Debug, Default)]
@ -90,12 +94,13 @@ impl RoutingTable {
RoutingTableInner {
buckets: Vec::new(),
public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(),
local_network_routing_domain: LocalInternetRoutingDomainDetail::default(),
local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(),
bucket_entry_count: 0,
self_latency_stats_accounting: LatencyStatsAccounting::new(),
self_transfer_stats_accounting: TransferStatsAccounting::new(),
self_transfer_stats: TransferStatsDownUp::default(),
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
route_spec_store: RouteSpecStore::new(),
}
}
fn new_unlocked_inner(
@ -214,17 +219,21 @@ impl RoutingTable {
pub fn relay_node(&self, domain: RoutingDomain) -> Option<NodeRef> {
let inner = self.inner.read();
Self::with_routing_domain(&*inner, domain, |rd| rd.relay_node())
Self::with_routing_domain(&*inner, domain, |rd| rd.common().relay_node())
}
pub fn has_dial_info(&self, domain: RoutingDomain) -> bool {
let inner = self.inner.read();
Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details().is_empty())
Self::with_routing_domain(&*inner, domain, |rd| {
!rd.common().dial_info_details().is_empty()
})
}
pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec<DialInfoDetail> {
let inner = self.inner.read();
Self::with_routing_domain(&*inner, domain, |rd| rd.dial_info_details().clone())
Self::with_routing_domain(&*inner, domain, |rd| {
rd.common().dial_info_details().clone()
})
}
pub fn first_filtered_dial_info_detail(
@ -235,7 +244,7 @@ impl RoutingTable {
let inner = self.inner.read();
for routing_domain in routing_domain_set {
let did = Self::with_routing_domain(&*inner, routing_domain, |rd| {
for did in rd.dial_info_details() {
for did in rd.common().dial_info_details() {
if did.matches_filter(filter) {
return Some(did.clone());
}
@ -258,7 +267,7 @@ impl RoutingTable {
let mut ret = Vec::new();
for routing_domain in routing_domain_set {
Self::with_routing_domain(&*inner, routing_domain, |rd| {
for did in rd.dial_info_details() {
for did in rd.common().dial_info_details() {
if did.matches_filter(filter) {
ret.push(did.clone());
}
@ -321,8 +330,8 @@ impl RoutingTable {
fn reset_all_seen_our_node_info(inner: &mut RoutingTableInner, routing_domain: RoutingDomain) {
let cur_ts = intf::get_timestamp();
Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| {
v.with_mut(|e| {
Self::with_entries_mut(inner, cur_ts, BucketEntryState::Dead, |rti, _, v| {
v.with_mut(rti, |_rti, e| {
e.set_seen_our_node_info(routing_domain, false);
});
Option::<()>::None
@ -331,50 +340,83 @@ impl RoutingTable {
fn reset_all_updated_since_last_network_change(inner: &mut RoutingTableInner) {
let cur_ts = intf::get_timestamp();
Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| {
v.with_mut(|e| e.set_updated_since_last_network_change(false));
Self::with_entries_mut(inner, cur_ts, BucketEntryState::Dead, |rti, _, v| {
v.with_mut(rti, |_rti, e| {
e.set_updated_since_last_network_change(false)
});
Option::<()>::None
});
}
/// Return a copy of our node's peerinfo
pub fn get_own_peer_info(&self, routing_domain: RoutingDomain) -> PeerInfo {
PeerInfo::new(
NodeId::new(self.node_id()),
self.get_own_signed_node_info(routing_domain),
)
let inner = &*self.inner.read();
Self::with_routing_domain(inner, routing_domain, |rdd| {
rdd.common().with_peer_info(|pi| pi.clone())
})
}
/// Return a copy of our node's signednodeinfo
pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedNodeInfo {
let node_id = NodeId::new(self.node_id());
let secret = self.node_id_secret();
SignedNodeInfo::with_secret(self.get_own_node_info(routing_domain), node_id, &secret)
.unwrap()
let inner = &*self.inner.read();
Self::with_routing_domain(inner, routing_domain, |rdd| {
rdd.common()
.with_peer_info(|pi| pi.signed_node_info.clone())
})
}
/// Return a copy of our node's nodeinfo
pub fn get_own_node_info(&self, routing_domain: RoutingDomain) -> NodeInfo {
let netman = self.network_manager();
let relay_node = self.relay_node(routing_domain);
let pc = netman.get_protocol_config();
NodeInfo {
network_class: netman
.get_network_class(routing_domain)
.unwrap_or(NetworkClass::Invalid),
outbound_protocols: pc.outbound,
address_types: pc.family_global,
min_version: MIN_VERSION,
max_version: MAX_VERSION,
dial_info_detail_list: self.dial_info_details(routing_domain),
relay_peer_info: relay_node
.and_then(|rn| rn.make_peer_info(routing_domain).map(Box::new)),
}
let inner = &*self.inner.read();
Self::with_routing_domain(inner, routing_domain, |rdd| {
rdd.common()
.with_peer_info(|pi| pi.signed_node_info.node_info.clone())
})
}
/// Return our currently registered network class
pub fn has_valid_own_node_info(&self, routing_domain: RoutingDomain) -> bool {
let netman = self.network_manager();
let nc = netman
.get_network_class(routing_domain)
.unwrap_or(NetworkClass::Invalid);
!matches!(nc, NetworkClass::Invalid)
let inner = &*self.inner.read();
Self::with_routing_domain(inner, routing_domain, |rdd| {
rdd.common().has_valid_own_node_info()
})
}
/// Return the domain's currently registered network class
pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option<NetworkClass> {
let inner = &*self.inner.read();
Self::with_routing_domain(inner, routing_domain, |rdd| rdd.common().network_class())
}
/// Return the domain's filter for what we can receivein the form of a dial info filter
pub fn get_inbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter {
let inner = &*self.inner.read();
Self::with_routing_domain(inner, routing_domain, |rdd| {
rdd.common().inbound_dial_info_filter()
})
}
/// Return the domain's filter for what we can receive in the form of a node ref filter
pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter {
let dif = self.get_inbound_dial_info_filter(routing_domain);
NodeRefFilter::new()
.with_routing_domain(routing_domain)
.with_dial_info_filter(dif)
}
/// Return the domain's filter for what we can send out in the form of a dial info filter
pub fn get_outbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter {
let inner = &*self.inner.read();
Self::with_routing_domain(inner, routing_domain, |rdd| {
rdd.common().outbound_dial_info_filter()
})
}
/// Return the domain's filter for what we can receive in the form of a node ref filter
pub fn get_outbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter {
let dif = self.get_outbound_dial_info_filter(routing_domain);
NodeRefFilter::new()
.with_routing_domain(routing_domain)
.with_dial_info_filter(dif)
}
fn bucket_depth(index: usize) -> usize {
@ -434,8 +476,8 @@ impl RoutingTable {
// If the local network topology has changed, nuke the existing local node info and let new local discovery happen
if changed {
let cur_ts = intf::get_timestamp();
Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_rti, e| {
e.with_mut(|e| {
Self::with_entries_mut(&mut *inner, cur_ts, BucketEntryState::Dead, |rti, _, e| {
e.with_mut(rti, |_rti, e| {
e.clear_signed_node_info(RoutingDomain::LocalNetwork);
e.set_seen_our_node_info(RoutingDomain::LocalNetwork, false);
e.set_updated_since_last_network_change(false);
@ -449,12 +491,13 @@ impl RoutingTable {
// should only be performed when there are no node_refs (detached)
pub fn purge_buckets(&self) {
let mut inner = self.inner.write();
let inner = &mut *inner;
log_rtab!(
"Starting routing table buckets purge. Table currently has {} nodes",
inner.bucket_entry_count
);
for bucket in &mut inner.buckets {
bucket.kick(0);
for bucket in &inner.buckets {
bucket.kick(inner, 0);
}
log_rtab!(debug
"Routing table buckets purge complete. Routing table now has {} nodes",
@ -465,13 +508,14 @@ impl RoutingTable {
// Attempt to remove last_connections from entries
pub fn purge_last_connections(&self) {
let mut inner = self.inner.write();
let inner = &mut *inner;
log_rtab!(
"Starting routing table last_connections purge. Table currently has {} nodes",
inner.bucket_entry_count
);
for bucket in &mut inner.buckets {
for bucket in &inner.buckets {
for entry in bucket.entries() {
entry.1.with_mut(|e| {
entry.1.with_mut(inner, |_rti, e| {
e.clear_last_connections();
});
}
@ -488,7 +532,7 @@ impl RoutingTable {
let bucket = &mut inner.buckets[idx];
let bucket_depth = Self::bucket_depth(idx);
if let Some(dead_node_ids) = bucket.kick(bucket_depth) {
if let Some(dead_node_ids) = bucket.kick(inner, bucket_depth) {
// Remove counts
inner.bucket_entry_count -= dead_node_ids.len();
log_rtab!(debug "Routing table now has {} nodes", inner.bucket_entry_count);
@ -524,8 +568,8 @@ impl RoutingTable {
) -> usize {
let mut count = 0usize;
let cur_ts = intf::get_timestamp();
Self::with_entries(inner, cur_ts, min_state, |_, e| {
if e.with(|e| e.best_routing_domain(routing_domain_set))
Self::with_entries(inner, cur_ts, min_state, |rti, _, e| {
if e.with(rti, |_rti, e| e.best_routing_domain(routing_domain_set))
.is_some()
{
count += 1;
@ -535,7 +579,7 @@ impl RoutingTable {
count
}
fn with_entries<T, F: FnMut(DHTKey, Arc<BucketEntry>) -> Option<T>>(
fn with_entries<T, F: FnMut(&RoutingTableInner, DHTKey, Arc<BucketEntry>) -> Option<T>>(
inner: &RoutingTableInner,
cur_ts: u64,
min_state: BucketEntryState,
@ -543,8 +587,29 @@ impl RoutingTable {
) -> Option<T> {
for bucket in &inner.buckets {
for entry in bucket.entries() {
if entry.1.with(|e| e.state(cur_ts) >= min_state) {
if let Some(out) = f(*entry.0, entry.1.clone()) {
if entry.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) {
if let Some(out) = f(inner, *entry.0, entry.1.clone()) {
return Some(out);
}
}
}
}
None
}
fn with_entries_mut<
T,
F: FnMut(&mut RoutingTableInner, DHTKey, Arc<BucketEntry>) -> Option<T>,
>(
inner: &mut RoutingTableInner,
cur_ts: u64,
min_state: BucketEntryState,
mut f: F,
) -> Option<T> {
for bucket in &inner.buckets {
for entry in bucket.entries() {
if entry.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) {
if let Some(out) = f(inner, *entry.0, entry.1.clone()) {
return Some(out);
}
}
@ -561,18 +626,23 @@ impl RoutingTable {
) -> Vec<NodeRef> {
let inner = self.inner.read();
let mut node_refs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
// Only update nodes that haven't seen our node info yet
if all || !v.with(|e| e.has_seen_our_node_info(routing_domain)) {
node_refs.push(NodeRef::new(
self.clone(),
k,
v,
Some(NodeRefFilter::new().with_routing_domain(routing_domain)),
));
}
Option::<()>::None
});
Self::with_entries(
&*inner,
cur_ts,
BucketEntryState::Unreliable,
|rti, k, v| {
// Only update nodes that haven't seen our node info yet
if all || !v.with(rti, |_rti, e| e.has_seen_our_node_info(routing_domain)) {
node_refs.push(NodeRef::new(
self.clone(),
k,
v,
Some(NodeRefFilter::new().with_routing_domain(routing_domain)),
));
}
Option::<()>::None
},
);
node_refs
}
@ -585,35 +655,45 @@ impl RoutingTable {
// Collect relay nodes
let opt_relay_id = Self::with_routing_domain(&*inner, routing_domain, |rd| {
rd.relay_node().map(|rn| rn.node_id())
rd.common().relay_node().map(|rn| rn.node_id())
});
// Collect all entries that are 'needs_ping' and have some node info making them reachable somehow
let mut node_refs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
if v.with(|e| {
e.has_node_info(routing_domain.into())
&& e.needs_ping(cur_ts, opt_relay_id == Some(k))
}) {
node_refs.push(NodeRef::new(
self.clone(),
k,
v,
Some(NodeRefFilter::new().with_routing_domain(routing_domain)),
));
}
Option::<()>::None
});
Self::with_entries(
&*inner,
cur_ts,
BucketEntryState::Unreliable,
|rti, k, v| {
if v.with(rti, |_rti, e| {
e.has_node_info(routing_domain.into())
&& e.needs_ping(cur_ts, opt_relay_id == Some(k))
}) {
node_refs.push(NodeRef::new(
self.clone(),
k,
v,
Some(NodeRefFilter::new().with_routing_domain(routing_domain)),
));
}
Option::<()>::None
},
);
node_refs
}
pub fn get_all_nodes(&self, cur_ts: u64) -> Vec<NodeRef> {
let inner = self.inner.read();
let mut node_refs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
node_refs.push(NodeRef::new(self.clone(), k, v, None));
Option::<()>::None
});
Self::with_entries(
&*inner,
cur_ts,
BucketEntryState::Unreliable,
|_rti, k, v| {
node_refs.push(NodeRef::new(self.clone(), k, v, None));
Option::<()>::None
},
);
node_refs
}
@ -627,7 +707,7 @@ impl RoutingTable {
// in a locked fashion as to ensure the bucket entry state is always valid
pub fn create_node_ref<F>(&self, node_id: DHTKey, update_func: F) -> Option<NodeRef>
where
F: FnOnce(&mut BucketEntryInner),
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner),
{
// Ensure someone isn't trying register this node itself
if node_id == self.node_id() {
@ -637,6 +717,7 @@ impl RoutingTable {
// Lock this entire operation
let mut inner = self.inner.write();
let inner = &mut *inner;
// Look up existing entry
let idx = self.find_bucket_index(node_id);
@ -657,7 +738,7 @@ impl RoutingTable {
// Update the entry
let entry = bucket.entry(&node_id).unwrap();
entry.with_mut(update_func);
entry.with_mut(inner, update_func);
// Kick the bucket
self.unlocked_inner.kick_queue.lock().insert(idx);
@ -669,9 +750,7 @@ impl RoutingTable {
// Update the entry
let bucket = &mut inner.buckets[idx];
let entry = bucket.entry(&node_id).unwrap();
entry.with_mut(|e| {
update_func(e);
});
entry.with_mut(inner, update_func);
nr
}
@ -731,7 +810,7 @@ impl RoutingTable {
}
}
self.create_node_ref(node_id, |e| {
self.create_node_ref(node_id, |_rti, e| {
e.update_signed_node_info(routing_domain, signed_node_info);
})
.map(|mut nr| {
@ -750,7 +829,7 @@ impl RoutingTable {
descriptor: ConnectionDescriptor,
timestamp: u64,
) -> Option<NodeRef> {
let out = self.create_node_ref(node_id, |e| {
let out = self.create_node_ref(node_id, |_rti, e| {
// this node is live because it literally just connected to us
e.touch_last_seen(timestamp);
});
@ -783,9 +862,10 @@ impl RoutingTable {
let mut health = RoutingTableHealth::default();
let cur_ts = intf::get_timestamp();
let inner = self.inner.read();
let inner = &*inner;
for bucket in &inner.buckets {
for (_, v) in bucket.entries() {
match v.with(|e| e.state(cur_ts)) {
match v.with(inner, |_rti, e| e.state(cur_ts)) {
BucketEntryState::Reliable => {
health.reliable_entry_count += 1;
}

View File

@ -103,7 +103,7 @@ impl NodeRef {
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T,
{
let inner = &*self.routing_table.inner.read();
self.entry.with(|e| f(inner, e))
self.entry.with(inner, f)
}
pub(super) fn operate_mut<T, F>(&self, f: F) -> T
@ -111,7 +111,7 @@ impl NodeRef {
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T,
{
let inner = &mut *self.routing_table.inner.write();
self.entry.with_mut(|e| f(inner, e))
self.entry.with_mut(inner, f)
}
// Filtering

View File

@ -0,0 +1,334 @@
use super::*;
use crate::veilid_api::*;
use serde::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
struct RouteSpecDetail {
/// The actual route spec
#[serde(with = "arc_serialize")]
route_spec: Arc<RouteSpec>,
/// Transfers up and down
transfer_stats_down_up: TransferStatsDownUp,
/// Latency stats
latency_stats: LatencyStats,
/// Accounting mechanism for this route's RPC latency
#[serde(skip)]
latency_stats_accounting: LatencyStatsAccounting,
/// Accounting mechanism for the bandwidth across this route
#[serde(skip)]
transfer_stats_accounting: TransferStatsAccounting,
/// Published private route, do not reuse for ephemeral routes
#[serde(skip)]
published: bool,
/// Timestamp of when the route was created
timestamp: u64,
}
/// The core representation of the RouteSpecStore that can be serialized
#[derive(Debug, Serialize, Deserialize)]
pub struct RouteSpecStoreContent {
/// All of the routes we have allocated so far
details: HashMap<DHTKey, RouteSpecDetail>,
}
/// Ephemeral data used to help the RouteSpecStore operate efficiently
#[derive(Debug, Default)]
pub struct RouteSpecStoreCache {
/// The fastest routes by latency
fastest_routes: Vec<DHTKey>,
/// The most reliable routes by node lifetime longevity
reliable_routes: Vec<DHTKey>,
/// How many times nodes have been used
used_nodes: HashMap<DHTKey, usize>,
/// How many times nodes have been used at the terminal point of a route
used_end_nodes: HashMap<DHTKey, usize>,
/// Route spec hop cache, used to quickly disqualify routes
hop_cache: HashSet<Vec<u8>>,
}
#[derive(Debug)]
pub struct RouteSpecStore {
/// Serialize RouteSpecStore content
content: RouteSpecStoreContent,
/// RouteSpecStore cache
cache: RouteSpecStoreCache,
}
fn route_spec_to_hop_cache(spec: Arc<RouteSpec>) -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(spec.hops.len() * DHT_KEY_LENGTH);
for hop in spec.hops {
cache.extend_from_slice(&hop.dial_info.node_id.key.bytes);
}
cache
}
fn node_sublist_to_hop_cache(
nodes: &[(DHTKey, Arc<BucketEntry>)],
start: usize,
len: usize,
) -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(len * DHT_KEY_LENGTH);
for node in &nodes[start..start + len] {
cache.extend_from_slice(&node.0.bytes)
}
cache
}
impl RouteSpecStore {
pub fn new() -> Self {
Self {
content: RouteSpecStoreContent {
details: HashMap::new(),
},
cache: Default::default(),
}
}
pub fn from_cbor(
routing_table: RoutingTable,
cbor: &[u8],
) -> Result<RouteSpecStore, VeilidAPIError> {
let content: RouteSpecStoreContent = serde_cbor::from_slice(cbor)
.map_err(|e| VeilidAPIError::parse_error("invalid route spec store content", e))?;
let rss = RouteSpecStore {
content,
cache: Default::default(),
};
rss.rebuild_cache();
Ok(rss)
}
pub fn to_cbor(&self) -> Vec<u8> {
serde_cbor::to_vec(&self.content).unwrap()
}
fn rebuild_cache(&mut self) {
//
}
fn detail_mut(&mut self, spec: Arc<RouteSpec>) -> &mut RouteSpecDetail {
self.content.details.get_mut(&spec.public_key).unwrap()
}
/// Create a new route
/// Prefers nodes that are not currently in use by another route
/// The route is not yet tested for its reachability
/// Returns None if no route could be allocated at this time
pub fn allocate_route(
&mut self,
routing_table: RoutingTable,
reliable: bool,
hop_count: usize,
) -> Option<Arc<RouteSpec>> {
use core::cmp::Ordering;
let max_route_hop_count = {
let config = routing_table.network_manager().config();
let c = config.get();
let max_route_hop_count = c.network.rpc.max_route_hop_count;
max_route_hop_count.into()
};
if hop_count < 2 {
log_rtab!(error "Not allocating route less than two hops in length");
return None;
}
if hop_count > max_route_hop_count {
log_rtab!(error "Not allocating route longer than max route hop count");
return None;
}
// Get list of all nodes, and sort them for selection
let cur_ts = intf::get_timestamp();
let dial_info_sort = if reliable {
Some(DialInfoDetail::reliable_sort)
} else {
None
};
let filter = |rti, k: DHTKey, v: Option<Arc<BucketEntry>>| -> bool {
// Exclude our own node from routes
if v.is_none() {
return false;
}
let v = v.unwrap();
// Exclude nodes on our local network
let on_local_network = v.with(rti, |_rti, e| {
e.node_info(RoutingDomain::LocalNetwork).is_some()
});
if on_local_network {
return false;
}
// Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route
v.with(rti, |_rti, e| {
let node_info_ok = if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) {
ni.has_any_dial_info()
} else {
false
};
let node_status_ok = if let Some(ns) = e.node_status(RoutingDomain::PublicInternet)
{
ns.will_route()
} else {
false
};
node_info_ok && node_status_ok
})
};
let compare = |rti,
v1: &(DHTKey, Option<Arc<BucketEntry>>),
v2: &(DHTKey, Option<Arc<BucketEntry>>)|
-> Ordering {
// deprioritize nodes that we have already used as end points
let e1_used_end = self
.cache
.used_end_nodes
.get(&v1.0)
.cloned()
.unwrap_or_default();
let e2_used_end = self
.cache
.used_end_nodes
.get(&v2.0)
.cloned()
.unwrap_or_default();
let cmp_used_end = e1_used_end.cmp(&e2_used_end);
if !matches!(cmp_used_end, Ordering::Equal) {
return cmp_used_end;
}
// deprioritize nodes we have used already anywhere
let e1_used = self
.cache
.used_nodes
.get(&v1.0)
.cloned()
.unwrap_or_default();
let e2_used = self
.cache
.used_nodes
.get(&v2.0)
.cloned()
.unwrap_or_default();
let cmp_used = e1_used.cmp(&e2_used);
if !matches!(cmp_used, Ordering::Equal) {
return cmp_used;
}
// always prioritize reliable nodes, but sort by oldest or fastest
let cmpout = v1.1.unwrap().with(rti, |rti, e1| {
v2.1.unwrap().with(rti, |_rti, e2| {
if reliable {
BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2)
} else {
BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2)
}
})
});
cmpout
};
let transform = |rti, k: DHTKey, v: Option<Arc<BucketEntry>>| -> (DHTKey, NodeInfo) {
// Return the key and the nodeinfo for that key
(
k,
v.unwrap().with(rti, |_rti, e| {
e.node_info(RoutingDomain::PublicInternet.into())
.unwrap()
.clone()
}),
)
};
// Pull the whole routing table in sorted order
let node_count = routing_table.get_entry_count(
RoutingDomain::PublicInternet.into(),
BucketEntryState::Unreliable,
);
let mut nodes = routing_table
.find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform);
// If we couldn't find enough nodes, wait until we have more nodes in the routing table
if nodes.len() < hop_count {
log_rtab!(debug "Not enough nodes to construct route at this time. Try again later.");
return None;
}
// Now go through nodes and try to build a route we haven't seen yet
let mut route_nodes = None;
for start in 0..(nodes.len() - hop_count) {
// Get the route cache key
let key = node_sublist_to_hop_cache(&nodes, start, hop_count);
// try each route until we find a unique one
if !self.cache.hop_cache.contains(&key) {
route_nodes = Some(&nodes[start..start + hop_count]);
break;
}
}
if route_nodes.is_none() {
return None;
}
let route_node = route_nodes.unwrap();
// Got a unique route, lets build the detail, register it, and return it
let hops: Vec<RouteHopSpec> = route_node
.into_iter()
.map(|v| RouteHopSpec {
dial_info: NodeDialInfo {
node_id: NodeId::new(v.0),
dial_info: xxx,
},
})
.collect();
let (public_key, secret_key) = generate_secret();
let route_spec = Arc::new(RouteSpec {
public_key,
secret_key,
hops,
});
let rsd = RouteSpecDetail {
route_spec,
transfer_stats_down_up: Default::default(),
latency_stats: Default::default(),
latency_stats_accounting: Default::default(),
transfer_stats_accounting: Default::default(),
published: false,
timestamp: cur_ts,
};
None
}
pub fn release_route(&mut self, spec: Arc<RouteSpec>) {}
pub fn best_route(&mut self, reliable: bool) -> Arc<RouteSpec> {}
/// Mark route as published
/// When first deserialized, routes must be re-published in order to ensure they remain
/// in the RouteSpecStore.
pub fn publish_route(&mut self, spec: Arc<RouteSpec>) {
//compile private route here?
}
pub fn record_latency(
&mut self,
spec: Arc<RouteSpec>,
latency: u64,
) -> veilid_api::LatencyStats {
}
pub fn add_down(&mut self, spec: Arc<RouteSpec>, bytes: u64) {
self.current_transfer.down += bytes;
}
pub fn add_up(&mut self, spec: Arc<RouteSpec>, bytes: u64) {}
pub fn roll_transfers(&mut self) {
//
}
}

View File

@ -3,8 +3,24 @@ use super::*;
enum RoutingDomainChange {
ClearDialInfoDetails,
ClearRelayNode,
SetRelayNode { relay_node: NodeRef },
AddDialInfoDetail { dial_info_detail: DialInfoDetail },
SetRelayNode {
relay_node: NodeRef,
},
AddDialInfoDetail {
dial_info_detail: DialInfoDetail,
},
SetupNode {
node_id: DHTKey,
node_id_secret: DHTKeySecret,
},
SetupNetwork {
outbound_protocols: ProtocolTypeSet,
inbound_protocols: ProtocolTypeSet,
address_types: AddressTypeSet,
},
SetNetworkClass {
network_class: Option<NetworkClass>,
},
}
pub struct RoutingDomainEditor {
@ -67,9 +83,40 @@ impl RoutingDomainEditor {
Ok(())
}
#[instrument(level = "debug", skip(self))]
pub fn setup_node(&mut self, node_id: DHTKey, node_id_secret: DHTKeySecret) {
self.changes.push(RoutingDomainChange::SetupNode {
node_id,
node_id_secret,
})
}
#[instrument(level = "debug", skip(self))]
pub fn setup_network(
&mut self,
outbound_protocols: ProtocolTypeSet,
inbound_protocols: ProtocolTypeSet,
address_types: AddressTypeSet,
) {
self.changes.push(RoutingDomainChange::SetupNetwork {
outbound_protocols,
inbound_protocols,
address_types,
})
}
#[instrument(level = "debug", skip(self))]
pub fn set_network_class(&mut self, network_class: Option<NetworkClass>) {
self.changes
.push(RoutingDomainChange::SetNetworkClass { network_class })
}
#[instrument(level = "debug", skip(self))]
pub async fn commit(self) {
// No locking if we have nothing to do
if self.changes.is_empty() {
return;
}
let mut changed = false;
{
let node_id = self.routing_table.node_id();
@ -81,17 +128,17 @@ impl RoutingDomainEditor {
match change {
RoutingDomainChange::ClearDialInfoDetails => {
debug!("[{:?}] cleared dial info details", self.routing_domain);
detail.clear_dial_info_details();
detail.common_mut().clear_dial_info_details();
changed = true;
}
RoutingDomainChange::ClearRelayNode => {
debug!("[{:?}] cleared relay node", self.routing_domain);
detail.set_relay_node(None);
detail.common_mut().set_relay_node(None);
changed = true;
}
RoutingDomainChange::SetRelayNode { relay_node } => {
debug!("[{:?}] set relay node: {}", self.routing_domain, relay_node);
detail.set_relay_node(Some(relay_node));
detail.common_mut().set_relay_node(Some(relay_node));
changed = true;
}
RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => {
@ -99,7 +146,9 @@ impl RoutingDomainEditor {
"[{:?}] add dial info detail: {:?}",
self.routing_domain, dial_info_detail
);
detail.add_dial_info_detail(dial_info_detail.clone());
detail
.common_mut()
.add_dial_info_detail(dial_info_detail.clone());
info!(
"{:?} Dial Info: {}",
@ -112,8 +161,68 @@ impl RoutingDomainEditor {
);
changed = true;
}
RoutingDomainChange::SetupNode {
node_id,
node_id_secret,
} => {
debug!(
"[{:?}] setup node: {}",
self.routing_domain,
node_id.encode()
);
detail.common_mut().setup_node(node_id, node_id_secret);
changed = true;
}
RoutingDomainChange::SetupNetwork {
outbound_protocols,
inbound_protocols,
address_types,
} => {
let old_outbound_protocols = detail.common().outbound_protocols();
let old_inbound_protocols = detail.common().inbound_protocols();
let old_address_types = detail.common().address_types();
let this_changed = old_outbound_protocols != outbound_protocols
|| old_inbound_protocols != inbound_protocols
|| old_address_types != address_types;
debug!(
"[{:?}] setup network: {:?} {:?} {:?}",
self.routing_domain,
outbound_protocols,
inbound_protocols,
address_types
);
detail.common_mut().setup_network(
outbound_protocols,
inbound_protocols,
address_types,
);
if this_changed {
changed = true;
}
}
RoutingDomainChange::SetNetworkClass { network_class } => {
let old_network_class = detail.common().network_class();
let this_changed = old_network_class != network_class;
debug!(
"[{:?}] set network class: {:?}",
self.routing_domain, network_class,
);
detail.common_mut().set_network_class(network_class);
if this_changed {
changed = true;
}
}
}
}
if changed {
detail.common_mut().clear_cache()
}
});
if changed {
RoutingTable::reset_all_seen_our_node_info(inner, self.routing_domain);

View File

@ -1,62 +1,204 @@
use super::*;
#[derive(Debug)]
pub struct RoutingDomainDetailCommon {
routing_domain: RoutingDomain,
node_id: DHTKey,
node_id_secret: DHTKeySecret,
network_class: Option<NetworkClass>,
outbound_protocols: ProtocolTypeSet,
inbound_protocols: ProtocolTypeSet,
address_types: AddressTypeSet,
relay_node: Option<NodeRef>,
dial_info_details: Vec<DialInfoDetail>,
// caches
cached_peer_info: Mutex<Option<PeerInfo>>,
}
impl RoutingDomainDetailCommon {
pub fn new(routing_domain: RoutingDomain) -> Self {
Self {
routing_domain,
node_id: Default::default(),
node_id_secret: Default::default(),
network_class: Default::default(),
outbound_protocols: Default::default(),
inbound_protocols: Default::default(),
address_types: Default::default(),
relay_node: Default::default(),
dial_info_details: Default::default(),
cached_peer_info: Mutex::new(Default::default()),
}
}
// Set from routing table
pub(super) fn setup_node(&mut self, node_id: DHTKey, node_id_secret: DHTKeySecret) {
self.node_id = node_id;
self.node_id_secret = node_id_secret;
self.clear_cache();
}
// Set from network manager
pub(super) fn setup_network(
&mut self,
outbound_protocols: ProtocolTypeSet,
inbound_protocols: ProtocolTypeSet,
address_types: AddressTypeSet,
) {
self.outbound_protocols = outbound_protocols;
self.inbound_protocols = inbound_protocols;
self.address_types = address_types;
}
pub fn node_id(&self) -> DHTKey {
self.node_id
}
pub fn node_id_secret(&self) -> DHTKeySecret {
self.node_id_secret
}
pub(super) fn set_network_class(&mut self, network_class: Option<NetworkClass>) {
self.network_class = network_class;
}
pub fn network_class(&self) -> Option<NetworkClass> {
self.network_class
}
pub fn outbound_protocols(&self) -> ProtocolTypeSet {
self.outbound_protocols
}
pub fn inbound_protocols(&self) -> ProtocolTypeSet {
self.inbound_protocols
}
pub fn address_types(&self) -> AddressTypeSet {
self.address_types
}
pub fn relay_node(&self) -> Option<NodeRef> {
self.relay_node.clone()
}
pub(super) fn set_relay_node(&mut self, opt_relay_node: Option<NodeRef>) {
self.relay_node = opt_relay_node.map(|nr| {
nr.filtered_clone(NodeRefFilter::new().with_routing_domain(self.routing_domain))
})
}
pub fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
&self.dial_info_details
}
pub(super) fn clear_dial_info_details(&mut self) {
self.dial_info_details.clear();
}
pub(super) fn add_dial_info_detail(&mut self, did: DialInfoDetail) {
self.dial_info_details.push(did);
self.dial_info_details.sort();
}
pub fn has_valid_own_node_info(&self) -> bool {
self.network_class.unwrap_or(NetworkClass::Invalid) != NetworkClass::Invalid
}
pub fn with_peer_info<F, R>(&self, f: F) -> R
where
F: FnOnce(&PeerInfo) -> R,
{
let cpi = self.cached_peer_info.lock();
if cpi.is_none() {
// Regenerate peer info
let pi = PeerInfo::new(
NodeId::new(self.node_id),
SignedNodeInfo::with_secret(
NodeInfo {
network_class: self.network_class.unwrap_or(NetworkClass::Invalid),
outbound_protocols: self.outbound_protocols,
address_types: self.address_types,
min_version: MIN_VERSION,
max_version: MAX_VERSION,
dial_info_detail_list: self.dial_info_details.clone(),
relay_peer_info: self
.relay_node
.and_then(|rn| rn.make_peer_info(self.routing_domain).map(Box::new)),
},
NodeId::new(self.node_id),
&self.node_id_secret,
)
.unwrap(),
);
// Cache the peer info
*cpi = Some(pi);
}
f(cpi.as_ref().unwrap())
}
pub fn inbound_dial_info_filter(&self) -> DialInfoFilter {
DialInfoFilter::all()
.with_protocol_type_set(self.inbound_protocols)
.with_address_type_set(self.address_types)
}
pub fn outbound_dial_info_filter(&self) -> DialInfoFilter {
DialInfoFilter::all()
.with_protocol_type_set(self.outbound_protocols)
.with_address_type_set(self.address_types)
}
pub(super) fn clear_cache(&self) {
*self.cached_peer_info.lock() = None;
}
}
/// General trait for all routing domains
pub trait RoutingDomainDetail {
// Common accessors
fn common(&self) -> &RoutingDomainDetailCommon;
fn common_mut(&mut self) -> &mut RoutingDomainDetailCommon;
// Per-domain accessors
fn can_contain_address(&self, address: Address) -> bool;
fn relay_node(&self) -> Option<NodeRef>;
fn set_relay_node(&mut self, opt_relay_node: Option<NodeRef>);
fn dial_info_details(&self) -> &Vec<DialInfoDetail>;
fn clear_dial_info_details(&mut self);
fn add_dial_info_detail(&mut self, did: DialInfoDetail);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// Public Internet routing domain internals
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct PublicInternetRoutingDomainDetail {
/// An optional node we relay through for this domain
relay_node: Option<NodeRef>,
/// The dial infos on this domain we can be reached by
dial_info_details: Vec<DialInfoDetail>,
/// Common implementation for all routing domains
common: RoutingDomainDetailCommon,
}
impl Default for PublicInternetRoutingDomainDetail {
fn default() -> Self {
Self {
common: RoutingDomainDetailCommon::new(RoutingDomain::PublicInternet),
}
}
}
impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
fn common(&self) -> &RoutingDomainDetailCommon {
&self.common
}
fn common_mut(&mut self) -> &mut RoutingDomainDetailCommon {
&mut self.common
}
fn can_contain_address(&self, address: Address) -> bool {
address.is_global()
}
fn relay_node(&self) -> Option<NodeRef> {
self.relay_node.clone()
}
fn set_relay_node(&mut self, opt_relay_node: Option<NodeRef>) {
self.relay_node = opt_relay_node.map(|nr| {
nr.filtered_clone(
NodeRefFilter::new().with_routing_domain(RoutingDomain::PublicInternet),
)
})
}
fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
&self.dial_info_details
}
fn clear_dial_info_details(&mut self) {
self.dial_info_details.clear();
}
fn add_dial_info_detail(&mut self, did: DialInfoDetail) {
self.dial_info_details.push(did);
self.dial_info_details.sort();
}
}
/// Local Network routing domain internals
#[derive(Debug, Default)]
pub struct LocalInternetRoutingDomainDetail {
/// An optional node we relay through for this domain
relay_node: Option<NodeRef>,
/// The dial infos on this domain we can be reached by
dial_info_details: Vec<DialInfoDetail>,
#[derive(Debug)]
pub struct LocalNetworkRoutingDomainDetail {
/// The local networks this domain will communicate with
local_networks: Vec<(IpAddr, IpAddr)>,
/// Common implementation for all routing domains
common: RoutingDomainDetailCommon,
}
impl LocalInternetRoutingDomainDetail {
impl Default for LocalNetworkRoutingDomainDetail {
fn default() -> Self {
Self {
local_networks: Default::default(),
common: RoutingDomainDetailCommon::new(RoutingDomain::LocalNetwork),
}
}
}
impl LocalNetworkRoutingDomainDetail {
pub fn set_local_networks(&mut self, mut local_networks: Vec<(IpAddr, IpAddr)>) -> bool {
local_networks.sort();
if local_networks == self.local_networks {
@ -67,7 +209,13 @@ impl LocalInternetRoutingDomainDetail {
}
}
impl RoutingDomainDetail for LocalInternetRoutingDomainDetail {
impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
fn common(&self) -> &RoutingDomainDetailCommon {
&self.common
}
fn common_mut(&mut self) -> &mut RoutingDomainDetailCommon {
&mut self.common
}
fn can_contain_address(&self, address: Address) -> bool {
let ip = address.to_ip_addr();
for localnet in &self.local_networks {
@ -77,22 +225,4 @@ impl RoutingDomainDetail for LocalInternetRoutingDomainDetail {
}
false
}
fn relay_node(&self) -> Option<NodeRef> {
self.relay_node.clone()
}
fn set_relay_node(&mut self, opt_relay_node: Option<NodeRef>) {
self.relay_node = opt_relay_node.map(|nr| {
nr.filtered_clone(NodeRefFilter::new().with_routing_domain(RoutingDomain::LocalNetwork))
});
}
fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
&self.dial_info_details
}
fn clear_dial_info_details(&mut self) {
self.dial_info_details.clear();
}
fn add_dial_info_detail(&mut self, did: DialInfoDetail) {
self.dial_info_details.push(did);
self.dial_info_details.sort();
}
}

View File

@ -22,8 +22,13 @@ impl RoutingTable {
);
// Roll all bucket entry transfers
for b in &mut inner.buckets {
b.roll_transfers(last_ts, cur_ts);
let entries: Vec<Arc<BucketEntry>> = inner
.buckets
.iter()
.flat_map(|b| b.entries().map(|(_k, v)| v.clone()))
.collect();
for v in entries {
v.with_mut(inner, |_rti, e| e.roll_transfers(last_ts, cur_ts));
}
Ok(())
}

View File

@ -7,8 +7,8 @@ pub enum Destination {
Direct {
/// The node to send to
target: NodeRef,
/// An optional safety route specification to send from for sender privacy
safety_route_spec: Option<Arc<SafetyRouteSpec>>,
/// Require safety route or not
safety: bool,
},
/// Send to node for relay purposes
Relay {
@ -16,15 +16,17 @@ pub enum Destination {
relay: NodeRef,
/// The final destination the relay should send to
target: DHTKey,
/// An optional safety route specification to send from for sender privacy
safety_route_spec: Option<Arc<SafetyRouteSpec>>,
/// Require safety route or not
safety: bool,
},
/// Send to private route (privateroute)
PrivateRoute {
/// A private route to send to
private_route: PrivateRoute,
/// An optional safety route specification to send from for sender privacy
safety_route_spec: Option<Arc<SafetyRouteSpec>>,
/// Require safety route or not
safety: bool,
/// Prefer reliability or not
reliable: bool,
},
}
@ -32,115 +34,47 @@ impl Destination {
pub fn direct(target: NodeRef) -> Self {
Self::Direct {
target,
safety_route_spec: None,
safety: false,
}
}
pub fn relay(relay: NodeRef, target: DHTKey) -> Self {
Self::Relay {
relay,
target,
safety_route_spec: None,
safety: false,
}
}
pub fn private_route(private_route: PrivateRoute) -> Self {
pub fn private_route(private_route: PrivateRoute, reliable: bool) -> Self {
Self::PrivateRoute {
private_route,
safety_route_spec: None,
safety: false,
reliable,
}
}
// pub fn target_id(&self) -> DHTKey {
// match self {
// Destination::Direct {
// target,
// safety_route_spec,
// } => target.node_id(),
// Destination::Relay {
// relay,
// target,
// safety_route_spec,
// } => *target,
// Destination::PrivateRoute {
// private_route,
// safety_route_spec,
// } => {}
// }
// }
// pub fn best_routing_domain(&self) -> RoutingDomain {
// match self {
// Destination::Direct {
// target,
// safety_route_spec,
// } => {
// if safety_route_spec.is_some() {
// RoutingDomain::PublicInternet
// } else {
// target
// .best_routing_domain()
// .unwrap_or(RoutingDomain::PublicInternet)
// }
// }
// Destination::Relay {
// relay,
// target,
// safety_route_spec,
// } => {
// if safety_route_spec.is_some() {
// RoutingDomain::PublicInternet
// } else {
// relay
// .best_routing_domain()
// .unwrap_or(RoutingDomain::PublicInternet)
// }
// }
// Destination::PrivateRoute {
// private_route: _,
// safety_route_spec: _,
// } => RoutingDomain::PublicInternet,
// }
// }
pub fn safety_route_spec(&self) -> Option<Arc<SafetyRouteSpec>> {
pub fn with_safety(self) -> Self {
match self {
Destination::Direct {
target: _,
safety_route_spec,
} => safety_route_spec.clone(),
Destination::Relay {
relay: _,
target: _,
safety_route_spec,
} => safety_route_spec.clone(),
Destination::PrivateRoute {
private_route: _,
safety_route_spec,
} => safety_route_spec.clone(),
}
}
pub fn with_safety_route_spec(self, safety_route_spec: Arc<SafetyRouteSpec>) -> Self {
match self {
Destination::Direct {
Destination::Direct { target, safety: _ } => Self::Direct {
target,
safety_route_spec: _,
} => Self::Direct {
target,
safety_route_spec: Some(safety_route_spec),
safety: true,
},
Destination::Relay {
relay,
target,
safety_route_spec: _,
safety: _,
} => Self::Relay {
relay,
target,
safety_route_spec: Some(safety_route_spec),
safety: true,
},
Destination::PrivateRoute {
private_route,
safety_route_spec: _,
safety: _,
reliable,
} => Self::PrivateRoute {
private_route,
safety_route_spec: Some(safety_route_spec),
safety: true,
reliable,
},
}
}
@ -149,39 +83,29 @@ impl Destination {
impl fmt::Display for Destination {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Destination::Direct {
target,
safety_route_spec,
} => {
let sr = safety_route_spec
.as_ref()
.map(|_sr| "+SR".to_owned())
.unwrap_or_default();
Destination::Direct { target, safety } => {
let sr = if *safety { "+SR" } else { "" };
write!(f, "{:?}{}", target, sr)
write!(f, "{}{}", target, sr)
}
Destination::Relay {
relay,
target,
safety_route_spec,
safety,
} => {
let sr = safety_route_spec
.as_ref()
.map(|_sr| "+SR".to_owned())
.unwrap_or_default();
let sr = if *safety { "+SR" } else { "" };
write!(f, "{:?}@{:?}{}", target.encode(), relay, sr)
write!(f, "{}@{}{}", target.encode(), relay, sr)
}
Destination::PrivateRoute {
private_route,
safety_route_spec,
safety,
reliable,
} => {
let sr = safety_route_spec
.as_ref()
.map(|_sr| "+SR".to_owned())
.unwrap_or_default();
let sr = if *safety { "+SR" } else { "" };
let rl = if *reliable { "+RL" } else { "" };
write!(f, "{}{}", private_route, sr)
write!(f, "{}{}{}", private_route, sr, rl)
}
}
}

View File

@ -544,7 +544,8 @@ impl RPCProcessor {
}
// Don't do this if our own signed node info isn't valid yet
let routing_table = self.routing_table();
if !routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet) {
let network_manager = self.network_manager();
if !RoutingTable::has_valid_own_node_info(network_manager, RoutingDomain::PublicInternet) {
return None;
}

View File

@ -62,16 +62,34 @@ impl RPCProcessor {
// add node information for the requesting node to our routing table
let routing_table = self.routing_table();
let rt2 = routing_table.clone();
let rt3 = routing_table.clone();
let network_manager = self.network_manager();
let has_valid_own_node_info =
routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet);
let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet);
// find N nodes closest to the target node in our routing table
let closest_nodes = routing_table.find_closest_nodes(
find_node_q.node_id,
// filter
move |_k, v| rt2.filter_has_valid_signed_node_info(RoutingDomain::PublicInternet, v),
|rti, _k, v| {
RoutingTable::filter_has_valid_signed_node_info_inner(
rti,
RoutingDomain::PublicInternet,
has_valid_own_node_info,
v,
)
},
// transform
move |k, v| rt3.transform_to_peer_info(RoutingDomain::PublicInternet, k, v),
|rti, k, v| {
let own_peer_info = own_peer_info.clone();
RoutingTable::transform_to_peer_info_inner(
rti,
RoutingDomain::PublicInternet,
own_peer_info,
k,
v,
)
},
);
// Make status answer

View File

@ -87,7 +87,7 @@ impl RPCProcessor {
routing_domain,
dial_info.clone(),
);
let will_validate_dial_info_filter = |e: &BucketEntryInner| {
let will_validate_dial_info_filter = |_rti, e: &BucketEntryInner| {
if let Some(status) = &e.node_status(routing_domain) {
status.will_validate_dial_info()
} else {

View File

@ -2002,25 +2002,6 @@ impl VeilidAPI {
.map_err(|e| VeilidAPIError::internal(e))
}
////////////////////////////////////////////////////////////////
// Safety / Private Route Handling
#[instrument(level = "debug", err, skip(self))]
pub async fn new_safety_route_spec(
&self,
_hops: u8,
) -> Result<SafetyRouteSpec, VeilidAPIError> {
panic!("unimplemented");
}
#[instrument(level = "debug", err, skip(self))]
pub async fn new_private_route_spec(
&self,
_hops: u8,
) -> Result<PrivateRouteSpec, VeilidAPIError> {
panic!("unimplemented");
}
////////////////////////////////////////////////////////////////
// Routing Context

View File

@ -9,35 +9,17 @@ pub struct RouteHopSpec {
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PrivateRouteSpec {
pub struct RouteSpec {
//
pub public_key: DHTKey,
pub secret_key: DHTKeySecret,
pub hops: Vec<RouteHopSpec>,
}
impl PrivateRouteSpec {
impl RouteSpec {
pub fn new() -> Self {
let (pk, sk) = generate_secret();
PrivateRouteSpec {
public_key: pk,
secret_key: sk,
hops: Vec::new(),
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct SafetyRouteSpec {
pub public_key: DHTKey,
pub secret_key: DHTKeySecret,
pub hops: Vec<RouteHopSpec>,
}
impl SafetyRouteSpec {
pub fn new() -> Self {
let (pk, sk) = generate_secret();
SafetyRouteSpec {
RouteSpec {
public_key: pk,
secret_key: sk,
hops: Vec::new(),

View File

@ -10,10 +10,8 @@ pub enum Target {
pub struct RoutingContextInner {}
pub struct RoutingContextUnlockedInner {
/// Safety route specified here is for _this_ node's anonymity as a sender, used via the 'route' operation
safety_route_spec: Option<Arc<SafetyRouteSpec>>,
/// Private route specified here is for _this_ node's anonymity as a receiver, passed out via the 'respond_to' field for replies
private_route_spec: Option<Arc<PrivateRouteSpec>>,
/// Enforce use of private routing
privacy: bool,
/// Choose reliable protocols over unreliable/faster protocols when available
reliable: bool,
}
@ -43,24 +41,18 @@ impl RoutingContext {
api,
inner: Arc::new(Mutex::new(RoutingContextInner {})),
unlocked_inner: Arc::new(RoutingContextUnlockedInner {
safety_route_spec: None,
private_route_spec: None,
privacy: false,
reliable: false,
}),
}
}
pub fn with_privacy(
self,
safety_route_spec: SafetyRouteSpec,
private_route_spec: PrivateRouteSpec,
) -> Self {
pub fn with_privacy(self) -> Self {
Self {
api: self.api.clone(),
inner: Arc::new(Mutex::new(RoutingContextInner {})),
unlocked_inner: Arc::new(RoutingContextUnlockedInner {
safety_route_spec: Some(Arc::new(safety_route_spec)),
private_route_spec: Some(Arc::new(private_route_spec)),
privacy: true,
reliable: self.unlocked_inner.reliable,
}),
}
@ -71,8 +63,7 @@ impl RoutingContext {
api: self.api.clone(),
inner: Arc::new(Mutex::new(RoutingContextInner {})),
unlocked_inner: Arc::new(RoutingContextUnlockedInner {
safety_route_spec: self.unlocked_inner.safety_route_spec.clone(),
private_route_spec: self.unlocked_inner.private_route_spec.clone(),
privacy: self.unlocked_inner.privacy,
reliable: true,
}),
}
@ -102,12 +93,13 @@ impl RoutingContext {
}
Ok(rpc_processor::Destination::Direct {
target: nr,
safety_route_spec: self.unlocked_inner.safety_route_spec.clone(),
safety: self.unlocked_inner.privacy,
})
}
Target::PrivateRoute(pr) => Ok(rpc_processor::Destination::PrivateRoute {
private_route: pr,
safety_route_spec: self.unlocked_inner.safety_route_spec.clone(),
safety: self.unlocked_inner.privacy,
reliable: self.unlocked_inner.reliable,
}),
}
}

View File

@ -113,3 +113,18 @@ pub mod opt_json_as_string {
}
}
}
pub mod arc_serialize {
use alloc::sync::Arc;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<T: Serialize, S: Serializer>(v: &Arc<T>, s: S) -> Result<S::Ok, S::Error> {
T::serialize(v.as_ref(), s)
}
pub fn deserialize<'de, T: Deserialize<'de>, D: Deserializer<'de>>(
d: D,
) -> Result<Arc<T>, D::Error> {
Ok(Arc::new(T::deserialize(d)?))
}
}