better external address sampling

This commit is contained in:
Christien Rioux 2024-09-24 21:03:21 -04:00
parent f74df2b0c6
commit 55f07d7bcc
27 changed files with 766 additions and 639 deletions

View File

@ -0,0 +1,313 @@
/// Address checker - keep track of how other nodes are seeing our node's address on a per-protocol basis
/// Used to determine if our address has changed and if we should re-publish new PeerInfo
use super::*;
/// Number of 'existing dialinfo inconsistent' results in the cache during inbound-capable to trigger detection
pub const ADDRESS_INCONSISTENCY_DETECTION_COUNT: usize = 3;
/// Number of consistent results in the cache during outbound-only to trigger detection
pub const ADDRESS_CONSISTENCY_DETECTION_COUNT: usize = 3;
/// Length of consistent/inconsistent result cache for detection
pub const ADDRESS_CHECK_CACHE_SIZE: usize = 10;
/// Length of consistent/inconsistent result cache for detection
// pub const ADDRESS_CHECK_PEER_COUNT: usize = 256;
// /// Frequency of address checks
// pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60;
// /// Duration we leave nodes in the inconsistencies table
// pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: TimestampDuration =
// TimestampDuration::new(300_000_000u64); // 5 minutes
// /// How long we punish nodes for lying about our address
// pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration =
// TimestampDuration::new(3_600_000_000_u64); // 60 minutes
/// Address checker config
pub(crate) struct AddressCheckConfig {
pub(crate) detect_address_changes: bool,
pub(crate) ip6_prefix_size: usize,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct AddressCheckCacheKey(RoutingDomain, ProtocolType, AddressType);
/// Address checker - keep track of how other nodes are seeing our node's address on a per-protocol basis
/// Used to determine if our address has changed and if we should re-publish new PeerInfo
pub(crate) struct AddressCheck {
config: AddressCheckConfig,
net: Network,
current_network_class: BTreeMap<RoutingDomain, NetworkClass>,
current_addresses: BTreeMap<AddressCheckCacheKey, HashSet<SocketAddress>>,
// Used by InboundCapable to determine if we have changed our address or re-do our network class
address_inconsistency_table: BTreeMap<AddressCheckCacheKey, usize>,
// Used by OutboundOnly to determine if we should re-do our network class
address_consistency_table: BTreeMap<AddressCheckCacheKey, LruCache<IpAddr, SocketAddress>>,
}
impl AddressCheck {
pub fn new(config: AddressCheckConfig, net: Network) -> Self {
Self {
config,
net,
current_network_class: BTreeMap::new(),
current_addresses: BTreeMap::new(),
address_inconsistency_table: BTreeMap::new(),
address_consistency_table: BTreeMap::new(),
}
}
/// Accept a report of any peerinfo that has changed
pub fn report_peer_info_change(&mut self, peer_info: Arc<PeerInfo>) {
let routing_domain = peer_info.routing_domain();
let network_class = peer_info.signed_node_info().node_info().network_class();
self.current_network_class
.insert(routing_domain, network_class);
for protocol_type in ProtocolTypeSet::all() {
for address_type in AddressTypeSet::all() {
let acck = AddressCheckCacheKey(routing_domain, protocol_type, address_type);
// Clear our current addresses so we can rebuild them for this routing domain
self.current_addresses.remove(&acck);
// Clear our history as well now so we start fresh when we get a new peer info
self.address_inconsistency_table.remove(&acck);
self.address_consistency_table.remove(&acck);
}
}
for did in peer_info
.signed_node_info()
.node_info()
.dial_info_detail_list()
{
// Strip port from direct and mapped addresses
// as the incoming dialinfo may not match the outbound
// connections' NAT mapping. In this case we only check for IP address changes.
let socket_address =
if did.class == DialInfoClass::Direct || did.class == DialInfoClass::Mapped {
did.dial_info.socket_address().with_port(0)
} else {
did.dial_info.socket_address()
};
let address_type = did.dial_info.address_type();
let protocol_type = did.dial_info.protocol_type();
let acck = AddressCheckCacheKey(routing_domain, protocol_type, address_type);
self.current_addresses
.entry(acck)
.or_default()
.insert(socket_address);
}
}
/// Accept a report of our address as seen by the other end of a flow, such
/// as the StatusA response from a StatusQ
pub fn report_socket_address_change(
&mut self,
routing_domain: RoutingDomain, // the routing domain used by this flow
socket_address: SocketAddress, // the socket address as seen by the remote peer
old_socket_address: Option<SocketAddress>, // the socket address previously for this peer
flow: Flow, // the flow used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) {
// Don't accept any reports if we're already in the middle of a public dial info check
if self.net.needs_public_dial_info_check() {
return;
}
// Ignore the LocalNetwork routing domain because we know if our local addresses change
// from our interfaces
if matches!(routing_domain, RoutingDomain::LocalNetwork) {
return;
}
// Ignore flows that do not start from our listening port (unbound connections etc),
// because a router is going to map these differently
let Some(pla) = self
.net
.get_preferred_local_address_by_key(flow.protocol_type(), flow.address_type())
else {
return;
};
let Some(local) = flow.local() else {
return;
};
if local.port() != pla.port() {
log_network_result!(debug "ignoring address report because local port did not match listener: {} != {}", local.port(), pla.port());
return;
}
// Get the ip(block) this report is coming from
let reporting_ipblock =
ip_to_ipblock(self.config.ip6_prefix_size, flow.remote_address().ip_addr());
// Reject public address reports from nodes that we know are behind symmetric nat or
// nodes that must be using a relay for everything
let Some(reporting_node_info) = reporting_peer.node_info(routing_domain) else {
return;
};
if reporting_node_info.network_class() != NetworkClass::InboundCapable {
return;
}
// If the socket address reported is the same as the reporter, then this is coming through a relay
// or it should be ignored due to local proximity (nodes on the same network block should not be trusted as
// public ip address reporters, only disinterested parties)
if reporting_ipblock == ip_to_ipblock(self.config.ip6_prefix_size, socket_address.ip_addr())
{
return;
}
// Get current network class / dial info
// If we haven't gotten our own network class yet we're done for now
let Some(network_class) = self.current_network_class.get(&routing_domain) else {
return;
};
// Process the state of the address checker and see if we need to
// perform a full address check for this routing domain
let needs_address_detection = match network_class {
NetworkClass::InboundCapable => self.detect_for_inbound_capable(
routing_domain,
socket_address,
old_socket_address,
flow,
reporting_peer,
),
NetworkClass::OutboundOnly => self.detect_for_outbound_only(
routing_domain,
socket_address,
flow,
reporting_ipblock,
),
NetworkClass::WebApp | NetworkClass::Invalid => {
return;
}
};
if needs_address_detection {
if self.config.detect_address_changes {
// Reset the address check cache now so we can start detecting fresh
info!(
"{:?} address has changed, detecting dial info",
routing_domain
);
// Re-detect the public dialinfo
self.net.set_needs_public_dial_info_check(None);
} else {
warn!(
"{:?} address may have changed. Restarting the server may be required.",
routing_domain
);
}
}
}
fn matches_current_address(
&self,
acckey: AddressCheckCacheKey,
socket_address: SocketAddress,
) -> bool {
self.current_addresses
.get(&acckey)
.map(|current_addresses| {
current_addresses.contains(&socket_address)
|| current_addresses.contains(&socket_address.with_port(0))
})
.unwrap_or(false)
}
// If we are inbound capable, but start to see places where our sender info used to match our dial info
// but no longer matches our dial info (count up the number of changes -away- from our dial info)
// then trigger a detection of dial info and network class
fn detect_for_inbound_capable(
&mut self,
routing_domain: RoutingDomain, // the routing domain used by this flow
socket_address: SocketAddress, // the socket address as seen by the remote peer
old_socket_address: Option<SocketAddress>, // the socket address previously for this peer
flow: Flow, // the flow used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) -> bool {
let acckey =
AddressCheckCacheKey(routing_domain, flow.protocol_type(), flow.address_type());
// Check the current socket address and see if it matches our current dial info
let new_matches_current = self.matches_current_address(acckey, socket_address);
// If we have something that matches our current dial info at all, consider it a validation
if new_matches_current {
self.address_inconsistency_table
.entry(acckey)
.and_modify(|ait| {
if *ait != 0 {
log_net!(debug "Resetting address inconsistency for {:?} due to match on flow {:?} from {}", acckey, flow, reporting_peer);
}
*ait = 0;
})
.or_insert(0);
return false;
}
// See if we have a case of switching away from our dial info
let old_matches_current = old_socket_address
.map(|osa| self.matches_current_address(acckey, osa))
.unwrap_or(false);
if old_matches_current {
let val = *self
.address_inconsistency_table
.entry(acckey)
.and_modify(|ait| {
*ait += 1;
})
.or_insert(1);
log_net!(debug "Adding address inconsistency ({}) for {:?} due to address {} on flow {:?} from {}", val, acckey, socket_address, flow, reporting_peer);
return val >= ADDRESS_INCONSISTENCY_DETECTION_COUNT;
}
false
}
// If we are currently outbound only, we don't have any public dial info
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
// lru the addresses we're seeing and if they all match (same ip only?) then trigger
fn detect_for_outbound_only(
&mut self,
routing_domain: RoutingDomain, // the routing domain used by this flow
socket_address: SocketAddress, // the socket address as seen by the remote peer
flow: Flow, // the flow used
reporting_ipblock: IpAddr, // the IP block this report came from
) -> bool {
let acckey =
AddressCheckCacheKey(routing_domain, flow.protocol_type(), flow.address_type());
// Add the currently seen socket address into the consistency table
let cache = self
.address_consistency_table
.entry(acckey)
.and_modify(|act| {
act.insert(reporting_ipblock, socket_address);
})
.or_insert_with(|| {
let mut lruc = LruCache::new(ADDRESS_CHECK_CACHE_SIZE);
lruc.insert(reporting_ipblock, socket_address);
lruc
});
// If we have at least N consistencies then trigger a detect
let mut consistencies = HashMap::<SocketAddress, usize>::new();
for (_k, v) in cache.iter() {
let count = *consistencies.entry(*v).and_modify(|e| *e += 1).or_insert(1);
if count >= ADDRESS_CONSISTENCY_DETECTION_COUNT {
log_net!(debug "Address consistency detected for {:?}: {}", acckey, v);
return true;
}
}
false
}
}

View File

@ -1,7 +1,6 @@
use super::*;
use alloc::collections::btree_map::Entry;
// XXX: Move to config eventually?
const PUNISHMENT_DURATION_MIN: usize = 60;
const MAX_PUNISHMENTS_BY_NODE_ID: usize = 65536;
const DIAL_INFO_FAILURE_DURATION_MIN: usize = 10;

View File

@ -4,6 +4,9 @@ use connection_table::*;
use network_connection::*;
use stop_token::future::FutureExt;
const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10);
const PROTECTED_CONNECTION_DROP_COUNT: usize = 3;
///////////////////////////////////////////////////////////
// Connection manager
@ -38,13 +41,20 @@ impl Drop for ConnectionRefScope {
}
}
#[derive(Debug)]
struct ProtectedAddress {
node_ref: NodeRef,
span_start_ts: Timestamp,
drops_in_span: usize,
}
#[derive(Debug)]
struct ConnectionManagerInner {
next_id: NetworkConnectionId,
sender: flume::Sender<ConnectionManagerEvent>,
async_processor_jh: Option<MustJoinHandle<()>>,
stop_source: Option<StopSource>,
protected_addresses: HashMap<SocketAddress, NodeRef>,
protected_addresses: HashMap<SocketAddress, ProtectedAddress>,
}
struct ConnectionManagerArc {
@ -191,7 +201,7 @@ impl ConnectionManager {
inner
.protected_addresses
.get(conn.flow().remote_address())
.cloned()
.map(|x| x.node_ref.clone())
}
// Update connection protections if things change, like a node becomes a relay
@ -205,8 +215,12 @@ impl ConnectionManager {
return;
};
// Get addresses for relays in all routing domains
inner.protected_addresses.clear();
// Protect addresses for relays in all routing domains
let mut dead_addresses = inner
.protected_addresses
.keys()
.cloned()
.collect::<HashSet<_>>();
for routing_domain in RoutingDomainSet::all() {
let Some(relay_node) = self
.network_manager()
@ -218,12 +232,28 @@ impl ConnectionManager {
for did in relay_node.dial_info_details() {
// SocketAddress are distinct per routing domain, so they should not collide
// and two nodes should never have the same SocketAddress
let protected_address = did.dial_info.socket_address();
// Update the protection, note the protected address is not dead
dead_addresses.remove(&protected_address);
inner
.protected_addresses
.insert(did.dial_info.socket_address(), relay_node.unfiltered());
.entry(protected_address)
.and_modify(|pa| pa.node_ref = relay_node.unfiltered())
.or_insert_with(|| ProtectedAddress {
node_ref: relay_node.unfiltered(),
span_start_ts: Timestamp::now(),
drops_in_span: 0usize,
});
}
}
// Remove protected addresses that were not still associated with a protected noderef
for dead_address in dead_addresses {
inner.protected_addresses.remove(&dead_address);
}
// For all connections, register the protection
self.arc
.connection_table
.with_all_connections_mut(|conn| {
@ -252,7 +282,7 @@ impl ConnectionManager {
// Get next connection id to use
let id = inner.next_id;
inner.next_id += 1u64;
log_net!(
log_net!(debug
"on_new_protocol_network_connection: id={} prot_conn={:?}",
id,
prot_conn
@ -366,7 +396,7 @@ impl ConnectionManager {
// Async lock on the remote address for atomicity per remote
let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr).await;
log_net!("== get_or_create_connection dial_info={:?}", dial_info);
log_net!(debug "== get_or_create_connection dial_info={:?}", dial_info);
// If any connection to this remote exists that has the same protocol, return it
// Any connection will do, we don't have to match the local address but if we can
@ -376,7 +406,7 @@ impl ConnectionManager {
.connection_table
.get_best_connection_by_remote(best_port, peer_address)
{
log_net!(
log_net!(debug
"== Returning best existing connection {:?}",
best_existing_conn
);
@ -561,7 +591,37 @@ impl ConnectionManager {
// If the connection closed while it was protected, report it on the node the connection was established on
// In-use connections will already get reported because they will cause a 'question_lost' stat on the remote node
if let Some(protect_nr) = conn.protected_node_ref() {
protect_nr.report_protected_connection_dropped();
// Find the protected address and increase our drop count
if let Some(inner) = self.arc.inner.lock().as_mut() {
for pa in inner.protected_addresses.values_mut() {
if pa.node_ref.same_entry(&protect_nr) {
// See if we've had more than the threshold number of drops in the last span
let cur_ts = Timestamp::now();
let duration = cur_ts.saturating_sub(pa.span_start_ts);
if duration < PROTECTED_CONNECTION_DROP_SPAN {
pa.drops_in_span += 1;
log_net!(debug "== Protected connection dropped (count={}): {} -> {} for node {}", pa.drops_in_span, conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
if pa.drops_in_span >= PROTECTED_CONNECTION_DROP_COUNT {
// Consider this as a failure to send if we've dropped the connection too many times in a single timespan
protect_nr.report_protected_connection_dropped();
// Reset the drop counter
pa.drops_in_span = 0;
pa.span_start_ts = cur_ts;
}
} else {
// Otherwise, just reset the drop detection span
pa.drops_in_span = 1;
pa.span_start_ts = cur_ts;
log_net!(debug "== Protected connection dropped (count={}): {} -> {} for node {}", pa.drops_in_span, conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
}
break;
}
}
}
}
let _ = sender.send_async(ConnectionManagerEvent::Dead(conn)).await;
}

View File

@ -5,6 +5,7 @@ mod native;
#[cfg(target_arch = "wasm32")]
mod wasm;
mod address_check;
mod address_filter;
mod connection_handle;
mod connection_manager;
@ -30,6 +31,7 @@ pub(crate) use stats::*;
pub use types::*;
////////////////////////////////////////////////////////////////////////////////////////
use address_check::*;
use address_filter::*;
use connection_handle::*;
use crypto::*;
@ -54,14 +56,6 @@ pub const IPADDR_TABLE_SIZE: usize = 1024;
pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration =
TimestampDuration::new(300_000_000u64); // 5 minutes
pub const NODE_CONTACT_METHOD_CACHE_SIZE: usize = 1024;
pub const PUBLIC_ADDRESS_CHANGE_CONSISTENCY_DETECTION_COUNT: usize = 3; // Number of consistent results in the cache during outbound-only to trigger detection
pub const PUBLIC_ADDRESS_CHANGE_INCONSISTENCY_DETECTION_COUNT: usize = 7; // Number of inconsistent results in the cache during inbound-capable to trigger detection
pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 10; // Length of consistent/inconsistent result cache for detection
pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60;
pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: TimestampDuration =
TimestampDuration::new(300_000_000u64); // 5 minutes
pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration =
TimestampDuration::new(3_600_000_000_u64); // 60 minutes
pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60;
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
@ -117,9 +111,6 @@ struct NodeContactMethodCacheKey {
target_node_ref_sequencing: Sequencing,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct PublicAddressCheckCacheKey(ProtocolType, AddressType);
enum SendDataToExistingFlowResult {
Sent(UniqueFlow),
NotSent(Vec<u8>),
@ -137,10 +128,7 @@ struct NetworkManagerInner {
stats: NetworkManagerStats,
client_allowlist: LruCache<TypedKey, ClientAllowlistEntry>,
node_contact_method_cache: LruCache<NodeContactMethodCacheKey, NodeContactMethod>,
public_internet_address_check_cache:
BTreeMap<PublicAddressCheckCacheKey, LruCache<IpAddr, SocketAddress>>,
public_internet_address_inconsistencies_table:
BTreeMap<PublicAddressCheckCacheKey, HashMap<IpAddr, Timestamp>>,
address_check: Option<AddressCheck>,
}
struct NetworkManagerUnlockedInner {
@ -158,7 +146,6 @@ struct NetworkManagerUnlockedInner {
update_callback: RwLock<Option<UpdateCallback>>,
// Background processes
rolling_transfers_task: TickTask<EyreReport>,
public_internet_address_check_task: TickTask<EyreReport>,
address_filter_task: TickTask<EyreReport>,
// Network Key
network_key: Option<SharedSecret>,
@ -178,8 +165,7 @@ impl NetworkManager {
stats: NetworkManagerStats::default(),
client_allowlist: LruCache::new_unbounded(),
node_contact_method_cache: LruCache::new(NODE_CONTACT_METHOD_CACHE_SIZE),
public_internet_address_check_cache: BTreeMap::new(),
public_internet_address_inconsistencies_table: BTreeMap::new(),
address_check: None,
}
}
fn new_unlocked_inner(
@ -205,10 +191,6 @@ impl NetworkManager {
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
public_internet_address_check_task: TickTask::new(
"public_address_check_task",
PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS,
),
address_filter_task: TickTask::new(
"address_filter_task",
ADDRESS_FILTER_TASK_INTERVAL_SECS,
@ -437,6 +419,20 @@ impl NetworkManager {
return Ok(StartupDisposition::BindRetry);
}
}
let (detect_address_changes, ip6_prefix_size) = self.with_config(|c| {
(
c.network.detect_address_changes,
c.network.max_connections_per_ip6_prefix_size as usize,
)
});
let address_check_config = AddressCheckConfig {
detect_address_changes,
ip6_prefix_size,
};
let address_check = AddressCheck::new(address_check_config, net.clone());
self.inner.lock().address_check = Some(address_check);
rpc_processor.startup().await?;
receipt_manager.startup().await?;
@ -474,6 +470,9 @@ impl NetworkManager {
// Cancel all tasks
self.cancel_tasks().await;
// Shutdown address check
self.inner.lock().address_check = Option::<AddressCheck>::None;
// Shutdown network components if they started up
log_net!(debug "shutting down network components");
@ -1196,4 +1195,48 @@ impl NetworkManager {
pub fn restart_network(&self) {
self.net().restart_network();
}
// If some other subsystem believes our dial info is no longer valid, this will trigger
// a re-check of the dial info and network class
pub fn set_needs_dial_info_check(&self, routing_domain: RoutingDomain) {
match routing_domain {
RoutingDomain::LocalNetwork => {
// nothing here yet
}
RoutingDomain::PublicInternet => {
self.net().set_needs_public_dial_info_check(None);
}
}
}
// Report peer info changes
pub fn report_peer_info_change(&self, peer_info: Arc<PeerInfo>) {
let mut inner = self.inner.lock();
if let Some(address_check) = inner.address_check.as_mut() {
address_check.report_peer_info_change(peer_info);
}
}
// Determine if our IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers
pub fn report_socket_address_change(
&self,
routing_domain: RoutingDomain, // the routing domain this flow is over
socket_address: SocketAddress, // the socket address as seen by the remote peer
old_socket_address: Option<SocketAddress>, // the socket address previously for this peer
flow: Flow, // the flow used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) {
let mut inner = self.inner.lock();
if let Some(address_check) = inner.address_check.as_mut() {
address_check.report_socket_address_change(
routing_domain,
socket_address,
old_socket_address,
flow,
reporting_peer,
);
}
}
}

View File

@ -6,6 +6,7 @@ use futures_util::stream::FuturesUnordered;
const PORT_MAP_VALIDATE_TRY_COUNT: usize = 3;
const PORT_MAP_VALIDATE_DELAY_MS: u32 = 500;
const PORT_MAP_TRY_COUNT: usize = 3;
const EXTERNAL_INFO_VALIDATIONS: usize = 5;
// Detection result of dial info detection futures
#[derive(Clone, Debug)]
@ -31,20 +32,15 @@ struct ExternalInfo {
}
struct DiscoveryContextInner {
// first node contacted
external_1: Option<ExternalInfo>,
// second node contacted
external_2: Option<ExternalInfo>,
external_info: Vec<ExternalInfo>,
}
struct DiscoveryContextUnlockedInner {
routing_table: RoutingTable,
net: Network,
clear_network_callback: ClearNetworkCallback,
// per-protocol
intf_addrs: Vec<SocketAddress>,
existing_external_address: Option<SocketAddress>,
protocol_type: ProtocolType,
address_type: AddressType,
port: u16,
@ -56,8 +52,6 @@ pub(super) struct DiscoveryContext {
inner: Arc<Mutex<DiscoveryContextInner>>,
}
pub(super) type ClearNetworkCallback = Arc<dyn Fn() -> SendPinBoxFuture<()> + Send + Sync>;
impl DiscoveryContext {
pub fn new(
routing_table: RoutingTable,
@ -65,44 +59,21 @@ impl DiscoveryContext {
protocol_type: ProtocolType,
address_type: AddressType,
port: u16,
clear_network_callback: ClearNetworkCallback,
) -> Self {
let intf_addrs =
Self::get_local_addresses(routing_table.clone(), protocol_type, address_type);
// Get the existing external address to check to see if it has changed
let existing_dial_info = routing_table.all_filtered_dial_info_details(
RoutingDomain::PublicInternet.into(),
&DialInfoFilter::default()
.with_address_type(address_type)
.with_protocol_type(protocol_type),
);
let existing_external_address = if existing_dial_info.len() == 1 {
Some(
existing_dial_info
.first()
.unwrap()
.dial_info
.socket_address(),
)
} else {
None
};
Self {
unlocked_inner: Arc::new(DiscoveryContextUnlockedInner {
routing_table,
net,
clear_network_callback,
intf_addrs,
existing_external_address,
protocol_type,
address_type,
port,
}),
inner: Arc::new(Mutex::new(DiscoveryContextInner {
external_1: None,
external_2: None,
external_info: Vec::new(),
})),
}
}
@ -153,12 +124,12 @@ impl DiscoveryContext {
}
);
log_net!(
log_network_result!(
debug "request_public_address {:?}: Value({:?})",
node_ref,
res.answer
);
res.answer.map(|si| si.socket_address)
res.answer.opt_sender_info.map(|si| si.socket_address)
}
// find fast peers with a particular address type, and ask them to tell us what our external address is
@ -260,40 +231,59 @@ impl DiscoveryContext {
unord.push(gpa_future);
// Always process two at a time so we get both addresses in parallel if possible
if unord.len() == 2 {
if unord.len() == EXTERNAL_INFO_VALIDATIONS {
// Process one
if let Some(Some(ei)) = unord.next().in_current_span().await {
external_address_infos.push(ei);
if external_address_infos.len() == 2 {
if external_address_infos.len() == EXTERNAL_INFO_VALIDATIONS {
break;
}
}
}
}
// Finish whatever is left if we need to
if external_address_infos.len() < 2 {
if external_address_infos.len() < EXTERNAL_INFO_VALIDATIONS {
while let Some(res) = unord.next().in_current_span().await {
if let Some(ei) = res {
external_address_infos.push(ei);
if external_address_infos.len() == 2 {
if external_address_infos.len() == EXTERNAL_INFO_VALIDATIONS {
break;
}
}
}
}
if external_address_infos.len() < 2 {
if external_address_infos.len() < EXTERNAL_INFO_VALIDATIONS {
log_net!(debug "not enough peers responded with an external address for type {:?}:{:?}",
protocol_type,
address_type);
return false;
}
// Try to make preferential port come first
external_address_infos.sort_by(|a, b| {
let acmp = a.address.ip_addr().cmp(&b.address.ip_addr());
if acmp != cmp::Ordering::Equal {
return acmp;
}
if a.address.port() == b.address.port() {
return cmp::Ordering::Equal;
}
if a.address.port() == self.unlocked_inner.port {
return cmp::Ordering::Less;
}
if b.address.port() == self.unlocked_inner.port {
return cmp::Ordering::Greater;
}
a.address.port().cmp(&b.address.port())
});
{
let mut inner = self.inner.lock();
inner.external_1 = Some(external_address_infos[0].clone());
log_net!(debug "external_1: {:?}", inner.external_1);
inner.external_2 = Some(external_address_infos[1].clone());
log_net!(debug "external_2: {:?}", inner.external_2);
inner.external_info = external_address_infos;
log_net!(debug "external_info ({:?}:{:?})[{}]",
self.unlocked_inner.protocol_type,
self.unlocked_inner.address_type,
inner.external_info.iter().map(|x| format!("{}",x.address)).collect::<Vec<_>>().join(", "));
}
true
@ -327,7 +317,7 @@ impl DiscoveryContext {
let low_level_protocol_type = protocol_type.low_level_protocol_type();
let address_type = self.unlocked_inner.address_type;
let local_port = self.unlocked_inner.port;
let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone();
let external_1 = self.inner.lock().external_info.first().unwrap().clone();
let igd_manager = self.unlocked_inner.net.unlocked_inner.igd_manager.clone();
let mut tries = 0;
@ -410,7 +400,7 @@ impl DiscoveryContext {
&self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectionResult>>>,
) {
let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone();
let external_1 = self.inner.lock().external_info.first().cloned().unwrap();
let this = self.clone();
let do_no_nat_fut: SendPinBoxFuture<Option<DetectionResult>> = Box::pin(async move {
@ -449,27 +439,68 @@ impl DiscoveryContext {
&self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectionResult>>>,
) {
// Get the external dial info for our use here
let (external_1, external_2) = {
let external_info = {
let inner = self.inner.lock();
(
inner.external_1.as_ref().unwrap().clone(),
inner.external_2.as_ref().unwrap().clone(),
)
inner.external_info.clone()
};
let local_port = self.unlocked_inner.port;
// If we have two different external address/port combinations, then this is a symmetric NAT
if external_2.address != external_1.address {
// Get the external dial info histogram for our use here
let mut external_info_addr_port_hist = HashMap::<SocketAddress, usize>::new();
let mut external_info_addr_hist = HashMap::<Address, usize>::new();
for ei in &external_info {
external_info_addr_port_hist
.entry(ei.address)
.and_modify(|n| *n += 1)
.or_insert(1);
external_info_addr_hist
.entry(ei.address.address())
.and_modify(|n| *n += 1)
.or_insert(1);
}
// If we have two different external addresses, then this is a symmetric NAT
// If just the port differs, and one is the preferential port we still accept
// this as an inbound capable dialinfo for holepunch
let different_addresses = external_info_addr_hist.len() > 1;
let mut best_external_info = None;
let mut local_port_matching_external_info = None;
let mut external_address_types = AddressTypeSet::new();
// Get the most popular external port from our sampling
// There will always be a best external info
let mut best_ei_address = None;
let mut best_ei_cnt = 0;
for eiph in &external_info_addr_port_hist {
if *eiph.1 > best_ei_cnt {
best_ei_address = Some(*eiph.0);
best_ei_cnt = *eiph.1;
}
}
// In preference order, pick out the best external address and if we have one the one that
// matches our local port number (may be the same)
for ei in &external_info {
if ei.address.port() == local_port && local_port_matching_external_info.is_none() {
local_port_matching_external_info = Some(ei.clone());
}
if best_ei_address.unwrap() == ei.address && best_external_info.is_none() {
best_external_info = Some(ei.clone());
}
external_address_types |= ei.address.address_type();
}
// There is a popular port on the best external info (more than one external address sample with same port)
let same_address_has_popular_port = !different_addresses && best_ei_cnt > 1;
// If we have different addresses in our samples, or no single address has a popular port
// then we consider this a symmetric NAT
if different_addresses || !same_address_has_popular_port {
let this = self.clone();
let do_symmetric_nat_fut: SendPinBoxFuture<Option<DetectionResult>> =
Box::pin(async move {
Some(DetectionResult {
ddi: DetectedDialInfo::SymmetricNAT,
external_address_types: AddressTypeSet::only(
external_1.address.address_type(),
) | AddressTypeSet::only(
external_2.address.address_type(),
),
external_address_types,
local_port: this.unlocked_inner.port,
})
});
@ -478,11 +509,12 @@ impl DiscoveryContext {
}
// Manual Mapping Detection
// If we have no external address that matches our local port, then lets try that port
// on our best external address and see if there's a port forward someone added manually
///////////
let this = self.clone();
let local_port = self.unlocked_inner.port;
if external_1.dial_info.port() != local_port {
let c_external_1 = external_1.clone();
if local_port_matching_external_info.is_none() && best_external_info.is_some() {
let c_external_1 = best_external_info.as_ref().unwrap().clone();
let c_this = this.clone();
let do_manual_map_fut: SendPinBoxFuture<Option<DetectionResult>> =
Box::pin(async move {
@ -534,7 +566,7 @@ impl DiscoveryContext {
let mut ord = FuturesOrdered::new();
let c_this = this.clone();
let c_external_1 = external_1.clone();
let c_external_1 = external_info.first().cloned().unwrap();
let do_full_cone_fut: SendPinBoxFuture<Option<DetectionResult>> =
Box::pin(async move {
// Let's see what kind of NAT we have
@ -566,8 +598,8 @@ impl DiscoveryContext {
ord.push_back(do_full_cone_fut);
let c_this = this.clone();
let c_external_1 = external_1.clone();
let c_external_2 = external_2.clone();
let c_external_1 = external_info.first().cloned().unwrap();
let c_external_2 = external_info.get(1).cloned().unwrap();
let do_restricted_cone_fut: SendPinBoxFuture<Option<DetectionResult>> =
Box::pin(async move {
// We are restricted, determine what kind of restriction
@ -656,30 +688,6 @@ impl DiscoveryContext {
return;
}
// Did external address change from the last time we made dialinfo?
// Disregard port for this because we only need to know if the ip address has changed
// If the port has changed it will change only for this protocol and will be overwritten individually by each protocol discover()
let some_clear_network_callback = {
let inner = self.inner.lock();
let ext_1 = inner.external_1.as_ref().unwrap().address.address();
let ext_2 = inner.external_2.as_ref().unwrap().address.address();
if (ext_1 != ext_2)
|| Some(ext_1)
!= self
.unlocked_inner
.existing_external_address
.map(|ea| ea.address())
{
// External address was not found, or has changed, go ahead and clear the network so we can do better
Some(self.unlocked_inner.clear_network_callback.clone())
} else {
None
}
};
if let Some(clear_network_callback) = some_clear_network_callback {
clear_network_callback().in_current_span().await;
}
// UPNP Automatic Mapping
///////////
if enable_upnp {
@ -710,9 +718,20 @@ impl DiscoveryContext {
///////////
// If our local interface list contains external_1 then there is no NAT in place
let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone();
let local_address_in_external_info = self
.inner
.lock()
.external_info
.iter()
.find_map(|ei| {
self.unlocked_inner
.intf_addrs
.contains(&ei.address)
.then_some(true)
})
.unwrap_or_default();
if self.unlocked_inner.intf_addrs.contains(&external_1.address) {
if local_address_in_external_info {
self.protocol_process_no_nat(unord).await;
} else {
self.protocol_process_nat(unord).await;

View File

@ -26,87 +26,26 @@ impl Network {
out
}
#[instrument(level = "trace", skip(self), err)]
pub async fn update_with_detected_dial_info(&self, ddi: DetectedDialInfo) -> EyreResult<()> {
let existing_network_class = self
.routing_table()
.get_network_class(RoutingDomain::PublicInternet)
.unwrap_or_default();
#[instrument(level = "trace", skip(self, editor), err)]
pub async fn update_with_detected_dial_info(
&self,
editor: &mut RoutingDomainEditorPublicInternet,
ddi: DetectedDialInfo,
) -> EyreResult<bool> {
match ddi {
DetectedDialInfo::SymmetricNAT => {
// If we get any symmetric nat dialinfo, this whole network class is outbound only,
// and all dial info should be treated as invalid
if !matches!(existing_network_class, NetworkClass::OutboundOnly) {
let mut editor = self.routing_table().edit_public_internet_routing_domain();
editor.clear_dial_info_details(None, None);
editor.set_network_class(Some(NetworkClass::OutboundOnly));
editor.commit(true).await;
}
Ok(true)
}
DetectedDialInfo::Detected(did) => {
// get existing dial info into table by protocol/address type
let mut existing_dial_info =
BTreeMap::<(ProtocolType, AddressType), DialInfoDetail>::new();
for did in self.routing_table().all_filtered_dial_info_details(
RoutingDomain::PublicInternet.into(),
&DialInfoFilter::all(),
) {
// Only need to keep one per pt/at pair, since they will all have the same dialinfoclass
existing_dial_info.insert(
(did.dial_info.protocol_type(), did.dial_info.address_type()),
did,
);
}
// We got a dial info, upgrade everything unless we are fixed to outbound only due to a symmetric nat
if !matches!(existing_network_class, NetworkClass::OutboundOnly) {
// Get existing dial info for protocol/address type combination
let pt = did.dial_info.protocol_type();
let at = did.dial_info.address_type();
// We got a dialinfo, add it and tag us as inbound capable
editor.add_dial_info(did.dial_info.clone(), did.class);
editor.set_network_class(Some(NetworkClass::InboundCapable));
// See what operations to perform with this dialinfo
let mut clear = false;
let mut add = false;
if let Some(edi) = existing_dial_info.get(&(pt, at)) {
// Is the dial info class better than our existing dial info?
// Or is the new dial info the same class, but different? Only change if things are different.
if did.class < edi.class
|| (did.class == edi.class && did.dial_info != edi.dial_info)
{
// Better or same dial info class was found, clear existing dialinfo for this pt/at pair
// Only keep one dial info per protocol/address type combination
clear = true;
add = true;
}
// Otherwise, don't upgrade, don't add, this is worse than what we have already
} else {
// No existing dial info of this type accept it, no need to upgrade, but add it
add = true;
}
if clear || add {
let mut editor = self.routing_table().edit_public_internet_routing_domain();
if clear {
editor.clear_dial_info_details(
Some(did.dial_info.address_type()),
Some(did.dial_info.protocol_type()),
);
}
if add {
editor.add_dial_info(did.dial_info.clone(), did.class);
}
editor.set_network_class(Some(NetworkClass::InboundCapable));
editor.commit(true).await;
}
}
Ok(false)
}
}
Ok(())
}
#[instrument(level = "trace", skip(self), err)]
@ -176,25 +115,9 @@ impl Network {
);
editor.commit(true).await;
// Create a callback to clear the network if we need to 'start over'
let this = self.clone();
let clear_network_callback: ClearNetworkCallback = Arc::new(move || {
let this = this.clone();
Box::pin(async move {
// Ensure we only do this once per network class discovery
{
let mut inner = this.inner.lock();
if inner.network_already_cleared {
return;
}
inner.network_already_cleared = true;
}
let mut editor = this.routing_table().edit_public_internet_routing_domain();
editor.clear_dial_info_details(None, None);
editor.set_network_class(None);
editor.commit(true).await;
})
});
// Start from scratch
editor.clear_dial_info_details(None, None);
editor.set_network_class(None);
// Process all protocol and address combinations
let mut unord = FuturesUnordered::new();
@ -208,13 +131,14 @@ impl Network {
*first_pt,
*at,
*port,
clear_network_callback.clone(),
// clear_network_callback.clone(),
);
discovery_context.discover(&mut unord).await;
}
// Wait for all discovery futures to complete and apply discoverycontexts
let mut all_address_types = AddressTypeSet::new();
let mut force_outbound_only = false;
loop {
match unord
.next()
@ -224,7 +148,9 @@ impl Network {
{
Ok(Some(Some(dr))) => {
// Found some new dial info for this protocol/address combination
self.update_with_detected_dial_info(dr.ddi.clone()).await?;
force_outbound_only |= self
.update_with_detected_dial_info(&mut editor, dr.ddi.clone())
.await?;
// Add the external address kinds to the set we've seen
all_address_types |= dr.external_address_types;
@ -247,7 +173,9 @@ impl Network {
class: did.class,
});
// Add additional dialinfo
self.update_with_detected_dial_info(additional_ddi).await?;
force_outbound_only |= self
.update_with_detected_dial_info(&mut editor, additional_ddi)
.await?;
}
}
}
@ -267,9 +195,13 @@ impl Network {
}
// All done
log_net!(debug "Network class discovery finished with address_types {:?}", all_address_types);
if force_outbound_only {
editor.clear_dial_info_details(None, None);
editor.set_network_class(Some(NetworkClass::OutboundOnly));
}
// Set the address types we've seen
editor.setup_network(
protocol_config.outbound,

View File

@ -473,7 +473,7 @@ impl NetworkConnection {
format!(" PROTECTED:{}", pnr)
} else {
"".to_owned()
}
},
)
}
}

View File

@ -1,15 +0,0 @@
use super::*;
impl NetworkManager {
// Determine if a local IP address has changed
// this means we should restart the low level network and and recreate all of our dial info
// Wait until we have received confirmation from N different peers
pub fn report_local_network_socket_address(
&self,
_socket_address: SocketAddress,
_flow: Flow,
_reporting_peer: NodeRef,
) {
// XXX: Nothing here yet.
}
}

View File

@ -1,5 +1,3 @@
pub mod local_network_address_check;
pub mod public_internet_address_check;
pub mod rolling_transfers;
use super::*;
@ -20,20 +18,6 @@ impl NetworkManager {
});
}
// Set public internet address check task
{
let this = self.clone();
self.unlocked_inner
.public_internet_address_check_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().public_internet_address_check_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
// Set address filter task
{
let this = self.clone();

View File

@ -1,287 +0,0 @@
use super::*;
impl NetworkManager {
// Clean up the public address check tables, removing entries that have timed out
#[instrument(parent = None, level = "trace", skip_all, err)]
pub(crate) async fn public_internet_address_check_task_routine(
self,
_stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
// go through public_address_inconsistencies_table and time out things that have expired
let mut inner = self.inner.lock();
for pait_v in inner
.public_internet_address_inconsistencies_table
.values_mut()
{
pait_v.retain(|_addr, exp_ts| {
// Keep it if it's in the future
*exp_ts > cur_ts
});
}
Ok(())
}
// Determine if a global IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers
pub fn report_public_internet_socket_address(
&self,
socket_address: SocketAddress, // the socket address as seen by the remote peer
flow: Flow, // the flow used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) {
log_network_result!("report_public_internet_socket_address:\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer);
// Ignore these reports if we are currently detecting public dial info
let net = self.net();
if net.needs_public_dial_info_check() {
return;
}
// Ignore flows that do not start from our listening port (unbound connections etc),
// because a router is going to map these differently
let Some(pla) =
net.get_preferred_local_address_by_key(flow.protocol_type(), flow.address_type())
else {
return;
};
let Some(local) = flow.local() else {
return;
};
if local.port() != pla.port() {
log_network_result!(debug "ignoring public internet address report because local port did not match listener: {} != {}", local.port(), pla.port());
return;
}
// Get our current published peer info
let routing_table = self.routing_table();
let Some(published_peer_info) =
routing_table.get_published_peer_info(RoutingDomain::PublicInternet)
else {
return;
};
// If we are a webapp we should skip this completely
// because we will never get inbound dialinfo directly on our public ip address
// If we have an invalid network class, this is not necessary yet
let public_internet_network_class = published_peer_info
.signed_node_info()
.node_info()
.network_class();
if matches!(public_internet_network_class, NetworkClass::WebApp) {
return;
}
let (detect_address_changes, ip6_prefix_size) = self.with_config(|c| {
(
c.network.detect_address_changes,
c.network.max_connections_per_ip6_prefix_size as usize,
)
});
// Get the ip(block) this report is coming from
let reporting_ipblock = ip_to_ipblock(ip6_prefix_size, flow.remote_address().ip_addr());
// Reject public address reports from nodes that we know are behind symmetric nat or
// nodes that must be using a relay for everything
let Some(node_info) = reporting_peer.node_info(RoutingDomain::PublicInternet) else {
return;
};
if node_info.network_class() != NetworkClass::InboundCapable {
return;
}
// If the socket address reported is the same as the reporter, then this is coming through a relay
// or it should be ignored due to local proximity (nodes on the same network block should not be trusted as
// public ip address reporters, only disinterested parties)
if reporting_ipblock == ip_to_ipblock(ip6_prefix_size, socket_address.ip_addr()) {
return;
}
// Check if the public address report is coming from a node/block that gives an 'inconsistent' location
// meaning that the node may be not useful for public address detection
// This is done on a per address/protocol basis
let mut inner = self.inner.lock();
let inner = &mut *inner;
let addr_proto_type_key =
PublicAddressCheckCacheKey(flow.protocol_type(), flow.address_type());
if inner
.public_internet_address_inconsistencies_table
.get(&addr_proto_type_key)
.map(|pait| pait.contains_key(&reporting_ipblock))
.unwrap_or(false)
{
return;
}
// Insert this new public address into the lru cache for the address check
// if we've seen this address before, it brings it to the front
let pacc = inner
.public_internet_address_check_cache
.entry(addr_proto_type_key)
.or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE));
pacc.insert(reporting_ipblock, socket_address);
// Determine if our external address has likely changed
let mut bad_public_internet_address_detection_punishment: Option<
Box<dyn FnOnce() + Send + 'static>,
> = None;
let needs_public_internet_address_detection = if matches!(
public_internet_network_class,
NetworkClass::InboundCapable
) {
// Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed
let dial_info_filter = flow.make_dial_info_filter();
// Get current external ip/port from registered global dialinfo
let current_addresses: BTreeSet<SocketAddress> = published_peer_info
.signed_node_info()
.node_info()
.filtered_dial_info_details(DialInfoDetail::NO_SORT, |did| {
did.matches_filter(&dial_info_filter)
})
.iter()
.map(|did| {
// Strip port from direct and mapped addresses
// as the incoming dialinfo may not match the outbound
// connections' NAT mapping. In this case we only check for IP address changes.
if did.class == DialInfoClass::Direct || did.class == DialInfoClass::Mapped {
did.dial_info.socket_address().with_port(0)
} else {
did.dial_info.socket_address()
}
})
.collect();
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
// then we zap the network class and re-detect it
// Keep list of the origin ip blocks of inconsistent public address reports
let mut inconsistencies = Vec::new();
// Iteration goes from most recent to least recent node/address pair
for (reporting_ip_block, a) in pacc {
// If this address is not one of our current addresses (inconsistent)
// and we haven't already denylisted the reporting source,
// Also check address with port zero in the event we are only checking changes to ip addresses
if !current_addresses.contains(a)
&& !current_addresses.contains(&a.with_port(0))
&& !inner
.public_internet_address_inconsistencies_table
.get(&addr_proto_type_key)
.map(|pait| pait.contains_key(reporting_ip_block))
.unwrap_or(false)
{
// Record the origin of the inconsistency
log_network_result!(debug "inconsistency added from {:?}: reported {:?} with current_addresses = {:?}", reporting_ip_block, a, current_addresses);
inconsistencies.push(*reporting_ip_block);
}
}
// If we have enough inconsistencies to consider changing our public dial info,
// add them to our denylist (throttling) and go ahead and check for new
// public dialinfo
let inconsistent =
if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_INCONSISTENCY_DETECTION_COUNT {
let exp_ts = Timestamp::now() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
let pait = inner
.public_internet_address_inconsistencies_table
.entry(addr_proto_type_key)
.or_default();
for i in &inconsistencies {
pait.insert(*i, exp_ts);
}
// Run this routine if the inconsistent nodes turn out to be lying
let this = self.clone();
bad_public_internet_address_detection_punishment = Some(Box::new(move || {
// xxx does this even work??
let mut inner = this.inner.lock();
let pait = inner
.public_internet_address_inconsistencies_table
.entry(addr_proto_type_key)
.or_default();
let exp_ts =
Timestamp::now() + PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
for i in inconsistencies {
pait.insert(i, exp_ts);
}
}));
true
} else {
false
};
// // debug code
// if inconsistent {
// log_net!("report_public_internet_socket_address: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner
// .public_address_check_cache, current_addresses, inconsistencies);
// }
inconsistent
} else if matches!(public_internet_network_class, NetworkClass::OutboundOnly) {
// If we are currently outbound only, we don't have any public dial info
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
let mut consistencies = 0;
let mut consistent = false;
let mut current_address = Option::<SocketAddress>::None;
// Iteration goes from most recent to least recent node/address pair
for (_, a) in pacc {
if let Some(current_address) = current_address {
if current_address == *a {
consistencies += 1;
if consistencies >= PUBLIC_ADDRESS_CHANGE_CONSISTENCY_DETECTION_COUNT {
consistent = true;
break;
}
}
} else {
current_address = Some(*a);
}
}
consistent
} else {
// If we are a webapp we never do this.
// If we have invalid network class, then public address detection is already going to happen via the network_class_discovery task
// we should have checked for this condition earlier at the top of this function
unreachable!();
};
if needs_public_internet_address_detection {
if detect_address_changes {
// Reset the address check cache now so we can start detecting fresh
info!("PublicInternet address has changed, detecting public dial info");
log_net!(debug "report_public_internet_socket_address:\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer);
log_net!(debug
"public_internet_address_check_cache: {:#?}",
inner.public_internet_address_check_cache
);
inner.public_internet_address_check_cache.clear();
// Re-detect the public dialinfo
net.set_needs_public_dial_info_check(
bad_public_internet_address_detection_punishment,
);
} else {
warn!("PublicInternet address may have changed. Restarting the server may be required.");
warn!("report_public_internet_socket_address:\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer);
warn!(
"public_internet_address_check_cache: {:#?}",
inner.public_internet_address_check_cache
);
}
}
}
}

View File

@ -259,7 +259,7 @@ impl DialInfo {
Self::WSS(di) => di.socket_address.ip_addr(),
}
}
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
#[expect(dead_code)]
pub fn port(&self) -> u16 {
match self {
Self::UDP(di) => di.socket_address.port(),

View File

@ -47,11 +47,6 @@ impl Flow {
pub fn address_type(&self) -> AddressType {
self.remote.address_type()
}
pub fn make_dial_info_filter(&self) -> DialInfoFilter {
DialInfoFilter::all()
.with_protocol_type(self.protocol_type())
.with_address_type(self.address_type())
}
}
impl MatchesDialInfoFilter for Flow {

View File

@ -87,13 +87,16 @@ impl From<BucketEntryStateReason> for BucketEntryState {
}
}
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub(crate) struct LastFlowKey(ProtocolType, AddressType);
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub(crate) struct LastFlowKey(pub ProtocolType, pub AddressType);
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub(crate) struct LastSenderInfoKey(pub RoutingDomain, pub ProtocolType, pub AddressType);
/// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct BucketEntryPublicInternet {
/// The PublicInternet node info
/// The PublicInternet node infoe
signed_node_info: Option<Box<SignedNodeInfo>>,
/// The last node info timestamp of ours that this entry has seen
last_seen_our_node_info_ts: Timestamp,
@ -130,6 +133,9 @@ pub(crate) struct BucketEntryInner {
/// The last flows used to contact this node, per protocol type
#[serde(skip)]
last_flows: BTreeMap<LastFlowKey, (Flow, Timestamp)>,
/// Last seen senderinfo per protocol/address type
#[serde(skip)]
last_sender_info: HashMap<LastSenderInfoKey, SenderInfo>,
/// The node info for this entry on the publicinternet routing domain
public_internet: BucketEntryPublicInternet,
/// The node info for this entry on the localnetwork routing domain
@ -910,6 +916,19 @@ impl BucketEntryInner {
self.peer_stats.rpc_stats.failed_to_send += 1;
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
}
pub(super) fn report_sender_info(
&mut self,
key: LastSenderInfoKey,
sender_info: SenderInfo,
) -> Option<SenderInfo> {
let last_sender_info = self.last_sender_info.insert(key, sender_info);
if last_sender_info != Some(sender_info) {
// Return last senderinfo if this new one is different
last_sender_info
} else {
None
}
}
}
#[derive(Debug)]
@ -930,6 +949,7 @@ impl BucketEntry {
envelope_support: Vec::new(),
updated_since_last_network_change: false,
last_flows: BTreeMap::new(),
last_sender_info: HashMap::new(),
local_network: BucketEntryLocalNetwork {
last_seen_our_node_info_ts: Timestamp::new(0u64),
signed_node_info: None,

View File

@ -297,4 +297,18 @@ pub trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait {
e.failed_to_send(ts, expects_answer);
})
}
fn report_sender_info(
&self,
routing_domain: RoutingDomain,
protocol_type: ProtocolType,
address_type: AddressType,
sender_info: SenderInfo,
) -> Option<SenderInfo> {
self.operate_mut(|_rti, e| {
e.report_sender_info(
LastSenderInfoKey(routing_domain, protocol_type, address_type),
sender_info,
)
})
}
}

View File

@ -1726,15 +1726,15 @@ impl RouteSpecStore {
/// Clear caches when local our local node info changes
#[instrument(level = "trace", target = "route", skip(self))]
pub fn reset(&self) {
log_rtab!(debug "flushing route spec store");
pub fn reset_cache(&self) {
log_rtab!(debug "resetting route cache");
let inner = &mut *self.inner.lock();
// Clean up local allocated routes
// Clean up local allocated routes (does not delete allocated routes, set republication flag)
inner.content.reset_details();
// Reset private route cache
// Reset private route cache (does not delete imported routes)
inner.cache.reset_remote_private_routes();
}

View File

@ -113,33 +113,41 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
}
fn publish_peer_info(&self, rti: &RoutingTableInner) -> bool {
let pi = self.get_peer_info(rti);
let peer_info = {
let pi = self.get_peer_info(rti);
// If the network class is not yet determined, don't publish
if pi.signed_node_info().node_info().network_class() == NetworkClass::Invalid {
log_rtab!(debug "[LocalNetwork] Not publishing peer info with invalid network class");
return false;
}
// If we need a relay and we don't have one, don't publish yet
if let Some(_relay_kind) = pi.signed_node_info().node_info().requires_relay() {
if pi.signed_node_info().relay_ids().is_empty() {
log_rtab!(debug "[LocalNetwork] Not publishing peer info that wants relay until we have a relay");
// If the network class is not yet determined, don't publish
if pi.signed_node_info().node_info().network_class() == NetworkClass::Invalid {
log_rtab!(debug "[LocalNetwork] Not publishing peer info with invalid network class");
return false;
}
}
// Don't publish if the peer info hasnt changed from our previous publication
let mut ppi_lock = self.published_peer_info.lock();
if let Some(old_peer_info) = &*ppi_lock {
if pi.equivalent(old_peer_info) {
log_rtab!(debug "[LocalNetwork] Not publishing peer info because it is equivalent");
return false;
// If we need a relay and we don't have one, don't publish yet
if let Some(_relay_kind) = pi.signed_node_info().node_info().requires_relay() {
if pi.signed_node_info().relay_ids().is_empty() {
log_rtab!(debug "[LocalNetwork] Not publishing peer info that wants relay until we have a relay");
return false;
}
}
}
log_rtab!(debug "[LocalNetwork] Published new peer info: {:#?}", pi);
*ppi_lock = Some(pi);
// Don't publish if the peer info hasnt changed from our previous publication
let mut ppi_lock = self.published_peer_info.lock();
if let Some(old_peer_info) = &*ppi_lock {
if pi.equivalent(old_peer_info) {
log_rtab!(debug "[LocalNetwork] Not publishing peer info because it is equivalent");
return false;
}
}
log_rtab!(debug "[LocalNetwork] Published new peer info: {:#?}", pi);
*ppi_lock = Some(pi.clone());
pi
};
rti.unlocked_inner
.network_manager()
.report_peer_info_change(peer_info);
true
}

View File

@ -242,10 +242,10 @@ impl RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet {
.write()
.publish_peer_info(RoutingDomain::PublicInternet);
// Clear the routespecstore cache if our PublicInternet dial info has changed
if changed {
// Clear the routespecstore cache if our PublicInternet dial info has changed
let rss = self.routing_table.route_spec_store();
rss.reset();
rss.reset_cache();
}
}

View File

@ -90,33 +90,41 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
}
fn publish_peer_info(&self, rti: &RoutingTableInner) -> bool {
let pi = self.get_peer_info(rti);
let peer_info = {
let pi = self.get_peer_info(rti);
// If the network class is not yet determined, don't publish
if pi.signed_node_info().node_info().network_class() == NetworkClass::Invalid {
log_rtab!(debug "[PublicInternet] Not publishing peer info with invalid network class");
return false;
}
// If we need a relay and we don't have one, don't publish yet
if let Some(_relay_kind) = pi.signed_node_info().node_info().requires_relay() {
if pi.signed_node_info().relay_ids().is_empty() {
log_rtab!(debug "[PublicInternet] Not publishing peer info that wants relay until we have a relay");
// If the network class is not yet determined, don't publish
if pi.signed_node_info().node_info().network_class() == NetworkClass::Invalid {
log_rtab!(debug "[PublicInternet] Not publishing peer info with invalid network class");
return false;
}
}
// Don't publish if the peer info hasnt changed from our previous publication
let mut ppi_lock = self.published_peer_info.lock();
if let Some(old_peer_info) = &*ppi_lock {
if pi.equivalent(old_peer_info) {
log_rtab!(debug "[PublicInternet] Not publishing peer info because it is equivalent");
return false;
// If we need a relay and we don't have one, don't publish yet
if let Some(_relay_kind) = pi.signed_node_info().node_info().requires_relay() {
if pi.signed_node_info().relay_ids().is_empty() {
log_rtab!(debug "[PublicInternet] Not publishing peer info that wants relay until we have a relay");
return false;
}
}
}
log_rtab!(debug "[PublicInternet] Published new peer info: {:#?}", pi);
*ppi_lock = Some(pi);
// Don't publish if the peer info hasnt changed from our previous publication
let mut ppi_lock = self.published_peer_info.lock();
if let Some(old_peer_info) = &*ppi_lock {
if pi.equivalent(old_peer_info) {
log_rtab!(debug "[PublicInternet] Not publishing peer info because it is equivalent");
return false;
}
}
log_rtab!(debug "[PublicInternet] Published new peer info: {:#?}", pi);
*ppi_lock = Some(pi.clone());
pi
};
rti.unlocked_inner
.network_manager()
.report_peer_info_change(peer_info);
true
}

View File

@ -14,8 +14,7 @@ use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::FutureExt;
use stop_token::future::FutureExt as StopFutureExt;
type PingValidatorFuture =
SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>;
type PingValidatorFuture = SendPinBoxFuture<Result<(), RPCError>>;
impl RoutingTable {
// Ping the relay to keep it alive, over every protocol it is relaying for us
@ -112,8 +111,10 @@ impl RoutingTable {
futurequeue.push_back(
async move {
rpc.rpc_call_status(Destination::direct(relay_nr_filtered))
.await
let _ = rpc
.rpc_call_status(Destination::direct(relay_nr_filtered))
.await?;
Ok(())
}
.boxed(),
);
@ -160,8 +161,10 @@ impl RoutingTable {
futurequeue.push_back(
async move {
rpc.rpc_call_status(Destination::direct(watch_nr.default_filtered()))
.await
let _ = rpc
.rpc_call_status(Destination::direct(watch_nr.default_filtered()))
.await?;
Ok(())
}
.boxed(),
);
@ -182,7 +185,7 @@ impl RoutingTable {
// Get all nodes needing pings in the PublicInternet routing domain
let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts);
// If we have a relay, let's ping for NAT keepalives
// If we have a relay, let's ping for NAT keepalives and check for address changes
self.relay_keepalive_public_internet(cur_ts, futurequeue)
.await?;
@ -195,7 +198,11 @@ impl RoutingTable {
let rpc = rpc.clone();
log_rtab!("--> Validator ping to {:?}", nr);
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr)).await }.boxed(),
async move {
let _ = rpc.rpc_call_status(Destination::direct(nr)).await?;
Ok(())
}
.boxed(),
);
}
@ -221,7 +228,11 @@ impl RoutingTable {
// Just do a single ping with the best protocol for all the nodes
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr)).await }.boxed(),
async move {
let _ = rpc.rpc_call_status(Destination::direct(nr)).await?;
Ok(())
}
.boxed(),
);
}
@ -264,8 +275,14 @@ impl RoutingTable {
.in_current_span()
.await
{
Ok(Some(_)) => {
Ok(Some(res)) => {
// Some ping completed
match res {
Ok(()) => {}
Err(e) => {
log_rtab!(error "Error performing status ping: {}", e);
}
}
}
Ok(None) => {
// We're empty

View File

@ -149,7 +149,7 @@ where
inner
.waiting_op_table
.remove(&op_id)
.ok_or_else(RPCError::else_internal(format!(
.ok_or_else(RPCError::else_ignore(format!(
"Unmatched operation id: {}",
op_id
)))?

View File

@ -62,6 +62,9 @@ impl RPCError {
pub fn map_ignore<M: ToString, X: ToString>(message: M) -> impl FnOnce(X) -> Self {
move |x| Self::Ignore(format!("{}: {}", message.to_string(), x.to_string()))
}
pub fn else_ignore<M: ToString>(message: M) -> impl FnOnce() -> Self {
move || Self::Ignore(message.to_string())
}
}
impl From<RPCError> for VeilidAPIError {

View File

@ -1,10 +1,16 @@
use super::*;
#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)]
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)]
pub struct SenderInfo {
pub socket_address: SocketAddress,
}
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)]
pub struct StatusResult {
pub opt_sender_info: Option<SenderInfo>,
pub opt_previous_sender_info: Option<SenderInfo>,
}
impl RPCProcessor {
// Send StatusQ RPC request, receive StatusA answer
// Can be sent via relays or routes, but will have less information via routes
@ -19,7 +25,7 @@ impl RPCProcessor {
pub async fn rpc_call_status(
self,
dest: Destination,
) -> RPCNetworkResult<Answer<Option<SenderInfo>>> {
) -> RPCNetworkResult<Answer<StatusResult>> {
let _guard = self
.unlocked_inner
.startup_lock
@ -105,6 +111,7 @@ impl RPCProcessor {
// Don't need to validate these addresses for the current routing domain
// the address itself is irrelevant, and the remote node can lie anyway
let mut opt_sender_info = None;
let mut opt_previous_sender_info = None;
match dest {
Destination::Direct {
node: target,
@ -120,24 +127,23 @@ impl RPCProcessor {
{
// Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as
// If this changes, we'd want to know about that to reset the networking stack
match routing_domain {
RoutingDomain::PublicInternet => self
.network_manager()
.report_public_internet_socket_address(
sender_info.socket_address,
send_data_method.unique_flow.flow,
target.unfiltered(),
),
RoutingDomain::LocalNetwork => {
self.network_manager().report_local_network_socket_address(
sender_info.socket_address,
send_data_method.unique_flow.flow,
target.unfiltered(),
)
}
}
opt_previous_sender_info = target.report_sender_info(
routing_domain,
send_data_method.unique_flow.flow.protocol_type(),
send_data_method.unique_flow.flow.address_type(),
sender_info,
);
};
opt_sender_info = Some(sender_info.clone());
opt_sender_info = Some(sender_info);
// Report ping status results to network manager
self.network_manager().report_socket_address_change(
routing_domain,
sender_info.socket_address,
opt_previous_sender_info.map(|s| s.socket_address),
send_data_method.unique_flow.flow,
target.unfiltered(),
);
}
}
}
@ -156,7 +162,10 @@ impl RPCProcessor {
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
opt_sender_info,
StatusResult {
opt_sender_info,
opt_previous_sender_info,
},
)))
}

View File

@ -272,7 +272,7 @@ impl StorageManager {
kind,
value_nodes: ctx.value_nodes.clone(),
};
log_network_result!(debug "GetValue Fanout: {:?}", fanout_result);
log_dht!(debug "GetValue Fanout: {:?}", fanout_result);
if let Err(e) = out_tx.send(Ok(OutboundGetValueResult {
fanout_result,

View File

@ -300,7 +300,7 @@ impl StorageManager {
fanout_results.push(fanout_result);
}
log_network_result!(debug "InspectValue Fanout ({:?}):\n{}", kind, debug_fanout_results(&fanout_results));
log_dht!(debug "InspectValue Fanout ({:?}):\n{}", kind, debug_fanout_results(&fanout_results));
Ok(OutboundInspectValueResult {
fanout_results,

View File

@ -88,7 +88,7 @@ impl StorageManager {
let context = context.clone();
let descriptor = descriptor.clone();
async move {
let send_descriptor = true; // xxx check if next_node needs the descriptor or not
let send_descriptor = true; // xxx check if next_node needs the descriptor or not, see issue #203
// get most recent value to send
let value = {
@ -274,7 +274,7 @@ impl StorageManager {
kind,
value_nodes: ctx.value_nodes.clone(),
};
log_network_result!(debug "SetValue Fanout: {:?}", fanout_result);
log_dht!(debug "SetValue Fanout: {:?}", fanout_result);
if let Err(e) = out_tx.send(Ok(OutboundSetValueResult {
fanout_result,

View File

@ -12,10 +12,10 @@ impl fmt::Debug for TimestampDuration {
}
impl TimestampDuration {
pub fn new_secs<N: num_traits::Unsigned + num_traits::ToPrimitive>(secs: N) -> Self {
TimestampDuration::new(secs.to_u64().unwrap() * 1_000_000u64)
pub const fn new_secs(secs: u32) -> Self {
TimestampDuration::new(secs as u64 * 1_000_000u64)
}
pub fn new_ms<N: num_traits::Unsigned + num_traits::ToPrimitive>(ms: N) -> Self {
TimestampDuration::new(ms.to_u64().unwrap() * 1_000u64)
pub const fn new_ms(ms: u64) -> Self {
TimestampDuration::new(ms * 1_000u64)
}
}

View File

@ -558,13 +558,6 @@ fn get_default_store_path(store_type: &str) -> String {
"".to_owned()
} else {
use std::path::PathBuf;
#[cfg(unix)]
{
let globalpath = PathBuf::from(format!("/var/db/veilid-server/{}", store_type));
if globalpath.exists() {
return globalpath.to_string_lossy().into();
}
}
ProjectDirs::from("org", "Veilid", "Veilid")
.map(|dirs| dirs.data_local_dir().to_path_buf())
.unwrap_or_else(|| PathBuf::from("./"))
@ -744,6 +737,18 @@ pub struct VeilidConfigInner {
pub network: VeilidConfigNetwork,
}
impl VeilidConfigInner {
/// Create a new 'VeilidConfigInner' for use with `setup_from_config`
/// Pick a program name and do not change it from release to release,
/// see `VeilidConfigInner::program_name` for details.
pub fn new(program_name: String) -> Self {
Self {
program_name,
..Default::default()
}
}
}
/// The configuration built for each Veilid node during API startup
#[derive(Clone)]
pub struct VeilidConfig {