Merge branch 'dht-consensus' into 'main'

improve dht consensus checking and low level networking

See merge request veilid/veilid!318
This commit is contained in:
Christien Rioux 2024-09-21 15:13:29 +00:00
commit f74df2b0c6
153 changed files with 5451 additions and 3562 deletions

View File

@ -1,4 +1,4 @@
**Changed in Veilid 0.3.3**
**Changed in Veilid 0.3.4**
- Crates updates
- Update crates to newer versions
- Remove veilid-async-tungstenite and veilid-async-tls crates as they are no longer needed

3
Cargo.lock generated
View File

@ -5886,6 +5886,7 @@ dependencies = [
"sharded-slab",
"smallvec",
"thread_local",
"time",
"tracing",
"tracing-core",
"tracing-log 0.2.0",
@ -6328,6 +6329,7 @@ dependencies = [
"async-tungstenite 0.27.0",
"backtrace",
"cfg-if 1.0.0",
"chrono",
"clap 4.5.15",
"color-eyre",
"config 0.14.0",
@ -6355,6 +6357,7 @@ dependencies = [
"signal-hook-async-std",
"stop-token",
"sysinfo",
"time",
"tokio",
"tokio-stream",
"tokio-util",

View File

@ -15,10 +15,11 @@ VERSION 0.7
# Ensure we are using an amd64 platform because some of these targets use cross-platform tooling
FROM ubuntu:18.04
ENV ZIG_VERSION=0.13.0-dev.46+3648d7df1
ENV ZIG_VERSION=0.13.0
ENV CMAKE_VERSION_MINOR=3.30
ENV CMAKE_VERSION_PATCH=3.30.1
ENV WASM_BINDGEN_CLI_VERSION=0.2.93
ENV RUST_VERSION=1.81.0
ENV RUSTUP_HOME=/usr/local/rustup
ENV RUSTUP_DIST_SERVER=https://static.rust-lang.org
ENV CARGO_HOME=/usr/local/cargo
@ -40,7 +41,7 @@ deps-base:
# Install Rust
deps-rust:
FROM +deps-base
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y -c clippy --no-modify-path --profile minimal
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain=$RUST_VERSION -y -c clippy --no-modify-path --profile minimal
RUN chmod -R a+w $RUSTUP_HOME $CARGO_HOME; \
rustup --version; \
cargo --version; \
@ -60,7 +61,7 @@ deps-rust:
# Caching tool
RUN cargo install cargo-chef
# Install Linux cross-platform tooling
RUN curl -O https://ziglang.org/builds/zig-linux-$(arch)-$ZIG_VERSION.tar.xz
RUN curl -O https://ziglang.org/download/$ZIG_VERSION/zig-linux-$(arch)-$ZIG_VERSION.tar.xz
RUN tar -C /usr/local -xJf zig-linux-$(arch)-$ZIG_VERSION.tar.xz
RUN mv /usr/local/zig-linux-$(arch)-$ZIG_VERSION /usr/local/zig
RUN cargo install cargo-zigbuild

View File

@ -9,6 +9,7 @@ authors = ["Veilid Team <contact@veilid.com>"]
edition = "2021"
license = "MPL-2.0"
resolver = "2"
rust-version = "1.81.0"
[[bin]]
name = "veilid-cli"

View File

@ -62,8 +62,6 @@ pub struct TextContent {
content: Arc<Mutex<TextContentInner>>,
}
#[allow(dead_code)]
impl TextContent {
/// Creates a new text content around the given value.
///
@ -129,6 +127,7 @@ impl TextContent {
}
/// Remove lines from the end until we have no more than 'count' from the beginning
#[expect(dead_code)]
pub fn resize_front(&self, count: usize) {
if self.get_content().len() <= count {
return;
@ -238,7 +237,6 @@ pub struct CachedTextView {
width: Option<usize>,
}
#[allow(dead_code)]
impl CachedTextView {
/// Creates a new TextView with the given content.
pub fn new<S>(content: S, cache_size: usize, max_lines: Option<usize>) -> Self
@ -281,6 +279,7 @@ impl CachedTextView {
}
/// Creates a new empty `TextView`.
#[expect(dead_code)]
pub fn empty(cache_size: usize, max_lines: Option<usize>) -> Self {
CachedTextView::new(ContentType::default(), cache_size, max_lines)
}
@ -295,6 +294,7 @@ impl CachedTextView {
///
/// Chainable variant.
#[must_use]
#[expect(dead_code)]
pub fn style<S: Into<StyleType>>(self, style: S) -> Self {
self.with(|s| s.set_style(style))
}
@ -303,6 +303,7 @@ impl CachedTextView {
///
/// This may be useful if you want horizontal scrolling.
#[must_use]
#[expect(dead_code)]
pub fn no_wrap(self) -> Self {
self.with(|s| s.set_content_wrap(false))
}
@ -317,6 +318,7 @@ impl CachedTextView {
/// Sets the horizontal alignment for this view.
#[must_use]
#[expect(dead_code)]
pub fn h_align(mut self, h: HAlign) -> Self {
self.align.h = h;
@ -325,6 +327,7 @@ impl CachedTextView {
/// Sets the vertical alignment for this view.
#[must_use]
#[expect(dead_code)]
pub fn v_align(mut self, v: VAlign) -> Self {
self.align.v = v;
@ -333,6 +336,7 @@ impl CachedTextView {
/// Sets the alignment for this view.
#[must_use]
#[expect(dead_code)]
pub fn align(mut self, a: Align) -> Self {
self.align = a;
@ -341,6 +345,7 @@ impl CachedTextView {
/// Center the text horizontally and vertically inside the view.
#[must_use]
#[expect(dead_code)]
pub fn center(mut self) -> Self {
self.align = Align::center();
self
@ -350,6 +355,7 @@ impl CachedTextView {
///
/// Chainable variant.
#[must_use]
#[expect(dead_code)]
pub fn content<S>(self, content: S) -> Self
where
S: Into<ContentType>,
@ -392,11 +398,13 @@ impl CachedTextView {
}
/// Returns the current text in this view.
#[cfg_attr(not(test), expect(dead_code))]
pub fn get_content(&self) -> TextContentRef {
TextContentInner::get_content(&self.content.content)
}
/// Returns a shared reference to the content, allowing content mutation.
#[expect(dead_code)]
pub fn get_shared_content(&mut self) -> TextContent {
// We take &mut here without really needing it,
// because it sort of "makes sense".

View File

@ -103,9 +103,15 @@ impl InteractiveUI {
println!("Error: {:?}", e);
break;
}
match readline.readline().timeout_at(done.clone()).await {
Ok(Ok(ReadlineEvent::Line(line))) => {
let line = line.trim();
if !line.is_empty() {
readline.add_history_entry(line.to_string());
}
if line == "clear" {
if let Err(e) = readline.clear() {
println!("Error: {:?}", e);
@ -183,7 +189,6 @@ impl InteractiveUI {
self.inner.lock().log_enabled = false;
}
} else if !line.is_empty() {
readline.add_history_entry(line.to_string());
let cmdproc = self.inner.lock().cmdproc.clone();
if let Some(cmdproc) = &cmdproc {
if let Err(e) = cmdproc.run_command(

View File

@ -36,7 +36,7 @@ struct CmdlineArgs {
#[arg(long, short = 'p')]
ipc_path: Option<PathBuf>,
/// Subnode index to use when connecting
#[arg(long, default_value = "0")]
#[arg(short('n'), long, default_value = "0")]
subnode_index: usize,
/// Address to connect to
#[arg(long, short = 'a')]

View File

@ -229,7 +229,7 @@ pub struct Settings {
}
impl Settings {
#[allow(dead_code)]
#[cfg_attr(windows, expect(dead_code))]
fn get_server_default_directory(subpath: &str) -> PathBuf {
#[cfg(unix)]
{

View File

@ -10,6 +10,7 @@ edition = "2021"
build = "build.rs"
license = "MPL-2.0"
resolver = "2"
rust-version = "1.81.0"
[lib]
crate-type = ["cdylib", "staticlib", "rlib"]

View File

@ -1,4 +1,3 @@
#![allow(dead_code)]
#![allow(clippy::absurd_extreme_comparisons)]
use super::*;
use crate::*;

View File

@ -1,4 +1,3 @@
#![allow(dead_code)]
#![allow(clippy::absurd_extreme_comparisons)]
use super::*;
use crate::*;

View File

@ -1,8 +1,8 @@
#![allow(dead_code)]
use super::*;
use crate::*;
pub async fn get_outbound_relay_peer() -> Option<crate::routing_table::PeerInfo> {
pub async fn get_outbound_relay_peer(
_routing_domain: routing_table::RoutingDomain,
) -> Option<Arc<routing_table::PeerInfo>> {
panic!("Native Veilid should never require an outbound relay");
}

View File

@ -1,8 +1,10 @@
use crate::*;
use super::*;
//use js_sys::*;
pub async fn get_outbound_relay_peer() -> Option<crate::routing_table::PeerInfo> {
pub async fn get_outbound_relay_peer(
_routing_domain: routing_table::RoutingDomain,
) -> Option<Arc<routing_table::PeerInfo>> {
// unimplemented!
None
}

View File

@ -1,4 +1,4 @@
pub static DEFAULT_LOG_FACILITIES_IGNORE_LIST: [&str; 28] = [
pub static DEFAULT_LOG_FACILITIES_IGNORE_LIST: [&str; 29] = [
"mio",
"h2",
"hyper",
@ -27,6 +27,7 @@ pub static DEFAULT_LOG_FACILITIES_IGNORE_LIST: [&str; 28] = [
"dht",
"fanout",
"receipt",
"rpc_message",
];
pub static FLAME_LOG_FACILITIES_IGNORE_LIST: [&str; 22] = [
@ -425,3 +426,31 @@ macro_rules! log_crypto {
trace!(target:"crypto", $fmt, $($arg),+);
}
}
#[macro_export]
macro_rules! log_rpc_message {
(error $text:expr) => { error!(
target: "rpc_message",
"{}",
$text,
)};
(error $fmt:literal, $($arg:expr),+) => {
error!(target:"crypto", $fmt, $($arg),+);
};
(warn $text:expr) => { warn!(
target: "rpc_message",
"{}",
$text,
)};
(warn $fmt:literal, $($arg:expr),+) => {
warn!(target:"crypto", $fmt, $($arg),+);
};
($text:expr) => {trace!(
target: "rpc_message",
"{}",
$text,
)};
($fmt:literal, $($arg:expr),+) => {
trace!(target:"rpc_message", $fmt, $($arg),+);
}
}

View File

@ -158,7 +158,7 @@ impl AddressFilter {
}
}
for key in dead_keys {
log_net!(debug ">>> FORGIVING: {}", key);
warn!("Forgiving: {}", key);
inner.punishments_by_ip4.remove(&key);
}
}
@ -174,7 +174,7 @@ impl AddressFilter {
}
}
for key in dead_keys {
log_net!(debug ">>> FORGIVING: {}", key);
warn!("Forgiving: {}", key);
inner.punishments_by_ip6_prefix.remove(&key);
}
}
@ -190,7 +190,7 @@ impl AddressFilter {
}
}
for key in dead_keys {
log_net!(debug ">>> FORGIVING: {}", key);
warn!("Forgiving: {}", key);
inner.punishments_by_node_id.remove(&key);
// make the entry alive again if it's still here
if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(key) {
@ -210,7 +210,7 @@ impl AddressFilter {
}
}
for key in dead_keys {
log_net!(debug ">>> DIALINFO PERMIT: {}", key);
log_net!(debug "DialInfo Permit: {}", key);
inner.dial_info_failures.remove(&key);
}
}
@ -259,10 +259,10 @@ impl AddressFilter {
let mut inner = self.inner.lock();
if inner.dial_info_failures.len() >= MAX_DIAL_INFO_FAILURES {
log_net!(debug ">>> DIALINFO FAILURE TABLE FULL: {}", dial_info);
warn!("DialInfo failure table full: {}", dial_info);
return;
}
log_net!(debug ">>> DIALINFO FAILURE: {:?}", dial_info);
log_net!(debug "DialInfo failure: {:?}", dial_info);
inner
.dial_info_failures
.entry(dial_info)
@ -279,7 +279,7 @@ impl AddressFilter {
}
pub fn punish_ip_addr(&self, addr: IpAddr, reason: PunishmentReason) {
log_net!(debug ">>> PUNISHED: {} for {:?}", addr, reason);
warn!("Punished: {} for {:?}", addr, reason);
let timestamp = Timestamp::now();
let punishment = Punishment { reason, timestamp };
@ -326,10 +326,10 @@ impl AddressFilter {
let mut inner = self.inner.lock();
if inner.punishments_by_node_id.len() >= MAX_PUNISHMENTS_BY_NODE_ID {
log_net!(debug ">>> PUNISHMENT TABLE FULL: {}", node_id);
warn!("Punishment table full: {}", node_id);
return;
}
log_net!(debug ">>> PUNISHED: {} for {:?}", node_id, reason);
warn!("Punished: {} for {:?}", node_id, reason);
inner
.punishments_by_node_id
.entry(node_id)
@ -372,7 +372,7 @@ impl AddressFilter {
let cnt = inner.conn_count_by_ip4.entry(v4).or_default();
assert!(*cnt <= self.unlocked_inner.max_connections_per_ip4);
if *cnt == self.unlocked_inner.max_connections_per_ip4 {
warn!("address filter count exceeded: {:?}", v4);
warn!("Address filter count exceeded: {:?}", v4);
return Err(AddressFilterError::CountExceeded);
}
// See if this ip block has connected too frequently
@ -383,7 +383,7 @@ impl AddressFilter {
});
assert!(tstamps.len() <= self.unlocked_inner.max_connection_frequency_per_min);
if tstamps.len() == self.unlocked_inner.max_connection_frequency_per_min {
warn!("address filter rate exceeded: {:?}", v4);
warn!("Address filter rate exceeded: {:?}", v4);
return Err(AddressFilterError::RateExceeded);
}
@ -396,14 +396,14 @@ impl AddressFilter {
let cnt = inner.conn_count_by_ip6_prefix.entry(v6).or_default();
assert!(*cnt <= self.unlocked_inner.max_connections_per_ip6_prefix);
if *cnt == self.unlocked_inner.max_connections_per_ip6_prefix {
warn!("address filter count exceeded: {:?}", v6);
warn!("Address filter count exceeded: {:?}", v6);
return Err(AddressFilterError::CountExceeded);
}
// See if this ip block has connected too frequently
let tstamps = inner.conn_timestamps_by_ip6_prefix.entry(v6).or_default();
assert!(tstamps.len() <= self.unlocked_inner.max_connection_frequency_per_min);
if tstamps.len() == self.unlocked_inner.max_connection_frequency_per_min {
warn!("address filter rate exceeded: {:?}", v6);
warn!("Address filter rate exceeded: {:?}", v6);
return Err(AddressFilterError::RateExceeded);
}

View File

@ -26,12 +26,12 @@ impl ConnectionHandle {
}
}
#[allow(dead_code)]
#[expect(dead_code)]
pub fn connection_id(&self) -> NetworkConnectionId {
self.connection_id
}
#[allow(dead_code)]
#[expect(dead_code)]
pub fn flow(&self) -> Flow {
self.flow
}

View File

@ -9,7 +9,6 @@ use stop_token::future::FutureExt;
#[derive(Debug)]
enum ConnectionManagerEvent {
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
Accepted(ProtocolNetworkConnection),
Dead(NetworkConnection),
}
@ -45,6 +44,7 @@ struct ConnectionManagerInner {
sender: flume::Sender<ConnectionManagerEvent>,
async_processor_jh: Option<MustJoinHandle<()>>,
stop_source: Option<StopSource>,
protected_addresses: HashMap<SocketAddress, NodeRef>,
}
struct ConnectionManagerArc {
@ -80,6 +80,7 @@ impl ConnectionManager {
stop_source: Some(stop_source),
sender,
async_processor_jh: Some(async_processor_jh),
protected_addresses: HashMap::new(),
}
}
fn new_arc(network_manager: NetworkManager) -> ConnectionManagerArc {
@ -182,25 +183,61 @@ impl ConnectionManager {
// Internal routine to see if we should keep this connection
// from being LRU removed. Used on our initiated relay connections.
fn should_protect_connection(&self, conn: &NetworkConnection) -> Option<NodeRef> {
let netman = self.network_manager();
let routing_table = netman.routing_table();
let remote_address = conn.flow().remote_address().address();
let routing_domain = routing_table.routing_domain_for_address(remote_address)?;
let relay_node = routing_table.relay_node(routing_domain)?;
let relay_nr = relay_node.filtered_clone(
NodeRefFilter::new()
.with_routing_domain(routing_domain)
.with_address_type(conn.flow().address_type())
.with_protocol_type(conn.flow().protocol_type()),
);
let dids = relay_nr.all_filtered_dial_info_details();
for did in dids {
if did.dial_info.address() == remote_address {
return Some(relay_nr);
fn should_protect_connection(
&self,
inner: &mut ConnectionManagerInner,
conn: &NetworkConnection,
) -> Option<NodeRef> {
inner
.protected_addresses
.get(conn.flow().remote_address())
.cloned()
}
// Update connection protections if things change, like a node becomes a relay
pub fn update_protections(&self) {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return;
};
let mut lock = self.arc.inner.lock();
let Some(inner) = lock.as_mut() else {
return;
};
// Get addresses for relays in all routing domains
inner.protected_addresses.clear();
for routing_domain in RoutingDomainSet::all() {
let Some(relay_node) = self
.network_manager()
.routing_table()
.relay_node(routing_domain)
else {
continue;
};
for did in relay_node.dial_info_details() {
// SocketAddress are distinct per routing domain, so they should not collide
// and two nodes should never have the same SocketAddress
inner
.protected_addresses
.insert(did.dial_info.socket_address(), relay_node.unfiltered());
}
}
None
self.arc
.connection_table
.with_all_connections_mut(|conn| {
if let Some(protect_nr) = conn.protected_node_ref() {
if self.should_protect_connection(inner, conn).is_none() {
log_net!(debug "== Unprotecting connection: {} -> {} for node {}", conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
conn.unprotect();
}
} else if let Some(protect_nr) = self.should_protect_connection(inner, conn) {
log_net!(debug "== Protecting existing connection: {} -> {} for node {}", conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
conn.protect(protect_nr);
}
Option::<()>::None
});
}
// Internal routine to register new connection atomically.
@ -231,8 +268,8 @@ impl ConnectionManager {
let handle = conn.get_handle();
// See if this should be a protected connection
if let Some(protect_nr) = self.should_protect_connection(&conn) {
log_net!(debug "== PROTECTING connection: {} -> {} for node {}", id, conn.debug_print(Timestamp::now()), protect_nr);
if let Some(protect_nr) = self.should_protect_connection(inner, &conn) {
log_net!(debug "== Protecting new connection: {} -> {} for node {}", id, conn.debug_print(Timestamp::now()), protect_nr);
conn.protect(protect_nr);
}
@ -472,7 +509,7 @@ impl ConnectionManager {
// Called by low-level network when any connection-oriented protocol connection appears
// either from incoming connections.
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
pub(super) async fn on_accepted_protocol_network_connection(
&self,
protocol_connection: ProtocolNetworkConnection,

View File

@ -246,6 +246,58 @@ impl ConnectionTable {
let _ = inner.conn_by_id[protocol_index].get(&id).unwrap();
}
#[expect(dead_code)]
pub fn with_connection_by_flow<R, F: FnOnce(&NetworkConnection) -> R>(
&self,
flow: Flow,
closure: F,
) -> Option<R> {
if flow.protocol_type() == ProtocolType::UDP {
return None;
}
let inner = self.inner.lock();
let id = *inner.id_by_flow.get(&flow)?;
let protocol_index = Self::protocol_to_index(flow.protocol_type());
let out = inner.conn_by_id[protocol_index].peek(&id).unwrap();
Some(closure(out))
}
#[expect(dead_code)]
pub fn with_connection_by_flow_mut<R, F: FnOnce(&mut NetworkConnection) -> R>(
&self,
flow: Flow,
closure: F,
) -> Option<R> {
if flow.protocol_type() == ProtocolType::UDP {
return None;
}
let mut inner = self.inner.lock();
let id = *inner.id_by_flow.get(&flow)?;
let protocol_index = Self::protocol_to_index(flow.protocol_type());
let out = inner.conn_by_id[protocol_index].peek_mut(&id).unwrap();
Some(closure(out))
}
pub fn with_all_connections_mut<R, F: FnMut(&mut NetworkConnection) -> Option<R>>(
&self,
mut closure: F,
) -> Option<R> {
let mut inner_lock = self.inner.lock();
let inner = &mut *inner_lock;
for (id, idx) in inner.protocol_index_by_id.iter() {
if let Some(conn) = inner.conn_by_id[*idx].peek_mut(id) {
if let Some(out) = closure(conn) {
return Some(out);
}
}
}
None
}
//#[instrument(level = "trace", skip(self), ret)]
pub fn ref_connection_by_id(
&self,
@ -304,7 +356,7 @@ impl ConnectionTable {
}
//#[instrument(level = "trace", skip(self), ret)]
#[allow(dead_code)]
#[expect(dead_code)]
pub fn get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec<NetworkConnectionId> {
let inner = self.inner.lock();
inner

View File

@ -34,7 +34,7 @@ impl NetworkManager {
// Direct bootstrap request
#[instrument(level = "trace", target = "net", err, skip(self))]
pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult<Vec<PeerInfo>> {
pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult<Vec<Arc<PeerInfo>>> {
let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms);
// Send boot magic to requested peer address
let data = BOOT_MAGIC.to_vec();
@ -51,6 +51,6 @@ impl NetworkManager {
deserialize_json(std::str::from_utf8(&out_data).wrap_err("bad utf8 in boot peerinfo")?)
.wrap_err("failed to deserialize boot peerinfo")?;
Ok(bootstrap_peerinfo)
Ok(bootstrap_peerinfo.into_iter().map(Arc::new).collect())
}
}

View File

@ -54,8 +54,9 @@ pub const IPADDR_TABLE_SIZE: usize = 1024;
pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration =
TimestampDuration::new(300_000_000u64); // 5 minutes
pub const NODE_CONTACT_METHOD_CACHE_SIZE: usize = 1024;
pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 5;
pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 10;
pub const PUBLIC_ADDRESS_CHANGE_CONSISTENCY_DETECTION_COUNT: usize = 3; // Number of consistent results in the cache during outbound-only to trigger detection
pub const PUBLIC_ADDRESS_CHANGE_INCONSISTENCY_DETECTION_COUNT: usize = 7; // Number of inconsistent results in the cache during inbound-capable to trigger detection
pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 10; // Length of consistent/inconsistent result cache for detection
pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60;
pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: TimestampDuration =
TimestampDuration::new(300_000_000u64); // 5 minutes
@ -64,18 +65,6 @@ pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration
pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60;
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
#[derive(Clone, Debug, Default)]
pub struct ProtocolConfig {
pub outbound: ProtocolTypeSet,
pub inbound: ProtocolTypeSet,
pub family_global: AddressTypeSet,
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
pub family_local: AddressTypeSet,
pub public_internet_capabilities: Vec<FourCC>,
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
pub local_network_capabilities: Vec<FourCC>,
}
// Things we get when we start up and go away when we shut down
// Routing table is not in here because we want it to survive a network shutdown/startup restart
#[derive(Clone)]
@ -111,20 +100,20 @@ pub(crate) enum NodeContactMethod {
/// Contact the node directly
Direct(DialInfo),
/// Request via signal the node connect back directly (relay, target)
SignalReverse(NodeRef, NodeRef),
SignalReverse(FilteredNodeRef, FilteredNodeRef),
/// Request via signal the node negotiate a hole punch (relay, target)
SignalHolePunch(NodeRef, NodeRef),
SignalHolePunch(FilteredNodeRef, FilteredNodeRef),
/// Must use an inbound relay to reach the node
InboundRelay(NodeRef),
InboundRelay(FilteredNodeRef),
/// Must use outbound relay to reach the node
OutboundRelay(NodeRef),
OutboundRelay(FilteredNodeRef),
}
#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct NodeContactMethodCacheKey {
node_ids: TypedKeyGroup,
own_node_info_ts: Timestamp,
target_node_info_ts: Timestamp,
target_node_ref_filter: Option<NodeRefFilter>,
target_node_ref_filter: NodeRefFilter,
target_node_ref_sequencing: Sequencing,
}
@ -139,7 +128,7 @@ enum SendDataToExistingFlowResult {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StartupDisposition {
Success,
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
BindRetry,
}
@ -148,9 +137,9 @@ struct NetworkManagerInner {
stats: NetworkManagerStats,
client_allowlist: LruCache<TypedKey, ClientAllowlistEntry>,
node_contact_method_cache: LruCache<NodeContactMethodCacheKey, NodeContactMethod>,
public_address_check_cache:
public_internet_address_check_cache:
BTreeMap<PublicAddressCheckCacheKey, LruCache<IpAddr, SocketAddress>>,
public_address_inconsistencies_table:
public_internet_address_inconsistencies_table:
BTreeMap<PublicAddressCheckCacheKey, HashMap<IpAddr, Timestamp>>,
}
@ -169,7 +158,7 @@ struct NetworkManagerUnlockedInner {
update_callback: RwLock<Option<UpdateCallback>>,
// Background processes
rolling_transfers_task: TickTask<EyreReport>,
public_address_check_task: TickTask<EyreReport>,
public_internet_address_check_task: TickTask<EyreReport>,
address_filter_task: TickTask<EyreReport>,
// Network Key
network_key: Option<SharedSecret>,
@ -189,8 +178,8 @@ impl NetworkManager {
stats: NetworkManagerStats::default(),
client_allowlist: LruCache::new_unbounded(),
node_contact_method_cache: LruCache::new(NODE_CONTACT_METHOD_CACHE_SIZE),
public_address_check_cache: BTreeMap::new(),
public_address_inconsistencies_table: BTreeMap::new(),
public_internet_address_check_cache: BTreeMap::new(),
public_internet_address_inconsistencies_table: BTreeMap::new(),
}
}
fn new_unlocked_inner(
@ -216,7 +205,7 @@ impl NetworkManager {
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
public_address_check_task: TickTask::new(
public_internet_address_check_task: TickTask::new(
"public_address_check_task",
PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS,
),
@ -525,6 +514,7 @@ impl NetworkManager {
log_net!(debug "finished network manager shutdown");
}
#[expect(dead_code)]
pub fn update_client_allowlist(&self, client: TypedKey) {
let mut inner = self.inner.lock();
match inner.client_allowlist.entry(client) {
@ -693,7 +683,7 @@ impl NetworkManager {
pub async fn handle_in_band_receipt<R: AsRef<[u8]>>(
&self,
receipt_data: R,
inbound_noderef: NodeRef,
inbound_noderef: FilteredNodeRef,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("network is not started");
@ -779,11 +769,8 @@ impl NetworkManager {
let rpc = self.rpc_processor();
// Add the peer info to our routing table
let mut peer_nr = match routing_table.register_node_with_peer_info(
RoutingDomain::PublicInternet,
peer_info,
false,
) {
let mut peer_nr = match routing_table.register_node_with_peer_info(peer_info, false)
{
Ok(nr) => nr,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!(
@ -808,11 +795,8 @@ impl NetworkManager {
let rpc = self.rpc_processor();
// Add the peer info to our routing table
let mut peer_nr = match routing_table.register_node_with_peer_info(
RoutingDomain::PublicInternet,
peer_info,
false,
) {
let mut peer_nr = match routing_table.register_node_with_peer_info(peer_info, false)
{
Ok(nr) => nr,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!(
@ -826,9 +810,8 @@ impl NetworkManager {
let outbound_nrf = routing_table
.get_outbound_node_ref_filter(RoutingDomain::PublicInternet)
.with_protocol_type(ProtocolType::UDP);
peer_nr.set_filter(Some(outbound_nrf));
let Some(hole_punch_dial_info_detail) = peer_nr.first_filtered_dial_info_detail()
else {
peer_nr.set_filter(outbound_nrf);
let Some(hole_punch_dial_info_detail) = peer_nr.first_dial_info_detail() else {
return Ok(NetworkResult::no_connection_other(format!(
"No hole punch capable dialinfo found for node: {}",
peer_nr
@ -836,10 +819,10 @@ impl NetworkManager {
};
// Now that we picked a specific dialinfo, further restrict the noderef to the specific address type
let filter = peer_nr.take_filter().unwrap();
let filter = peer_nr.take_filter();
let filter =
filter.with_address_type(hole_punch_dial_info_detail.dial_info.address_type());
peer_nr.set_filter(Some(filter));
peer_nr.set_filter(filter);
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
@ -912,7 +895,7 @@ impl NetworkManager {
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn send_envelope<B: AsRef<[u8]>>(
&self,
node_ref: NodeRef,
node_ref: FilteredNodeRef,
destination_node_ref: Option<NodeRef>,
body: B,
) -> EyreResult<NetworkResult<SendDataMethod>> {
@ -920,7 +903,7 @@ impl NetworkManager {
return Ok(NetworkResult::no_connection_other("network is not started"));
};
let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone();
let destination_node_ref = destination_node_ref.unwrap_or_else(|| node_ref.unfiltered());
let best_node_id = destination_node_ref.best_node_id();
// Get node's envelope versions and see if we can send to it
@ -1110,7 +1093,7 @@ impl NetworkManager {
.resolve_node(recipient_id, SafetySelection::Unsafe(Sequencing::default()))
.await
{
Ok(v) => v,
Ok(v) => v.map(|nr| nr.default_filtered()),
Err(e) => {
log_net!(debug "failed to resolve recipient node for relay, dropping outbound relayed packet: {}" ,e);
return Ok(false);
@ -1125,7 +1108,7 @@ impl NetworkManager {
// should be mutually in each others routing tables. The node needing the relay will be
// pinging this node regularly to keep itself in the routing table
match routing_table.lookup_node_ref(recipient_id) {
Ok(v) => v,
Ok(v) => v.map(|nr| nr.default_filtered()),
Err(e) => {
log_net!(debug "failed to look up recipient node for relay, dropping outbound relayed packet: {}" ,e);
return Ok(false);
@ -1182,7 +1165,8 @@ impl NetworkManager {
};
// Cache the envelope information in the routing table
let mut source_noderef = match routing_table.register_node_with_existing_connection(
let source_noderef = match routing_table.register_node_with_existing_connection(
routing_domain,
envelope.get_sender_typed_id(),
flow,
ts,
@ -1196,9 +1180,6 @@ impl NetworkManager {
};
source_noderef.add_envelope_version(envelope.get_version());
// Enforce routing domain
source_noderef.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain));
// Pass message to RPC system
if let Err(e) =
rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)

View File

@ -136,7 +136,7 @@ impl DiscoveryContext {
// Ask for a public address check from a particular noderef
// This is done over the normal port using RPC
#[instrument(level = "trace", skip(self), ret)]
async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddress> {
async fn request_public_address(&self, node_ref: FilteredNodeRef) -> Option<SocketAddress> {
let rpc = self.unlocked_inner.routing_table.rpc_processor();
let res = network_result_value_or_log!(match rpc.rpc_call_status(Destination::direct(node_ref.clone())).await {
@ -217,7 +217,7 @@ impl DiscoveryContext {
let nodes = self
.unlocked_inner
.routing_table
.find_fast_public_nodes_filtered(node_count, filters);
.find_fast_non_local_nodes_filtered(routing_domain, node_count, filters);
if nodes.is_empty() {
log_net!(debug
"no external address detection peers of type {:?}:{:?}",
@ -232,7 +232,7 @@ impl DiscoveryContext {
let get_public_address_func = |node: NodeRef| {
let this = self.clone();
let node = node.filtered_clone(
let node = node.custom_filtered(
NodeRefFilter::new()
.with_routing_domain(routing_domain)
.with_dial_info_filter(dial_info_filter),
@ -246,7 +246,7 @@ impl DiscoveryContext {
return Some(ExternalInfo {
dial_info,
address,
node,
node: node.unfiltered(),
});
}
None
@ -308,12 +308,7 @@ impl DiscoveryContext {
) -> bool {
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
// asking for node validation doesn't have to use the dial info filter of the dial info we are validating
let mut node_ref = node_ref.clone();
node_ref.set_filter(None);
// ask the node to send us a dial info validation receipt
match rpc_processor
.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect)
.await

View File

@ -1,15 +1,17 @@
mod discovery_context;
mod igd_manager;
mod network_class_discovery;
mod network_state;
mod network_tcp;
mod network_udp;
mod protocol;
mod start_protocols;
mod tasks;
use super::*;
use crate::routing_table::*;
use connection_manager::*;
use discovery_context::*;
use network_state::*;
use network_tcp::*;
use protocol::tcp::RawTcpProtocolHandler;
use protocol::udp::RawUdpProtocolHandler;
@ -75,26 +77,19 @@ struct NetworkInner {
/// set if the network needs to be restarted due to a low level configuration change
/// such as dhcp release or change of address or interfaces being added or removed
network_needs_restart: bool,
/// the calculated protocol configuration for inbound/outbound protocols
protocol_config: ProtocolConfig,
/// set of statically configured protocols with public dialinfo
static_public_dialinfo: ProtocolTypeSet,
/// join handles for all the low level network background tasks
join_handles: Vec<MustJoinHandle<()>>,
/// stop source for shutting down the low level network background tasks
stop_source: Option<StopSource>,
/// does our network have ipv4 on any network?
enable_ipv4: bool,
/// does our network have ipv6 on the global internet?
enable_ipv6_global: bool,
/// does our network have ipv6 on the local network?
enable_ipv6_local: bool,
/// set if we need to calculate our public dial info again
needs_public_dial_info_check: bool,
/// set if we have yet to clear the network during public dial info checking
network_already_cleared: bool,
/// the punishment closure to enax
public_dial_info_check_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
/// Actual bound addresses per protocol
bound_address_per_protocol: BTreeMap<ProtocolType, Vec<SocketAddr>>,
/// mapping of protocol handlers to accept messages from a set of bound socket addresses
udp_protocol_handlers: BTreeMap<SocketAddr, RawUdpProtocolHandler>,
/// outbound udp protocol handler for udpv4
@ -107,8 +102,10 @@ struct NetworkInner {
listener_states: BTreeMap<SocketAddr, Arc<RwLock<ListenerState>>>,
/// Preferred local addresses for protocols/address combinations for outgoing connections
preferred_local_addresses: BTreeMap<(ProtocolType, AddressType), SocketAddr>,
/// The list of stable interface addresses we have last seen
stable_interface_addresses_at_startup: Vec<IpAddr>,
/// set of statically configured protocols with public dialinfo
static_public_dial_info: ProtocolTypeSet,
/// Network state
network_state: Option<NetworkState>,
}
struct NetworkUnlockedInner {
@ -125,6 +122,7 @@ struct NetworkUnlockedInner {
update_network_class_task: TickTask<EyreReport>,
network_interfaces_task: TickTask<EyreReport>,
upnp_task: TickTask<EyreReport>,
network_task_lock: AsyncMutex<()>,
// Managers
igd_manager: igd_manager::IGDManager,
@ -144,20 +142,17 @@ impl Network {
needs_public_dial_info_check: false,
network_already_cleared: false,
public_dial_info_check_punishment: None,
protocol_config: Default::default(),
static_public_dialinfo: ProtocolTypeSet::empty(),
join_handles: Vec::new(),
stop_source: None,
enable_ipv4: false,
enable_ipv6_global: false,
enable_ipv6_local: false,
bound_address_per_protocol: BTreeMap::new(),
udp_protocol_handlers: BTreeMap::new(),
default_udpv4_protocol_handler: None,
default_udpv6_protocol_handler: None,
tls_acceptor: None,
listener_states: BTreeMap::new(),
preferred_local_addresses: BTreeMap::new(),
stable_interface_addresses_at_startup: Vec::new(),
static_public_dial_info: ProtocolTypeSet::new(),
network_state: None,
}
}
@ -176,6 +171,7 @@ impl Network {
update_network_class_task: TickTask::new("update_network_class_task", 1),
network_interfaces_task: TickTask::new("network_interfaces_task", 1),
upnp_task: TickTask::new("upnp_task", 1),
network_task_lock: AsyncMutex::new(()),
igd_manager: igd_manager::IGDManager::new(config.clone()),
}
}
@ -195,31 +191,7 @@ impl Network {
)),
};
// Set update network class tick task
{
let this2 = this.clone();
this.unlocked_inner
.update_network_class_task
.set_routine(move |s, l, t| {
Box::pin(this2.clone().update_network_class_task_routine(s, l, t))
});
}
// Set network interfaces tick task
{
let this2 = this.clone();
this.unlocked_inner
.network_interfaces_task
.set_routine(move |s, l, t| {
Box::pin(this2.clone().network_interfaces_task_routine(s, l, t))
});
}
// Set upnp tick task
{
let this2 = this.clone();
this.unlocked_inner
.upnp_task
.set_routine(move |s, l, t| Box::pin(this2.clone().upnp_task_routine(s, l, t)));
}
this.setup_tasks();
this
}
@ -301,11 +273,11 @@ impl Network {
inner.join_handles.push(jh);
}
fn translate_unspecified_address(&self, from: &SocketAddr) -> Vec<SocketAddr> {
fn translate_unspecified_address(&self, from: SocketAddr) -> Vec<SocketAddr> {
if !from.ip().is_unspecified() {
vec![*from]
vec![from]
} else {
let addrs = self.get_stable_interface_addresses();
let addrs = self.last_network_state().stable_interface_addresses;
addrs
.iter()
.filter_map(|a| {
@ -336,46 +308,6 @@ impl Network {
inner.preferred_local_addresses.get(&key).copied()
}
pub(crate) fn is_stable_interface_address(&self, addr: IpAddr) -> bool {
let stable_addrs = self.get_stable_interface_addresses();
stable_addrs.contains(&addr)
}
pub(crate) fn get_stable_interface_addresses(&self) -> Vec<IpAddr> {
let addrs = self.unlocked_inner.interfaces.stable_addresses();
let mut addrs: Vec<IpAddr> = addrs
.into_iter()
.filter(|addr| {
let address = Address::from_ip_addr(*addr);
address.is_local() || address.is_global()
})
.collect();
addrs.sort();
addrs.dedup();
addrs
}
// See if our interface addresses have changed, if so redo public dial info if necessary
async fn check_interface_addresses(&self) -> EyreResult<bool> {
if !self
.unlocked_inner
.interfaces
.refresh()
.await
.wrap_err("failed to check network interfaces")?
{
return Ok(false);
}
let mut inner = self.inner.lock();
let new_stable_interface_addresses = self.get_stable_interface_addresses();