shutdown work and perfetto logger

This commit is contained in:
Christien Rioux 2024-07-20 18:05:24 -04:00
parent fdead16fc8
commit e568c39efb
16 changed files with 439 additions and 88 deletions

105
Cargo.lock generated
View File

@ -1958,6 +1958,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flate2"
version = "1.0.30"
@ -3147,6 +3153,12 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "multimap"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
[[package]]
name = "nanorand"
version = "0.7.0"
@ -4019,6 +4031,16 @@ dependencies = [
"sha2 0.10.8",
]
[[package]]
name = "petgraph"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset",
"indexmap 2.2.6",
]
[[package]]
name = "pharos"
version = "0.5.3"
@ -4170,6 +4192,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e"
dependencies = [
"proc-macro2",
"syn 2.0.71",
]
[[package]]
name = "proc-macro-crate"
version = "0.1.5"
@ -4218,6 +4250,27 @@ dependencies = [
"prost-derive 0.12.6",
]
[[package]]
name = "prost-build"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck 0.5.0",
"itertools 0.12.1",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost 0.12.6",
"prost-types",
"regex",
"syn 2.0.71",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.11.9"
@ -4259,6 +4312,15 @@ version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "protobuf-src"
version = "2.0.1+26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ba1cfa4b9dc098926b8cce388bf434b93516db3ecf6e8b1a37eb643d733ee7"
dependencies = [
"cmake",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@ -5252,6 +5314,18 @@ dependencies = [
"windows 0.52.0",
]
[[package]]
name = "tempfile"
version = "3.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1"
dependencies = [
"cfg-if 1.0.0",
"fastrand 2.1.0",
"rustix 0.38.34",
"windows-sys 0.52.0",
]
[[package]]
name = "termcolor"
version = "1.4.1"
@ -5310,6 +5384,16 @@ dependencies = [
"syn 2.0.71",
]
[[package]]
name = "thread-id"
version = "4.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe8f25bbdd100db7e1d34acf7fd2dc59c4bf8f7483f505eaa7d4f12f76cc0ea"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "thread_local"
version = "1.1.8"
@ -5744,6 +5828,24 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-perfetto"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd21777b526dfcb57f11f65aa8a2024d83e1db52841993229b6e282e511978b7"
dependencies = [
"anyhow",
"bytes",
"chrono",
"prost 0.12.6",
"prost-build",
"protobuf-src",
"rand",
"thread-id",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.18"
@ -6233,6 +6335,7 @@ dependencies = [
"tracing-flame",
"tracing-journald",
"tracing-opentelemetry 0.24.0",
"tracing-perfetto",
"tracing-subscriber",
"url",
"veilid-bugsalot",
@ -6246,7 +6349,7 @@ name = "veilid-tools"
version = "0.3.3"
dependencies = [
"android_logger 0.13.3",
"async-lock 2.8.0",
"async-lock 3.4.0",
"async-std",
"async_executors",
"backtrace",

View File

@ -414,6 +414,10 @@ impl ConnectionManager {
if !allow_accept {
return;
}
let Ok(_guard) = self.arc.startup_lock.enter() else {
return;
};
// Async lock on the remote address for atomicity per remote
let _lock_guard = self
.arc

View File

@ -64,6 +64,8 @@ pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration
pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60;
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
static FUCK: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Debug, Default)]
pub struct ProtocolConfig {
pub outbound: ProtocolTypeSet,
@ -172,6 +174,8 @@ struct NetworkManagerUnlockedInner {
address_filter_task: TickTask<EyreReport>,
// Network Key
network_key: Option<SharedSecret>,
// Startup Lock
startup_lock: StartupLock,
}
#[derive(Clone)]
@ -213,6 +217,7 @@ impl NetworkManager {
public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS),
address_filter_task: TickTask::new(ADDRESS_FILTER_TASK_INTERVAL_SECS),
network_key,
startup_lock: StartupLock::new(),
}
}
@ -445,27 +450,30 @@ impl NetworkManager {
#[instrument(level = "debug", skip_all, err)]
pub async fn startup(&self) -> EyreResult<StartupDisposition> {
let guard = self.unlocked_inner.startup_lock.startup()?;
match self.internal_startup().await {
Ok(StartupDisposition::Success) => {
guard.success();
// Inform api clients that things have changed
self.send_network_update();
Ok(StartupDisposition::Success)
}
Ok(StartupDisposition::BindRetry) => {
self.shutdown().await;
self.shutdown_internal().await;
Ok(StartupDisposition::BindRetry)
}
Err(e) => {
self.shutdown().await;
self.shutdown_internal().await;
Err(e)
}
}
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "starting network manager shutdown");
async fn shutdown_internal(&self) {
// Cancel all tasks
self.cancel_tasks().await;
@ -487,6 +495,20 @@ impl NetworkManager {
{
*self.inner.lock() = NetworkManager::new_inner();
}
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "starting network manager shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_net!(debug "network manager is already shut down");
return;
};
self.shutdown_internal().await;
guard.success();
// send update
log_net!(debug "sending network state update to api clients");
@ -562,6 +584,9 @@ impl NetworkManager {
extra_data: D,
callback: impl ReceiptCallback,
) -> EyreResult<Vec<u8>> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
bail!("network is not started");
};
let receipt_manager = self.receipt_manager();
let routing_table = self.routing_table();
@ -597,6 +622,10 @@ impl NetworkManager {
expiration_us: u64,
extra_data: D,
) -> EyreResult<(Vec<u8>, EventualValueFuture<ReceiptEvent>)> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
bail!("network is not started");
};
let receipt_manager = self.receipt_manager();
let routing_table = self.routing_table();
@ -633,6 +662,10 @@ impl NetworkManager {
&self,
receipt_data: R,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("network is not started");
};
let receipt_manager = self.receipt_manager();
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
@ -654,6 +687,10 @@ impl NetworkManager {
receipt_data: R,
inbound_noderef: NodeRef,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("network is not started");
};
let receipt_manager = self.receipt_manager();
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
@ -674,6 +711,10 @@ impl NetworkManager {
&self,
receipt_data: R,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("network is not started");
};
let receipt_manager = self.receipt_manager();
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
@ -695,6 +736,10 @@ impl NetworkManager {
receipt_data: R,
private_route: PublicKey,
) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("network is not started");
};
let receipt_manager = self.receipt_manager();
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
@ -710,12 +755,16 @@ impl NetworkManager {
}
// Process a received signal
#[instrument(level = "trace", target = "receipt", skip_all)]
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn handle_signal(
&self,
signal_flow: Flow,
signal_info: SignalInfo,
) -> EyreResult<NetworkResult<()>> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return Ok(NetworkResult::service_unavailable("network is not started"));
};
match signal_info {
SignalInfo::ReverseConnect { receipt, peer_info } => {
let routing_table = self.routing_table();
@ -809,7 +858,7 @@ impl NetworkManager {
}
/// Builds an envelope for sending over the network
#[instrument(level = "trace", target = "receipt", skip_all)]
#[instrument(level = "trace", target = "net", skip_all)]
fn build_envelope<B: AsRef<[u8]>>(
&self,
dest_node_id: TypedKey,
@ -852,13 +901,19 @@ impl NetworkManager {
/// node_ref is the direct destination to which the envelope will be sent
/// If 'destination_node_ref' is specified, it can be different than the node_ref being sent to
/// which will cause the envelope to be relayed
#[instrument(level = "trace", target = "receipt", skip_all)]
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn send_envelope<B: AsRef<[u8]>>(
&self,
node_ref: NodeRef,
destination_node_ref: Option<NodeRef>,
body: B,
) -> EyreResult<NetworkResult<SendDataMethod>> {
let _dg = DebugGuard::new(&FUCK);
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
bail!("network is not started");
};
let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone();
let best_node_id = destination_node_ref.best_node_id();
@ -896,6 +951,10 @@ impl NetworkManager {
dial_info: DialInfo,
rcpt_data: Vec<u8>,
) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
bail!("network is not started");
};
// Do we need to validate the outgoing receipt? Probably not
// because it is supposed to be opaque and the
// recipient/originator does the validation
@ -916,8 +975,12 @@ impl NetworkManager {
// Called when a packet potentially containing an RPC envelope is received by a low-level
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
// and passes it to the RPC handler
#[instrument(level = "trace", target = "receipt", skip_all)]
#[instrument(level = "trace", target = "net", skip_all)]
async fn on_recv_envelope(&self, data: &mut [u8], flow: Flow) -> EyreResult<bool> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return Ok(false);
};
log_net!("envelope of {} bytes received from {:?}", data.len(), flow);
let remote_addr = flow.remote_address().ip_addr();

View File

@ -941,10 +941,12 @@ impl Network {
}
Ok(StartupDisposition::BindRetry) => {
debug!("network bind retry");
self.shutdown_internal().await;
Ok(StartupDisposition::BindRetry)
}
Err(e) => {
debug!("network failed to start");
self.shutdown_internal().await;
Err(e)
}
}
@ -964,13 +966,7 @@ impl Network {
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "starting low level network shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_net!(debug "low level network is already shut down");
return;
};
async fn shutdown_internal(&self) {
let routing_table = self.routing_table();
// Stop all tasks
@ -1014,6 +1010,17 @@ impl Network {
// Reset state including network class
*self.inner.lock() = Self::new_inner();
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "starting low level network shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_net!(debug "low level network is already shut down");
return;
};
self.shutdown_internal().await;
guard.success();
log_net!(debug "finished low level network shutdown");

View File

@ -309,10 +309,6 @@ impl NetworkConnection {
match res {
Ok((_span_id, message)) => {
// let span = span!(Level::TRACE, "process_connection send");
// span.follows_from(span_id);
// let _enter = span.enter();
// Touch the LRU for this connection
connection_manager.touch_connection_by_id(connection_id);

View File

@ -35,7 +35,7 @@ impl Default for NetworkManagerStats {
impl NetworkManager {
// Callbacks from low level network for statistics gathering
pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
pub(crate) fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats
@ -52,7 +52,7 @@ impl NetworkManager {
.add_up(bytes);
}
pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
pub(crate) fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats

View File

@ -114,7 +114,6 @@ impl RoutingTable {
rpc.rpc_call_status(Destination::direct(relay_nr_filtered))
.await
}
.instrument(Span::current())
.boxed(),
);
}
@ -159,9 +158,7 @@ impl RoutingTable {
log_rtab!("--> Watch ping to {:?}", watch_nr);
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(watch_nr)).await }
.instrument(Span::current())
.boxed(),
async move { rpc.rpc_call_status(Destination::direct(watch_nr)).await }.boxed(),
);
}
Ok(())
@ -198,9 +195,7 @@ impl RoutingTable {
let rpc = rpc.clone();
log_rtab!("--> Validator ping to {:?}", nr);
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr)).await }
.instrument(Span::current())
.boxed(),
async move { rpc.rpc_call_status(Destination::direct(nr)).await }.boxed(),
);
}
@ -226,9 +221,7 @@ impl RoutingTable {
// Just do a single ping with the best protocol for all the nodes
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr)).await }
.instrument(Span::current())
.boxed(),
async move { rpc.rpc_call_status(Destination::direct(nr)).await }.boxed(),
);
}
@ -244,6 +237,8 @@ impl RoutingTable {
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
eprintln!("pv tick");
let mut futurequeue: VecDeque<PingValidatorFuture> = VecDeque::new();
// PublicInternet
@ -258,7 +253,6 @@ impl RoutingTable {
let mut unord = FuturesUnordered::new();
while !unord.is_empty() || !futurequeue.is_empty() {
#[cfg(feature = "verbose-tracing")]
log_rtab!(debug "Ping validation queue: {} remaining, {} in progress", futurequeue.len(), unord.len());
// Process one unordered futures if we have some
match unord.next().timeout_at(stop_token.clone()).await {
@ -279,7 +273,7 @@ impl RoutingTable {
let Some(fq) = futurequeue.pop_front() else {
break;
};
unord.push(fq);
unord.push(fq.in_current_span());
}
}

View File

@ -81,6 +81,7 @@ stop-token = { version = "^0", default-features = false }
sysinfo = { version = "^0.30.13" }
wg = { version = "^0.9.1", features = ["future"] }
tracing-flame = "0.2.0"
tracing-perfetto = "0.1.1"
[target.'cfg(windows)'.dependencies]
windows-service = "^0"

View File

@ -84,10 +84,14 @@ pub struct CmdlineArgs {
#[arg(long, value_name = "endpoint")]
otlp: Option<String>,
/// Turn on flamegraph tracing (experimental, isn't terribly useful)
/// Turn on flamegraph tracing (experimental)
#[arg(long, hide = true, value_name = "PATH", num_args=0..=1, require_equals=true, default_missing_value = "")]
flame: Option<OsString>,
/// Turn on perfetto tracing (experimental)
#[arg(long, hide = true, value_name = "PATH", num_args=0..=1, require_equals=true, default_missing_value = "")]
perfetto: Option<OsString>,
/// Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports
#[arg(long)]
subnode_index: Option<u16>,
@ -223,6 +227,18 @@ fn main() -> EyreResult<()> {
settingsrw.logging.flame.enabled = true;
settingsrw.logging.flame.path = flame;
}
if let Some(perfetto) = args.perfetto {
let perfetto = if perfetto.is_empty() {
Settings::get_default_perfetto_path(settingsrw.testing.subnode_index)
.to_string_lossy()
.to_string()
} else {
perfetto.to_string_lossy().to_string()
};
println!("Enabling perfetto output to {}", perfetto);
settingsrw.logging.perfetto.enabled = true;
settingsrw.logging.perfetto.path = perfetto;
}
if args.no_attach {
settingsrw.auto_attach = false;

View File

@ -69,6 +69,9 @@ logging:
flame:
enabled: false
path: ''
perfetto:
enabled: false
path: ''
console:
enabled: false
testing:
@ -451,6 +454,12 @@ pub struct Flame {
pub path: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Perfetto {
pub enabled: bool,
pub path: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Console {
pub enabled: bool,
@ -503,6 +512,7 @@ pub struct Logging {
pub api: Api,
pub otlp: Otlp,
pub flame: Flame,
pub perfetto: Perfetto,
pub console: Console,
}
@ -873,6 +883,15 @@ impl Settings {
})
}
/// Determine default perfetto output path
pub fn get_default_perfetto_path(subnode_index: u16) -> PathBuf {
std::env::temp_dir().join(if subnode_index == 0 {
"veilid-server.pftrace".to_owned()
} else {
format!("veilid-server-{}.pftrace", subnode_index)
})
}
#[allow(dead_code)]
fn get_or_create_private_directory<P: AsRef<Path>>(path: P, group_read: bool) -> bool {
let path = path.as_ref();
@ -996,6 +1015,8 @@ impl Settings {
set_config_value!(inner.logging.otlp.ignore_log_targets, value);
set_config_value!(inner.logging.flame.enabled, value);
set_config_value!(inner.logging.flame.path, value);
set_config_value!(inner.logging.perfetto.enabled, value);
set_config_value!(inner.logging.perfetto.path, value);
set_config_value!(inner.logging.console.enabled, value);
set_config_value!(inner.testing.subnode_index, value);
set_config_value!(inner.core.capabilities.disable, value);
@ -1565,6 +1586,8 @@ mod tests {
);
assert!(!s.logging.flame.enabled);
assert_eq!(s.logging.flame.path, "");
assert!(!s.logging.perfetto.enabled);
assert_eq!(s.logging.perfetto.path, "");
assert!(!s.logging.console.enabled);
assert_eq!(s.testing.subnode_index, 0);

View File

@ -18,6 +18,7 @@ use std::path::*;
use std::sync::Arc;
use tracing_appender::*;
use tracing_flame::FlameLayer;
use tracing_perfetto::PerfettoLayer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::*;
@ -48,14 +49,15 @@ impl VeilidLogs {
#[cfg(feature = "rt-tokio")]
if settingsr.logging.console.enabled {
let filter = veilid_core::VeilidLayerFilter::new_no_default(
veilid_core::VeilidConfigLogLevel::Trace,
&[],
);
let layer = ConsoleLayer::builder()
.with_default_env()
.spawn()
.with_filter(
filter::Targets::new()
.with_target("tokio", Level::TRACE)
.with_target("runtime", Level::TRACE),
);
.with_filter(filter);
layers.push(layer.boxed());
}
@ -93,6 +95,26 @@ impl VeilidLogs {
);
}
// Perfetto logger
if settingsr.logging.perfetto.enabled {
let filter = veilid_core::VeilidLayerFilter::new_no_default(
veilid_core::VeilidConfigLogLevel::Trace,
&veilid_core::FLAME_LOG_FACILITIES_IGNORE_LIST.map(|x| x.to_string()),
);
let perfetto_layer = PerfettoLayer::new(std::sync::Mutex::new(std::fs::File::create(
&settingsr.logging.perfetto.path,
)?));
// Do not include this in change_log_level changes, so we keep trace level
// filters.insert("flame", filter.clone());
layers.push(
perfetto_layer
.with_debug_annotations(true)
.with_filter(filter)
.boxed(),
);
}
// OpenTelemetry logger
#[cfg(feature = "opentelemetry-otlp")]
if settingsr.logging.otlp.enabled {

View File

@ -34,6 +34,7 @@ rt-wasm-bindgen = ["async_executors/bindgen", "async_executors/timer"]
veilid_tools_android_tests = ["dep:paranoid-android"]
veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"]
tracing = ["dep:tracing", "dep:tracing-subscriber"]
debug-locks = []
[dependencies]
tracing = { version = "0.1.40", features = [
@ -52,6 +53,7 @@ futures-util = { version = "0.3.30", default-features = false, features = [
"alloc",
] }
parking_lot = "0.12.3"
async-lock = "3.4.0"
once_cell = "1.19.0"
stop-token = { version = "0.7.0", default-features = false }
rand = "0.8.5"
@ -87,7 +89,6 @@ wasm-bindgen-futures = "0.4.42"
async_executors = { version = "0.7.0", default-features = false }
getrandom = { version = "0.2", features = ["js"] }
async-lock = "2.8.0"
send_wrapper = { version = "0.6.0", features = ["futures"] }
# Dependencies for Linux or Android

View File

@ -107,7 +107,7 @@ pub use std::str::FromStr;
#[doc(no_inline)]
pub use std::string::{String, ToString};
#[doc(no_inline)]
pub use std::sync::atomic::{AtomicBool, Ordering};
pub use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[doc(no_inline)]
pub use std::sync::{Arc, Weak};
#[doc(no_inline)]
@ -117,6 +117,13 @@ pub use std::time::Duration;
#[doc(no_inline)]
pub use std::vec::Vec;
#[doc(no_inline)]
pub use async_lock::RwLock as AsyncRwLock;
#[doc(no_inline)]
pub use async_lock::RwLockReadGuard as AsyncRwLockReadGuard;
#[doc(no_inline)]
pub use async_lock::RwLockWriteGuard as AsyncRwLockWriteGuard;
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
#[doc(no_inline)]
@ -126,13 +133,6 @@ cfg_if! {
#[doc(no_inline)]
pub use async_lock::MutexGuardArc as AsyncMutexGuardArc;
#[doc(no_inline)]
pub use async_lock::RwLock as AsyncRwLock;
#[doc(no_inline)]
pub use async_lock::RwLockReadGuard as AsyncRwLockReadGuard;
#[doc(no_inline)]
pub use async_lock::RwLockWriteGuard as AsyncRwLockWriteGuard;
#[doc(no_inline)]
pub use async_executors::JoinHandle as LowLevelJoinHandle;
@ -146,12 +146,12 @@ cfg_if! {
#[doc(no_inline)]
pub use async_std::sync::MutexGuardArc as AsyncMutexGuardArc;
#[doc(no_inline)]
pub use async_std::sync::RwLock as AsyncRwLock;
#[doc(no_inline)]
pub use async_std::sync::RwLockReadGuard as AsyncRwLockReadGuard;
#[doc(no_inline)]
pub use async_std::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
// #[doc(no_inline)]
// pub use async_std::sync::RwLock as AsyncRwLock;
// #[doc(no_inline)]
// pub use async_std::sync::RwLockReadGuard as AsyncRwLockReadGuard;
// #[doc(no_inline)]
// pub use async_std::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
#[doc(no_inline)]
pub use async_std::task::JoinHandle as LowLevelJoinHandle;
@ -164,12 +164,12 @@ cfg_if! {
#[doc(no_inline)]
pub use tokio::sync::OwnedMutexGuard as AsyncMutexGuardArc;
#[doc(no_inline)]
pub use tokio::sync::RwLock as AsyncRwLock;
#[doc(no_inline)]
pub use tokio::sync::RwLockReadGuard as AsyncRwLockReadGuard;
#[doc(no_inline)]
pub use tokio::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
// #[doc(no_inline)]
// pub use tokio::sync::RwLock as AsyncRwLock;
// #[doc(no_inline)]
// pub use tokio::sync::RwLockReadGuard as AsyncRwLockReadGuard;
// #[doc(no_inline)]
// pub use tokio::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
#[doc(no_inline)]

View File

@ -34,8 +34,22 @@ impl<'a> StartupLockGuard<'a> {
#[derive(Debug)]
pub struct StartupLockEnterGuard<'a> {
_guard: AsyncRwLockReadGuard<'a, bool>,
// #[cfg(feature = "debug-locks")]
id: usize,
// #[cfg(feature = "debug-locks")]
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
}
//#[cfg(feature = "debug-locks")]
impl<'a> Drop for StartupLockEnterGuard<'a> {
fn drop(&mut self) {
self.active_guards.lock().remove(&self.id);
}
}
//#[cfg(feature = "debug-locks")]
static GUARD_ID: AtomicUsize = AtomicUsize::new(0);
/// Synchronization mechanism that tracks the startup and shutdown of a region of code.
/// Guarantees that some code can only be started up once and shut down only if it is
/// already started.
@ -46,12 +60,16 @@ pub struct StartupLockEnterGuard<'a> {
#[derive(Debug)]
pub struct StartupLock {
rwlock: AsyncRwLock<bool>,
// #[cfg(feature = "debug-locks")]
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
}
impl StartupLock {
pub fn new() -> Self {
Self {
rwlock: AsyncRwLock::new(false),
// #[cfg(feature = "debug-locks")]
active_guards: Arc::new(Mutex::new(HashMap::new())),
}
}
@ -91,7 +109,20 @@ impl StartupLock {
/// One must call 'success()' on the returned startup lock guard if shutdown was successful
/// otherwise the startup lock will not shift to the 'stopped' state.
pub async fn shutdown(&self) -> Result<StartupLockGuard, StartupLockAlreadyShutDownError> {
let guard = self.rwlock.write().await;
cfg_if! {
if #[cfg(feature = "debug-locks")] {
//let guard = self.rwlock.write().await;
} else {
let guard = self.rwlock.write().await;
// let guard = match timeout(30000, self.rwlock.write()).await {
// Ok(v) => v,
// Err(_) => {
// eprintln!("active guards: {:#?}", self.active_guards.lock().values().collect::<Vec<_>>());
// panic!("shutdown deadlock");
// }
// };
}
}
if !*guard {
return Err(StartupLockAlreadyShutDownError);
}
@ -109,7 +140,16 @@ impl StartupLock {
if !*guard {
return Err(StartupLockNotStartedError);
}
Ok(StartupLockEnterGuard { _guard: guard })
let out = StartupLockEnterGuard {
_guard: guard,
//#[cfg(feature = "debug-locks")]
id: GUARD_ID.fetch_add(1, Ordering::AcqRel),
active_guards: self.active_guards.clone(),
};
self.active_guards
.lock()
.insert(out.id, backtrace::Backtrace::new());
Ok(out)
}
}

View File

@ -114,8 +114,66 @@ pub async fn test_bad_enter() {
lock.enter().expect_err("should not enter when shut down");
}
pub async fn test_multiple_enter() {
info!("test_multiple_enter");
let lock = Arc::new(StartupLock::new());
let s1 = lock.startup().expect("should startup");
s1.success();
{
let _e1 = lock.enter().expect("should enter 1");
{
let _e2 = lock.enter().expect("should enter 2");
{
let _e3 = lock.enter().expect("should enter 3");
}
}
}
let e4 = lock.enter().expect("should enter 4");
let e5 = lock.enter().expect("should enter 5");
let e6 = lock.enter().expect("should enter 6");
//eprintln!("1");
let lock2 = lock.clone();
let jh = spawn(async move {
//eprintln!("2");
let guard = lock2.shutdown().await.expect("should shutdown");
//eprintln!("7");
sleep(2000).await;
//eprintln!("8");
guard.success();
});
sleep(1000).await;
//eprintln!("3");
assert!(!lock.is_started());
assert!(!lock.is_shut_down());
// Now drop the enter created before shutdown
drop(e4);
//eprintln!("4");
drop(e5);
//eprintln!("5");
drop(e6);
//eprintln!("6");
// This should finally exit
jh.await;
//eprintln!("9");
assert!(!lock.is_started());
assert!(lock.is_shut_down());
lock.enter().expect_err("should not enter");
}
pub async fn test_all() {
test_startup_shutdown().await;
test_contention().await;
test_bad_enter().await;
test_multiple_enter().await;
}

View File

@ -55,19 +55,19 @@ cfg_if::cfg_if! {
};
}
#[macro_export]
macro_rules! asyncrwlock_try_read {
($x:expr) => {
$x.try_read().ok()
};
}
// #[macro_export]
// macro_rules! asyncrwlock_try_read {
// ($x:expr) => {
// $x.try_read().ok()
// };
// }
#[macro_export]
macro_rules! asyncrwlock_try_write {
($x:expr) => {
$x.try_write().ok()
};
}
// #[macro_export]
// macro_rules! asyncrwlock_try_write {
// ($x:expr) => {
// $x.try_write().ok()
// };
// }
} else {
#[macro_export]
macro_rules! asyncmutex_try_lock {
@ -87,21 +87,23 @@ cfg_if::cfg_if! {
$x.try_lock_arc()
};
}
#[macro_export]
macro_rules! asyncrwlock_try_read {
($x:expr) => {
$x.try_read()
};
}
#[macro_export]
macro_rules! asyncrwlock_try_write {
($x:expr) => {
$x.try_write()
};
}
}
}
#[macro_export]
macro_rules! asyncrwlock_try_read {
($x:expr) => {
$x.try_read()
};
}
#[macro_export]
macro_rules! asyncrwlock_try_write {
($x:expr) => {
$x.try_write()
};
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn system_boxed<'a, Out>(
@ -500,3 +502,24 @@ pub fn type_name_of_val<T: ?Sized>(_val: &T) -> &'static str {
pub fn map_to_string<X: ToString>(arg: X) -> String {
arg.to_string()
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub struct DebugGuard {
counter: &'static AtomicUsize,
}
impl DebugGuard {
pub fn new(counter: &'static AtomicUsize) -> Self {
let c = counter.fetch_add(1, Ordering::SeqCst);
eprintln!("DebugGuard Entered: {}", c + 1);
Self { counter }
}
}
impl Drop for DebugGuard {
fn drop(&mut self) {
let c = self.counter.fetch_sub(1, Ordering::SeqCst);
eprintln!("DebugGuard Exited: {}", c - 1);
}
}