refactor wasm tests

This commit is contained in:
Christien Rioux 2023-10-22 21:48:25 -04:00
parent 8fa4ab10a4
commit 126bb0035d
8 changed files with 180 additions and 185 deletions

View File

@ -382,7 +382,6 @@ impl NetworkManager {
.with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()));
let sequencing = target_node_ref.sequencing();
// If the node has had lost questions or failures to send, prefer sequencing
// to improve reliability. The node may be experiencing UDP fragmentation drops
// or other firewalling issues and may perform better with TCP.

View File

@ -166,7 +166,7 @@ impl RouteSpecStore {
/// Returns Err(VeilidAPIError::TryAgain) if no route could be allocated at this time
/// Returns other errors on failure
/// Returns Ok(route id string) on success
#[instrument(level = "trace", skip(self), ret, err)]
#[instrument(level = "trace", skip(self), ret, err(level=Level::TRACE))]
pub fn allocate_route(
&self,
crypto_kinds: &[CryptoKind],
@ -192,7 +192,7 @@ impl RouteSpecStore {
)
}
#[instrument(level = "trace", skip(self, inner, rti), ret, err(level=Level::DEBUG))]
#[instrument(level = "trace", skip(self, inner, rti), ret, err(level=Level::TRACE))]
#[allow(clippy::too_many_arguments)]
fn allocate_route_inner(
&self,

View File

@ -252,137 +252,31 @@ impl RoutingTable {
Ok(merged_bootstrap_records)
}
// 'direct' bootstrap task routine for systems incapable of resolving TXT records, such as browser WASM
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn direct_bootstrap_task_routine(
self,
stop_token: StopToken,
bootstrap_dialinfos: Vec<DialInfo>,
) -> EyreResult<()> {
let mut unord = FuturesUnordered::new();
let network_manager = self.network_manager();
//#[instrument(level = "trace", skip(self), err)]
pub(crate) fn bootstrap_with_peer(self, crypto_kinds: Vec<CryptoKind>, pi: PeerInfo, unord: &FuturesUnordered<SendPinBoxFuture<()>>) {
for bootstrap_di in bootstrap_dialinfos {
log_rtab!(debug "direct bootstrap with: {}", bootstrap_di);
let peer_info = network_manager.boot_request(bootstrap_di).await?;
log_rtab!(debug " direct bootstrap peerinfo: {:?}", peer_info);
// Got peer info, let's add it to the routing table
for pi in peer_info {
// Register the node
let nr = match self.register_node_with_peer_info(
RoutingDomain::PublicInternet,
pi,
false,
) {
Ok(nr) => nr,
Err(e) => {
log_rtab!(error "failed to register direct bootstrap peer info: {}", e);
continue;
}
};
// Add this our futures to process in parallel
for crypto_kind in VALID_CRYPTO_KINDS {
let routing_table = self.clone();
let nr = nr.clone();
unord.push(
// lets ask bootstrap to find ourselves now
async move { routing_table.reverse_find_node(crypto_kind, nr, true).await }
.instrument(Span::current()),
);
}
}
}
// Wait for all bootstrap operations to complete before we complete the singlefuture
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
Ok(())
}
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
let bootstrap = self
.unlocked_inner
.with_config(|c| c.network.routing_table.bootstrap.clone());
// Don't bother if bootstraps aren't configured
if bootstrap.is_empty() {
return Ok(());
}
log_rtab!(debug "--- bootstrap_task");
// Get counts by crypto kind
let entry_count = self.inner.read().cached_entry_counts();
// See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism
let mut bootstrap_dialinfos = Vec::<DialInfo>::new();
for b in &bootstrap {
if let Ok(bootstrap_di_vec) = DialInfo::try_vec_from_url(b) {
for bootstrap_di in bootstrap_di_vec {
bootstrap_dialinfos.push(bootstrap_di);
}
}
}
if !bootstrap_dialinfos.is_empty() {
return self
.direct_bootstrap_task_routine(stop_token, bootstrap_dialinfos)
.await;
}
// If not direct, resolve bootstrap servers and recurse their TXT entries
let bsrecs = self.resolve_bootstrap(bootstrap).await?;
// Run all bootstrap operations concurrently
let mut unord = FuturesUnordered::new();
for bsrec in bsrecs {
log_rtab!(
"--- bootstrapping {} with {:?}",
&bsrec.node_ids,
&bsrec.dial_info_details
pi.node_ids(),
pi.signed_node_info().node_info().dial_info_detail_list()
);
// Get crypto support from list of node ids
let crypto_support = bsrec.node_ids.kinds();
// Make unsigned SignedNodeInfo
let sni =
SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo::new(
NetworkClass::InboundCapable, // Bootstraps are always inbound capable
ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled
AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable
bsrec.envelope_support, // Envelope support is as specified in the bootstrap list
crypto_support, // Crypto support is derived from list of node ids
vec![], // Bootstrap needs no capabilities
bsrec.dial_info_details, // Dial info is as specified in the bootstrap list
)));
let pi = PeerInfo::new(bsrec.node_ids, sni);
let nr =
match self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, true) {
Ok(nr) => nr,
Err(e) => {
log_rtab!(error "failed to register bootstrap peer info: {}", e);
continue;
return;
}
};
// Add this our futures to process in parallel
for crypto_kind in VALID_CRYPTO_KINDS {
// Do we need to bootstrap this crypto kind?
let eckey = (RoutingDomain::PublicInternet, crypto_kind);
let cnt = entry_count.get(&eckey).copied().unwrap_or_default();
if cnt != 0 {
continue;
}
for crypto_kind in crypto_kinds {
// Bootstrap this crypto kind
let nr = nr.clone();
let routing_table = self.clone();
unord.push(
unord.push(Box::pin(
async move {
// Get what contact method would be used for contacting the bootstrap
let bsdi = match routing_table
@ -417,12 +311,111 @@ impl RoutingTable {
}
}
.instrument(Span::current()),
);
));
}
}
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn bootstrap_with_peer_list(self, peers: Vec<PeerInfo>, stop_token: StopToken) -> EyreResult<()> {
log_rtab!(debug " bootstrapped peers: {:?}", &peers);
// Get crypto kinds to bootstrap
let crypto_kinds = self.get_bootstrap_crypto_kinds();
log_rtab!(debug " bootstrapped crypto kinds: {:?}", &crypto_kinds);
// Run all bootstrap operations concurrently
let mut unord = FuturesUnordered::<SendPinBoxFuture<()>>::new();
for peer in peers {
self.clone().bootstrap_with_peer(crypto_kinds.clone(), peer, &unord);
}
// Wait for all bootstrap operations to complete before we complete the singlefuture
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
Ok(())
}
// Get counts by crypto kind and figure out which crypto kinds need bootstrapping
fn get_bootstrap_crypto_kinds(&self) -> Vec<CryptoKind> {
let entry_count = self.inner.read().cached_entry_counts();
let mut crypto_kinds = Vec::new();
for crypto_kind in VALID_CRYPTO_KINDS {
// Do we need to bootstrap this crypto kind?
let eckey = (RoutingDomain::PublicInternet, crypto_kind);
let cnt = entry_count.get(&eckey).copied().unwrap_or_default();
if cnt == 0 {
crypto_kinds.push(crypto_kind);
}
}
crypto_kinds
}
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
let bootstrap = self
.unlocked_inner
.with_config(|c| c.network.routing_table.bootstrap.clone());
// Don't bother if bootstraps aren't configured
if bootstrap.is_empty() {
return Ok(());
}
log_rtab!(debug "--- bootstrap_task");
// See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism
let mut bootstrap_dialinfos = Vec::<DialInfo>::new();
for b in &bootstrap {
if let Ok(bootstrap_di_vec) = DialInfo::try_vec_from_url(b) {
for bootstrap_di in bootstrap_di_vec {
bootstrap_dialinfos.push(bootstrap_di);
}
}
}
// Get a peer list from bootstrap to process
let peers = if !bootstrap_dialinfos.is_empty() {
// Direct bootstrap
let network_manager = self.network_manager();
let mut peer_map = HashMap::<TypedKeyGroup, PeerInfo>::new();
for bootstrap_di in bootstrap_dialinfos {
log_rtab!(debug "direct bootstrap with: {}", bootstrap_di);
let peers = network_manager.boot_request(bootstrap_di).await?;
for peer in peers {
if !peer_map.contains_key(peer.node_ids()) {
peer_map.insert(peer.node_ids().clone(), peer);
}
}
}
peer_map.into_values().collect()
} else {
// If not direct, resolve bootstrap servers and recurse their TXT entries
let bsrecs = self.resolve_bootstrap(bootstrap).await?;
let peers : Vec<PeerInfo> = bsrecs.into_iter().map(|bsrec| {
// Get crypto support from list of node ids
let crypto_support = bsrec.node_ids.kinds();
// Make unsigned SignedNodeInfo
let sni =
SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo::new(
NetworkClass::InboundCapable, // Bootstraps are always inbound capable
ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled
AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable
bsrec.envelope_support, // Envelope support is as specified in the bootstrap list
crypto_support, // Crypto support is derived from list of node ids
vec![], // Bootstrap needs no capabilities
bsrec.dial_info_details, // Dial info is as specified in the bootstrap list
)));
PeerInfo::new(bsrec.node_ids, sni)
}).collect();
peers
};
self.clone().bootstrap_with_peer_list(peers, stop_token).await
}
}

View File

@ -181,10 +181,10 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
}
"network.connection_initial_timeout_ms" => Ok(Box::new(2_000u32)),
"network.connection_inactivity_timeout_ms" => Ok(Box::new(60_000u32)),
"network.max_connections_per_ip4" => Ok(Box::new(8u32)),
"network.max_connections_per_ip6_prefix" => Ok(Box::new(8u32)),
"network.max_connections_per_ip4" => Ok(Box::new(32u32)),
"network.max_connections_per_ip6_prefix" => Ok(Box::new(32u32)),
"network.max_connections_per_ip6_prefix_size" => Ok(Box::new(56u32)),
"network.max_connection_frequency_per_min" => Ok(Box::new(8u32)),
"network.max_connection_frequency_per_min" => Ok(Box::new(128u32)),
"network.client_whitelist_timeout_ms" => Ok(Box::new(300_000u32)),
"network.reverse_connection_receipt_time_ms" => Ok(Box::new(5_000u32)),
"network.hole_punch_receipt_time_ms" => Ok(Box::new(5_000u32)),
@ -203,7 +203,7 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
"network.routing_table.limit_attached_strong" => Ok(Box::new(16u32)),
"network.routing_table.limit_attached_good" => Ok(Box::new(8u32)),
"network.routing_table.limit_attached_weak" => Ok(Box::new(4u32)),
"network.rpc.concurrency" => Ok(Box::new(2u32)),
"network.rpc.concurrency" => Ok(Box::new(0u32)),
"network.rpc.queue_size" => Ok(Box::new(1024u32)),
"network.rpc.max_timestamp_behind_ms" => Ok(Box::new(Some(10_000u32))),
"network.rpc.max_timestamp_ahead_ms" => Ok(Box::new(Some(10_000u32))),
@ -222,7 +222,7 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
"network.dht.set_value_fanout" => Ok(Box::new(4u32)),
"network.dht.min_peer_count" => Ok(Box::new(20u32)),
"network.dht.min_peer_refresh_time_ms" => Ok(Box::new(60_000u32)),
"network.dht.validate_dial_info_receipt_time_ms" => Ok(Box::new(5_000u32)),
"network.dht.validate_dial_info_receipt_time_ms" => Ok(Box::new(2_000u32)),
"network.dht.local_subkey_cache_size" => Ok(Box::new(128u32)),
"network.dht.local_max_subkey_cache_memory_mb" => Ok(Box::new(256u32)),
"network.dht.remote_subkey_cache_size" => Ok(Box::new(1024u32)),
@ -231,7 +231,7 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
"network.dht.remote_max_storage_space_mb" => Ok(Box::new(64u32)),
"network.upnp" => Ok(Box::new(false)),
"network.detect_address_changes" => Ok(Box::new(true)),
"network.restricted_nat_retries" => Ok(Box::new(3u32)),
"network.restricted_nat_retries" => Ok(Box::new(0u32)),
"network.tls.certificate_path" => Ok(Box::new(get_certfile_path())),
"network.tls.private_key_path" => Ok(Box::new(get_keyfile_path())),
"network.tls.connection_initial_timeout_ms" => Ok(Box::new(2_000u32)),
@ -244,7 +244,7 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
"network.application.http.path" => Ok(Box::new(String::from("app"))),
"network.application.http.url" => Ok(Box::new(Option::<String>::None)),
"network.protocol.udp.enabled" => Ok(Box::new(true)),
"network.protocol.udp.socket_pool_size" => Ok(Box::new(16u32)),
"network.protocol.udp.socket_pool_size" => Ok(Box::new(0u32)),
"network.protocol.udp.listen_address" => Ok(Box::new("".to_owned())),
"network.protocol.udp.public_address" => Ok(Box::new(Option::<String>::None)),
"network.protocol.tcp.connect" => Ok(Box::new(true)),
@ -252,15 +252,15 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
"network.protocol.tcp.max_connections" => Ok(Box::new(32u32)),
"network.protocol.tcp.listen_address" => Ok(Box::new("".to_owned())),
"network.protocol.tcp.public_address" => Ok(Box::new(Option::<String>::None)),
"network.protocol.ws.connect" => Ok(Box::new(false)),
"network.protocol.ws.listen" => Ok(Box::new(false)),
"network.protocol.ws.max_connections" => Ok(Box::new(16u32)),
"network.protocol.ws.connect" => Ok(Box::new(true)),
"network.protocol.ws.listen" => Ok(Box::new(true)),
"network.protocol.ws.max_connections" => Ok(Box::new(32u32)),
"network.protocol.ws.listen_address" => Ok(Box::new("".to_owned())),
"network.protocol.ws.path" => Ok(Box::new(String::from("ws"))),
"network.protocol.ws.url" => Ok(Box::new(Option::<String>::None)),
"network.protocol.wss.connect" => Ok(Box::new(false)),
"network.protocol.wss.connect" => Ok(Box::new(true)),
"network.protocol.wss.listen" => Ok(Box::new(false)),
"network.protocol.wss.max_connections" => Ok(Box::new(16u32)),
"network.protocol.wss.max_connections" => Ok(Box::new(32u32)),
"network.protocol.wss.listen_address" => Ok(Box::new("".to_owned())),
"network.protocol.wss.path" => Ok(Box::new(String::from("ws"))),
"network.protocol.wss.url" => Ok(Box::new(Option::<String>::None)),
@ -316,15 +316,15 @@ pub async fn test_config() {
);
assert_eq!(inner.network.connection_initial_timeout_ms, 2_000u32);
assert_eq!(inner.network.connection_inactivity_timeout_ms, 60_000u32);
assert_eq!(inner.network.max_connections_per_ip4, 8u32);
assert_eq!(inner.network.max_connections_per_ip6_prefix, 8u32);
assert_eq!(inner.network.max_connections_per_ip4, 32u32);
assert_eq!(inner.network.max_connections_per_ip6_prefix, 32u32);
assert_eq!(inner.network.max_connections_per_ip6_prefix_size, 56u32);
assert_eq!(inner.network.max_connection_frequency_per_min, 8u32);
assert_eq!(inner.network.max_connection_frequency_per_min, 128u32);
assert_eq!(inner.network.client_whitelist_timeout_ms, 300_000u32);
assert_eq!(inner.network.reverse_connection_receipt_time_ms, 5_000u32);
assert_eq!(inner.network.hole_punch_receipt_time_ms, 5_000u32);
assert_eq!(inner.network.network_key_password, Option::<String>::None);
assert_eq!(inner.network.rpc.concurrency, 2u32);
assert_eq!(inner.network.rpc.concurrency, 0u32);
assert_eq!(inner.network.rpc.queue_size, 1024u32);
assert_eq!(inner.network.rpc.timeout_ms, 5_000u32);
assert_eq!(inner.network.rpc.max_route_hop_count, 4u8);
@ -366,7 +366,7 @@ pub async fn test_config() {
assert!(!inner.network.upnp);
assert!(inner.network.detect_address_changes);
assert_eq!(inner.network.restricted_nat_retries, 3u32);
assert_eq!(inner.network.restricted_nat_retries, 0u32);
assert_eq!(inner.network.tls.certificate_path, get_certfile_path());
assert_eq!(inner.network.tls.private_key_path, get_keyfile_path());
assert_eq!(inner.network.tls.connection_initial_timeout_ms, 2_000u32);
@ -379,6 +379,7 @@ pub async fn test_config() {
assert_eq!(inner.network.application.http.listen_address, "");
assert_eq!(inner.network.application.http.path, "app");
assert_eq!(inner.network.application.http.url, None);
assert!(inner.network.protocol.udp.enabled);
assert_eq!(inner.network.protocol.udp.socket_pool_size, 16u32);
assert_eq!(inner.network.protocol.udp.listen_address, "");
@ -388,15 +389,15 @@ pub async fn test_config() {
assert_eq!(inner.network.protocol.tcp.max_connections, 32u32);
assert_eq!(inner.network.protocol.tcp.listen_address, "");
assert_eq!(inner.network.protocol.tcp.public_address, None);
assert!(!inner.network.protocol.ws.connect);
assert!(!inner.network.protocol.ws.listen);
assert_eq!(inner.network.protocol.ws.max_connections, 16u32);
assert!(inner.network.protocol.ws.connect);
assert!(inner.network.protocol.ws.listen);
assert_eq!(inner.network.protocol.ws.max_connections, 32u32);
assert_eq!(inner.network.protocol.ws.listen_address, "");
assert_eq!(inner.network.protocol.ws.path, "ws");
assert_eq!(inner.network.protocol.ws.url, None);
assert!(!inner.network.protocol.wss.connect);
assert!(inner.network.protocol.wss.connect);
assert!(!inner.network.protocol.wss.listen);
assert_eq!(inner.network.protocol.wss.max_connections, 16u32);
assert_eq!(inner.network.protocol.wss.max_connections, 32u32);
assert_eq!(inner.network.protocol.wss.listen_address, "");
assert_eq!(inner.network.protocol.wss.path, "ws");
assert_eq!(inner.network.protocol.wss.url, None);

View File

@ -2,9 +2,9 @@
#![cfg(target_arch = "wasm32")]
#![recursion_limit = "256"]
use cfg_if::*;
use parking_lot::Once;
use serial_test::serial;
use tracing::*;
use veilid_core::tests::*;
use wasm_bindgen_test::*;
@ -18,17 +18,12 @@ static SETUP_ONCE: Once = Once::new();
pub fn setup() -> () {
SETUP_ONCE.call_once(|| {
console_error_panic_hook::set_once();
cfg_if! {
if #[cfg(feature = "tracing")] {
let mut builder = tracing_wasm::WASMLayerConfigBuilder::new();
builder.set_report_logs_in_timings(false);
builder.set_max_level(Level::TRACE);
builder.set_console_config(tracing_wasm::ConsoleConfig::ReportWithConsoleColor);
builder.set_max_level(Level::DEBUG);
builder.set_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
tracing_wasm::set_as_global_default_with_config(builder.build());
} else {
wasm_logger::init(wasm_logger::Config::default());
}
}
});
}

View File

@ -87,15 +87,7 @@ pub fn system_boxed<'a, Out>(
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
// xxx: for now until wasm threads are more stable, and/or we bother with web workers
pub fn get_concurrency() -> u32 {
1
}
} else {
if #[cfg(not(target_arch = "wasm32"))] {
pub fn get_concurrency() -> u32 {
std::thread::available_parallelism()
.map(|x| x.get())
@ -104,7 +96,6 @@ cfg_if! {
1
}) as u32
}
}
}

View File

@ -1,7 +1,7 @@
#![cfg(target_arch = "wasm32")]
use super::*;
use core::sync::atomic::{AtomicI8, Ordering};
use core::sync::atomic::{AtomicI8, AtomicU32, Ordering};
use js_sys::{global, Reflect};
use wasm_bindgen::prelude::*;
@ -82,3 +82,19 @@ pub fn is_ipv6_supported() -> bool {
*opt_supp = Some(supp);
supp
}
pub fn get_concurrency() -> u32 {
static CACHE: AtomicU32 = AtomicU32::new(0);
let cache = CACHE.load(Ordering::Acquire);
if cache != 0 {
return cache;
}
let res = js_sys::eval("navigator.hardwareConcurrency")
.map(|res| res.as_f64().unwrap_or(1.0f64) as u32)
.unwrap_or(1);
CACHE.store(res, Ordering::Release);
res
}

View File

@ -22,7 +22,7 @@ pub fn setup() -> () {
let mut builder = tracing_wasm::WASMLayerConfigBuilder::new();
builder.set_report_logs_in_timings(false);
builder.set_max_level(Level::TRACE);
builder.set_console_config(tracing_wasm::ConsoleConfig::ReportWithConsoleColor);
builder.set_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
tracing_wasm::set_as_global_default_with_config(builder.build());
} else {
wasm_logger::init(wasm_logger::Config::default());