Merge branch 'startup-lock' into 'main'

Fix startup/shutdown/attach/detach

See merge request veilid/veilid!302
This commit is contained in:
Christien Rioux 2024-07-22 14:29:44 +00:00
commit e759e50983
85 changed files with 1556 additions and 459 deletions

212
Cargo.lock generated
View File

@ -243,9 +243,9 @@ dependencies = [
[[package]]
name = "arrayref"
version = "0.3.7"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545"
checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a"
[[package]]
name = "arrayvec"
@ -470,7 +470,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -500,7 +500,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -745,7 +745,7 @@ checksum = "e0b121a9fe0df916e362fb3271088d071159cdf11db0e4182d02152850756eff"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -831,6 +831,12 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "byteorder-lite"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
[[package]]
name = "bytes"
version = "1.6.1"
@ -857,9 +863,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.1.5"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052"
checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f"
[[package]]
name = "cesu8"
@ -1016,7 +1022,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1375,7 +1381,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f"
dependencies = [
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1488,7 +1494,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1544,7 +1550,7 @@ dependencies = [
"ident_case",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1566,7 +1572,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core 0.20.10",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1733,7 +1739,7 @@ dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1753,7 +1759,7 @@ checksum = "f282cfdfe92516eb26c2af8589c274c7c17681f5ecc03c18255fe741c6aa64eb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1796,7 +1802,7 @@ dependencies = [
"darling 0.20.10",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1958,6 +1964,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flate2"
version = "1.0.30"
@ -2028,7 +2040,7 @@ checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -2140,7 +2152,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -2680,12 +2692,12 @@ dependencies = [
[[package]]
name = "image"
version = "0.25.1"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd54d660e773627692c524beaad361aca785a4f9f5730ce91f42aabe5bce3d11"
checksum = "99314c8a2152b8ddb211f924cdae532d8c5e4c8bb54728e12fff1b0cd5963a10"
dependencies = [
"bytemuck",
"byteorder",
"byteorder-lite",
"num-traits",
"png",
"tiff",
@ -2969,9 +2981,9 @@ dependencies = [
[[package]]
name = "libloading"
version = "0.8.4"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e310b3a6b5907f99202fcdb4960ff45b93735d7c7d96b760fcff8db2dc0e103d"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if 1.0.0",
"windows-targets 0.52.6",
@ -3147,6 +3159,12 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "multimap"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
[[package]]
name = "nanorand"
version = "0.7.0"
@ -4005,7 +4023,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4019,6 +4037,16 @@ dependencies = [
"sha2 0.10.8",
]
[[package]]
name = "petgraph"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset",
"indexmap 2.2.6",
]
[[package]]
name = "pharos"
version = "0.5.3"
@ -4046,7 +4074,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4145,9 +4173,9 @@ dependencies = [
[[package]]
name = "portable-atomic"
version = "1.6.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0"
checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265"
[[package]]
name = "portable-atomic-util"
@ -4170,6 +4198,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e"
dependencies = [
"proc-macro2",
"syn 2.0.72",
]
[[package]]
name = "proc-macro-crate"
version = "0.1.5"
@ -4218,6 +4256,27 @@ dependencies = [
"prost-derive 0.12.6",
]
[[package]]
name = "prost-build"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck 0.5.0",
"itertools 0.12.1",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost 0.12.6",
"prost-types",
"regex",
"syn 2.0.72",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.11.9"
@ -4241,7 +4300,7 @@ dependencies = [
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4259,6 +4318,15 @@ version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "protobuf-src"
version = "2.0.1+26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ba1cfa4b9dc098926b8cce388bf434b93516db3ecf6e8b1a37eb643d733ee7"
dependencies = [
"cmake",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@ -4692,7 +4760,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals 0.29.1",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4719,9 +4787,9 @@ dependencies = [
[[package]]
name = "sdd"
version = "1.6.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eb0dde0ccd15e337a3cf738a9a38115c6d8e74795d074e73973dad3d229a897"
checksum = "85f05a494052771fc5bd0619742363b5e24e5ad72ab3111ec2e27925b8edc5f3"
[[package]]
name = "secret-service"
@ -4854,7 +4922,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4865,7 +4933,7 @@ checksum = "e578a843d40b4189a4d66bba51d7684f57da5bd7c304c64e14bd63efbef49509"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4876,7 +4944,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4898,7 +4966,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4959,7 +5027,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4970,7 +5038,7 @@ checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -5222,9 +5290,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.71"
version = "2.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462"
checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af"
dependencies = [
"proc-macro2",
"quote",
@ -5252,6 +5320,18 @@ dependencies = [
"windows 0.52.0",
]
[[package]]
name = "tempfile"
version = "3.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1"
dependencies = [
"cfg-if 1.0.0",
"fastrand 2.1.0",
"rustix 0.38.34",
"windows-sys 0.52.0",
]
[[package]]
name = "termcolor"
version = "1.4.1"
@ -5307,7 +5387,17 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
name = "thread-id"
version = "4.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe8f25bbdd100db7e1d34acf7fd2dc59c4bf8f7483f505eaa7d4f12f76cc0ea"
dependencies = [
"libc",
"winapi",
]
[[package]]
@ -5426,7 +5516,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -5505,7 +5595,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.6.13",
"winnow 0.6.14",
]
[[package]]
@ -5627,7 +5717,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -5744,6 +5834,24 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-perfetto"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd21777b526dfcb57f11f65aa8a2024d83e1db52841993229b6e282e511978b7"
dependencies = [
"anyhow",
"bytes",
"chrono",
"prost 0.12.6",
"prost-build",
"protobuf-src",
"rand",
"thread-id",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.18"
@ -5812,7 +5920,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals 0.28.0",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -6034,7 +6142,6 @@ version = "0.3.3"
dependencies = [
"argon2",
"async-io 1.13.0",
"async-lock 2.8.0",
"async-std",
"async-std-resolver",
"async-tls",
@ -6234,6 +6341,7 @@ dependencies = [
"tracing-flame",
"tracing-journald",
"tracing-opentelemetry 0.24.0",
"tracing-perfetto",
"tracing-subscriber",
"url",
"veilid-bugsalot",
@ -6247,7 +6355,7 @@ name = "veilid-tools"
version = "0.3.3"
dependencies = [
"android_logger 0.13.3",
"async-lock 2.8.0",
"async-lock 3.4.0",
"async-std",
"async_executors",
"backtrace",
@ -6388,7 +6496,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
"wasm-bindgen-shared",
]
@ -6422,7 +6530,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -6455,7 +6563,7 @@ checksum = "b7f89739351a2e03cb94beb799d47fb2cac01759b40ec441f7de39b00cbf7ef0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -6875,9 +6983,9 @@ dependencies = [
[[package]]
name = "winnow"
version = "0.6.13"
version = "0.6.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1"
checksum = "374ec40a2d767a3c1b4972d9475ecd557356637be906f2cb3f7fe17a6eb5e22f"
dependencies = [
"memchr",
]
@ -7022,7 +7130,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -7042,7 +7150,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]

View File

@ -16,6 +16,8 @@ VERSION 0.7
FROM ubuntu:18.04
ENV ZIG_VERSION=0.13.0-dev.46+3648d7df1
ENV CMAKE_VERSION_MINOR=3.30
ENV CMAKE_VERSION_PATCH=3.30.1
ENV RUSTUP_HOME=/usr/local/rustup
ENV RUSTUP_DIST_SERVER=https://static.rust-lang.org
ENV CARGO_HOME=/usr/local/cargo
@ -28,7 +30,11 @@ WORKDIR /veilid
# Install build prerequisites & setup required directories
deps-base:
RUN apt-get -y update
RUN apt-get install -y iproute2 curl build-essential cmake libssl-dev openssl file git pkg-config libdbus-1-dev libdbus-glib-1-dev libgirepository1.0-dev libcairo2-dev checkinstall unzip libncursesw5-dev libncurses5-dev
RUN apt-get install -y iproute2 curl build-essential libssl-dev openssl file git pkg-config libdbus-1-dev libdbus-glib-1-dev libgirepository1.0-dev libcairo2-dev checkinstall unzip libncursesw5-dev libncurses5-dev
RUN curl -O https://cmake.org/files/v$CMAKE_VERSION_MINOR/cmake-$CMAKE_VERSION_PATCH-linux-$(arch).sh
RUN mkdir /opt/cmake
RUN sh cmake-$CMAKE_VERSION_PATCH-linux-$(arch).sh --skip-license --prefix=/opt/cmake
RUN ln -s /opt/cmake/bin/cmake /usr/local/bin/cmake
# Install Rust
deps-rust:

View File

@ -22,12 +22,12 @@ rt-async-std = [
rt-tokio = ["tokio", "tokio-util", "veilid-tools/rt-tokio", "cursive/rt-tokio"]
[dependencies]
async-std = { version = "^1.12", features = [
async-std = { version = "1.12.0", features = [
"unstable",
"attributes",
], optional = true }
tokio = { version = "^1", features = ["full"], optional = true }
tokio-util = { version = "^0", features = ["compat"], optional = true }
tokio = { version = "1.38.1", features = ["full", "tracing"], optional = true }
tokio-util = { version = "0.7.11", features = ["compat"], optional = true }
async-tungstenite = { version = "^0.23" }
cursive = { git = "https://gitlab.com/veilid/cursive.git", default-features = false, features = [
"crossterm",

View File

@ -187,7 +187,7 @@ impl ClientApiConnection {
// Request initial server state
let capi = self.clone();
spawn_detached_local(async move {
spawn_detached_local("get initial server state", async move {
let mut req = json::JsonValue::new_object();
req["op"] = "GetState".into();
let Some(resp) = capi.perform_request(req).await else {

View File

@ -114,7 +114,7 @@ impl CommandProcessor {
trace!("CommandProcessor::cmd_help");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd help", async move {
let out = match capi.server_debug("help".to_owned()).await {
Err(e) => {
error!("Server command 'debug help' failed: {}", e);
@ -166,7 +166,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_shutdown");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd shutdown", async move {
if let Err(e) = capi.server_shutdown().await {
error!("Server command 'shutdown' failed to execute: {}", e);
}
@ -179,7 +179,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_disconnect");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd disconnect", async move {
capi.disconnect().await;
ui.send_callback(callback);
});
@ -190,7 +190,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_debug");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd debug", async move {
match capi.server_debug(command_line).await {
Ok(output) => {
ui.add_node_event(Level::Info, &output);
@ -213,7 +213,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_change_log_level");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd change_log_level", async move {
let (layer, rest) = Self::word_split(&rest.unwrap_or_default());
let log_level = match convert_loglevel(&rest.unwrap_or_default()) {
Ok(v) => v,
@ -252,7 +252,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_change_log_ignore");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd change_log_ignoe", async move {
let (layer, rest) = Self::word_split(&rest.unwrap_or_default());
let log_ignore = rest.unwrap_or_default();
@ -284,7 +284,7 @@ Server Debug Commands:
let ui = self.ui_sender();
let this = self.clone();
spawn_detached_local(async move {
spawn_detached_local("cmd enable", async move {
let flag = rest.clone().unwrap_or_default();
match flag.as_str() {
"app_messages" => {
@ -306,7 +306,7 @@ Server Debug Commands:
let ui = self.ui_sender();
let this = self.clone();
spawn_detached_local(async move {
spawn_detached_local("cmd disable", async move {
let flag = rest.clone().unwrap_or_default();
match flag.as_str() {
"app_messages" => {
@ -664,7 +664,7 @@ Server Debug Commands:
pub fn attach(&self) {
let capi = self.capi();
spawn_detached_local(async move {
spawn_detached_local("attach", async move {
if let Err(e) = capi.server_attach().await {
error!("Server command 'attach' failed to execute: {}", e);
}
@ -674,7 +674,7 @@ Server Debug Commands:
pub fn detach(&self) {
let capi = self.capi();
spawn_detached_local(async move {
spawn_detached_local("detach", async move {
if let Err(e) = capi.server_detach().await {
error!("Server command 'detach' failed to execute: {}", e);
}

View File

@ -86,6 +86,7 @@ impl LogViewerUI {
done.await;
} else {
while let Ok(Ok(c)) = blocking_wrapper(
"LogViewerUI read",
{
let term = term.clone();
move || term.read_char()

View File

@ -51,6 +51,7 @@ crypto-test = ["enable-crypto-vld0", "enable-crypto-none"]
crypto-test-none = ["enable-crypto-none"]
veilid_core_android_tests = ["dep:paranoid-android"]
veilid_core_ios_tests = ["dep:tracing-oslog"]
debug-locks = ["veilid-tools/debug-locks"]
### DEPENDENCIES
@ -191,7 +192,6 @@ async_executors = { version = "0.7.0", default-features = false, features = [
"bindgen",
"timer",
] }
async-lock = "2.8.0"
wasm-bindgen = "0.2.92"
js-sys = "0.3.69"
wasm-bindgen-futures = "0.4.42"

View File

@ -211,7 +211,7 @@ impl AttachmentManager {
}
}
#[instrument(level = "debug", skip_all)]
#[instrument(parent = None, level = "debug", skip_all)]
async fn attachment_maintainer(self) {
log_net!(debug "attachment starting");
self.update_attaching_detaching_state(AttachmentState::Attaching);
@ -323,7 +323,10 @@ impl AttachmentManager {
return false;
}
inner.maintain_peers = true;
inner.attachment_maintainer_jh = Some(spawn(self.clone().attachment_maintainer()));
inner.attachment_maintainer_jh = Some(spawn(
"attachment maintainer",
self.clone().attachment_maintainer(),
));
true
}

View File

@ -176,7 +176,7 @@ impl Crypto {
// Schedule flushing
let this = self.clone();
let flush_future = interval(60000, move || {
let flush_future = interval("crypto flush", 60000, move || {
let this = this.clone();
async move {
if let Err(e) = this.flush().await {

View File

@ -337,6 +337,7 @@ impl AddressFilter {
.or_insert(punishment);
}
#[instrument(parent = None, level = "trace", skip_all, err)]
pub async fn address_filter_task_routine(
self,
_stop_token: StopToken,

View File

@ -53,6 +53,7 @@ struct ConnectionManagerArc {
connection_inactivity_timeout_ms: u32,
connection_table: ConnectionTable,
address_lock_table: AsyncTagLockTable<SocketAddr>,
startup_lock: StartupLock,
inner: Mutex<Option<ConnectionManagerInner>>,
}
impl core::fmt::Debug for ConnectionManagerArc {
@ -98,6 +99,7 @@ impl ConnectionManager {
connection_inactivity_timeout_ms,
connection_table: ConnectionTable::new(config, address_filter),
address_lock_table: AsyncTagLockTable::new(),
startup_lock: StartupLock::new(),
inner: Mutex::new(None),
}
}
@ -115,8 +117,11 @@ impl ConnectionManager {
self.arc.connection_inactivity_timeout_ms
}
pub async fn startup(&self) {
pub async fn startup(&self) -> EyreResult<()> {
let guard = self.arc.startup_lock.startup()?;
log_net!(debug "startup connection manager");
let mut inner = self.arc.inner.lock();
if inner.is_some() {
panic!("shouldn't start connection manager twice without shutting it down first");
@ -129,14 +134,26 @@ impl ConnectionManager {
let stop_source = StopSource::new();
// Spawn the async processor
let async_processor = spawn(self.clone().async_processor(stop_source.token(), receiver));
let async_processor = spawn(
"connection manager async processor",
self.clone().async_processor(stop_source.token(), receiver),
);
// Store in the inner object
*inner = Some(Self::new_inner(stop_source, sender, async_processor));
guard.success();
Ok(())
}
pub async fn shutdown(&self) {
log_net!(debug "starting connection manager shutdown");
let Ok(guard) = self.arc.startup_lock.shutdown().await else {
log_net!(debug "connection manager is already shut down");
return;
};
// Remove the inner from the lock
let mut inner = {
let mut inner_lock = self.arc.inner.lock();
@ -158,6 +175,8 @@ impl ConnectionManager {
// Wait for the connections to complete
log_net!(debug "waiting for connection handlers to complete");
self.arc.connection_table.join().await;
guard.success();
log_net!(debug "finished connection manager shutdown");
}
@ -263,6 +282,9 @@ impl ConnectionManager {
// Returns a network connection if one already is established
pub fn get_connection(&self, flow: Flow) -> Option<ConnectionHandle> {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return None;
};
self.arc.connection_table.peek_connection_by_flow(flow)
}
@ -276,6 +298,9 @@ impl ConnectionManager {
self.arc.connection_table.ref_connection_by_id(id, kind)
}
pub fn try_connection_ref_scope(&self, id: NetworkConnectionId) -> Option<ConnectionRefScope> {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return None;
};
ConnectionRefScope::try_new(self.clone(), id)
}
@ -288,6 +313,11 @@ impl ConnectionManager {
&self,
dial_info: DialInfo,
) -> EyreResult<NetworkResult<ConnectionHandle>> {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return Ok(NetworkResult::service_unavailable(
"connection manager is not started",
));
};
let peer_address = dial_info.peer_address();
let remote_addr = peer_address.socket_addr();
let mut preferred_local_address = self
@ -387,6 +417,10 @@ impl ConnectionManager {
if !allow_accept {
return;
}
let Ok(_guard) = self.arc.startup_lock.enter() else {
return;
};
// Async lock on the remote address for atomicity per remote
let _lock_guard = self
.arc

View File

@ -139,6 +139,7 @@ enum SendDataToExistingFlowResult {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StartupDisposition {
Success,
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
BindRetry,
}
@ -172,6 +173,8 @@ struct NetworkManagerUnlockedInner {
address_filter_task: TickTask<EyreReport>,
// Network Key
network_key: Option<SharedSecret>,
// Startup Lock
startup_lock: StartupLock,
}
#[derive(Clone)]
@ -209,10 +212,20 @@ impl NetworkManager {
routing_table: RwLock::new(None),
components: RwLock::new(None),
update_callback: RwLock::new(None),
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS),
address_filter_task: TickTask::new(ADDRESS_FILTER_TASK_INTERVAL_SECS),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
public_address_check_task: TickTask::new(
"public_address_check_task",
PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS,
),
address_filter_task: TickTask::new(
"address_filter_task",
ADDRESS_FILTER_TASK_INTERVAL_SECS,
),
network_key,
startup_lock: StartupLock::new(),
}
}
@ -428,7 +441,7 @@ impl NetworkManager {
});
// Start network components
connection_manager.startup().await;
connection_manager.startup().await?;
match net.startup().await? {
StartupDisposition::Success => {}
StartupDisposition::BindRetry => {
@ -445,27 +458,30 @@ impl NetworkManager {
#[instrument(level = "debug", skip_all, err)]
pub async fn startup(&self) -> EyreResult<StartupDisposition> {
let guard = self.unlocked_inner.startup_lock.startup()?;
match self.internal_startup().await {
Ok(StartupDisposition::Success) => {
guard.success();
// Inform api clients that things have changed
self.send_network_update();
Ok(StartupDisposition::Success)
}
Ok(StartupDisposition::BindRetry) => {
self.shutdown().await;
self.shutdown_internal().await;
Ok(StartupDisposition::BindRetry)
}
Err(e) => {
self.shutdown().await;
self.shutdown_internal().await;
Err(e)
}
}
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "starting network manager shutdown");
async fn shutdown_internal(&self) {
// Cancel all tasks
self.cancel_tasks().await;
@ -487,6 +503,20 @@ impl NetworkManager {
{
*self.inner.lock() = NetworkManager::new_inner();
}
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "starting network manager shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_net!(debug "network manager is already shut down");
return;
};
self.shutdown_internal().await;
guard.success();
// send update
log_net!(debug "sending network state update to api clients");
@ -546,9 +576,7 @@ impl NetworkManager {
}
pub fn network_is_started(&self) -> bool {
self.opt_net()
.and_then(|net| net.is_started())
.unwrap_or(false)
self.opt_net().map(|net| net.is_started()).unwrap_or(false)
}
pub fn generate_node_status(&self, _routing_domain: RoutingDomain) -> NodeStatus {
@ -556,7 +584,7 @@ impl NetworkManager {
}
/// Generates a multi-shot/normal receipt
#[instrument(level = "trace", skip(self, extra_data, callback), err)]
#[instrument(level = "trace", skip(self, extra_data, callback))]
pub fn generate_receipt<D: AsRef<[u8]>>(
&self,
expiration_us: u64,
@ -564,6 +592,9 @@ impl NetworkManager {
extra_data: D,
callback: impl ReceiptCallback,
) -> EyreResult<Vec<u8>> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
bail!("network is not started");
};
let receipt_manager = self.receipt_manager();
let routing_table = self.routing_table();
@ -593,12 +624,16 @@ impl NetworkManager {
}
/// Generates a single-shot/normal receipt
#[instrument(level = "trace", skip(self, extra_data), err)]
#[instrument(level = "trace", skip(self, extra_data))]
pub fn generate_single_shot_receipt<D: AsRef<[u8]>>(
&self,
expiration_us: u64,
extra_data: D,
) -> EyreResult<(Vec<u8>, EventualValueFuture<ReceiptEvent>)> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
bail!("network is not started");
};
let receipt_manager = self.receipt_manager();
let routing_table = self.routing_table();
@ -635,6 +670,10 @@ impl NetworkManager {
&self,
receipt_data: R,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("network is not started");
};
let receipt_manager = self.receipt_manager();
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
@ -656,6 +695,10 @@ impl NetworkManager {
receipt_data: R,
inbound_noderef: NodeRef,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("network is not started");
};
let receipt_manager = self.receipt_manager();
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
@ -676,6 +719,10 @@ impl NetworkManager {
&self,
receipt_data: R,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("network is not started");
};
let receipt_manager = self.receipt_manager();
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
@ -697,6 +744,10 @@ impl NetworkManager {
receipt_data: R,
private_route: PublicKey,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("network is not started");
};
let receipt_manager = self.receipt_manager();
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
@ -712,12 +763,16 @@ impl NetworkManager {
}
// Process a received signal
#[instrument(level = "trace", target = "receipt", skip_all)]
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn handle_signal(
&self,
signal_flow: Flow,
signal_info: SignalInfo,
) -> EyreResult<NetworkResult<()>> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return Ok(NetworkResult::service_unavailable("network is not started"));
};
match signal_info {
SignalInfo::ReverseConnect { receipt, peer_info } => {
let routing_table = self.routing_table();
@ -811,7 +866,7 @@ impl NetworkManager {
}
/// Builds an envelope for sending over the network
#[instrument(level = "trace", target = "receipt", skip_all)]
#[instrument(level = "trace", target = "net", skip_all)]
fn build_envelope<B: AsRef<[u8]>>(
&self,
dest_node_id: TypedKey,
@ -854,13 +909,17 @@ impl NetworkManager {
/// node_ref is the direct destination to which the envelope will be sent
/// If 'destination_node_ref' is specified, it can be different than the node_ref being sent to
/// which will cause the envelope to be relayed
#[instrument(level = "trace", target = "receipt", skip_all)]
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn send_envelope<B: AsRef<[u8]>>(
&self,
node_ref: NodeRef,
destination_node_ref: Option<NodeRef>,
body: B,
) -> EyreResult<NetworkResult<SendDataMethod>> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return Ok(NetworkResult::no_connection_other("network is not started"));
};
let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone();
let best_node_id = destination_node_ref.best_node_id();
@ -898,6 +957,11 @@ impl NetworkManager {
dial_info: DialInfo,
rcpt_data: Vec<u8>,
) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "not sending out-of-band receipt to {} because network is stopped", dial_info);
return Ok(());
};
// Do we need to validate the outgoing receipt? Probably not
// because it is supposed to be opaque and the
// recipient/originator does the validation
@ -918,8 +982,12 @@ impl NetworkManager {
// Called when a packet potentially containing an RPC envelope is received by a low-level
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
// and passes it to the RPC handler
#[instrument(level = "trace", target = "receipt", skip_all)]
#[instrument(level = "trace", target = "net", skip_all)]
async fn on_recv_envelope(&self, data: &mut [u8], flow: Flow) -> EyreResult<bool> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return Ok(false);
};
log_net!("envelope of {} bytes received from {:?}", data.len(), flow);
let remote_addr = flow.remote_address().ip_addr();
@ -1132,7 +1200,13 @@ impl NetworkManager {
source_noderef.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain));
// Pass message to RPC system
rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)?;
if let Err(e) =
rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)
{
// Couldn't enqueue, but not the sender's fault
log_net!(debug "failed to enqueue direct message: {}", e);
return Ok(false);
}
// Inform caller that we dealt with the envelope locally
Ok(true)

View File

@ -262,7 +262,7 @@ impl DiscoveryContext {
// Always process two at a time so we get both addresses in parallel if possible
if unord.len() == 2 {
// Process one
if let Some(Some(ei)) = unord.next().await {
if let Some(Some(ei)) = unord.next().in_current_span().await {
external_address_infos.push(ei);
if external_address_infos.len() == 2 {
break;
@ -272,7 +272,7 @@ impl DiscoveryContext {
}
// Finish whatever is left if we need to
if external_address_infos.len() < 2 {
while let Some(res) = unord.next().await {
while let Some(res) = unord.next().in_current_span().await {
if let Some(ei) = res {
external_address_infos.push(ei);
if external_address_infos.len() == 2 {
@ -644,6 +644,7 @@ impl DiscoveryContext {
}
/// Add discovery futures to an unordered set that may detect dialinfo when they complete
#[instrument(level = "trace", skip(self))]
pub async fn discover(
&self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectionResult>>>,
@ -681,7 +682,7 @@ impl DiscoveryContext {
}
};
if let Some(clear_network_callback) = some_clear_network_callback {
clear_network_callback().await;
clear_network_callback().in_current_span().await;
}
// UPNP Automatic Mapping

View File

@ -191,7 +191,7 @@ impl IGDManager {
mapped_port: u16,
) -> Option<()> {
let this = self.clone();
blocking_wrapper(move || {
blocking_wrapper("igd unmap_port", move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
@ -235,7 +235,7 @@ impl IGDManager {
expected_external_address: Option<IpAddr>,
) -> Option<SocketAddr> {
let this = self.clone();
blocking_wrapper(move || {
blocking_wrapper("igd map_any_port", move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
@ -310,7 +310,7 @@ impl IGDManager {
.await
}
#[instrument(level = "trace", target = "net", skip_all, err)]
#[instrument(level = "trace", target = "net", name = "IGDManager::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<bool> {
// Refresh mappings if we have them
// If an error is received, then return false to restart the local network
@ -340,7 +340,7 @@ impl IGDManager {
}
let this = self.clone();
blocking_wrapper(move || {
blocking_wrapper("igd tick", move || {
let mut inner = this.inner.lock();
// Process full renewals
@ -434,6 +434,6 @@ impl IGDManager {
// Normal exit, no restart
Ok(true)
}, Err(eyre!("failed to process blocking task"))).in_current_span().await
}, Err(eyre!("failed to process blocking task"))).instrument(tracing::trace_span!("igd tick fut")).await
}
}

View File

@ -72,8 +72,6 @@ pub const MAX_CAPABILITIES: usize = 64;
/////////////////////////////////////////////////////////////////
struct NetworkInner {
/// Some(true) if the low-level network is running, Some(false) if it is not, None if it is in transit
network_started: Option<bool>,
/// set if the network needs to be restarted due to a low level configuration change
/// such as dhcp release or change of address or interfaces being added or removed
network_needs_restart: bool,
@ -114,6 +112,9 @@ struct NetworkInner {
}
struct NetworkUnlockedInner {
// Startup lock
startup_lock: StartupLock,
// Accessors
routing_table: RoutingTable,
network_manager: NetworkManager,
@ -139,7 +140,6 @@ pub(in crate::network_manager) struct Network {
impl Network {
fn new_inner() -> NetworkInner {
NetworkInner {
network_started: Some(false),
network_needs_restart: false,
needs_public_dial_info_check: false,
network_already_cleared: false,
@ -168,13 +168,14 @@ impl Network {
) -> NetworkUnlockedInner {
let config = network_manager.config();
NetworkUnlockedInner {
startup_lock: StartupLock::new(),
network_manager,
routing_table,
connection_manager,
interfaces: NetworkInterfaces::new(),
update_network_class_task: TickTask::new(1),
network_interfaces_task: TickTask::new(1),
upnp_task: TickTask::new(1),
update_network_class_task: TickTask::new("update_network_class_task", 1),
network_interfaces_task: TickTask::new("network_interfaces_task", 1),
upnp_task: TickTask::new("upnp_task", 1),
igd_manager: igd_manager::IGDManager::new(config.clone()),
}
}
@ -335,12 +336,12 @@ impl Network {
inner.preferred_local_addresses.get(&key).copied()
}
pub fn is_stable_interface_address(&self, addr: IpAddr) -> bool {
pub(crate) fn is_stable_interface_address(&self, addr: IpAddr) -> bool {
let stable_addrs = self.get_stable_interface_addresses();
stable_addrs.contains(&addr)
}
pub fn get_stable_interface_addresses(&self) -> Vec<IpAddr> {
pub(crate) fn get_stable_interface_addresses(&self) -> Vec<IpAddr> {
let addrs = self.unlocked_inner.interfaces.stable_addresses();
let mut addrs: Vec<IpAddr> = addrs
.into_iter()
@ -377,7 +378,7 @@ impl Network {
////////////////////////////////////////////////////////////
// Record DialInfo failures
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
&self,
dial_info: DialInfo,
fut: F,
@ -401,6 +402,8 @@ impl Network {
dial_info: DialInfo,
data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(
dial_info.clone(),
async move {
@ -476,6 +479,8 @@ impl Network {
data: Vec<u8>,
timeout_ms: u32,
) -> EyreResult<NetworkResult<Vec<u8>>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(
dial_info.clone(),
async move {
@ -513,7 +518,7 @@ impl Network {
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
let (recv_len, recv_addr) = network_result_try!(timeout(
timeout_ms,
h.recv_message(&mut out).instrument(Span::current())
h.recv_message(&mut out).in_current_span()
)
.await
.into_network_result())
@ -564,7 +569,7 @@ impl Network {
let out = network_result_try!(network_result_try!(timeout(
timeout_ms,
pnc.recv()
pnc.recv().in_current_span()
)
.await
.into_network_result())
@ -590,6 +595,8 @@ impl Network {
flow: Flow,
data: Vec<u8>,
) -> EyreResult<SendDataToExistingFlowResult> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
let data_len = data.len();
// Handle connectionless protocol
@ -655,6 +662,8 @@ impl Network {
dial_info: DialInfo,
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(
dial_info.clone(),
async move {
@ -922,22 +931,22 @@ impl Network {
#[instrument(level = "debug", err, skip_all)]
pub async fn startup(&self) -> EyreResult<StartupDisposition> {
self.inner.lock().network_started = None;
let guard = self.unlocked_inner.startup_lock.startup()?;
match self.startup_internal().await {
Ok(StartupDisposition::Success) => {
info!("network started");
self.inner.lock().network_started = Some(true);
guard.success();
Ok(StartupDisposition::Success)
}
Ok(StartupDisposition::BindRetry) => {
debug!("network bind retry");
self.inner.lock().network_started = Some(false);
self.shutdown_internal().await;
Ok(StartupDisposition::BindRetry)
}
Err(e) => {
debug!("network failed to start");
self.inner.lock().network_started = Some(false);
self.shutdown_internal().await;
Err(e)
}
}
@ -947,8 +956,8 @@ impl Network {
self.inner.lock().network_needs_restart
}
pub fn is_started(&self) -> Option<bool> {
self.inner.lock().network_started
pub fn is_started(&self) -> bool {
self.unlocked_inner.startup_lock.is_started()
}
#[instrument(level = "debug", skip_all)]
@ -957,11 +966,7 @@ impl Network {
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "starting low level network shutdown");
self.inner.lock().network_started = None;
async fn shutdown_internal(&self) {
let routing_table = self.routing_table();
// Stop all tasks
@ -1005,7 +1010,19 @@ impl Network {
// Reset state including network class
*self.inner.lock() = Self::new_inner();
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "starting low level network shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_net!(debug "low level network is already shut down");
return;
};
self.shutdown_internal().await;
guard.success();
log_net!(debug "finished low level network shutdown");
}
@ -1014,12 +1031,20 @@ impl Network {
&self,
punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
) {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return;
};
let mut inner = self.inner.lock();
inner.needs_public_dial_info_check = true;
inner.public_dial_info_check_punishment = punishment;
}
pub fn needs_public_dial_info_check(&self) -> bool {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return false;
};
let inner = self.inner.lock();
inner.needs_public_dial_info_check
}
@ -1027,7 +1052,7 @@ impl Network {
//////////////////////////////////////////
#[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn network_interfaces_task_routine(
async fn network_interfaces_task_routine(
self,
_stop_token: StopToken,
_l: u64,
@ -1038,13 +1063,8 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn upnp_task_routine(
self,
_stop_token: StopToken,
_l: u64,
_t: u64,
) -> EyreResult<()> {
#[instrument(parent = None, level = "trace", target = "net", skip_all, err)]
async fn upnp_task_routine(self, _stop_token: StopToken, _l: u64, _t: u64) -> EyreResult<()> {
if !self.unlocked_inner.igd_manager.tick().await? {
info!("upnp failed, restarting local network");
let mut inner = self.inner.lock();
@ -1054,8 +1074,13 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> {
#[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)]
pub(crate) async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return Ok(());
};
let (detect_address_changes, upnp) = {
let config = self.network_manager().config();
let c = config.get();

View File

@ -200,7 +200,12 @@ impl Network {
// Wait for all discovery futures to complete and apply discoverycontexts
let mut all_address_types = AddressTypeSet::new();
loop {
match unord.next().timeout_at(stop_token.clone()).await {
match unord
.next()
.timeout_at(stop_token.clone())
.in_current_span()
.await
{
Ok(Some(Some(dr))) => {
// Found some new dial info for this protocol/address combination
self.update_with_detected_dial_info(dr.ddi.clone()).await?;
@ -277,7 +282,7 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", skip(self), err)]
#[instrument(parent = None, level = "trace", skip(self), err)]
pub async fn update_network_class_task_routine(
self,
stop_token: StopToken,

View File

@ -38,6 +38,7 @@ impl Network {
Ok(acceptor)
}
#[instrument(level = "trace", skip_all)]
async fn try_tls_handlers(
&self,
tls_acceptor: &TlsAcceptor,
@ -60,7 +61,7 @@ impl Network {
// read a chunk of the stream
timeout(
tls_connection_initial_timeout_ms,
ps.peek_exact(&mut first_packet),
ps.peek_exact(&mut first_packet).in_current_span(),
)
.await
.wrap_err("tls initial timeout")?
@ -70,6 +71,7 @@ impl Network {
.await
}
#[instrument(level = "trace", skip_all)]
async fn try_handlers(
&self,
stream: AsyncPeekStream,
@ -90,6 +92,7 @@ impl Network {
Ok(None)
}
#[instrument(level = "trace", skip_all)]
async fn tcp_acceptor(
self,
tcp_stream: io::Result<TcpStream>,
@ -132,33 +135,33 @@ impl Network {
}
};
// #[cfg(all(feature = "rt-async-std", unix))]
// {
// // async-std does not directly support linger on TcpStream yet
// use std::os::fd::{AsRawFd, FromRawFd};
// if let Err(e) = unsafe { socket2::Socket::from_raw_fd(tcp_stream.as_raw_fd()) }
// .set_linger(Some(core::time::Duration::from_secs(0)))
// {
// log_net!(debug "Couldn't set TCP linger: {}", e);
// return;
// }
// }
// #[cfg(all(feature = "rt-async-std", windows))]
// {
// // async-std does not directly support linger on TcpStream yet
// use std::os::windows::io::{AsRawSocket, FromRawSocket};
// if let Err(e) = unsafe { socket2::Socket::from_raw_socket(tcp_stream.as_raw_socket()) }
// .set_linger(Some(core::time::Duration::from_secs(0)))
// {
// log_net!(debug "Couldn't set TCP linger: {}", e);
// return;
// }
// }
// #[cfg(not(feature = "rt-async-std"))]
// if let Err(e) = tcp_stream.set_linger(Some(core::time::Duration::from_secs(0))) {
// log_net!(debug "Couldn't set TCP linger: {}", e);
// return;
// }
#[cfg(all(feature = "rt-async-std", unix))]
{
// async-std does not directly support linger on TcpStream yet
use std::os::fd::{AsRawFd, FromRawFd};
if let Err(e) = unsafe { socket2::Socket::from_raw_fd(tcp_stream.as_raw_fd()) }
.set_linger(Some(core::time::Duration::from_secs(0)))
{
log_net!(debug "Couldn't set TCP linger: {}", e);
return;
}
}
#[cfg(all(feature = "rt-async-std", windows))]
{
// async-std does not directly support linger on TcpStream yet
use std::os::windows::io::{AsRawSocket, FromRawSocket};
if let Err(e) = unsafe { socket2::Socket::from_raw_socket(tcp_stream.as_raw_socket()) }
.set_linger(Some(core::time::Duration::from_secs(0)))
{
log_net!(debug "Couldn't set TCP linger: {}", e);
return;
}
}
#[cfg(not(feature = "rt-async-std"))]
if let Err(e) = tcp_stream.set_linger(Some(core::time::Duration::from_secs(0))) {
log_net!(debug "Couldn't set TCP linger: {}", e);
return;
}
if let Err(e) = tcp_stream.set_nodelay(true) {
log_net!(debug "Couldn't set TCP nodelay: {}", e);
return;
@ -180,7 +183,7 @@ impl Network {
// read a chunk of the stream
if timeout(
connection_initial_timeout_ms,
ps.peek_exact(&mut first_packet),
ps.peek_exact(&mut first_packet).in_current_span(),
)
.await
.is_err()
@ -237,6 +240,7 @@ impl Network {
}
}
#[instrument(level = "trace", skip_all)]
async fn spawn_socket_listener(&self, addr: SocketAddr) -> EyreResult<bool> {
// Get config
let (connection_initial_timeout_ms, tls_connection_initial_timeout_ms) = {
@ -297,7 +301,7 @@ impl Network {
let connection_manager = self.connection_manager();
////////////////////////////////////////////////////////////
let jh = spawn(async move {
let jh = spawn(&format!("TCP listener {}", addr), async move {
// moves listener object in and get incoming iterator
// when this task exists, the listener will close the socket
@ -344,6 +348,7 @@ impl Network {
/////////////////////////////////////////////////////////////////
// TCP listener that multiplexes ports so multiple protocols can exist on a single port
#[instrument(level = "trace", skip_all)]
pub(super) async fn start_tcp_listener(
&self,
bind_set: NetworkBindSet,

View File

@ -3,6 +3,7 @@ use sockets::*;
use stop_token::future::FutureExt;
impl Network {
#[instrument(level = "trace", skip_all)]
pub(super) async fn create_udp_listener_tasks(&self) -> EyreResult<()> {
// Spawn socket tasks
let mut task_count = {
@ -16,14 +17,14 @@ impl Network {
}
}
log_net!("task_count: {}", task_count);
for _ in 0..task_count {
for task_n in 0..task_count {
log_net!("Spawning UDP listener task");
////////////////////////////////////////////////////////////
// Run thread task to process stream of messages
let this = self.clone();
let jh = spawn(async move {
let jh = spawn(&format!("UDP listener {}", task_n), async move {
log_net!("UDP listener task spawned");
// Collect all our protocol handlers into a vector
@ -57,6 +58,7 @@ impl Network {
match ph
.recv_message(&mut data)
.timeout_at(stop_token.clone())
.in_current_span()
.await
{
Ok(Ok((size, flow))) => {
@ -89,7 +91,7 @@ impl Network {
// Now we wait for join handles to exit,
// if any error out it indicates an error needing
// us to completely restart the network
while let Some(v) = protocol_handlers_unordered.next().await {
while let Some(v) = protocol_handlers_unordered.next().in_current_span().await {
// true = stopped, false = errored
if !v {
// If any protocol handler fails, our socket died and we need to restart the network
@ -98,7 +100,7 @@ impl Network {
}
log_net!("UDP listener task stopped");
});
}.instrument(trace_span!(parent: None, "UDP Listener")));
////////////////////////////////////////////////////////////
// Add to join handle
@ -108,6 +110,7 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn create_udp_protocol_handler(&self, addr: SocketAddr) -> EyreResult<bool> {
log_net!("create_udp_protocol_handler on {:?}", &addr);
@ -148,6 +151,7 @@ impl Network {
Ok(true)
}
#[instrument(level = "trace", skip_all)]
pub(super) async fn create_udp_protocol_handlers(
&self,
bind_set: NetworkBindSet,

View File

@ -91,9 +91,9 @@ pub fn new_default_tcp_socket(domain: Domain) -> io::Result<Socket> {
#[instrument(level = "trace", ret)]
pub fn new_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
// if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
// log_net!(error "Couldn't set TCP linger: {}", e);
// }
if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
log_net!(error "Couldn't set TCP linger: {}", e);
}
if let Err(e) = socket.set_nodelay(true) {
log_net!(error "Couldn't set TCP nodelay: {}", e);
}
@ -162,10 +162,12 @@ pub async fn nonblocking_connect(
let async_stream = Async::new(std::net::TcpStream::from(socket))?;
// The stream becomes writable when connected
timeout_or_try!(timeout(timeout_ms, async_stream.writable())
timeout_or_try!(
timeout(timeout_ms, async_stream.writable().in_current_span())
.await
.into_timeout_or()
.into_result()?);
.into_result()?
);
// Check low level error
let async_stream = match async_stream.get_ref().take_error()? {

View File

@ -134,7 +134,7 @@ impl RawTcpProtocolHandler {
let mut peekbuf: [u8; PEEK_DETECT_LEN] = [0u8; PEEK_DETECT_LEN];
if (timeout(
self.connection_initial_timeout_ms,
ps.peek_exact(&mut peekbuf),
ps.peek_exact(&mut peekbuf).in_current_span(),
)
.await)
.is_err()

View File

@ -222,7 +222,7 @@ impl WebsocketProtocolHandler {
let mut peek_buf = [0u8; MAX_WS_BEFORE_BODY];
let peek_len = match timeout(
self.arc.connection_initial_timeout_ms,
ps.peek(&mut peek_buf),
ps.peek(&mut peek_buf).in_current_span(),
)
.await
{

View File

@ -84,6 +84,7 @@ impl Network {
// Returns a port, a set of ip addresses to bind to, and a
// bool specifying if multiple ports should be tried
#[instrument(level = "trace", skip_all)]
async fn convert_listen_address_to_bind_set(
&self,
listen_address: String,
@ -136,6 +137,7 @@ impl Network {
/////////////////////////////////////////////////////
#[instrument(level = "trace", skip_all)]
pub(super) async fn bind_udp_protocol_handlers(
&self,
editor_public_internet: &mut RoutingDomainEditor,
@ -249,6 +251,7 @@ impl Network {
Ok(StartupDisposition::Success)
}
#[instrument(level = "trace", skip_all)]
pub(super) async fn start_ws_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,
@ -364,6 +367,7 @@ impl Network {
Ok(StartupDisposition::Success)
}
#[instrument(level = "trace", skip_all)]
pub(super) async fn start_wss_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,
@ -463,6 +467,7 @@ impl Network {
Ok(StartupDisposition::Success)
}
#[instrument(level = "trace", skip_all)]
pub(super) async fn start_tcp_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,

View File

@ -136,7 +136,8 @@ impl NetworkConnection {
let flow = protocol_connection.flow();
// Create handle for sending
let (sender, receiver) = flume::bounded(get_concurrency() as usize);
//let (sender, receiver) = flume::bounded(get_concurrency() as usize);
let (sender, receiver) = flume::unbounded();
// Create stats
let stats = Arc::new(Mutex::new(NetworkConnectionStats {
@ -148,7 +149,7 @@ impl NetworkConnection {
let local_stop_token = stop_source.token();
// Spawn connection processor and pass in protocol connection
let processor = spawn(Self::process_connection(
let processor = spawn("connection processor", Self::process_connection(
connection_manager,
local_stop_token,
manager_stop_token,
@ -265,7 +266,7 @@ impl NetworkConnection {
// Connection receiver loop
#[allow(clippy::too_many_arguments)]
//#[instrument(level="trace", target="net", skip_all)]
#[instrument(parent = None, level="trace", target="net", skip_all)]
fn process_connection(
connection_manager: ConnectionManager,
local_stop_token: StopToken,
@ -309,10 +310,6 @@ impl NetworkConnection {
match res {
Ok((_span_id, message)) => {
// let span = span!(Level::TRACE, "process_connection send");
// span.follows_from(span_id);
// let _enter = span.enter();
// Touch the LRU for this connection
connection_manager.touch_connection_by_id(connection_id);
@ -337,7 +334,7 @@ impl NetworkConnection {
RecvLoopAction::Finish
}
}
});
}.in_current_span());
unord.push(system_boxed(sender_fut.in_current_span()));
}

View File

@ -158,9 +158,14 @@ struct ReceiptManagerInner {
timeout_task: MustJoinSingleFuture<()>,
}
struct ReceiptManagerUnlockedInner {
startup_lock: StartupLock,
}
#[derive(Clone)]
pub(super) struct ReceiptManager {
inner: Arc<Mutex<ReceiptManagerInner>>,
unlocked_inner: Arc<ReceiptManagerUnlockedInner>,
}
impl ReceiptManager {
@ -177,6 +182,9 @@ impl ReceiptManager {
pub fn new(network_manager: NetworkManager) -> Self {
Self {
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
unlocked_inner: Arc::new(ReceiptManagerUnlockedInner {
startup_lock: StartupLock::new(),
}),
}
}
@ -185,6 +193,7 @@ impl ReceiptManager {
}
pub async fn startup(&self) -> EyreResult<()> {
let guard = self.unlocked_inner.startup_lock.startup()?;
log_net!(debug "startup receipt manager");
// Retrieve config
@ -195,6 +204,7 @@ impl ReceiptManager {
inner.stop_source = Some(StopSource::new());
}
guard.success();
Ok(())
}
@ -223,7 +233,7 @@ impl ReceiptManager {
}
#[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) {
async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) {
// Go through all receipts and build a list of expired nonces
let mut new_next_oldest_ts: Option<Timestamp> = None;
let mut expired_records = Vec::new();
@ -271,8 +281,18 @@ impl ReceiptManager {
}
}
#[instrument(level = "trace", target = "receipt", skip_all, err)]
#[instrument(
level = "trace",
target = "receipt",
name = "ReceiptManager::tick",
skip_all,
err
)]
pub async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return Ok(());
};
let (next_oldest_ts, timeout_task, stop_token) = {
let inner = self.inner.lock();
let stop_token = match inner.stop_source.as_ref() {
@ -291,10 +311,12 @@ impl ReceiptManager {
// Single-spawn the timeout task routine
let _ = timeout_task
.single_spawn(
"receipt timeout",
self.clone()
.timeout_task_routine(now, stop_token)
.in_current_span(),
.instrument(trace_span!(parent: None, "receipt timeout task")),
)
.in_current_span()
.await;
}
}
@ -303,6 +325,11 @@ impl ReceiptManager {
pub async fn shutdown(&self) {
log_net!(debug "starting receipt manager shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_net!(debug "receipt manager is already shut down");
return;
};
let network_manager = self.network_manager();
// Stop all tasks
@ -320,6 +347,8 @@ impl ReceiptManager {
}
*self.inner.lock() = Self::new_inner(network_manager);
guard.success();
log_net!(debug "finished receipt manager shutdown");
}
@ -332,6 +361,10 @@ impl ReceiptManager {
expected_returns: u32,
callback: impl ReceiptCallback,
) {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return;
};
let receipt_nonce = receipt.get_nonce();
event!(target: "receipt", Level::DEBUG, "== New Multiple Receipt ({}) {} ", expected_returns, receipt_nonce.encode());
let record = Arc::new(Mutex::new(ReceiptRecord::new(
@ -353,6 +386,10 @@ impl ReceiptManager {
expiration: Timestamp,
eventual: ReceiptSingleShotType,
) {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return;
};
let receipt_nonce = receipt.get_nonce();
event!(target: "receipt", Level::DEBUG, "== New SingleShot Receipt {}", receipt_nonce.encode());
@ -385,6 +422,8 @@ impl ReceiptManager {
pub async fn cancel_receipt(&self, nonce: &Nonce) -> EyreResult<()> {
event!(target: "receipt", Level::DEBUG, "== Cancel Receipt {}", nonce.encode());
let _guard = self.unlocked_inner.startup_lock.enter()?;
// Remove the record
let record = {
let mut inner = self.inner.lock();
@ -417,6 +456,10 @@ impl ReceiptManager {
receipt: Receipt,
receipt_returned: ReceiptReturned,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("receipt manager not started");
};
let receipt_nonce = receipt.get_nonce();
let extra_data = receipt.get_extra_data();

View File

@ -1,4 +1,5 @@
use super::*;
use stop_token::future::FutureExt as _;
impl NetworkManager {
/// Send raw data to a node
@ -146,7 +147,7 @@ impl NetworkManager {
Ok(NetworkResult::value(send_data_method))
}
.instrument(trace_span!("send_data")),
.in_current_span()
)
}
@ -559,6 +560,12 @@ impl NetworkManager {
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
// Detect if network is stopping so we can break out of this
let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else {
return Ok(NetworkResult::service_unavailable("network is stopping"));
};
// Build a return receipt for the signal
let receipt_timeout = ms_to_us(
self.unlocked_inner
@ -588,14 +595,20 @@ impl NetworkManager {
let rpc = self.rpc_processor();
network_result_try!(rpc
.rpc_call_signal(
Destination::relay(relay_nr, target_nr.clone()),
Destination::relay(relay_nr.clone(), target_nr.clone()),
SignalInfo::ReverseConnect { receipt, peer_info },
)
.await
.wrap_err("failed to send signal")?);
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
let inbound_nr = match eventual_value.timeout_at(stop_token).in_current_span().await {
Err(_) => {
return Ok(NetworkResult::service_unavailable("network is stopping"));
}
Ok(v) => {
let receipt_event = v.take_value().unwrap();
match receipt_event {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
@ -613,6 +626,8 @@ impl NetworkManager {
target_nr
)))
}
}
}
};
// We expect the inbound noderef to be the same as the target noderef
@ -634,7 +649,9 @@ impl NetworkManager {
)),
}
} else {
bail!("no reverse connection available")
return Ok(NetworkResult::no_connection_other(format!(
"reverse connection dropped from {}", target_nr)
));
}
}
@ -648,6 +665,11 @@ impl NetworkManager {
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
// Detect if network is stopping so we can break out of this
let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else {
return Ok(NetworkResult::service_unavailable("network is stopping"));
};
// Ensure we are filtered down to UDP (the only hole punch protocol supported today)
assert!(target_nr
.filter_ref()
@ -706,7 +728,13 @@ impl NetworkManager {
.wrap_err("failed to send signal")?);
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
let inbound_nr = match eventual_value.timeout_at(stop_token).in_current_span().await {
Err(_) => {
return Ok(NetworkResult::service_unavailable("network is stopping"));
}
Ok(v) => {
let receipt_event = v.take_value().unwrap();
match receipt_event {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
@ -724,6 +752,8 @@ impl NetworkManager {
target_nr
)))
}
}
}
};
// We expect the inbound noderef to be the same as the target noderef
@ -749,7 +779,9 @@ impl NetworkManager {
)),
}
} else {
bail!("no hole punch available")
return Ok(NetworkResult::no_connection_other(format!(
"hole punch dropped from {}", target_nr)
));
}
}
}

View File

@ -35,7 +35,7 @@ impl Default for NetworkManagerStats {
impl NetworkManager {
// Callbacks from low level network for statistics gathering
pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
pub(crate) fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats
@ -52,7 +52,7 @@ impl NetworkManager {
.add_up(bytes);
}
pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
pub(crate) fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats

View File

@ -48,6 +48,7 @@ impl NetworkManager {
}
}
#[instrument(level = "trace", name = "NetworkManager::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> {
let routing_table = self.routing_table();
let net = self.net();

View File

@ -2,10 +2,10 @@ use super::*;
impl NetworkManager {
// Clean up the public address check tables, removing entries that have timed out
#[instrument(level = "trace", skip(self), err)]
#[instrument(parent = None, level = "trace", skip_all, err)]
pub(crate) async fn public_address_check_task_routine(
self,
stop_token: StopToken,
_stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {

View File

@ -52,12 +52,14 @@ pub const MAX_CAPABILITIES: usize = 64;
/////////////////////////////////////////////////////////////////
struct NetworkInner {
network_started: Option<bool>,
network_needs_restart: bool,
protocol_config: ProtocolConfig,
}
struct NetworkUnlockedInner {
// Startup lock
startup_lock: StartupLock,
// Accessors
routing_table: RoutingTable,
network_manager: NetworkManager,
@ -74,7 +76,6 @@ pub(in crate::network_manager) struct Network {
impl Network {
fn new_inner() -> NetworkInner {
NetworkInner {
network_started: Some(false),
network_needs_restart: false,
protocol_config: Default::default(),
}
@ -86,6 +87,7 @@ impl Network {
connection_manager: ConnectionManager,
) -> NetworkUnlockedInner {
NetworkUnlockedInner {
startup_lock: StartupLock::new(),
network_manager,
routing_table,
connection_manager,
@ -121,7 +123,7 @@ impl Network {
/////////////////////////////////////////////////////////////////
// Record DialInfo failures
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
&self,
dial_info: DialInfo,
fut: F,
@ -135,12 +137,18 @@ impl Network {
Ok(network_result)
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
// Send data to a dial info, unbound, using a new connection from a random port
// This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message.
// This bypasses the connection table as it is not a 'node to node' connection.
#[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len();
let timeout_ms = {
@ -187,13 +195,15 @@ impl Network {
// This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message.
// This bypasses the connection table as it is not a 'node to node' connection.
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
#[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_recv_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
data: Vec<u8>,
timeout_ms: u32,
) -> EyreResult<NetworkResult<Vec<u8>>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len();
let connect_timeout_ms = {
@ -247,12 +257,14 @@ impl Network {
.await
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
#[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_data_to_existing_flow(
&self,
flow: Flow,
data: Vec<u8>,
) -> EyreResult<SendDataToExistingFlowResult> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
let data_len = data.len();
match flow.protocol_type() {
ProtocolType::UDP => {
@ -292,12 +304,16 @@ impl Network {
Ok(SendDataToExistingFlowResult::NotSent(data))
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
// Send data directly to a dial info, possibly without knowing which node it is going to
// Returns a flow for the connection used to send the data
#[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_data_to_dial_info(
&self,
dial_info: DialInfo,
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len();
if dial_info.protocol_type() == ProtocolType::UDP {
@ -399,23 +415,22 @@ impl Network {
Ok(StartupDisposition::Success)
}
#[instrument(level = "debug", err, skip_all)]
pub async fn startup(&self) -> EyreResult<StartupDisposition> {
self.inner.lock().network_started = None;
let guard = self.unlocked_inner.startup_lock.startup()?;
match self.startup_internal().await {
Ok(StartupDisposition::Success) => {
info!("network started");
self.inner.lock().network_started = Some(true);
guard.success();
Ok(StartupDisposition::Success)
}
Ok(StartupDisposition::BindRetry) => {
debug!("network bind retry");
self.inner.lock().network_started = Some(false);
Ok(StartupDisposition::BindRetry)
}
Err(e) => {
debug!("network failed to start");
self.inner.lock().network_started = Some(false);
Err(e)
}
}
@ -425,16 +440,22 @@ impl Network {
self.inner.lock().network_needs_restart
}
pub fn is_started(&self) -> Option<bool> {
self.inner.lock().network_started
pub fn is_started(&self) -> bool {
self.unlocked_inner.startup_lock.is_started()
}
#[instrument(level = "debug", skip_all)]
pub fn restart_network(&self) {
self.inner.lock().network_needs_restart = true;
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "stopping network");
log_net!(debug "starting low level network shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_net!(debug "low level network is already shut down");
return;
};
// Reset state
let routing_table = self.routing_table();
@ -451,7 +472,8 @@ impl Network {
// Cancels all async background tasks by dropping join handles
*self.inner.lock() = Self::new_inner();
log_net!(debug "network stopped");
guard.success();
log_net!(debug "finished low level network shutdown");
}
pub fn get_preferred_local_address(&self, _dial_info: &DialInfo) -> Option<SocketAddr> {
@ -472,15 +494,29 @@ impl Network {
&self,
_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
) {
//
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return;
};
}
pub fn needs_public_dial_info_check(&self) -> bool {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return false;
};
false
}
//////////////////////////////////////////
pub async fn tick(&self) -> EyreResult<()> {
#[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)]
pub(crate) async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return Ok(());
};
Ok(())
}
}

View File

@ -221,14 +221,26 @@ impl RoutingTable {
node_id: c.network.routing_table.node_id.clone(),
node_id_secret: c.network.routing_table.node_id_secret.clone(),
kick_queue: Mutex::new(BTreeSet::default()),
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
kick_buckets_task: TickTask::new(1),
bootstrap_task: TickTask::new(1),
peer_minimum_refresh_task: TickTask::new(1),
closest_peers_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms),
ping_validator_task: TickTask::new(1),
relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS),
private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
kick_buckets_task: TickTask::new("kick_buckets_task", 1),
bootstrap_task: TickTask::new("bootstrap_task", 1),
peer_minimum_refresh_task: TickTask::new("peer_minimum_refresh_task", 1),
closest_peers_refresh_task: TickTask::new_ms(
"closest_peers_refresh_task",
c.network.dht.min_peer_refresh_time_ms,
),
ping_validator_task: TickTask::new("ping_validator_task", 1),
relay_management_task: TickTask::new(
"relay_management_task",
RELAY_MANAGEMENT_INTERVAL_SECS,
),
private_route_management_task: TickTask::new(
"private_route_management_task",
PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS,
),
}
}
pub fn new(network_manager: NetworkManager) -> Self {

View File

@ -112,6 +112,7 @@ impl RoutingTable {
/// Ticks about once per second
/// to run tick tasks which may run at slower tick rates as configured
#[instrument(level = "trace", name = "RoutingTable::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> {
// Don't tick if paused
let opt_tick_guard = {

View File

@ -114,7 +114,6 @@ impl RoutingTable {
rpc.rpc_call_status(Destination::direct(relay_nr_filtered))
.await
}
.instrument(Span::current())
.boxed(),
);
}
@ -159,9 +158,7 @@ impl RoutingTable {
log_rtab!("--> Watch ping to {:?}", watch_nr);
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(watch_nr)).await }
.instrument(Span::current())
.boxed(),
async move { rpc.rpc_call_status(Destination::direct(watch_nr)).await }.boxed(),
);
}
Ok(())
@ -198,9 +195,7 @@ impl RoutingTable {
let rpc = rpc.clone();
log_rtab!("--> Validator ping to {:?}", nr);
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr)).await }
.instrument(Span::current())
.boxed(),
async move { rpc.rpc_call_status(Destination::direct(nr)).await }.boxed(),
);
}
@ -226,9 +221,7 @@ impl RoutingTable {
// Just do a single ping with the best protocol for all the nodes
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr)).await }
.instrument(Span::current())
.boxed(),
async move { rpc.rpc_call_status(Destination::direct(nr)).await }.boxed(),
);
}
@ -258,10 +251,19 @@ impl RoutingTable {
let mut unord = FuturesUnordered::new();
while !unord.is_empty() || !futurequeue.is_empty() {
#[cfg(feature = "verbose-tracing")]
log_rtab!(debug "Ping validation queue: {} remaining, {} in progress", futurequeue.len(), unord.len());
log_rtab!(
"Ping validation queue: {} remaining, {} in progress",
futurequeue.len(),
unord.len()
);
// Process one unordered futures if we have some
match unord.next().timeout_at(stop_token.clone()).await {
match unord
.next()
.timeout_at(stop_token.clone())
.in_current_span()
.await
{
Ok(Some(_)) => {
// Some ping completed
}

View File

@ -321,13 +321,17 @@ where
}
}
// Wait for them to complete
timeout(timeout_ms, async {
while let Some(is_done) = unord.next().await {
timeout(
timeout_ms,
async {
while let Some(is_done) = unord.next().in_current_span().await {
if is_done {
break;
}
}
})
}
.in_current_span(),
)
.await
.into_timeout_or()
.map(|_| {

View File

@ -277,7 +277,7 @@ enum RPCKind {
/////////////////////////////////////////////////////////////////////
struct RPCProcessorInner {
send_channel: Option<flume::Sender<(Option<Id>, RPCMessageEncoded)>>,
send_channel: Option<flume::Sender<(Span, RPCMessageEncoded)>>,
stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>,
}
@ -292,6 +292,7 @@ struct RPCProcessorUnlockedInner {
update_callback: UpdateCallback,
waiting_rpc_table: OperationWaiter<RPCMessage, Option<QuestionContext>>,
waiting_app_call_table: OperationWaiter<Vec<u8>, ()>,
startup_lock: StartupLock,
}
#[derive(Clone)]
@ -345,6 +346,7 @@ impl RPCProcessor {
update_callback,
waiting_rpc_table: OperationWaiter::new(),
waiting_app_call_table: OperationWaiter::new(),
startup_lock: StartupLock::new(),
}
}
pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self {
@ -377,6 +379,7 @@ impl RPCProcessor {
#[instrument(level = "debug", skip_all, err)]
pub async fn startup(&self) -> EyreResult<()> {
log_rpc!(debug "startup rpc processor");
let guard = self.unlocked_inner.startup_lock.startup()?;
{
let mut inner = self.inner.lock();
@ -389,10 +392,10 @@ impl RPCProcessor {
"Spinning up {} RPC workers",
self.unlocked_inner.concurrency
);
for _ in 0..self.unlocked_inner.concurrency {
for task_n in 0..self.unlocked_inner.concurrency {
let this = self.clone();
let receiver = channel.1.clone();
let jh = spawn(Self::rpc_worker(
let jh = spawn(&format!("rpc worker {}",task_n), Self::rpc_worker(
this,
inner.stop_source.as_ref().unwrap().token(),
receiver,
@ -406,12 +409,17 @@ impl RPCProcessor {
.set_rpc_processor(Some(self.clone()))
.await;
guard.success();
Ok(())
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_rpc!(debug "starting rpc processor shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_rpc!(debug "rpc processor already shut down");
return;
};
// Stop storage manager from using us
self.storage_manager.set_rpc_processor(None).await;
@ -437,6 +445,7 @@ impl RPCProcessor {
// Release the rpc processor
*self.inner.lock() = Self::new_inner();
guard.success();
log_rpc!(debug "finished rpc processor shutdown");
}
@ -539,6 +548,8 @@ impl RPCProcessor {
) -> SendPinBoxFuture<Result<Option<NodeRef>, RPCError>> {
let this = self.clone();
Box::pin(async move {
let _guard = this.unlocked_inner.startup_lock.enter().map_err(RPCError::map_try_again("not started up"))?;
let routing_table = this.routing_table();
// First see if we have the node in our routing table already
@ -579,7 +590,7 @@ impl RPCProcessor {
};
Ok(nr)
})
}.in_current_span())
}
#[instrument(level="trace", target="rpc", skip_all)]
@ -1651,31 +1662,31 @@ impl RPCProcessor {
RPCStatementDetail::AppMessage(_) => self.process_app_message(msg).await,
},
RPCOperationKind::Answer(_) => {
self.unlocked_inner
let op_id = msg.operation.op_id();
if let Err(e) = self.unlocked_inner
.waiting_rpc_table
.complete_op_waiter(msg.operation.op_id(), msg)
.await?;
.complete_op_waiter(op_id, msg) {
log_rpc!(debug "Operation id {} did not complete: {}", op_id, e);
// Don't throw an error here because it's okay if the original operation timed out
}
Ok(NetworkResult::value(()))
}
}
}
#[instrument(level="trace", target="rpc", skip_all)]
async fn rpc_worker(
self,
stop_token: StopToken,
receiver: flume::Receiver<(Option<Id>, RPCMessageEncoded)>,
receiver: flume::Receiver<(Span, RPCMessageEncoded)>,
) {
while let Ok(Ok((_span_id, msg))) =
while let Ok(Ok((prev_span, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await
{
//let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv");
// xxx: causes crash (Missing otel data span extensions)
// rpc_worker_span.follows_from(span_id);
let rpc_message_span = tracing::trace_span!("rpc message");
rpc_message_span.follows_from(prev_span);
network_result_value_or_log!(match self
.process_rpc_message(msg).in_current_span()
//.instrument(rpc_worker_span)
.process_rpc_message(msg).instrument(rpc_message_span)
.await
{
Err(e) => {
@ -1699,6 +1710,8 @@ impl RPCProcessor {
routing_domain: RoutingDomain,
body: Vec<u8>,
) -> EyreResult<()> {
let _guard = self.unlocked_inner.startup_lock.enter().map_err(RPCError::map_try_again("not started up"))?;
let header = RPCMessageHeader {
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
envelope,
@ -1722,9 +1735,8 @@ impl RPCProcessor {
};
send_channel
};
let span_id = Span::current().id();
send_channel
.try_send((span_id, msg))
.try_send((Span::current(), msg))
.map_err(|e| eyre!("failed to enqueue direct RPC message: {}", e))?;
Ok(())
}
@ -1758,9 +1770,8 @@ impl RPCProcessor {
};
send_channel
};
let span_id = Span::current().id();
send_channel
.try_send((span_id, msg))
.try_send((Span::current(), msg))
.map_err(|e| eyre!("failed to enqueue safety routed RPC message: {}", e))?;
Ok(())
}
@ -1797,9 +1808,8 @@ impl RPCProcessor {
};
send_channel
};
let span_id = Span::current().id();
send_channel
.try_send((span_id, msg))
.try_send((Span::current(), msg))
.map_err(|e| eyre!("failed to enqueue private routed RPC message: {}", e))?;
Ok(())
}

View File

@ -8,7 +8,7 @@ where
{
waiter: OperationWaiter<T, C>,
op_id: OperationId,
eventual_instance: Option<EventualValueFuture<(Option<Id>, T)>>,
result_receiver: Option<flume::Receiver<(Span, T)>>,
}
impl<T, C> Drop for OperationWaitHandle<T, C>
@ -17,7 +17,7 @@ where
C: Unpin + Clone,
{
fn drop(&mut self) {
if self.eventual_instance.is_some() {
if self.result_receiver.is_some() {
self.waiter.cancel_op_waiter(self.op_id);
}
}
@ -31,7 +31,7 @@ where
{
context: C,
timestamp: Timestamp,
eventual: EventualValue<(Option<Id>, T)>,
result_sender: flume::Sender<(Span, T)>,
}
#[derive(Debug)]
@ -80,11 +80,11 @@ where
/// Set up wait for operation to complete
pub fn add_op_waiter(&self, op_id: OperationId, context: C) -> OperationWaitHandle<T, C> {
let mut inner = self.inner.lock();
let e = EventualValue::new();
let (result_sender, result_receiver) = flume::bounded(1);
let waiting_op = OperationWaitingOp {
context,
timestamp: get_aligned_timestamp(),
eventual: e.clone(),
result_sender,
};
if inner.waiting_op_table.insert(op_id, waiting_op).is_some() {
error!(
@ -96,7 +96,7 @@ where
OperationWaitHandle {
waiter: self.clone(),
op_id,
eventual_instance: Some(e.instance()),
result_receiver: Some(result_receiver),
}
}
@ -122,14 +122,15 @@ where
}
/// Remove wait for op
#[instrument(level = "trace", target = "rpc", skip_all)]
fn cancel_op_waiter(&self, op_id: OperationId) {
let mut inner = self.inner.lock();
inner.waiting_op_table.remove(&op_id);
}
/// Complete the app call
/// Complete the waiting op
#[instrument(level = "trace", target = "rpc", skip_all)]
pub async fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
pub fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
let waiting_op = {
let mut inner = self.inner.lock();
inner
@ -141,10 +142,9 @@ where
)))?
};
waiting_op
.eventual
.resolve((Span::current().id(), message))
.await;
Ok(())
.result_sender
.send((Span::current(), message))
.map_err(RPCError::ignore)
}
/// Wait for operation to complete
@ -156,29 +156,30 @@ where
) -> Result<TimeoutOr<(T, TimestampDuration)>, RPCError> {
let timeout_ms = us_to_ms(timeout_us.as_u64()).map_err(RPCError::internal)?;
// Take the instance
// Take the receiver
// After this, we must manually cancel since the cancel on handle drop is disabled
let eventual_instance = handle.eventual_instance.take().unwrap();
let result_receiver = handle.result_receiver.take().unwrap();
let result_fut = result_receiver.recv_async().in_current_span();
// wait for eventualvalue
let start_ts = get_aligned_timestamp();
let res = timeout(timeout_ms, eventual_instance)
.await
.into_timeout_or();
Ok(res
.on_timeout(|| {
// log_rpc!(debug "op wait timed out: {}", handle.op_id);
// debug_print_backtrace();
let res = timeout(timeout_ms, result_fut).await.into_timeout_or();
match res {
TimeoutOr::Timeout => {
self.cancel_op_waiter(handle.op_id);
})
.map(|res| {
let (_span_id, ret) = res.take_value().unwrap();
Ok(TimeoutOr::Timeout)
}
TimeoutOr::Value(Ok((_span_id, ret))) => {
let end_ts = get_aligned_timestamp();
//xxx: causes crash (Missing otel data span extensions)
// Span::current().follows_from(span_id);
(ret, end_ts.saturating_sub(start_ts))
}))
Ok(TimeoutOr::Value((ret, end_ts.saturating_sub(start_ts))))
}
TimeoutOr::Value(Err(e)) => Err(RPCError::ignore(e)),
}
}
}

View File

@ -9,6 +9,12 @@ impl RPCProcessor {
dest: Destination,
message: Vec<u8>,
) -> RPCNetworkResult<Answer<Vec<u8>>> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
let debug_string = format!("AppCall(message(len)={}) => {}", message.len(), dest);
let app_call_q = RPCOperationAppCallQ::new(message)?;
@ -147,14 +153,15 @@ impl RPCProcessor {
/// Exposed to API for apps to return app call answers
#[instrument(level = "trace", target = "rpc", skip_all)]
pub async fn app_call_reply(
&self,
call_id: OperationId,
message: Vec<u8>,
) -> Result<(), RPCError> {
pub fn app_call_reply(&self, call_id: OperationId, message: Vec<u8>) -> Result<(), RPCError> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
self.unlocked_inner
.waiting_app_call_table
.complete_op_waiter(call_id, message)
.await
.map_err(RPCError::ignore)
}
}

View File

@ -9,6 +9,12 @@ impl RPCProcessor {
dest: Destination,
message: Vec<u8>,
) -> RPCNetworkResult<()> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
let app_message = RPCOperationAppMessage::new(message)?;
let statement = RPCStatement::new(RPCStatementDetail::AppMessage(Box::new(app_message)));

View File

@ -14,6 +14,12 @@ impl RPCProcessor {
node_id: TypedKey,
capabilities: Vec<Capability>,
) -> RPCNetworkResult<Answer<Vec<PeerInfo>>> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination never has a private route
if matches!(
dest,

View File

@ -30,6 +30,12 @@ impl RPCProcessor {
subkey: ValueSubkey,
last_descriptor: Option<SignedValueDescriptor>,
) ->RPCNetworkResult<Answer<GetValueAnswer>> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination never has a private route
// and get the target noderef so we can validate the response
let Some(target) = dest.node() else {

View File

@ -32,6 +32,12 @@ impl RPCProcessor {
subkeys: ValueSubkeyRangeSet,
last_descriptor: Option<SignedValueDescriptor>,
) -> RPCNetworkResult<Answer<InspectValueAnswer>> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination never has a private route
// and get the target noderef so we can validate the response
let Some(target) = dest.node() else {

View File

@ -9,6 +9,12 @@ impl RPCProcessor {
dest: Destination,
receipt: D,
) -> RPCNetworkResult<()> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
let receipt = receipt.as_ref().to_vec();
let return_receipt = RPCOperationReturnReceipt::new(receipt)?;

View File

@ -34,6 +34,12 @@ impl RPCProcessor {
descriptor: SignedValueDescriptor,
send_descriptor: bool,
) ->RPCNetworkResult<Answer<SetValueAnswer>> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination never has a private route
// and get the target noderef so we can validate the response
let Some(target) = dest.node() else {

View File

@ -9,6 +9,12 @@ impl RPCProcessor {
dest: Destination,
signal_info: SignalInfo,
) -> RPCNetworkResult<()> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination never has a private route
if matches!(
dest,

View File

@ -20,6 +20,12 @@ impl RPCProcessor {
self,
dest: Destination,
) -> RPCNetworkResult<Answer<Option<SenderInfo>>> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Determine routing domain and node status to send
let (opt_target_nr, routing_domain, node_status) = if let Some(UnsafeRoutingInfo {
opt_node,

View File

@ -10,6 +10,17 @@ impl RPCProcessor {
dial_info: DialInfo,
redirect: bool,
) -> Result<bool, RPCError> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
let stop_token = self
.unlocked_inner
.startup_lock
.stop_token()
.ok_or(RPCError::try_again("not started up"))?;
let network_manager = self.network_manager();
let receipt_time = ms_to_us(self.unlocked_inner.validate_dial_info_receipt_time_ms);
@ -32,7 +43,17 @@ impl RPCProcessor {
);
// Wait for receipt
match eventual_value.await.take_value().unwrap() {
match eventual_value
.timeout_at(stop_token)
.in_current_span()
.await
{
Err(_) => {
return Err(RPCError::try_again("not started up"));
}
Ok(v) => {
let receipt_event = v.take_value().unwrap();
match receipt_event {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedInBand { inbound_noderef: _ }
| ReceiptEvent::ReturnedSafety => {
@ -52,6 +73,8 @@ impl RPCProcessor {
}
}
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -13,6 +13,12 @@ impl RPCProcessor {
watch_id: u64,
value: Option<SignedValueData>,
) -> RPCNetworkResult<()> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination is never using a safety route
if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) {
return Err(RPCError::internal(

View File

@ -32,6 +32,12 @@ impl RPCProcessor {
watcher: KeyPair,
watch_id: Option<u64>,
) -> RPCNetworkResult<Answer<WatchValueAnswer>> {
let _guard = self
.unlocked_inner
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination never has a private route
// and get the target noderef so we can validate the response
let Some(target) = dest.node() else {

View File

@ -182,7 +182,7 @@ impl StorageManager {
log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
Ok(NetworkResult::value(gva.answer.peers))
}.in_current_span()
}.instrument(tracing::trace_span!("outbound_get_value fanout routine"))
}
};
@ -225,7 +225,7 @@ impl StorageManager {
};
// Call the fanout in a spawned task
spawn(Box::pin(async move {
spawn("outbound_get_value fanout", Box::pin(async move {
let fanout_call = FanoutCall::new(
routing_table.clone(),
key,
@ -271,7 +271,7 @@ impl StorageManager {
})) {
log_dht!(debug "Sending GetValue result failed: {}", e);
}
}.in_current_span()))
}.instrument(tracing::trace_span!("outbound_get_value result"))))
.detach();
Ok(out_rx)
@ -319,7 +319,7 @@ impl StorageManager {
// Return done
false
}.in_current_span())
}.instrument(tracing::trace_span!("outbound_get_value deferred results")))
},
),
);

View File

@ -228,7 +228,7 @@ impl StorageManager {
log_network_result!(debug "InspectValue fanout call returned peers {}", answer.peers.len());
Ok(NetworkResult::value(answer.peers))
}.in_current_span()
}.instrument(tracing::trace_span!("outbound_inspect_value fanout call"))
};
// Routine to call to check if we're done at each step

View File

@ -89,11 +89,26 @@ impl StorageManager {
table_store,
#[cfg(feature = "unstable-blockstore")]
block_store,
flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS),
offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS),
send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS),
check_active_watches_task: TickTask::new(CHECK_ACTIVE_WATCHES_INTERVAL_SECS),
check_watched_records_task: TickTask::new(CHECK_WATCHED_RECORDS_INTERVAL_SECS),
flush_record_stores_task: TickTask::new(
"flush_record_stores_task",
FLUSH_RECORD_STORES_INTERVAL_SECS,
),
offline_subkey_writes_task: TickTask::new(
"offline_subkey_writes_task",
OFFLINE_SUBKEY_WRITES_INTERVAL_SECS,
),
send_value_changes_task: TickTask::new(
"send_value_changes_task",
SEND_VALUE_CHANGES_INTERVAL_SECS,
),
check_active_watches_task: TickTask::new(
"check_active_watches_task",
CHECK_ACTIVE_WATCHES_INTERVAL_SECS,
),
check_watched_records_task: TickTask::new(
"check_watched_records_task",
CHECK_WATCHED_RECORDS_INTERVAL_SECS,
),
anonymous_watch_keys,
}

View File

@ -177,7 +177,7 @@ impl StorageManager {
ctx.send_partial_update = true;
Ok(NetworkResult::value(sva.answer.peers))
}.in_current_span()
}.instrument(tracing::trace_span!("fanout call_routine"))
}
};
@ -224,7 +224,7 @@ impl StorageManager {
};
// Call the fanout in a spawned task
spawn(Box::pin(async move {
spawn("outbound_set_value fanout", Box::pin(async move {
let fanout_call = FanoutCall::new(
routing_table.clone(),
key,
@ -267,7 +267,7 @@ impl StorageManager {
})) {
log_dht!(debug "Sending SetValue result failed: {}", e);
}
}.in_current_span()))
}.instrument(tracing::trace_span!("outbound_set_value fanout routine"))))
.detach();
Ok(out_rx)
@ -329,7 +329,7 @@ impl StorageManager {
// Return done
false
}.in_current_span())
}.instrument(tracing::trace_span!("outbound_set_value deferred results")))
},
),
);

View File

@ -133,7 +133,7 @@ impl StorageManagerInner {
self.deferred_result_processor.init().await;
// Schedule tick
let tick_future = interval(1000, move || {
let tick_future = interval("storage manager tick", 1000, move || {
let this = outer_self.clone();
async move {
if let Err(e) = this.tick().await {

View File

@ -80,6 +80,7 @@ impl StorageManager {
}
}
#[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> {
// Run the flush stores task
self.unlocked_inner.flush_record_stores_task.tick().await?;
@ -109,6 +110,7 @@ impl StorageManager {
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(crate) async fn cancel_tasks(&self) {
log_stor!(debug "stopping check watched records task");
if let Err(e) = self.unlocked_inner.check_watched_records_task.stop().await {

View File

@ -32,15 +32,24 @@ impl StorageManager {
// Add a future for each value change
for vc in value_changes {
let this = self.clone();
unord.push(async move {
unord.push(
async move {
if let Err(e) = this.send_value_change(vc).await {
log_stor!(debug "Failed to send value change: {}", e);
}
});
}
.in_current_span(),
);
}
while !unord.is_empty() {
match unord.next().timeout_at(stop_token.clone()).await {
match unord
.next()
.in_current_span()
.timeout_at(stop_token.clone())
.in_current_span()
.await
{
Ok(Some(_)) => {
// Some ValueChanged completed
}

View File

@ -294,7 +294,7 @@ impl StorageManager {
log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node);
Ok(NetworkResult::value(wva.answer.peers))
}.in_current_span()
}.instrument(tracing::trace_span!("outbound_watch_value call routine"))
};
// Routine to call to check if we're done at each step

View File

@ -15,7 +15,7 @@ impl fmt::Debug for VeilidAPIInner {
impl Drop for VeilidAPIInner {
fn drop(&mut self) {
if let Some(context) = self.context.take() {
spawn_detached(api_shutdown(context));
spawn_detached("api shutdown", api_shutdown(context));
}
}
}
@ -366,7 +366,6 @@ impl VeilidAPI {
let rpc_processor = self.rpc_processor()?;
rpc_processor
.app_call_reply(call_id, message)
.await
.map_err(|e| e.into())
}

View File

@ -867,9 +867,11 @@ impl VeilidAPI {
.purge_last_connections();
if let Some(connection_manager) = &opt_connection_manager {
connection_manager.startup().await;
connection_manager
.startup()
.await
.map_err(VeilidAPIError::internal)?;
}
Ok("Connections purged".to_owned())
} else if args[0] == "routes" {
// Purge route spec store

View File

@ -38,6 +38,7 @@ rt-tokio = [
]
tracking = ["veilid-core/tracking"]
debug-json-api = []
debug-locks = ["veilid-core/debug-locks"]
[dependencies]
veilid-core = { path = "../veilid-core", default-features = false }
@ -81,6 +82,7 @@ stop-token = { version = "^0", default-features = false }
sysinfo = { version = "^0.30.13" }
wg = { version = "^0.9.1", features = ["future"] }
tracing-flame = "0.2.0"
tracing-perfetto = "0.1.1"
[target.'cfg(windows)'.dependencies]
windows-service = "^0"

View File

@ -145,7 +145,11 @@ impl ClientApi {
let t_awg = awg.clone();
// Process the connection
spawn(self.clone().handle_ipc_connection(stream, t_awg)).detach();
spawn(
"client_api handle_ipc_connection",
self.clone().handle_ipc_connection(stream, t_awg),
)
.detach();
}
// Wait for all connections to terminate
@ -183,7 +187,11 @@ impl ClientApi {
let t_awg = awg.clone();
// Process the connection
spawn(self.clone().handle_tcp_connection(stream, t_awg)).detach();
spawn(
"client_api handle_tcp_connection",
self.clone().handle_tcp_connection(stream, t_awg),
)
.detach();
}
// Wait for all connections to terminate
@ -543,6 +551,6 @@ impl ClientApi {
}
let bind_futures_join = join_all(bind_futures);
self.inner.lock().join_handle = Some(spawn(bind_futures_join));
self.inner.lock().join_handle = Some(spawn("client_api bind_futures", bind_futures_join));
}
}

View File

@ -84,10 +84,14 @@ pub struct CmdlineArgs {
#[arg(long, value_name = "endpoint")]
otlp: Option<String>,
/// Turn on flamegraph tracing (experimental, isn't terribly useful)
/// Turn on flamegraph tracing (experimental)
#[arg(long, hide = true, value_name = "PATH", num_args=0..=1, require_equals=true, default_missing_value = "")]
flame: Option<OsString>,
/// Turn on perfetto tracing (experimental)
#[arg(long, hide = true, value_name = "PATH", num_args=0..=1, require_equals=true, default_missing_value = "")]
perfetto: Option<OsString>,
/// Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports
#[arg(long)]
subnode_index: Option<u16>,
@ -223,6 +227,18 @@ fn main() -> EyreResult<()> {
settingsrw.logging.flame.enabled = true;
settingsrw.logging.flame.path = flame;
}
if let Some(perfetto) = args.perfetto {
let perfetto = if perfetto.is_empty() {
Settings::get_default_perfetto_path(settingsrw.testing.subnode_index)
.to_string_lossy()
.to_string()
} else {
perfetto.to_string_lossy().to_string()
};
println!("Enabling perfetto output to {}", perfetto);
settingsrw.logging.perfetto.enabled = true;
settingsrw.logging.perfetto.path = perfetto;
}
if args.no_attach {
settingsrw.auto_attach = false;

View File

@ -108,7 +108,9 @@ pub async fn run_veilid_server(
let capi2 = capi.clone();
let update_receiver_shutdown = SingleShotEventual::new(Some(()));
let mut update_receiver_shutdown_instance = update_receiver_shutdown.instance().fuse();
let update_receiver_jh = spawn_local(async move {
let update_receiver_jh = spawn_local(
"update_receiver",
async move {
loop {
select! {
res = receiver.recv_async() => {
@ -126,7 +128,9 @@ pub async fn run_veilid_server(
}
};
}
});
}
.in_current_span(),
);
// Auto-attach if desired
let mut out = Ok(());

View File

@ -69,6 +69,9 @@ logging:
flame:
enabled: false
path: ''
perfetto:
enabled: false
path: ''
console:
enabled: false
testing:
@ -451,6 +454,12 @@ pub struct Flame {
pub path: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Perfetto {
pub enabled: bool,
pub path: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Console {
pub enabled: bool,
@ -503,6 +512,7 @@ pub struct Logging {
pub api: Api,
pub otlp: Otlp,
pub flame: Flame,
pub perfetto: Perfetto,
pub console: Console,
}
@ -873,6 +883,15 @@ impl Settings {
})
}
/// Determine default perfetto output path
pub fn get_default_perfetto_path(subnode_index: u16) -> PathBuf {
std::env::temp_dir().join(if subnode_index == 0 {
"veilid-server.pftrace".to_owned()
} else {
format!("veilid-server-{}.pftrace", subnode_index)
})
}
#[allow(dead_code)]
fn get_or_create_private_directory<P: AsRef<Path>>(path: P, group_read: bool) -> bool {
let path = path.as_ref();
@ -996,6 +1015,8 @@ impl Settings {
set_config_value!(inner.logging.otlp.ignore_log_targets, value);
set_config_value!(inner.logging.flame.enabled, value);
set_config_value!(inner.logging.flame.path, value);
set_config_value!(inner.logging.perfetto.enabled, value);
set_config_value!(inner.logging.perfetto.path, value);
set_config_value!(inner.logging.console.enabled, value);
set_config_value!(inner.testing.subnode_index, value);
set_config_value!(inner.core.capabilities.disable, value);
@ -1565,6 +1586,8 @@ mod tests {
);
assert!(!s.logging.flame.enabled);
assert_eq!(s.logging.flame.path, "");
assert!(!s.logging.perfetto.enabled);
assert_eq!(s.logging.perfetto.path, "");
assert!(!s.logging.console.enabled);
assert_eq!(s.testing.subnode_index, 0);

View File

@ -34,7 +34,7 @@ pub async fn run_veilid_server_with_signals(
Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT]).wrap_err("failed to init signals")?;
let handle = signals.handle();
let signals_task = spawn(handle_signals(signals));
let signals_task = spawn("signals", handle_signals(signals));
// Run veilid server
let res = run_veilid_server(settings, server_mode, veilid_logs).await;

View File

@ -18,6 +18,7 @@ use std::path::*;
use std::sync::Arc;
use tracing_appender::*;
use tracing_flame::FlameLayer;
use tracing_perfetto::PerfettoLayer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::*;
@ -48,14 +49,15 @@ impl VeilidLogs {
#[cfg(feature = "rt-tokio")]
if settingsr.logging.console.enabled {
let filter = veilid_core::VeilidLayerFilter::new_no_default(
veilid_core::VeilidConfigLogLevel::Trace,
&[],
);
let layer = ConsoleLayer::builder()
.with_default_env()
.spawn()
.with_filter(
filter::Targets::new()
.with_target("tokio", Level::TRACE)
.with_target("runtime", Level::TRACE),
);
.with_filter(filter);
layers.push(layer.boxed());
}
@ -93,6 +95,26 @@ impl VeilidLogs {
);
}
// Perfetto logger
if settingsr.logging.perfetto.enabled {
let filter = veilid_core::VeilidLayerFilter::new_no_default(
veilid_core::VeilidConfigLogLevel::Trace,
&veilid_core::FLAME_LOG_FACILITIES_IGNORE_LIST.map(|x| x.to_string()),
);
let perfetto_layer = PerfettoLayer::new(std::sync::Mutex::new(std::fs::File::create(
&settingsr.logging.perfetto.path,
)?));
// Do not include this in change_log_level changes, so we keep trace level
// filters.insert("flame", filter.clone());
layers.push(
perfetto_layer
.with_debug_annotations(true)
.with_filter(filter)
.boxed(),
);
}
// OpenTelemetry logger
#[cfg(feature = "opentelemetry-otlp")]
if settingsr.logging.otlp.enabled {

View File

@ -33,7 +33,8 @@ rt-wasm-bindgen = ["async_executors/bindgen", "async_executors/timer"]
veilid_tools_android_tests = ["dep:paranoid-android"]
veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"]
tracing = ["dep:tracing", "dep:tracing-subscriber"]
tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"]
debug-locks = []
[dependencies]
tracing = { version = "0.1.40", features = [
@ -52,6 +53,7 @@ futures-util = { version = "0.3.30", default-features = false, features = [
"alloc",
] }
parking_lot = "0.12.3"
async-lock = "3.4.0"
once_cell = "1.19.0"
stop-token = { version = "0.7.0", default-features = false }
rand = "0.8.5"
@ -87,7 +89,6 @@ wasm-bindgen-futures = "0.4.42"
async_executors = { version = "0.7.0", default-features = false }
getrandom = { version = "0.2", features = ["js"] }
async-lock = "2.8.0"
send_wrapper = { version = "0.6.0", features = ["futures"] }
# Dependencies for Linux or Android

View File

@ -32,7 +32,10 @@ impl DeferredStreamProcessor {
self.opt_stopper = Some(stopper);
let (dsc_tx, dsc_rx) = flume::unbounded::<SendPinBoxFuture<()>>();
self.opt_deferred_stream_channel = Some(dsc_tx);
self.opt_join_handle = Some(spawn(Self::processor(stop_token, dsc_rx)));
self.opt_join_handle = Some(spawn(
"deferred stream processor",
Self::processor(stop_token, dsc_rx),
));
}
/// Terminate the processor and ensure all streams are closed

View File

@ -104,7 +104,7 @@ where
match out {
None => task::Poll::<Self::Output>::Pending,
Some(wakers) => {
// Wake all EventualResolvedFutures
// Wake all other instance futures
for w in wakers {
w.wake();
}

View File

@ -81,7 +81,7 @@ impl<T: Unpin> Future for EventualValueFuture<T> {
match out {
None => task::Poll::<Self::Output>::Pending,
Some(wakers) => {
// Wake all EventualResolvedFutures
// Wake all other instance futures
for w in wakers {
w.wake();
}

View File

@ -77,7 +77,7 @@ impl<T: Unpin + Clone> Future for EventualValueCloneFuture<T> {
match out {
None => task::Poll::<Self::Output>::Pending,
Some(wakers) => {
// Wake all EventualResolvedFutures
// Wake all other instance futures
for w in wakers {
w.wake();
}

View File

@ -3,7 +3,7 @@ use super::*;
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
pub fn interval<F, FUT>(freq_ms: u32, callback: F) -> SendPinBoxFuture<()>
pub fn interval<F, FUT>(name: &str, freq_ms: u32, callback: F) -> SendPinBoxFuture<()>
where
F: Fn() -> FUT + Send + Sync + 'static,
FUT: Future<Output = ()> + Send,
@ -11,7 +11,7 @@ cfg_if! {
let e = Eventual::new();
let ie = e.clone();
let jh = spawn(Box::pin(async move {
let jh = spawn(name, Box::pin(async move {
while timeout(freq_ms, ie.instance_clone(())).await.is_err() {
callback().await;
}
@ -25,7 +25,7 @@ cfg_if! {
} else {
pub fn interval<F, FUT>(freq_ms: u32, callback: F) -> SendPinBoxFuture<()>
pub fn interval<F, FUT>(name: &str, freq_ms: u32, callback: F) -> SendPinBoxFuture<()>
where
F: Fn() -> FUT + Send + Sync + 'static,
FUT: Future<Output = ()> + Send,
@ -33,7 +33,7 @@ cfg_if! {
let e = Eventual::new();
let ie = e.clone();
let jh = spawn(async move {
let jh = spawn(name, async move {
while timeout(freq_ms, ie.instance_clone(())).await.is_err() {
callback().await;
}

View File

@ -49,6 +49,7 @@ pub mod single_shot_eventual;
pub mod sleep;
pub mod spawn;
pub mod split_url;
pub mod startup_lock;
pub mod tick_task;
pub mod timeout;
pub mod timeout_or;
@ -106,7 +107,7 @@ pub use std::str::FromStr;
#[doc(no_inline)]
pub use std::string::{String, ToString};
#[doc(no_inline)]
pub use std::sync::atomic::{AtomicBool, Ordering};
pub use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[doc(no_inline)]
pub use std::sync::{Arc, Weak};
#[doc(no_inline)]
@ -116,6 +117,13 @@ pub use std::time::Duration;
#[doc(no_inline)]
pub use std::vec::Vec;
#[doc(no_inline)]
pub use async_lock::RwLock as AsyncRwLock;
#[doc(no_inline)]
pub use async_lock::RwLockReadGuard as AsyncRwLockReadGuard;
#[doc(no_inline)]
pub use async_lock::RwLockWriteGuard as AsyncRwLockWriteGuard;
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
#[doc(no_inline)]
@ -124,8 +132,10 @@ cfg_if! {
pub use async_lock::MutexGuard as AsyncMutexGuard;
#[doc(no_inline)]
pub use async_lock::MutexGuardArc as AsyncMutexGuardArc;
#[doc(no_inline)]
pub use async_executors::JoinHandle as LowLevelJoinHandle;
} else {
cfg_if! {
if #[cfg(feature="rt-async-std")] {
@ -135,8 +145,17 @@ cfg_if! {
pub use async_std::sync::MutexGuard as AsyncMutexGuard;
#[doc(no_inline)]
pub use async_std::sync::MutexGuardArc as AsyncMutexGuardArc;
// #[doc(no_inline)]
// pub use async_std::sync::RwLock as AsyncRwLock;
// #[doc(no_inline)]
// pub use async_std::sync::RwLockReadGuard as AsyncRwLockReadGuard;
// #[doc(no_inline)]
// pub use async_std::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
#[doc(no_inline)]
pub use async_std::task::JoinHandle as LowLevelJoinHandle;
} else if #[cfg(feature="rt-tokio")] {
#[doc(no_inline)]
pub use tokio::sync::Mutex as AsyncMutex;
@ -144,6 +163,15 @@ cfg_if! {
pub use tokio::sync::MutexGuard as AsyncMutexGuard;
#[doc(no_inline)]
pub use tokio::sync::OwnedMutexGuard as AsyncMutexGuardArc;
// #[doc(no_inline)]
// pub use tokio::sync::RwLock as AsyncRwLock;
// #[doc(no_inline)]
// pub use tokio::sync::RwLockReadGuard as AsyncRwLockReadGuard;
// #[doc(no_inline)]
// pub use tokio::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
#[doc(no_inline)]
pub use tokio::task::JoinHandle as LowLevelJoinHandle;
} else {
@ -202,6 +230,8 @@ pub use spawn::*;
#[doc(inline)]
pub use split_url::*;
#[doc(inline)]
pub use startup_lock::*;
#[doc(inline)]
pub use tick_task::*;
#[doc(inline)]
pub use timeout::*;
@ -221,6 +251,7 @@ pub mod tests;
cfg_if! {
if #[cfg(feature = "tracing")] {
use tracing::*;
#[macro_export]
macro_rules! debug_target_enabled {
($target:expr) => { enabled!(target: $target, Level::DEBUG) }

View File

@ -64,6 +64,7 @@ where
}
/// Check the result and take it if there is one
#[cfg_attr(feature = "tracing", instrument(level = "trace", skip_all))]
pub async fn check(&self) -> Result<Option<T>, ()> {
let mut out: Option<T> = None;
@ -95,6 +96,7 @@ where
}
/// Wait for the result and take it
#[cfg_attr(feature = "tracing", instrument(level = "trace", skip_all))]
pub async fn join(&self) -> Result<Option<T>, ()> {
let mut out: Option<T> = None;
@ -124,6 +126,7 @@ where
// Possibly spawn the future possibly returning the value of the last execution
pub async fn single_spawn_local(
&self,
name: &str,
future: impl Future<Output = T> + 'static,
) -> Result<(Option<T>, bool), ()> {
let mut out: Option<T> = None;
@ -152,7 +155,7 @@ where
// Run if we should do that
if run {
self.unlock(Some(spawn_local(future)));
self.unlock(Some(spawn_local(name, future)));
}
// Return the prior result if we have one
@ -166,6 +169,7 @@ where
{
pub async fn single_spawn(
&self,
name: &str,
future: impl Future<Output = T> + Send + 'static,
) -> Result<(Option<T>, bool), ()> {
let mut out: Option<T> = None;
@ -191,7 +195,7 @@ where
}
// Run if we should do that
if run {
self.unlock(Some(spawn(future)));
self.unlock(Some(spawn(name, future)));
}
// Return the prior result if we have one
Ok((out, run))

View File

@ -317,7 +317,7 @@ impl PlatformSupportNetlink {
let (connection, handle, _) = new_connection_with_socket::<RTNetLinkSocket>()?;
// Spawn a connection handler
let connection_jh = spawn(connection);
let connection_jh = spawn("rtnetlink connection", connection);
// Save the connection
self.connection_jh = Some(connection_jh);

View File

@ -32,6 +32,7 @@ impl<T> IoNetworkResultExt<T> for io::Result<T> {
Err(e) => match e.kind() {
io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout),
io::ErrorKind::UnexpectedEof
| io::ErrorKind::BrokenPipe
| io::ErrorKind::ConnectionAborted
| io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionReset
@ -51,6 +52,7 @@ impl<T> IoNetworkResultExt<T> for io::Result<T> {
match e.kind() {
io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout),
io::ErrorKind::UnexpectedEof
| io::ErrorKind::BrokenPipe
| io::ErrorKind::ConnectionAborted
| io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionReset => Ok(NetworkResult::NoConnection(e)),

View File

@ -4,7 +4,7 @@ cfg_if! {
if #[cfg(target_arch = "wasm32")] {
use async_executors::{Bindgen, LocalSpawnHandleExt, SpawnHandleExt};
pub fn spawn<Out>(future: impl Future<Output = Out> + Send + 'static) -> MustJoinHandle<Out>
pub fn spawn<Out>(_name: &str, future: impl Future<Output = Out> + Send + 'static) -> MustJoinHandle<Out>
where
Out: Send + 'static,
{
@ -15,7 +15,7 @@ cfg_if! {
)
}
pub fn spawn_local<Out>(future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out>
pub fn spawn_local<Out>(_name: &str, future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out>
where
Out: 'static,
{
@ -26,7 +26,7 @@ cfg_if! {
)
}
pub fn spawn_detached<Out>(future: impl Future<Output = Out> + Send + 'static)
pub fn spawn_detached<Out>(_name: &str, future: impl Future<Output = Out> + Send + 'static)
where
Out: Send + 'static,
{
@ -35,7 +35,7 @@ cfg_if! {
.expect("wasm-bindgen-futures spawn_handle_local should never error out")
.detach()
}
pub fn spawn_detached_local<Out>(future: impl Future<Output = Out> + 'static)
pub fn spawn_detached_local<Out>(_name: &str, future: impl Future<Output = Out> + 'static)
where
Out: 'static,
{
@ -47,60 +47,60 @@ cfg_if! {
} else {
pub fn spawn<Out>(future: impl Future<Output = Out> + Send + 'static) -> MustJoinHandle<Out>
pub fn spawn<Out>(name: &str, future: impl Future<Output = Out> + Send + 'static) -> MustJoinHandle<Out>
where
Out: Send + 'static,
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
MustJoinHandle::new(async_std::task::spawn(future))
MustJoinHandle::new(async_std::task::Builder::new().name(name.to_string()).spawn(future).unwrap())
} else if #[cfg(feature="rt-tokio")] {
MustJoinHandle::new(tokio::task::spawn(future))
MustJoinHandle::new(tokio::task::Builder::new().name(name).spawn(future).unwrap())
}
}
}
pub fn spawn_local<Out>(future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out>
pub fn spawn_local<Out>(name: &str, future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out>
where
Out: 'static,
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
MustJoinHandle::new(async_std::task::spawn_local(future))
MustJoinHandle::new(async_std::task::Builder::new().name(name.to_string()).local(future).unwrap())
} else if #[cfg(feature="rt-tokio")] {
MustJoinHandle::new(tokio::task::spawn_local(future))
MustJoinHandle::new(tokio::task::Builder::new().name(name).spawn_local(future).unwrap())
}
}
}
pub fn spawn_detached<Out>(future: impl Future<Output = Out> + Send + 'static)
pub fn spawn_detached<Out>(name: &str, future: impl Future<Output = Out> + Send + 'static)
where
Out: Send + 'static,
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
drop(async_std::task::spawn(future));
drop(async_std::task::Builder::new().name(name.to_string()).spawn(future).unwrap());
} else if #[cfg(feature="rt-tokio")] {
drop(tokio::task::spawn(future));
drop(tokio::task::Builder::new().name(name).spawn(future).unwrap());
}
}
}
pub fn spawn_detached_local<Out>(future: impl Future<Output = Out> + 'static)
pub fn spawn_detached_local<Out>(name: &str,future: impl Future<Output = Out> + 'static)
where
Out: 'static,
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
drop(async_std::task::spawn_local(future));
drop(async_std::task::Builder::new().name(name.to_string()).local(future).unwrap());
} else if #[cfg(feature="rt-tokio")] {
drop(tokio::task::spawn_local(future));
drop(tokio::task::Builder::new().name(name).spawn_local(future).unwrap());
}
}
}
#[allow(unused_variables)]
pub async fn blocking_wrapper<F, R>(blocking_task: F, err_result: R) -> R
pub async fn blocking_wrapper<F, R>(name: &str, blocking_task: F, err_result: R) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
@ -108,9 +108,11 @@ cfg_if! {
// run blocking stuff in blocking thread
cfg_if! {
if #[cfg(feature="rt-async-std")] {
let _name = name;
// async_std::task::Builder blocking doesn't work like spawn_blocking()
async_std::task::spawn_blocking(blocking_task).await
} else if #[cfg(feature="rt-tokio")] {
tokio::task::spawn_blocking(blocking_task).await.unwrap_or(err_result)
tokio::task::Builder::new().name(name).spawn_blocking(blocking_task).unwrap().await.unwrap_or(err_result)
} else {
#[compile_error("must use an executor")]
}

View File

@ -0,0 +1,177 @@
use super::*;
#[derive(ThisError, Debug, Copy, Clone, PartialEq, Eq)]
#[error("Already started")]
pub struct StartupLockAlreadyStartedError;
#[derive(ThisError, Debug, Copy, Clone, PartialEq, Eq)]
#[error("Already shut down")]
pub struct StartupLockAlreadyShutDownError;
#[derive(ThisError, Debug, Copy, Clone, PartialEq, Eq)]
#[error("Not started")]
pub struct StartupLockNotStartedError;
/// RAII-style lock for startup and shutdown operations
/// Must call 'success()' on this lock to report a successful startup or shutdown
/// Dropping this lock without calling 'success()' first indicates a failed
/// startup or shutdown operation
#[derive(Debug)]
pub struct StartupLockGuard<'a> {
guard: AsyncRwLockWriteGuard<'a, bool>,
success_value: bool,
}
impl<'a> StartupLockGuard<'a> {
/// Call this function at the end of a successful startup or shutdown
/// operation to switch the state of the StartupLock.
pub fn success(mut self) {
*self.guard = self.success_value;
}
}
/// RAII-style lock for entry operations on a started-up region of code.
#[derive(Debug)]
pub struct StartupLockEnterGuard<'a> {
_guard: AsyncRwLockReadGuard<'a, bool>,
#[cfg(feature = "debug-locks")]
id: usize,
#[cfg(feature = "debug-locks")]
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
}
#[cfg(feature = "debug-locks")]
impl<'a> Drop for StartupLockEnterGuard<'a> {
fn drop(&mut self) {
self.active_guards.lock().remove(&self.id);
}
}
#[cfg(feature = "debug-locks")]
static GUARD_ID: AtomicUsize = AtomicUsize::new(0);
/// Synchronization mechanism that tracks the startup and shutdown of a region of code.
/// Guarantees that some code can only be started up once and shut down only if it is
/// already started.
/// Also tracks if the code is in-use and will wait for all 'entered' code to finish
/// before shutting down. Once a shutdown is requested, future calls to 'enter' will
/// fail, ensuring that nothing is 'entered' at the time of shutdown. This allows an
/// asynchronous shutdown to wait for operations to finish before proceeding.
#[derive(Debug)]
pub struct StartupLock {
startup_state: AsyncRwLock<bool>,
stop_source: Mutex<Option<StopSource>>,
#[cfg(feature = "debug-locks")]
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
}
impl StartupLock {
pub fn new() -> Self {
Self {
startup_state: AsyncRwLock::new(false),
stop_source: Mutex::new(None),
#[cfg(feature = "debug-locks")]
active_guards: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Start up if things are not already started up
/// One must call 'success()' on the returned startup lock guard if startup was successful
/// otherwise the startup lock will not shift to the 'started' state.
pub fn startup(&self) -> Result<StartupLockGuard, StartupLockAlreadyStartedError> {
let guard =
asyncrwlock_try_write!(self.startup_state).ok_or(StartupLockAlreadyStartedError)?;
if *guard {
return Err(StartupLockAlreadyStartedError);
}
*self.stop_source.lock() = Some(StopSource::new());
Ok(StartupLockGuard {
guard,
success_value: true,
})
}
/// Get a stop token for this lock
/// One can wait on this to timeout operations when a shutdown is requested
pub fn stop_token(&self) -> Option<StopToken> {
self.stop_source.lock().as_ref().map(|ss| ss.token())
}
/// Check if this StartupLock is currently in a started state
/// Returns false is the state is in transition
pub fn is_started(&self) -> bool {
let Some(guard) = asyncrwlock_try_read!(self.startup_state) else {
return false;
};
*guard
}
/// Check if this StartupLock is currently in a shut down state
/// Returns false is the state is in transition
pub fn is_shut_down(&self) -> bool {
let Some(guard) = asyncrwlock_try_read!(self.startup_state) else {
return false;
};
!*guard
}
/// Wait for all 'entered' operations to finish before shutting down
/// One must call 'success()' on the returned startup lock guard if shutdown was successful
/// otherwise the startup lock will not shift to the 'stopped' state.
pub async fn shutdown(&self) -> Result<StartupLockGuard, StartupLockAlreadyShutDownError> {
// Drop the stop source to ensure we can detect shutdown has been requested
*self.stop_source.lock() = None;
cfg_if! {
if #[cfg(feature = "debug-locks")] {
let guard = match timeout(30000, self.startup_state.write()).await {
Ok(v) => v,
Err(_) => {
eprintln!("active guards: {:#?}", self.active_guards.lock().values().collect::<Vec<_>>());
panic!("shutdown deadlock");
}
};
} else {
let guard = self.startup_state.write().await;
}
}
if !*guard {
return Err(StartupLockAlreadyShutDownError);
}
Ok(StartupLockGuard {
guard,
success_value: false,
})
}
/// Enter an operation in a started-up module.
/// If this module has not yet started up or is in the process of startup or shutdown
/// this will fail.
pub fn enter(&self) -> Result<StartupLockEnterGuard, StartupLockNotStartedError> {
let guard = asyncrwlock_try_read!(self.startup_state).ok_or(StartupLockNotStartedError)?;
if !*guard {
return Err(StartupLockNotStartedError);
}
let out = StartupLockEnterGuard {
_guard: guard,
#[cfg(feature = "debug-locks")]
id: GUARD_ID.fetch_add(1, Ordering::AcqRel),
#[cfg(feature = "debug-locks")]
active_guards: self.active_guards.clone(),
};
#[cfg(feature = "debug-locks")]
self.active_guards
.lock()
.insert(out.id, backtrace::Backtrace::new());
Ok(out)
}
}
impl Default for StartupLock {
fn default() -> Self {
Self::new()
}
}

View File

@ -1,5 +1,6 @@
pub mod test_async_tag_lock;
pub mod test_host_interface;
pub mod test_startup_lock;
#[allow(dead_code)]
pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [

View File

@ -35,7 +35,7 @@ pub async fn test_simple_single_contention() {
let g1 = table.lock_tag(a1).await;
info!("locked");
let t1 = spawn(async move {
let t1 = spawn("t1", async move {
// move the guard into the task
let _g1_take = g1;
// hold the guard for a bit
@ -90,7 +90,7 @@ pub async fn test_simple_double_contention() {
let g2 = table.lock_tag(a2).await;
info!("locked");
let t1 = spawn(async move {
let t1 = spawn("t1", async move {
// move the guard into the tas
let _g1_take = g1;
// hold the guard for a bit
@ -99,7 +99,7 @@ pub async fn test_simple_double_contention() {
// release the guard
info!("released");
});
let t2 = spawn(async move {
let t2 = spawn("t2", async move {
// move the guard into the task
let _g2_take = g2;
// hold the guard for a bit
@ -131,7 +131,7 @@ pub async fn test_parallel_single_contention() {
let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234);
let table1 = table.clone();
let t1 = spawn(async move {
let t1 = spawn("t1", async move {
// lock the tag
let _g = table1.lock_tag(a1).await;
info!("locked t1");
@ -143,7 +143,7 @@ pub async fn test_parallel_single_contention() {
});
let table2 = table.clone();
let t2 = spawn(async move {
let t2 = spawn("t2", async move {
// lock the tag
let _g = table2.lock_tag(a1).await;
info!("locked t2");
@ -155,7 +155,7 @@ pub async fn test_parallel_single_contention() {
});
let table3 = table.clone();
let t3 = spawn(async move {
let t3 = spawn("t3", async move {
// lock the tag
let _g = table3.lock_tag(a1).await;
info!("locked t3");

View File

@ -30,7 +30,7 @@ pub async fn test_eventual() {
let i4 = e1.instance_clone(4u32);
drop(i2);
let jh = spawn(async move {
let jh = spawn("task", async move {
sleep(1000).await;
e1.resolve();
});
@ -47,7 +47,7 @@ pub async fn test_eventual() {
let i3 = e1.instance_clone(3u32);
let i4 = e1.instance_clone(4u32);
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
let i5 = e1.instance_clone(5u32);
let i6 = e1.instance_clone(6u32);
assert_eq!(i1.await, 1u32);
@ -67,7 +67,7 @@ pub async fn test_eventual() {
let i1 = e1.instance_clone(1u32);
let i2 = e1.instance_clone(2u32);
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
assert_eq!(i1.await, 1u32);
assert_eq!(i2.await, 2u32);
});
@ -80,7 +80,7 @@ pub async fn test_eventual() {
//
let j1 = e1.instance_clone(1u32);
let j2 = e1.instance_clone(2u32);
let jh = spawn(async move {
let jh = spawn("task", async move {
assert_eq!(j1.await, 1u32);
assert_eq!(j2.await, 2u32);
});
@ -105,7 +105,7 @@ pub async fn test_eventual_value() {
drop(i2);
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
sleep(1000).await;
e1_c1.resolve(3u32);
});
@ -122,7 +122,7 @@ pub async fn test_eventual_value() {
let i3 = e1.instance();
let i4 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
let i5 = e1.instance();
let i6 = e1.instance();
i1.await;
@ -144,7 +144,7 @@ pub async fn test_eventual_value() {
let i1 = e1.instance();
let i2 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
i1.await;
i2.await;
});
@ -157,7 +157,7 @@ pub async fn test_eventual_value() {
//
let j1 = e1.instance();
let j2 = e1.instance();
let jh = spawn(async move {
let jh = spawn("task", async move {
j1.await;
j2.await;
});
@ -181,7 +181,7 @@ pub async fn test_eventual_value_clone() {
let i4 = e1.instance();
drop(i2);
let jh = spawn(async move {
let jh = spawn("task", async move {
sleep(1000).await;
e1.resolve(3u32);
});
@ -199,7 +199,7 @@ pub async fn test_eventual_value_clone() {
let i3 = e1.instance();
let i4 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
let i5 = e1.instance();
let i6 = e1.instance();
assert_eq!(i1.await, 4);
@ -220,7 +220,7 @@ pub async fn test_eventual_value_clone() {
let i1 = e1.instance();
let i2 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
assert_eq!(i1.await, 5);
assert_eq!(i2.await, 5);
});
@ -231,7 +231,7 @@ pub async fn test_eventual_value_clone() {
//
let j1 = e1.instance();
let j2 = e1.instance();
let jh = spawn(async move {
let jh = spawn("task", async move {
assert_eq!(j1.await, 6);
assert_eq!(j2.await, 6);
});
@ -245,7 +245,7 @@ pub async fn test_interval() {
info!("testing interval");
let tick: Arc<Mutex<u32>> = Arc::new(Mutex::new(0u32));
let stopper = interval(1000, move || {
let stopper = interval("interval", 1000, move || {
let tick = tick.clone();
async move {
let mut tick = tick.lock();
@ -493,7 +493,7 @@ pub async fn test_must_join_single_future() {
let sf = MustJoinSingleFuture::<u32>::new();
assert_eq!(sf.check().await, Ok(None));
assert_eq!(
sf.single_spawn(async {
sf.single_spawn("t1", async {
sleep(2000).await;
69
})
@ -501,10 +501,13 @@ pub async fn test_must_join_single_future() {
Ok((None, true))
);
assert_eq!(sf.check().await, Ok(None));
assert_eq!(sf.single_spawn(async { panic!() }).await, Ok((None, false)));
assert_eq!(
sf.single_spawn("t2", async { panic!() }).await,
Ok((None, false))
);
assert_eq!(sf.join().await, Ok(Some(69)));
assert_eq!(
sf.single_spawn(async {
sf.single_spawn("t3", async {
sleep(1000).await;
37
})
@ -513,7 +516,7 @@ pub async fn test_must_join_single_future() {
);
sleep(2000).await;
assert_eq!(
sf.single_spawn(async {
sf.single_spawn("t4", async {
sleep(1000).await;
27
})

View File

@ -0,0 +1,179 @@
use crate::*;
pub async fn test_startup_shutdown() {
info!("test_startup_shutdown");
let lock = StartupLock::new();
// Normal case
{
let guard = lock.startup().expect("should startup");
guard.success();
}
assert!(lock.is_started());
assert!(!lock.is_shut_down());
{
let guard = lock.shutdown().await.expect("should shutdown");
guard.success();
}
assert!(!lock.is_started());
assert!(lock.is_shut_down());
// Startup fail case
{
lock.startup().expect("should startup");
// Don't call success()
}
assert!(!lock.is_started());
{
lock.shutdown().await.expect_err("should not shutdown");
}
assert!(!lock.is_started());
// Shutdown fail case
{
let guard = lock.startup().expect("should startup");
guard.success();
}
assert!(lock.is_started());
{
lock.shutdown().await.expect("should shutdown");
// Don't call success()
}
assert!(lock.is_started());
{
let guard = lock.shutdown().await.expect("should shutdown");
guard.success();
}
assert!(!lock.is_started());
}
pub async fn test_contention() {
info!("test_contention");
let lock = Arc::new(StartupLock::new());
let val = Arc::new(AtomicBool::new(false));
{
let guard = lock.startup().expect("should startup");
guard.success();
}
assert!(lock.is_started());
let lock2 = lock.clone();
let val2 = val.clone();
let jh = spawn("task", async move {
let _guard = lock2.enter().expect("should enter");
sleep(2000).await;
val2.store(true, Ordering::Release);
});
sleep(1000).await;
{
let guard = lock.shutdown().await.expect("should shutdown");
assert!(
val.load(Ordering::Acquire),
"should have waited for enter to exit"
);
guard.success();
}
assert!(!lock.is_started());
jh.await;
}
pub async fn test_bad_enter() {
info!("test_bad_enter");
let lock = Arc::new(StartupLock::new());
lock.enter()
.expect_err("should not enter when not started up");
{
let guard = lock.startup().expect("should startup");
guard.success();
}
assert!(lock.is_started());
assert!(!lock.is_shut_down());
let lock2 = lock.clone();
let jh = spawn("task", async move {
let guard = lock2.shutdown().await.expect("should shutdown");
sleep(2000).await;
guard.success();
});
sleep(1000).await;
assert!(!lock.is_started());
assert!(!lock.is_shut_down());
lock.enter()
.expect_err("should not enter when shutting down");
jh.await;
assert!(!lock.is_started());
assert!(lock.is_shut_down());
lock.enter().expect_err("should not enter when shut down");
}
pub async fn test_multiple_enter() {
info!("test_multiple_enter");
let lock = Arc::new(StartupLock::new());
let s1 = lock.startup().expect("should startup");
s1.success();
{
let _e1 = lock.enter().expect("should enter 1");
{
let _e2 = lock.enter().expect("should enter 2");
{
let _e3 = lock.enter().expect("should enter 3");
}
}
}
let e4 = lock.enter().expect("should enter 4");
let e5 = lock.enter().expect("should enter 5");
let e6 = lock.enter().expect("should enter 6");
//eprintln!("1");
let lock2 = lock.clone();
let jh = spawn("task", async move {
//eprintln!("2");
let guard = lock2.shutdown().await.expect("should shutdown");
//eprintln!("7");
sleep(2000).await;
//eprintln!("8");
guard.success();
});
sleep(1000).await;
//eprintln!("3");
assert!(!lock.is_started());
assert!(!lock.is_shut_down());
// Now drop the enter created before shutdown
drop(e4);
//eprintln!("4");
drop(e5);
//eprintln!("5");
drop(e6);
//eprintln!("6");
// This should finally exit
jh.await;
//eprintln!("9");
assert!(!lock.is_started());
assert!(lock.is_shut_down());
lock.enter().expect_err("should not enter");
}
pub async fn test_all() {
test_startup_shutdown().await;
test_contention().await;
test_bad_enter().await;
test_multiple_enter().await;
}

View File

@ -14,6 +14,8 @@ use super::*;
pub async fn run_all_tests() {
info!("TEST: exec_test_host_interface");
test_host_interface::test_all().await;
info!("TEST: exec_test_startup_lock");
test_startup_lock::test_all().await;
info!("TEST: exec_test_network_interfaces");
test_network_interfaces::test_all().await;
info!("TEST: exec_test_async_peek_stream");
@ -87,6 +89,15 @@ cfg_if! {
});
}
#[test]
#[serial]
fn run_test_startup_lock() {
setup();
block_on(async {
test_startup_lock::test_all().await;
});
}
#[test]
#[serial]
fn run_test_network_interfaces() {

View File

@ -10,6 +10,7 @@ type TickTaskRoutine<E> =
/// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again.
/// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity.
pub struct TickTask<E: Send + 'static> {
name: String,
last_timestamp_us: AtomicU64,
tick_period_us: u64,
routine: OnceCell<Box<TickTaskRoutine<E>>>,
@ -19,8 +20,9 @@ pub struct TickTask<E: Send + 'static> {
}
impl<E: Send + 'static> TickTask<E> {
pub fn new_us(tick_period_us: u64) -> Self {
pub fn new_us(name: &str, tick_period_us: u64) -> Self {
Self {
name: name.to_string(),
last_timestamp_us: AtomicU64::new(0),
tick_period_us,
routine: OnceCell::new(),
@ -29,8 +31,9 @@ impl<E: Send + 'static> TickTask<E> {
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn new_ms(tick_period_ms: u32) -> Self {
pub fn new_ms(name: &str, tick_period_ms: u32) -> Self {
Self {
name: name.to_string(),
last_timestamp_us: AtomicU64::new(0),
tick_period_us: (tick_period_ms as u64) * 1000u64,
routine: OnceCell::new(),
@ -39,8 +42,9 @@ impl<E: Send + 'static> TickTask<E> {
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn new(tick_period_sec: u32) -> Self {
pub fn new(name: &str, tick_period_sec: u32) -> Self {
Self {
name: name.to_string(),
last_timestamp_us: AtomicU64::new(0),
tick_period_us: (tick_period_sec as u64) * 1000000u64,
routine: OnceCell::new(),
@ -100,19 +104,26 @@ impl<E: Send + 'static> TickTask<E> {
return Ok(());
}
self.internal_tick(now, last_timestamp_us).await.map(drop)
let itick = self.internal_tick(now, last_timestamp_us);
itick.await.map(drop)
}
pub async fn try_tick_now(&self) -> Result<bool, E> {
let now = get_timestamp();
let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
self.internal_tick(now, last_timestamp_us).await
let itick = self.internal_tick(now, last_timestamp_us);
itick.await
}
async fn internal_tick(&self, now: u64, last_timestamp_us: u64) -> Result<bool, E> {
// Lock the stop source, tells us if we have ever started this future
let opt_stop_source = &mut *self.stop_source.lock().await;
let opt_stop_source_fut = self.stop_source.lock();
let opt_stop_source = &mut *opt_stop_source_fut.await;
if opt_stop_source.is_some() {
// See if the previous execution finished with an error
match self.single_future.check().await {
@ -141,13 +152,19 @@ impl<E: Send + 'static> TickTask<E> {
let stop_token = stop_source.token();
let running = self.running.clone();
let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now);
let wrapped_routine = Box::pin(async move {
running.store(true, core::sync::atomic::Ordering::Release);
let out = routine.await;
running.store(false, core::sync::atomic::Ordering::Release);
out
});
match self.single_future.single_spawn(wrapped_routine).await {
match self
.single_future
.single_spawn(&self.name, wrapped_routine)
.await
{
// We should have already consumed the result of the last run, or there was none
// and we should definitely have run, because the prior 'check()' operation
// should have ensured the singlefuture was ready to run

View File

@ -8,7 +8,9 @@ cfg_if! {
where
F: Future<Output = T>,
{
match select(Box::pin(sleep(dur_ms)), Box::pin(f)).await {
let tout = select(Box::pin(sleep(dur_ms)), Box::pin(f));
match tout.await {
Either::Left((_x, _b)) => Err(TimeoutError()),
Either::Right((y, _a)) => Ok(y),
}
@ -22,11 +24,13 @@ cfg_if! {
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
async_std::future::timeout(Duration::from_millis(dur_ms as u64), f).await.map_err(|e| e.into())
let tout = async_std::future::timeout(Duration::from_millis(dur_ms as u64), f);
} else if #[cfg(feature="rt-tokio")] {
tokio::time::timeout(Duration::from_millis(dur_ms as u64), f).await.map_err(|e| e.into())
let tout = tokio::time::timeout(Duration::from_millis(dur_ms as u64), f);
}
}
tout.await.map_err(|e| e.into())
}
}

View File

@ -54,6 +54,20 @@ cfg_if::cfg_if! {
$x.clone().try_lock_owned().ok()
};
}
// #[macro_export]
// macro_rules! asyncrwlock_try_read {
// ($x:expr) => {
// $x.try_read().ok()
// };
// }
// #[macro_export]
// macro_rules! asyncrwlock_try_write {
// ($x:expr) => {
// $x.try_write().ok()
// };
// }
} else {
#[macro_export]
macro_rules! asyncmutex_try_lock {
@ -73,9 +87,23 @@ cfg_if::cfg_if! {
$x.try_lock_arc()
};
}
}
}
#[macro_export]
macro_rules! asyncrwlock_try_read {
($x:expr) => {
$x.try_read()
};
}
#[macro_export]
macro_rules! asyncrwlock_try_write {
($x:expr) => {
$x.try_write()
};
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn system_boxed<'a, Out>(
@ -474,3 +502,25 @@ pub fn type_name_of_val<T: ?Sized>(_val: &T) -> &'static str {
pub fn map_to_string<X: ToString>(arg: X) -> String {
arg.to_string()
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub struct DebugGuard {
name: &'static str,
counter: &'static AtomicUsize,
}
impl DebugGuard {
pub fn new(name: &'static str, counter: &'static AtomicUsize) -> Self {
let c = counter.fetch_add(1, Ordering::SeqCst);
eprintln!("{} entered: {}", name, c + 1);
Self { name, counter }
}
}
impl Drop for DebugGuard {
fn drop(&mut self) {
let c = self.counter.fetch_sub(1, Ordering::SeqCst);
eprintln!("{} exited: {}", self.name, c - 1);
}
}

View File

@ -44,3 +44,10 @@ async fn run_test_async_tag_lock() {
test_async_tag_lock::test_all().await;
}
#[wasm_bindgen_test]
async fn run_test_startup_lock() {
setup();
test_startup_lock::test_all().await;
}