Merge branch 'dialinfo-work' into 'main'

Support for DialInfo failover

See merge request veilid/veilid!143
This commit is contained in:
Christien Rioux 2023-08-22 19:55:14 +00:00
commit 41af6d4c5b
18 changed files with 668 additions and 443 deletions

30
Cargo.lock generated
View File

@ -854,9 +854,9 @@ dependencies = [
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.82" version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@ -1533,9 +1533,9 @@ dependencies = [
[[package]] [[package]]
name = "dashmap" name = "dashmap"
version = "5.5.0" version = "5.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d" checksum = "edd72493923899c6f10c641bdbdeddc7183d6396641d99c1a0d1597f37f92e28"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"hashbrown 0.14.0", "hashbrown 0.14.0",
@ -2249,9 +2249,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.20" version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049" checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833"
dependencies = [ dependencies = [
"bytes 1.4.0", "bytes 1.4.0",
"fnv", "fnv",
@ -3757,12 +3757,12 @@ dependencies = [
[[package]] [[package]]
name = "petgraph" name = "petgraph"
version = "0.6.3" version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9"
dependencies = [ dependencies = [
"fixedbitset", "fixedbitset",
"indexmap 1.9.3", "indexmap 2.0.0",
] ]
[[package]] [[package]]
@ -4463,9 +4463,9 @@ dependencies = [
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.183" version = "1.0.185"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" checksum = "be9b6f69f1dfd54c3b568ffa45c310d6973a5e5148fd40cf515acaf38cf5bc31"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
@ -4491,9 +4491,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.183" version = "1.0.185"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" checksum = "dc59dfdcbad1437773485e0367fea4b090a2e0a16d9ffc46af47764536a298ec"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -4750,9 +4750,9 @@ dependencies = [
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.8" version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [ dependencies = [
"autocfg", "autocfg",
] ]

View File

@ -13,8 +13,24 @@ crate-type = ["cdylib", "staticlib", "rlib"]
# Common features # Common features
default = ["enable-crypto-vld0"] default = ["enable-crypto-vld0"]
rt-async-std = ["async-std", "async-std-resolver", "async_executors/async_std", "rtnetlink/smol_socket", "veilid-tools/rt-async-std"] rt-async-std = [
rt-tokio = ["tokio", "tokio-util", "tokio-stream", "trust-dns-resolver/tokio-runtime", "async_executors/tokio_tp", "async_executors/tokio_io", "async_executors/tokio_timer", "rtnetlink/tokio_socket", "veilid-tools/rt-tokio"] "async-std",
"async-std-resolver",
"async_executors/async_std",
"rtnetlink/smol_socket",
"veilid-tools/rt-async-std",
]
rt-tokio = [
"tokio",
"tokio-util",
"tokio-stream",
"trust-dns-resolver/tokio-runtime",
"async_executors/tokio_tp",
"async_executors/tokio_io",
"async_executors/tokio_timer",
"rtnetlink/tokio_socket",
"veilid-tools/rt-tokio",
]
rt-wasm-bindgen = ["veilid-tools/rt-wasm-bindgen", "async_executors/bindgen"] rt-wasm-bindgen = ["veilid-tools/rt-wasm-bindgen", "async_executors/bindgen"]
# Crypto support features # Crypto support features
@ -36,7 +52,7 @@ network-result-extra = ["veilid-tools/network-result-extra"]
[dependencies] [dependencies]
# Tools # Tools
veilid-tools = { path = "../veilid-tools", features = [ "tracing" ] } veilid-tools = { path = "../veilid-tools", features = ["tracing"] }
paste = "1.0.14" paste = "1.0.14"
once_cell = "1.18.0" once_cell = "1.18.0"
owning_ref = "0.4.1" owning_ref = "0.4.1"
@ -57,7 +73,7 @@ eyre = "0.6.8"
thiserror = "1.0.47" thiserror = "1.0.47"
# Data structures # Data structures
enumset = { version= "1.1.2", features = ["serde"] } enumset = { version = "1.1.2", features = ["serde"] }
keyvaluedb = { path = "../external/keyvaluedb/keyvaluedb" } keyvaluedb = { path = "../external/keyvaluedb/keyvaluedb" }
range-set-blaze = "0.1.9" range-set-blaze = "0.1.9"
weak-table = "0.3.2" weak-table = "0.3.2"
@ -65,15 +81,31 @@ generic-array = "0.14.7"
hashlink = { path = "../external/hashlink", features = ["serde_impl"] } hashlink = { path = "../external/hashlink", features = ["serde_impl"] }
# System # System
futures-util = { version = "0.3.28", default_features = false, features = ["alloc"] } futures-util = { version = "0.3.28", default_features = false, features = [
"alloc",
] }
flume = { version = "0.11.0", features = ["async"] } flume = { version = "0.11.0", features = ["async"] }
parking_lot = "0.12.1" parking_lot = "0.12.1"
stop-token = { version = "0.7.0", default-features = false } stop-token = { version = "0.7.0", default-features = false }
# Crypto # Crypto
ed25519-dalek = { version = "2.0.0", default_features = false, features = ["alloc", "rand_core", "digest"] } ed25519-dalek = { version = "2.0.0", default_features = false, features = [
x25519-dalek = { version = "2.0.0", default_features = false, features = ["alloc", "static_secrets"] } "alloc",
curve25519-dalek = { version = "4.0.0", default_features = false, features = ["alloc"] } "rand_core",
"digest",
"zeroize",
] }
x25519-dalek = { version = "2.0.0", default_features = false, features = [
"alloc",
"static_secrets",
"zeroize",
"precomputed-tables",
] }
curve25519-dalek = { version = "4.0.0", default_features = false, features = [
"alloc",
"zeroize",
"precomputed-tables",
] }
blake3 = { version = "1.4.1" } blake3 = { version = "1.4.1" }
chacha20poly1305 = "0.10.1" chacha20poly1305 = "0.10.1"
chacha20 = "0.9.1" chacha20 = "0.9.1"
@ -82,17 +114,20 @@ argon2 = "0.5.1"
# Network # Network
async-std-resolver = { version = "0.22.0", optional = true } async-std-resolver = { version = "0.22.0", optional = true }
trust-dns-resolver = { version = "0.22.0", optional = true } trust-dns-resolver = { version = "0.22.0", optional = true }
enum-as-inner = "=0.5.1" # temporary fix for trust-dns-resolver v0.22.0 enum-as-inner = "=0.5.1" # temporary fix for trust-dns-resolver v0.22.0
# Serialization # Serialization
capnp = { version = "0.17.2", default_features = false } capnp = { version = "0.17.2", default_features = false }
serde = { version = "1.0.183", features = ["derive" ] } serde = { version = "1.0.183", features = ["derive"] }
serde_json = { version = "1.0.105" } serde_json = { version = "1.0.105" }
serde-big-array = "0.5.1" serde-big-array = "0.5.1"
json = "0.12.4" json = "0.12.4"
data-encoding = { version = "2.4.0" } data-encoding = { version = "2.4.0" }
schemars = "0.8.12" schemars = "0.8.12"
lz4_flex = { version = "0.11.1", default-features = false, features = ["safe-encode", "safe-decode"] } lz4_flex = { version = "0.11.1", default-features = false, features = [
"safe-encode",
"safe-decode",
] }
# Dependencies for native builds only # Dependencies for native builds only
# Linux, Windows, Mac, iOS, Android # Linux, Windows, Mac, iOS, Android
@ -106,12 +141,17 @@ libc = "0.2.147"
nix = "0.26.2" nix = "0.26.2"
# System # System
async-std = { version = "1.12.0", features = ["unstable"], optional = true} async-std = { version = "1.12.0", features = ["unstable"], optional = true }
tokio = { version = "1.32.0", features = ["full"], optional = true} tokio = { version = "1.32.0", features = ["full"], optional = true }
tokio-util = { version = "0.7.8", features = ["compat"], optional = true} tokio-util = { version = "0.7.8", features = ["compat"], optional = true }
tokio-stream = { version = "0.1.14", features = ["net"], optional = true} tokio-stream = { version = "0.1.14", features = ["net"], optional = true }
async-io = { version = "1.13.0" } async-io = { version = "1.13.0" }
futures-util = { version = "0.3.28", default-features = false, features = ["async-await", "sink", "std", "io"] } futures-util = { version = "0.3.28", default-features = false, features = [
"async-await",
"sink",
"std",
"io",
] }
# Data structures # Data structures
keyring-manager = { path = "../external/keyring-manager" } keyring-manager = { path = "../external/keyring-manager" }
@ -119,7 +159,7 @@ keyvaluedb-sqlite = { path = "../external/keyvaluedb/keyvaluedb-sqlite" }
# Network # Network
async-tungstenite = { version = "0.23.0", features = ["async-tls"] } async-tungstenite = { version = "0.23.0", features = ["async-tls"] }
igd = { path = "../external/rust-igd" } igd = { path = "../external/rust-igd" }
async-tls = "0.12.0" async-tls = "0.12.0"
webpki = "0.22.0" webpki = "0.22.0"
webpki-roots = "0.25.2" webpki-roots = "0.25.2"
@ -134,7 +174,10 @@ socket2 = { version = "0.5.3", features = ["all"] }
getrandom = { version = "0.2.4", features = ["js"] } getrandom = { version = "0.2.4", features = ["js"] }
# System # System
async_executors = { version = "0.7.0", default-features = false, features = [ "bindgen", "timer" ]} async_executors = { version = "0.7.0", default-features = false, features = [
"bindgen",
"timer",
] }
async-lock = "2.8.0" async-lock = "2.8.0"
wasm-bindgen = "0.2.87" wasm-bindgen = "0.2.87"
js-sys = "0.3.64" js-sys = "0.3.64"
@ -181,14 +224,17 @@ ifstructs = "0.1.1"
# Dependencies for Linux or Android # Dependencies for Linux or Android
[target.'cfg(any(target_os = "android", target_os = "linux"))'.dependencies] [target.'cfg(any(target_os = "android", target_os = "linux"))'.dependencies]
rtnetlink = { version = "=0.13.0", default-features = false} rtnetlink = { version = "=0.13.0", default-features = false }
netlink-sys = { version = "=0.8.5" } netlink-sys = { version = "=0.8.5" }
netlink-packet-route = { version = "=0.17.0" } netlink-packet-route = { version = "=0.17.0" }
# Dependencies for Windows # Dependencies for Windows
[target.'cfg(target_os = "windows")'.dependencies] [target.'cfg(target_os = "windows")'.dependencies]
winapi = { version = "0.3.9", features = [ "iptypes", "iphlpapi" ] } winapi = { version = "0.3.9", features = ["iptypes", "iphlpapi"] }
windows = { version = "0.51.1", features = [ "Win32_NetworkManagement_Dns", "Win32_Foundation" ]} windows = { version = "0.51.1", features = [
"Win32_NetworkManagement_Dns",
"Win32_Foundation",
] }
windows-permissions = "0.2.4" windows-permissions = "0.2.4"
# Dependencies for iOS # Dependencies for iOS
@ -207,7 +253,7 @@ features = ["bundled"]
serial_test = "2.0.0" serial_test = "2.0.0"
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
simplelog = { version = "0.12.1", features=["test"] } simplelog = { version = "0.12.1", features = ["test"] }
[target.'cfg(target_arch = "wasm32")'.dev-dependencies] [target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3.37" wasm-bindgen-test = "0.3.37"

View File

@ -61,7 +61,7 @@ pub fn veilid_version() -> (u32, u32, u32) {
#[cfg(target_os = "android")] #[cfg(target_os = "android")]
pub use intf::android::veilid_core_setup_android; pub use intf::android::veilid_core_setup_android;
pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 23] = [
"mio", "mio",
"h2", "h2",
"hyper", "hyper",
@ -83,6 +83,8 @@ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [
"trust_dns_resolver", "trust_dns_resolver",
"trust_dns_proto", "trust_dns_proto",
"attohttpc", "attohttpc",
"ws_stream_wasm",
"keyvaluedb_web",
]; ];
use cfg_if::*; use cfg_if::*;

View File

@ -4,6 +4,8 @@ use alloc::collections::btree_map::Entry;
// XXX: Move to config eventually? // XXX: Move to config eventually?
const PUNISHMENT_DURATION_MIN: usize = 60; const PUNISHMENT_DURATION_MIN: usize = 60;
const MAX_PUNISHMENTS_BY_NODE_ID: usize = 65536; const MAX_PUNISHMENTS_BY_NODE_ID: usize = 65536;
const DIAL_INFO_FAILURE_DURATION_MIN: usize = 10;
const MAX_DIAL_INFO_FAILURES: usize = 65536;
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)] #[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
pub enum AddressFilterError { pub enum AddressFilterError {
@ -28,6 +30,7 @@ struct AddressFilterInner {
punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>, punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>,
punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>, punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>,
punishments_by_node_id: BTreeMap<TypedKey, Timestamp>, punishments_by_node_id: BTreeMap<TypedKey, Timestamp>,
dial_info_failures: BTreeMap<DialInfo, Timestamp>,
} }
struct AddressFilterUnlockedInner { struct AddressFilterUnlockedInner {
@ -36,6 +39,7 @@ struct AddressFilterUnlockedInner {
max_connections_per_ip6_prefix_size: usize, max_connections_per_ip6_prefix_size: usize,
max_connection_frequency_per_min: usize, max_connection_frequency_per_min: usize,
punishment_duration_min: usize, punishment_duration_min: usize,
dial_info_failure_duration_min: usize,
routing_table: RoutingTable, routing_table: RoutingTable,
} }
@ -56,6 +60,10 @@ impl fmt::Debug for AddressFilterUnlockedInner {
&self.max_connection_frequency_per_min, &self.max_connection_frequency_per_min,
) )
.field("punishment_duration_min", &self.punishment_duration_min) .field("punishment_duration_min", &self.punishment_duration_min)
.field(
"dial_info_failure_duration_min",
&self.dial_info_failure_duration_min,
)
.finish() .finish()
} }
} }
@ -78,6 +86,7 @@ impl AddressFilter {
max_connection_frequency_per_min: c.network.max_connection_frequency_per_min max_connection_frequency_per_min: c.network.max_connection_frequency_per_min
as usize, as usize,
punishment_duration_min: PUNISHMENT_DURATION_MIN, punishment_duration_min: PUNISHMENT_DURATION_MIN,
dial_info_failure_duration_min: DIAL_INFO_FAILURE_DURATION_MIN,
routing_table, routing_table,
}), }),
inner: Arc::new(Mutex::new(AddressFilterInner { inner: Arc::new(Mutex::new(AddressFilterInner {
@ -88,10 +97,17 @@ impl AddressFilter {
punishments_by_ip4: BTreeMap::new(), punishments_by_ip4: BTreeMap::new(),
punishments_by_ip6_prefix: BTreeMap::new(), punishments_by_ip6_prefix: BTreeMap::new(),
punishments_by_node_id: BTreeMap::new(), punishments_by_node_id: BTreeMap::new(),
dial_info_failures: BTreeMap::new(),
})), })),
} }
} }
// When the network restarts, some of the address filter can be cleared
pub fn restart(&self) {
let mut inner = self.inner.lock();
inner.dial_info_failures.clear();
}
fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) { fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) {
// v4 // v4
{ {
@ -180,6 +196,22 @@ impl AddressFilter {
} }
} }
} }
// dial info
{
let mut dead_keys = Vec::<DialInfo>::new();
for (key, value) in &mut inner.dial_info_failures {
// Drop failures older than the failure duration
if cur_ts.as_u64().saturating_sub(value.as_u64())
> self.unlocked_inner.dial_info_failure_duration_min as u64 * 60_000_000u64
{
dead_keys.push(key.clone());
}
}
for key in dead_keys {
log_net!(debug ">>> DIALINFO PERMIT: {}", key);
inner.dial_info_failures.remove(&key);
}
}
} }
fn is_ip_addr_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool { fn is_ip_addr_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool {
@ -198,6 +230,14 @@ impl AddressFilter {
false false
} }
fn get_dial_info_failed_ts_inner(
&self,
inner: &AddressFilterInner,
dial_info: &DialInfo,
) -> Option<Timestamp> {
inner.dial_info_failures.get(dial_info).copied()
}
pub fn is_ip_addr_punished(&self, addr: IpAddr) -> bool { pub fn is_ip_addr_punished(&self, addr: IpAddr) -> bool {
let inner = self.inner.lock(); let inner = self.inner.lock();
let ipblock = ip_to_ipblock( let ipblock = ip_to_ipblock(
@ -207,6 +247,27 @@ impl AddressFilter {
self.is_ip_addr_punished_inner(&*inner, ipblock) self.is_ip_addr_punished_inner(&*inner, ipblock)
} }
pub fn get_dial_info_failed_ts(&self, dial_info: &DialInfo) -> Option<Timestamp> {
let inner = self.inner.lock();
self.get_dial_info_failed_ts_inner(&*inner, dial_info)
}
pub fn set_dial_info_failed(&self, dial_info: DialInfo) {
let ts = get_aligned_timestamp();
let mut inner = self.inner.lock();
if inner.dial_info_failures.len() >= MAX_DIAL_INFO_FAILURES {
log_net!(debug ">>> DIALINFO FAILURE TABLE FULL: {}", dial_info);
return;
}
log_net!(debug ">>> DIALINFO FAILURE: {:?}", dial_info);
inner
.dial_info_failures
.entry(dial_info)
.and_modify(|v| *v = ts)
.or_insert(ts);
}
pub fn punish_ip_addr(&self, addr: IpAddr) { pub fn punish_ip_addr(&self, addr: IpAddr) {
log_net!(debug ">>> PUNISHED: {}", addr); log_net!(debug ">>> PUNISHED: {}", addr);
let ts = get_aligned_timestamp(); let ts = get_aligned_timestamp();

View File

@ -211,20 +211,7 @@ impl NetworkManager {
// Make the network key // Make the network key
let network_key = { let network_key = {
let c = config.get(); let c = config.get();
let network_key_password = if let Some(nkp) = c.network.network_key_password.clone() { let network_key_password = c.network.network_key_password.clone();
Some(nkp)
} else {
if c.network
.routing_table
.bootstrap
.contains(&"bootstrap.veilid.net".to_owned())
{
None
} else {
Some(c.network.routing_table.bootstrap.join(","))
}
};
let network_key = if let Some(network_key_password) = network_key_password { let network_key = if let Some(network_key_password) = network_key_password {
if !network_key_password.is_empty() { if !network_key_password.is_empty() {
info!("Using network key"); info!("Using network key");
@ -380,6 +367,9 @@ impl NetworkManager {
return Ok(()); return Ok(());
} }
// Clean address filter for things that should not be persistent
self.address_filter().restart();
// Create network components // Create network components
let connection_manager = ConnectionManager::new(self.clone()); let connection_manager = ConnectionManager::new(self.clone());
let net = Network::new( let net = Network::new(

View File

@ -384,6 +384,21 @@ impl Network {
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
// Record DialInfo failures
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
&self,
dial_info: DialInfo,
fut: F,
) -> EyreResult<NetworkResult<T>> {
let network_result = fut.await?;
if matches!(network_result, NetworkResult::NoConnection(_)) {
self.network_manager()
.address_filter()
.set_dial_info_failed(dial_info);
}
Ok(network_result)
}
// Send data to a dial info, unbound, using a new connection from a random port // Send data to a dial info, unbound, using a new connection from a random port
// This creates a short-lived connection in the case of connection-oriented protocols // This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message. // for the purpose of sending this one message.
@ -394,59 +409,62 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> { ) -> EyreResult<NetworkResult<()>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let connect_timeout_ms = { let data_len = data.len();
let c = self.config.get(); let connect_timeout_ms = {
c.network.connection_initial_timeout_ms let c = self.config.get();
}; c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().to_ip_addr()) .is_ip_addr_punished(dial_info.address().to_ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
} }
match dial_info.protocol_type() { match dial_info.protocol_type() {
ProtocolType::UDP => { ProtocolType::UDP => {
let peer_socket_addr = dial_info.to_socket_addr(); let peer_socket_addr = dial_info.to_socket_addr();
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
.await
.wrap_err("create socket failure")?;
let _ = network_result_try!(h
.send_message(data, peer_socket_addr)
.await
.map(NetworkResult::Value)
.wrap_err("send message failure")?);
}
ProtocolType::TCP => {
let peer_socket_addr = dial_info.to_socket_addr();
let pnc = network_result_try!(RawTcpProtocolHandler::connect(
None,
peer_socket_addr,
connect_timeout_ms
)
.await .await
.wrap_err("create socket failure")?; .wrap_err("connect failure")?);
let _ = network_result_try!(h network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
.send_message(data, peer_socket_addr) }
ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
None,
&dial_info,
connect_timeout_ms
)
.await .await
.map(NetworkResult::Value) .wrap_err("connect failure")?);
.wrap_err("send message failure")?); network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
} }
ProtocolType::TCP => { // Network accounting
let peer_socket_addr = dial_info.to_socket_addr(); self.network_manager()
let pnc = network_result_try!(RawTcpProtocolHandler::connect( .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
None,
peer_socket_addr,
connect_timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
None,
&dial_info,
connect_timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
}
// Network accounting
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::Value(())) Ok(NetworkResult::Value(()))
})
.await
} }
// Send data to a dial info, unbound, using a new connection from a random port // Send data to a dial info, unbound, using a new connection from a random port
@ -461,85 +479,95 @@ impl Network {
data: Vec<u8>, data: Vec<u8>,
timeout_ms: u32, timeout_ms: u32,
) -> EyreResult<NetworkResult<Vec<u8>>> { ) -> EyreResult<NetworkResult<Vec<u8>>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let connect_timeout_ms = { let data_len = data.len();
let c = self.config.get(); let connect_timeout_ms = {
c.network.connection_initial_timeout_ms let c = self.config.get();
}; c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().to_ip_addr()) .is_ip_addr_punished(dial_info.address().to_ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
let peer_socket_addr = dial_info.to_socket_addr();
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
.await
.wrap_err("create socket failure")?;
network_result_try!(h
.send_message(data, peer_socket_addr)
.await
.wrap_err("send message failure")?);
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
// receive single response
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
let (recv_len, recv_addr) = network_result_try!(timeout(
timeout_ms,
h.recv_message(&mut out).instrument(Span::current())
)
.await
.into_network_result())
.wrap_err("recv_message failure")?;
let recv_socket_addr = recv_addr.remote_address().to_socket_addr();
self.network_manager()
.stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64));
// if the from address is not the same as the one we sent to, then drop this
if recv_socket_addr != peer_socket_addr {
bail!("wrong address");
}
out.resize(recv_len, 0u8);
Ok(NetworkResult::Value(out))
} }
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(match dial_info.protocol_type() {
ProtocolType::UDP => unreachable!(),
ProtocolType::TCP => {
let peer_socket_addr = dial_info.to_socket_addr();
RawTcpProtocolHandler::connect(None, peer_socket_addr, connect_timeout_ms)
.await
.wrap_err("connect failure")?
}
ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms)
.await
.wrap_err("connect failure")?
}
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?); match dial_info.protocol_type() {
self.network_manager() ProtocolType::UDP => {
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); let peer_socket_addr = dial_info.to_socket_addr();
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
.await
.wrap_err("create socket failure")?;
network_result_try!(h
.send_message(data, peer_socket_addr)
.await
.wrap_err("send message failure")?);
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) // receive single response
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
let (recv_len, recv_addr) = network_result_try!(timeout(
timeout_ms,
h.recv_message(&mut out).instrument(Span::current())
)
.await .await
.into_network_result()) .into_network_result())
.wrap_err("recv failure")?); .wrap_err("recv_message failure")?;
self.network_manager() let recv_socket_addr = recv_addr.remote_address().to_socket_addr();
.stats_packet_rcvd(dial_info.to_ip_addr(), ByteCount::new(out.len() as u64)); self.network_manager()
.stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64));
Ok(NetworkResult::Value(out)) // if the from address is not the same as the one we sent to, then drop this
if recv_socket_addr != peer_socket_addr {
bail!("wrong address");
}
out.resize(recv_len, 0u8);
Ok(NetworkResult::Value(out))
}
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(match dial_info.protocol_type() {
ProtocolType::UDP => unreachable!(),
ProtocolType::TCP => {
let peer_socket_addr = dial_info.to_socket_addr();
RawTcpProtocolHandler::connect(
None,
peer_socket_addr,
connect_timeout_ms,
)
.await
.wrap_err("connect failure")?
}
ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms)
.await
.wrap_err("connect failure")?
}
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
let out =
network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
.await
.into_network_result())
.wrap_err("recv failure")?);
self.network_manager().stats_packet_rcvd(
dial_info.to_ip_addr(),
ByteCount::new(out.len() as u64),
);
Ok(NetworkResult::Value(out))
}
} }
} })
.await
} }
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
@ -609,41 +637,44 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> { ) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let connection_descriptor; let data_len = data.len();
if dial_info.protocol_type() == ProtocolType::UDP { let connection_descriptor;
// Handle connectionless protocol if dial_info.protocol_type() == ProtocolType::UDP {
let peer_socket_addr = dial_info.to_socket_addr(); // Handle connectionless protocol
let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { let peer_socket_addr = dial_info.to_socket_addr();
Some(ph) => ph, let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) {
None => bail!("no appropriate UDP protocol handler for dial_info"), Some(ph) => ph,
}; None => bail!("no appropriate UDP protocol handler for dial_info"),
connection_descriptor = network_result_try!(ph };
.send_message(data, peer_socket_addr) connection_descriptor = network_result_try!(ph
.await .send_message(data, peer_socket_addr)
.wrap_err("failed to send data to dial info")?); .await
} else { .wrap_err("failed to send data to dial info")?);
// Handle connection-oriented protocols } else {
let conn = network_result_try!( // Handle connection-oriented protocols
self.connection_manager() let conn = network_result_try!(
.get_or_create_connection(dial_info.clone()) self.connection_manager()
.await? .get_or_create_connection(dial_info.clone())
); .await?
);
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
return Ok(NetworkResult::NoConnection(io::Error::new( return Ok(NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset, io::ErrorKind::ConnectionReset,
"failed to send", "failed to send",
))); )));
}
connection_descriptor = conn.connection_descriptor();
} }
connection_descriptor = conn.connection_descriptor();
}
// Network accounting // Network accounting
self.network_manager() self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::value(connection_descriptor)) Ok(NetworkResult::value(connection_descriptor))
})
.await
} }
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////

View File

@ -358,6 +358,24 @@ impl NetworkManager {
// sequencing = Sequencing::PreferOrdered; // sequencing = Sequencing::PreferOrdered;
// } // }
// Deprioritize dial info that have recently failed
let address_filter = self.address_filter();
let mut dial_info_failures_map = BTreeMap::<DialInfo, Timestamp>::new();
for did in peer_b.signed_node_info().node_info().all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |_| true) {
if let Some(ts) = address_filter.get_dial_info_failed_ts(&did.dial_info) {
dial_info_failures_map.insert(did.dial_info, ts);
}
}
let dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>> = if dial_info_failures_map.is_empty() {
None
} else {
Some(Arc::new(move |a: &DialInfoDetail, b: &DialInfoDetail| {
let ats = dial_info_failures_map.get(&a.dial_info).copied().unwrap_or_default();
let bts = dial_info_failures_map.get(&b.dial_info).copied().unwrap_or_default();
ats.cmp(&bts)
}))
};
// Get the best contact method with these parameters from the routing domain // Get the best contact method with these parameters from the routing domain
let cm = routing_table.get_contact_method( let cm = routing_table.get_contact_method(
routing_domain, routing_domain,
@ -365,6 +383,7 @@ impl NetworkManager {
&peer_b, &peer_b,
dial_info_filter, dial_info_filter,
sequencing, sequencing,
dif_sort,
); );
// Translate the raw contact method to a referenced contact method // Translate the raw contact method to a referenced contact method

View File

@ -118,47 +118,66 @@ impl Network {
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
// Record DialInfo failures
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
&self,
dial_info: DialInfo,
fut: F,
) -> EyreResult<NetworkResult<T>> {
let network_result = fut.await?;
if matches!(network_result, NetworkResult::NoConnection(_)) {
self.network_manager()
.address_filter()
.set_dial_info_failed(dial_info);
}
Ok(network_result)
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_unbound_to_dial_info( pub async fn send_data_unbound_to_dial_info(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> { ) -> EyreResult<NetworkResult<()>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let timeout_ms = { let data_len = data.len();
let c = self.config.get(); let timeout_ms = {
c.network.connection_initial_timeout_ms let c = self.config.get();
}; c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().to_ip_addr()) .is_ip_addr_punished(dial_info.address().to_ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
bail!("no support for UDP protocol")
} }
ProtocolType::TCP => {
bail!("no support for TCP protocol")
}
ProtocolType::WS | ProtocolType::WSS => {
let pnc =
network_result_try!(WebsocketProtocolHandler::connect(&dial_info, timeout_ms)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
};
// Network accounting match dial_info.protocol_type() {
self.network_manager() ProtocolType::UDP => {
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); bail!("no support for UDP protocol")
}
ProtocolType::TCP => {
bail!("no support for TCP protocol")
}
ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
&dial_info, timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
};
Ok(NetworkResult::Value(())) // Network accounting
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::Value(()))
})
.await
} }
// Send data to a dial info, unbound, using a new connection from a random port // Send data to a dial info, unbound, using a new connection from a random port
@ -173,53 +192,59 @@ impl Network {
data: Vec<u8>, data: Vec<u8>,
timeout_ms: u32, timeout_ms: u32,
) -> EyreResult<NetworkResult<Vec<u8>>> { ) -> EyreResult<NetworkResult<Vec<u8>>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let connect_timeout_ms = { let data_len = data.len();
let c = self.config.get(); let connect_timeout_ms = {
c.network.connection_initial_timeout_ms let c = self.config.get();
}; c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().to_ip_addr()) .is_ip_addr_punished(dial_info.address().to_ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
} }
match dial_info.protocol_type() { match dial_info.protocol_type() {
ProtocolType::UDP => { ProtocolType::UDP => {
bail!("no support for UDP protocol") bail!("no support for UDP protocol")
} }
ProtocolType::TCP => { ProtocolType::TCP => {
bail!("no support for TCP protocol") bail!("no support for TCP protocol")
} }
ProtocolType::WS | ProtocolType::WSS => { ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(match dial_info.protocol_type() { let pnc = network_result_try!(match dial_info.protocol_type() {
ProtocolType::UDP => unreachable!(), ProtocolType::UDP => unreachable!(),
ProtocolType::TCP => unreachable!(), ProtocolType::TCP => unreachable!(),
ProtocolType::WS | ProtocolType::WSS => { ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::connect(&dial_info, connect_timeout_ms) WebsocketProtocolHandler::connect(&dial_info, connect_timeout_ms)
.await
.wrap_err("connect failure")?
}
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
let out =
network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
.await .await
.wrap_err("connect failure")? .into_network_result())
} .wrap_err("recv failure")?);
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?); self.network_manager().stats_packet_rcvd(
self.network_manager() dial_info.to_ip_addr(),
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); ByteCount::new(out.len() as u64),
);
let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) Ok(NetworkResult::Value(out))
.await }
.into_network_result())
.wrap_err("recv failure")?);
self.network_manager()
.stats_packet_rcvd(dial_info.to_ip_addr(), ByteCount::new(out.len() as u64));
Ok(NetworkResult::Value(out))
} }
} })
.await
} }
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
@ -273,34 +298,37 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> { ) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
if dial_info.protocol_type() == ProtocolType::UDP { let data_len = data.len();
bail!("no support for UDP protocol"); if dial_info.protocol_type() == ProtocolType::UDP {
} bail!("no support for UDP protocol");
if dial_info.protocol_type() == ProtocolType::TCP { }
bail!("no support for TCP protocol"); if dial_info.protocol_type() == ProtocolType::TCP {
} bail!("no support for TCP protocol");
}
// Handle connection-oriented protocols // Handle connection-oriented protocols
let conn = network_result_try!( let conn = network_result_try!(
self.connection_manager() self.connection_manager()
.get_or_create_connection(dial_info.clone()) .get_or_create_connection(dial_info.clone())
.await? .await?
); );
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
return Ok(NetworkResult::NoConnection(io::Error::new( return Ok(NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset, io::ErrorKind::ConnectionReset,
"failed to send", "failed to send",
))); )));
} }
let connection_descriptor = conn.connection_descriptor(); let connection_descriptor = conn.connection_descriptor();
// Network accounting // Network accounting
self.network_manager() self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::value(connection_descriptor)) Ok(NetworkResult::value(connection_descriptor))
})
.await
} }
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
@ -320,8 +348,8 @@ impl Network {
} }
// XXX: See issue #92 // XXX: See issue #92
let family_global = AddressTypeSet::all(); let family_global = AddressTypeSet::from(AddressType::IPV4);
let family_local = AddressTypeSet::all(); let family_local = AddressTypeSet::from(AddressType::IPV4);
ProtocolConfig { ProtocolConfig {
outbound, outbound,

View File

@ -538,6 +538,7 @@ impl RoutingTable {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod { ) -> ContactMethod {
self.inner.read().get_contact_method( self.inner.read().get_contact_method(
routing_domain, routing_domain,
@ -545,6 +546,7 @@ impl RoutingTable {
peer_b, peer_b,
dial_info_filter, dial_info_filter,
sequencing, sequencing,
dif_sort,
) )
} }

View File

@ -401,6 +401,7 @@ impl RouteSpecStore {
current_node, current_node,
DialInfoFilter::all(), DialInfoFilter::all(),
sequencing, sequencing,
None,
); );
if matches!(cm, ContactMethod::Unreachable) { if matches!(cm, ContactMethod::Unreachable) {
reachable = false; reachable = false;
@ -415,6 +416,7 @@ impl RouteSpecStore {
current_node, current_node,
DialInfoFilter::all(), DialInfoFilter::all(),
Sequencing::EnsureOrdered, Sequencing::EnsureOrdered,
None,
); );
if matches!(cm, ContactMethod::Unreachable) { if matches!(cm, ContactMethod::Unreachable) {
can_do_sequenced = false; can_do_sequenced = false;
@ -438,6 +440,7 @@ impl RouteSpecStore {
current_node, current_node,
DialInfoFilter::all(), DialInfoFilter::all(),
sequencing, sequencing,
None,
); );
if matches!(cm, ContactMethod::Unreachable) { if matches!(cm, ContactMethod::Unreachable) {
reachable = false; reachable = false;
@ -452,6 +455,7 @@ impl RouteSpecStore {
current_node, current_node,
DialInfoFilter::all(), DialInfoFilter::all(),
Sequencing::EnsureOrdered, Sequencing::EnsureOrdered,
None,
); );
if matches!(cm, ContactMethod::Unreachable) { if matches!(cm, ContactMethod::Unreachable) {
can_do_sequenced = false; can_do_sequenced = false;

View File

@ -220,6 +220,7 @@ pub trait RoutingDomainDetail {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod; ) -> ContactMethod;
} }
@ -245,6 +246,7 @@ fn first_filtered_dial_info_detail_between_nodes(
to_node: &NodeInfo, to_node: &NodeInfo,
dial_info_filter: &DialInfoFilter, dial_info_filter: &DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>
) -> Option<DialInfoDetail> { ) -> Option<DialInfoDetail> {
let dial_info_filter = dial_info_filter.clone().filtered( let dial_info_filter = dial_info_filter.clone().filtered(
&DialInfoFilter::all() &DialInfoFilter::all()
@ -253,11 +255,28 @@ fn first_filtered_dial_info_detail_between_nodes(
); );
// Apply sequencing and get sort // Apply sequencing and get sort
// Include sorting by external dial info sort for rotating through dialinfo
// based on an external preference table, for example the one kept by
// AddressFilter to deprioritize dialinfo that have recently failed to connect
let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing); let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
let sort = if ordered { let sort: Option<Box<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>> = if ordered {
Some(DialInfoDetail::ordered_sequencing_sort) if let Some(dif_sort) = dif_sort {
Some(Box::new(move |a, b| {
let mut ord = dif_sort(a,b);
if ord == core::cmp::Ordering::Equal {
ord = DialInfoDetail::ordered_sequencing_sort(a,b);
}
ord
}))
} else {
Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) }))
}
} else { } else {
None if let Some(dif_sort) = dif_sort {
Some(Box::new(move |a,b| { dif_sort(a,b) }))
} else {
None
}
}; };
// If the filter is dead then we won't be able to connect // If the filter is dead then we won't be able to connect
@ -287,6 +306,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod { ) -> ContactMethod {
// Get the nodeinfos for convenience // Get the nodeinfos for convenience
let node_a = peer_a.signed_node_info().node_info(); let node_a = peer_a.signed_node_info().node_info();
@ -304,7 +324,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
// Get the best match dial info for node B if we have it // Get the best match dial info for node B if we have it
if let Some(target_did) = if let Some(target_did) =
first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing) first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing, dif_sort.clone())
{ {
// Do we need to signal before going inbound? // Do we need to signal before going inbound?
if !target_did.class.requires_signal() { if !target_did.class.requires_signal() {
@ -334,6 +354,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_b_relay, node_b_relay,
&dial_info_filter, &dial_info_filter,
sequencing, sequencing,
dif_sort.clone(),
) )
.is_some() .is_some()
{ {
@ -347,6 +368,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_a, node_a,
&dial_info_filter, &dial_info_filter,
sequencing, sequencing,
dif_sort.clone()
) { ) {
// Ensure we aren't on the same public IP address (no hairpin nat) // Ensure we aren't on the same public IP address (no hairpin nat)
if reverse_did.dial_info.to_ip_addr() if reverse_did.dial_info.to_ip_addr()
@ -373,6 +395,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_b, node_b,
&udp_dial_info_filter, &udp_dial_info_filter,
sequencing, sequencing,
dif_sort.clone()
) { ) {
// Does node A have a direct udp dialinfo that node B can reach? // Does node A have a direct udp dialinfo that node B can reach?
if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes( if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes(
@ -380,6 +403,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_a, node_a,
&udp_dial_info_filter, &udp_dial_info_filter,
sequencing, sequencing,
dif_sort.clone(),
) { ) {
// Ensure we aren't on the same public IP address (no hairpin nat) // Ensure we aren't on the same public IP address (no hairpin nat)
if reverse_udp_did.dial_info.to_ip_addr() if reverse_udp_did.dial_info.to_ip_addr()
@ -422,6 +446,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
&node_b_relay, &node_b_relay,
&dial_info_filter, &dial_info_filter,
sequencing, sequencing,
dif_sort.clone()
) )
.is_some() .is_some()
{ {
@ -496,6 +521,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod { ) -> ContactMethod {
// Scope the filter down to protocols node A can do outbound // Scope the filter down to protocols node A can do outbound
let dial_info_filter = dial_info_filter.filtered( let dial_info_filter = dial_info_filter.filtered(
@ -504,20 +530,31 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
.with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()), .with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()),
); );
// Get first filtered dialinfo // Apply sequencing and get sort
let (sort, dial_info_filter) = match sequencing { // Include sorting by external dial info sort for rotating through dialinfo
Sequencing::NoPreference => (None, dial_info_filter), // based on an external preference table, for example the one kept by
Sequencing::PreferOrdered => ( // AddressFilter to deprioritize dialinfo that have recently failed to connect
Some(DialInfoDetail::ordered_sequencing_sort), let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
dial_info_filter, let sort: Option<Box<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>> = if ordered {
), if let Some(dif_sort) = dif_sort {
Sequencing::EnsureOrdered => ( Some(Box::new(move |a, b| {
Some(DialInfoDetail::ordered_sequencing_sort), let mut ord = dif_sort(a,b);
dial_info_filter.filtered( if ord == core::cmp::Ordering::Equal {
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()), ord = DialInfoDetail::ordered_sequencing_sort(a,b);
), }
), ord
}))
} else {
Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) }))
}
} else {
if let Some(dif_sort) = dif_sort {
Some(Box::new(move |a,b| { dif_sort(a,b) }))
} else {
None
}
}; };
// If the filter is dead then we won't be able to connect // If the filter is dead then we won't be able to connect
if dial_info_filter.is_dead() { if dial_info_filter.is_dead() {
return ContactMethod::Unreachable; return ContactMethod::Unreachable;

View File

@ -226,9 +226,10 @@ impl RoutingTableInner {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod { ) -> ContactMethod {
self.with_routing_domain(routing_domain, |rdd| { self.with_routing_domain(routing_domain, |rdd| {
rdd.get_contact_method(self, peer_a, peer_b, dial_info_filter, sequencing) rdd.get_contact_method(self, peer_a, peer_b, dial_info_filter, sequencing, dif_sort)
}) })
} }

View File

@ -403,7 +403,7 @@ packages:
path: ".." path: ".."
relative: true relative: true
source: path source: path
version: "0.1.9" version: "0.1.10"
web: web:
dependency: transitive dependency: transitive
description: description:

View File

@ -1,2 +1,2 @@
@echo off @echo off
flutter run --dart-define=DELETE_TABLE_STORE=1 --dart-define=DELETE_PROTECTED_STORE=1 --dart-define=DELETE_BLOCK_STORE=1 flutter run --dart-define=DELETE_TABLE_STORE=1 --dart-define=DELETE_PROTECTED_STORE=1 --dart-define=DELETE_BLOCK_STORE=1 %*

View File

@ -1,2 +1,2 @@
#!/bin/bash #!/bin/bash
flutter run --dart-define=DELETE_TABLE_STORE=1 --dart-define=DELETE_PROTECTED_STORE=1 --dart-define=DELETE_BLOCK_STORE=1 flutter run --dart-define=DELETE_TABLE_STORE=1 --dart-define=DELETE_PROTECTED_STORE=1 --dart-define=DELETE_BLOCK_STORE=1 $@

View File

@ -58,129 +58,133 @@ int getRemoteMaxStorageSpaceMb() {
return 256; return 256;
} }
Future<VeilidConfig> getDefaultVeilidConfig(String programName) async => Future<VeilidConfig> getDefaultVeilidConfig(String programName) async {
VeilidConfig( // ignore: do_not_use_environment
programName: programName, const bootstrap = String.fromEnvironment('BOOTSTRAP');
namespace: '', return VeilidConfig(
capabilities: const VeilidConfigCapabilities(disable: []), programName: programName,
protectedStore: const VeilidConfigProtectedStore( namespace: '',
allowInsecureFallback: false, capabilities: const VeilidConfigCapabilities(disable: []),
alwaysUseInsecureStorage: false, protectedStore: const VeilidConfigProtectedStore(
directory: '', allowInsecureFallback: false,
delete: false, alwaysUseInsecureStorage: false,
deviceEncryptionKeyPassword: '', directory: '',
delete: false,
deviceEncryptionKeyPassword: '',
),
tableStore: VeilidConfigTableStore(
directory: kIsWeb
? ''
: p.join((await getApplicationSupportDirectory()).absolute.path,
'table_store'),
delete: false,
),
blockStore: VeilidConfigBlockStore(
directory: kIsWeb
? ''
: p.join((await getApplicationSupportDirectory()).absolute.path,
'block_store'),
delete: false,
),
network: VeilidConfigNetwork(
connectionInitialTimeoutMs: 2000,
connectionInactivityTimeoutMs: 60000,
maxConnectionsPerIp4: 32,
maxConnectionsPerIp6Prefix: 32,
maxConnectionsPerIp6PrefixSize: 56,
maxConnectionFrequencyPerMin: 128,
clientWhitelistTimeoutMs: 300000,
reverseConnectionReceiptTimeMs: 5000,
holePunchReceiptTimeMs: 5000,
routingTable: VeilidConfigRoutingTable(
nodeId: [],
nodeIdSecret: [],
bootstrap: bootstrap.isNotEmpty
? bootstrap.split(',')
: (kIsWeb
? ['ws://bootstrap.veilid.net:5150/ws']
: ['bootstrap.veilid.net']),
limitOverAttached: 64,
limitFullyAttached: 32,
limitAttachedStrong: 16,
limitAttachedGood: 8,
limitAttachedWeak: 4,
), ),
tableStore: VeilidConfigTableStore( rpc: const VeilidConfigRPC(
directory: kIsWeb concurrency: 0,
? '' queueSize: 1024,
: p.join((await getApplicationSupportDirectory()).absolute.path, maxTimestampBehindMs: 10000,
'table_store'), maxTimestampAheadMs: 10000,
delete: false, timeoutMs: 5000,
maxRouteHopCount: 4,
defaultRouteHopCount: 1,
), ),
blockStore: VeilidConfigBlockStore( dht: VeilidConfigDHT(
directory: kIsWeb resolveNodeTimeoutMs: 10000,
? '' resolveNodeCount: 1,
: p.join((await getApplicationSupportDirectory()).absolute.path, resolveNodeFanout: 4,
'block_store'), maxFindNodeCount: 20,
delete: false, getValueTimeoutMs: 10000,
), getValueCount: 3,
network: VeilidConfigNetwork( getValueFanout: 4,
setValueTimeoutMs: 10000,
setValueCount: 4,
setValueFanout: 6,
minPeerCount: 20,
minPeerRefreshTimeMs: 60000,
validateDialInfoReceiptTimeMs: 2000,
localSubkeyCacheSize: getLocalSubkeyCacheSize(),
localMaxSubkeyCacheMemoryMb: await getLocalMaxSubkeyCacheMemoryMb(),
remoteSubkeyCacheSize: getRemoteSubkeyCacheSize(),
remoteMaxRecords: getRemoteMaxRecords(),
remoteMaxSubkeyCacheMemoryMb: await getRemoteMaxSubkeyCacheMemoryMb(),
remoteMaxStorageSpaceMb: getRemoteMaxStorageSpaceMb()),
upnp: true,
detectAddressChanges: true,
restrictedNatRetries: 0,
tls: const VeilidConfigTLS(
certificatePath: '',
privateKeyPath: '',
connectionInitialTimeoutMs: 2000, connectionInitialTimeoutMs: 2000,
connectionInactivityTimeoutMs: 60000, ),
maxConnectionsPerIp4: 32, application: const VeilidConfigApplication(
maxConnectionsPerIp6Prefix: 32, https: VeilidConfigHTTPS(
maxConnectionsPerIp6PrefixSize: 56, enabled: false,
maxConnectionFrequencyPerMin: 128, listenAddress: '',
clientWhitelistTimeoutMs: 300000, path: '',
reverseConnectionReceiptTimeMs: 5000, ),
holePunchReceiptTimeMs: 5000, http: VeilidConfigHTTP(
routingTable: const VeilidConfigRoutingTable( enabled: false,
nodeId: [], listenAddress: '',
nodeIdSecret: [], path: '',
bootstrap: kIsWeb )),
? ['ws://bootstrap.veilid.net:5150/ws'] protocol: const VeilidConfigProtocol(
: ['bootstrap.veilid.net'], udp: VeilidConfigUDP(
limitOverAttached: 64, enabled: !kIsWeb,
limitFullyAttached: 32, socketPoolSize: 0,
limitAttachedStrong: 16, listenAddress: '',
limitAttachedGood: 8,
limitAttachedWeak: 4,
), ),
rpc: const VeilidConfigRPC( tcp: VeilidConfigTCP(
concurrency: 0, connect: !kIsWeb,
queueSize: 1024, listen: !kIsWeb,
maxTimestampBehindMs: 10000, maxConnections: 32,
maxTimestampAheadMs: 10000, listenAddress: '',
timeoutMs: 5000,
maxRouteHopCount: 4,
defaultRouteHopCount: 1,
), ),
dht: VeilidConfigDHT( ws: VeilidConfigWS(
resolveNodeTimeoutMs: 10000, connect: true,
resolveNodeCount: 1, listen: !kIsWeb,
resolveNodeFanout: 4, maxConnections: 32,
maxFindNodeCount: 20, listenAddress: '',
getValueTimeoutMs: 10000, path: 'ws',
getValueCount: 3,
getValueFanout: 4,
setValueTimeoutMs: 10000,
setValueCount: 4,
setValueFanout: 6,
minPeerCount: 20,
minPeerRefreshTimeMs: 60000,
validateDialInfoReceiptTimeMs: 2000,
localSubkeyCacheSize: getLocalSubkeyCacheSize(),
localMaxSubkeyCacheMemoryMb: await getLocalMaxSubkeyCacheMemoryMb(),
remoteSubkeyCacheSize: getRemoteSubkeyCacheSize(),
remoteMaxRecords: getRemoteMaxRecords(),
remoteMaxSubkeyCacheMemoryMb:
await getRemoteMaxSubkeyCacheMemoryMb(),
remoteMaxStorageSpaceMb: getRemoteMaxStorageSpaceMb()),
upnp: true,
detectAddressChanges: true,
restrictedNatRetries: 0,
tls: const VeilidConfigTLS(
certificatePath: '',
privateKeyPath: '',
connectionInitialTimeoutMs: 2000,
), ),
application: const VeilidConfigApplication( wss: VeilidConfigWSS(
https: VeilidConfigHTTPS( connect: true,
enabled: false, listen: false,
listenAddress: '', maxConnections: 32,
path: '', listenAddress: '',
), path: 'ws',
http: VeilidConfigHTTP(
enabled: false,
listenAddress: '',
path: '',
)),
protocol: const VeilidConfigProtocol(
udp: VeilidConfigUDP(
enabled: !kIsWeb,
socketPoolSize: 0,
listenAddress: '',
),
tcp: VeilidConfigTCP(
connect: !kIsWeb,
listen: !kIsWeb,
maxConnections: 32,
listenAddress: '',
),
ws: VeilidConfigWS(
connect: true,
listen: !kIsWeb,
maxConnections: 16,
listenAddress: '',
path: 'ws',
),
wss: VeilidConfigWSS(
connect: true,
listen: false,
maxConnections: 16,
listenAddress: '',
path: 'ws',
),
), ),
), ),
); ),
);
}

View File

@ -143,14 +143,14 @@ core:
ws: ws:
connect: true connect: true
listen: true listen: true
max_connections: 16 max_connections: 32
listen_address: '' listen_address: ''
path: 'ws' path: 'ws'
# url: 'ws://localhost:5150/ws' # url: 'ws://localhost:5150/ws'
wss: wss:
connect: true connect: true
listen: false listen: false
max_connections: 16 max_connections: 32
listen_address: '' listen_address: ''
path: 'ws' path: 'ws'
# url: '' # url: ''
@ -1686,7 +1686,7 @@ mod tests {
// //
assert_eq!(s.core.network.protocol.ws.connect, true); assert_eq!(s.core.network.protocol.ws.connect, true);
assert_eq!(s.core.network.protocol.ws.listen, true); assert_eq!(s.core.network.protocol.ws.listen, true);
assert_eq!(s.core.network.protocol.ws.max_connections, 16); assert_eq!(s.core.network.protocol.ws.max_connections, 32);
assert_eq!(s.core.network.protocol.ws.listen_address.name, ""); assert_eq!(s.core.network.protocol.ws.listen_address.name, "");
assert_eq!(s.core.network.protocol.ws.listen_address.addrs, vec![]); assert_eq!(s.core.network.protocol.ws.listen_address.addrs, vec![]);
assert_eq!( assert_eq!(
@ -1697,7 +1697,7 @@ mod tests {
// //
assert_eq!(s.core.network.protocol.wss.connect, true); assert_eq!(s.core.network.protocol.wss.connect, true);
assert_eq!(s.core.network.protocol.wss.listen, false); assert_eq!(s.core.network.protocol.wss.listen, false);
assert_eq!(s.core.network.protocol.wss.max_connections, 16); assert_eq!(s.core.network.protocol.wss.max_connections, 32);
assert_eq!(s.core.network.protocol.wss.listen_address.name, ""); assert_eq!(s.core.network.protocol.wss.listen_address.name, "");
assert_eq!(s.core.network.protocol.wss.listen_address.addrs, vec![]); assert_eq!(s.core.network.protocol.wss.listen_address.addrs, vec![]);
assert_eq!( assert_eq!(

View File

@ -34,7 +34,7 @@ else
OUTPUTDIR=../target/wasm32-unknown-unknown/debug/pkg OUTPUTDIR=../target/wasm32-unknown-unknown/debug/pkg
INPUTDIR=../target/wasm32-unknown-unknown/debug INPUTDIR=../target/wasm32-unknown-unknown/debug
RUSTFLAGS="-O -g" cargo build --target wasm32-unknown-unknown RUSTFLAGS="-O -g $RUSTFLAGS" cargo build --target wasm32-unknown-unknown
mkdir -p $OUTPUTDIR mkdir -p $OUTPUTDIR
wasm-bindgen --out-dir $OUTPUTDIR --target web --keep-debug --debug $INPUTDIR/veilid_wasm.wasm wasm-bindgen --out-dir $OUTPUTDIR --target web --keep-debug --debug $INPUTDIR/veilid_wasm.wasm
./wasm-sourcemap.py $OUTPUTDIR/veilid_wasm_bg.wasm -o $OUTPUTDIR/veilid_wasm_bg.wasm.map --dwarfdump $DWARFDUMP ./wasm-sourcemap.py $OUTPUTDIR/veilid_wasm_bg.wasm -o $OUTPUTDIR/veilid_wasm_bg.wasm.map --dwarfdump $DWARFDUMP