spawn names

This commit is contained in:
Christien Rioux 2024-07-20 19:42:25 -04:00
parent 39d8b397fa
commit 35dc7bdfd6
32 changed files with 177 additions and 115 deletions

View File

@ -187,7 +187,7 @@ impl ClientApiConnection {
// Request initial server state
let capi = self.clone();
spawn_detached_local(async move {
spawn_detached_local("get initial server state", async move {
let mut req = json::JsonValue::new_object();
req["op"] = "GetState".into();
let Some(resp) = capi.perform_request(req).await else {

View File

@ -114,7 +114,7 @@ impl CommandProcessor {
trace!("CommandProcessor::cmd_help");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd help", async move {
let out = match capi.server_debug("help".to_owned()).await {
Err(e) => {
error!("Server command 'debug help' failed: {}", e);
@ -166,7 +166,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_shutdown");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd shutdown", async move {
if let Err(e) = capi.server_shutdown().await {
error!("Server command 'shutdown' failed to execute: {}", e);
}
@ -179,7 +179,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_disconnect");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd disconnect", async move {
capi.disconnect().await;
ui.send_callback(callback);
});
@ -190,7 +190,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_debug");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd debug", async move {
match capi.server_debug(command_line).await {
Ok(output) => {
ui.add_node_event(Level::Info, &output);
@ -213,7 +213,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_change_log_level");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd change_log_level", async move {
let (layer, rest) = Self::word_split(&rest.unwrap_or_default());
let log_level = match convert_loglevel(&rest.unwrap_or_default()) {
Ok(v) => v,
@ -252,7 +252,7 @@ Server Debug Commands:
trace!("CommandProcessor::cmd_change_log_ignore");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
spawn_detached_local("cmd change_log_ignoe", async move {
let (layer, rest) = Self::word_split(&rest.unwrap_or_default());
let log_ignore = rest.unwrap_or_default();
@ -284,7 +284,7 @@ Server Debug Commands:
let ui = self.ui_sender();
let this = self.clone();
spawn_detached_local(async move {
spawn_detached_local("cmd enable", async move {
let flag = rest.clone().unwrap_or_default();
match flag.as_str() {
"app_messages" => {
@ -306,7 +306,7 @@ Server Debug Commands:
let ui = self.ui_sender();
let this = self.clone();
spawn_detached_local(async move {
spawn_detached_local("cmd disable", async move {
let flag = rest.clone().unwrap_or_default();
match flag.as_str() {
"app_messages" => {
@ -664,7 +664,7 @@ Server Debug Commands:
pub fn attach(&self) {
let capi = self.capi();
spawn_detached_local(async move {
spawn_detached_local("attach", async move {
if let Err(e) = capi.server_attach().await {
error!("Server command 'attach' failed to execute: {}", e);
}
@ -674,7 +674,7 @@ Server Debug Commands:
pub fn detach(&self) {
let capi = self.capi();
spawn_detached_local(async move {
spawn_detached_local("detach", async move {
if let Err(e) = capi.server_detach().await {
error!("Server command 'detach' failed to execute: {}", e);
}

View File

@ -86,6 +86,7 @@ impl LogViewerUI {
done.await;
} else {
while let Ok(Ok(c)) = blocking_wrapper(
"LogViewerUI read",
{
let term = term.clone();
move || term.read_char()

View File

@ -323,7 +323,10 @@ impl AttachmentManager {
return false;
}
inner.maintain_peers = true;
inner.attachment_maintainer_jh = Some(spawn(self.clone().attachment_maintainer()));
inner.attachment_maintainer_jh = Some(spawn(
"attachment maintainer",
self.clone().attachment_maintainer(),
));
true
}

View File

@ -176,7 +176,7 @@ impl Crypto {
// Schedule flushing
let this = self.clone();
let flush_future = interval(60000, move || {
let flush_future = interval("crypto flush", 60000, move || {
let this = this.clone();
async move {
if let Err(e) = this.flush().await {

View File

@ -134,7 +134,10 @@ impl ConnectionManager {
let stop_source = StopSource::new();
// Spawn the async processor
let async_processor = spawn(self.clone().async_processor(stop_source.token(), receiver));
let async_processor = spawn(
"connection manager async processor",
self.clone().async_processor(stop_source.token(), receiver),
);
// Store in the inner object
*inner = Some(Self::new_inner(stop_source, sender, async_processor));

View File

@ -141,6 +141,7 @@ enum SendDataToExistingFlowResult {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StartupDisposition {
Success,
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
BindRetry,
}
@ -213,9 +214,18 @@ impl NetworkManager {
routing_table: RwLock::new(None),
components: RwLock::new(None),
update_callback: RwLock::new(None),
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS),
address_filter_task: TickTask::new(ADDRESS_FILTER_TASK_INTERVAL_SECS),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
public_address_check_task: TickTask::new(
"public_address_check_task",
PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS,
),
address_filter_task: TickTask::new(
"address_filter_task",
ADDRESS_FILTER_TASK_INTERVAL_SECS,
),
network_key,
startup_lock: StartupLock::new(),
}

View File

@ -191,7 +191,7 @@ impl IGDManager {
mapped_port: u16,
) -> Option<()> {
let this = self.clone();
blocking_wrapper(move || {
blocking_wrapper("igd unmap_port", move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
@ -235,7 +235,7 @@ impl IGDManager {
expected_external_address: Option<IpAddr>,
) -> Option<SocketAddr> {
let this = self.clone();
blocking_wrapper(move || {
blocking_wrapper("igd map_any_port", move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
@ -340,7 +340,7 @@ impl IGDManager {
}
let this = self.clone();
blocking_wrapper(move || {
blocking_wrapper("igd tick", move || {
let mut inner = this.inner.lock();
// Process full renewals

View File

@ -173,9 +173,9 @@ impl Network {
routing_table,
connection_manager,
interfaces: NetworkInterfaces::new(),
update_network_class_task: TickTask::new(1),
network_interfaces_task: TickTask::new(1),
upnp_task: TickTask::new(1),
update_network_class_task: TickTask::new("update_network_class_task", 1),
network_interfaces_task: TickTask::new("network_interfaces_task", 1),
upnp_task: TickTask::new("upnp_task", 1),
igd_manager: igd_manager::IGDManager::new(config.clone()),
}
}

View File

@ -297,7 +297,7 @@ impl Network {
let connection_manager = self.connection_manager();
////////////////////////////////////////////////////////////
let jh = spawn(async move {
let jh = spawn(&format!("TCP listener {}", addr), async move {
// moves listener object in and get incoming iterator
// when this task exists, the listener will close the socket

View File

@ -16,14 +16,14 @@ impl Network {
}
}
log_net!("task_count: {}", task_count);
for _ in 0..task_count {
for task_n in 0..task_count {
log_net!("Spawning UDP listener task");
////////////////////////////////////////////////////////////
// Run thread task to process stream of messages
let this = self.clone();
let jh = spawn(async move {
let jh = spawn(&format!("UDP listener {}", task_n), async move {
log_net!("UDP listener task spawned");
// Collect all our protocol handlers into a vector

View File

@ -148,7 +148,7 @@ impl NetworkConnection {
let local_stop_token = stop_source.token();
// Spawn connection processor and pass in protocol connection
let processor = spawn(Self::process_connection(
let processor = spawn("connection processor", Self::process_connection(
connection_manager,
local_stop_token,
manager_stop_token,

View File

@ -305,6 +305,7 @@ impl ReceiptManager {
// Single-spawn the timeout task routine
let _ = timeout_task
.single_spawn(
"receipt timeout",
self.clone()
.timeout_task_routine(now, stop_token)
.in_current_span(),

View File

@ -52,7 +52,6 @@ pub const MAX_CAPABILITIES: usize = 64;
/////////////////////////////////////////////////////////////////
struct NetworkInner {
network_started: Option<bool>,
network_needs_restart: bool,
protocol_config: ProtocolConfig,
}
@ -77,7 +76,6 @@ pub(in crate::network_manager) struct Network {
impl Network {
fn new_inner() -> NetworkInner {
NetworkInner {
network_started: Some(false),
network_needs_restart: false,
protocol_config: Default::default(),
}

View File

@ -221,14 +221,26 @@ impl RoutingTable {
node_id: c.network.routing_table.node_id.clone(),
node_id_secret: c.network.routing_table.node_id_secret.clone(),
kick_queue: Mutex::new(BTreeSet::default()),
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
kick_buckets_task: TickTask::new(1),
bootstrap_task: TickTask::new(1),
peer_minimum_refresh_task: TickTask::new(1),
closest_peers_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms),
ping_validator_task: TickTask::new(1),
relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS),
private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
kick_buckets_task: TickTask::new("kick_buckets_task", 1),
bootstrap_task: TickTask::new("bootstrap_task", 1),
peer_minimum_refresh_task: TickTask::new("peer_minimum_refresh_task", 1),
closest_peers_refresh_task: TickTask::new_ms(
"closest_peers_refresh_task",
c.network.dht.min_peer_refresh_time_ms,
),
ping_validator_task: TickTask::new("ping_validator_task", 1),
relay_management_task: TickTask::new(
"relay_management_task",
RELAY_MANAGEMENT_INTERVAL_SECS,
),
private_route_management_task: TickTask::new(
"private_route_management_task",
PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS,
),
}
}
pub fn new(network_manager: NetworkManager) -> Self {

View File

@ -392,10 +392,10 @@ impl RPCProcessor {
"Spinning up {} RPC workers",
self.unlocked_inner.concurrency
);
for _ in 0..self.unlocked_inner.concurrency {
for task_n in 0..self.unlocked_inner.concurrency {
let this = self.clone();
let receiver = channel.1.clone();
let jh = spawn(Self::rpc_worker(
let jh = spawn(&format!("rpc worker {}",task_n), Self::rpc_worker(
this,
inner.stop_source.as_ref().unwrap().token(),
receiver,
@ -1680,13 +1680,8 @@ impl RPCProcessor {
while let Ok(Ok((_span_id, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await
{
//let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv");
// xxx: causes crash (Missing otel data span extensions)
// rpc_worker_span.follows_from(span_id);
network_result_value_or_log!(match self
.process_rpc_message(msg).in_current_span()
//.instrument(rpc_worker_span)
.await
{
Err(e) => {

View File

@ -225,7 +225,7 @@ impl StorageManager {
};
// Call the fanout in a spawned task
spawn(Box::pin(async move {
spawn("outbound_get_value fanout", Box::pin(async move {
let fanout_call = FanoutCall::new(
routing_table.clone(),
key,

View File

@ -89,11 +89,26 @@ impl StorageManager {
table_store,
#[cfg(feature = "unstable-blockstore")]
block_store,
flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS),
offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS),
send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS),
check_active_watches_task: TickTask::new(CHECK_ACTIVE_WATCHES_INTERVAL_SECS),
check_watched_records_task: TickTask::new(CHECK_WATCHED_RECORDS_INTERVAL_SECS),
flush_record_stores_task: TickTask::new(
"flush_record_stores_task",
FLUSH_RECORD_STORES_INTERVAL_SECS,
),
offline_subkey_writes_task: TickTask::new(
"offline_subkey_writes_task",
OFFLINE_SUBKEY_WRITES_INTERVAL_SECS,
),
send_value_changes_task: TickTask::new(
"send_value_changes_task",
SEND_VALUE_CHANGES_INTERVAL_SECS,
),
check_active_watches_task: TickTask::new(
"check_active_watches_task",
CHECK_ACTIVE_WATCHES_INTERVAL_SECS,
),
check_watched_records_task: TickTask::new(
"check_watched_records_task",
CHECK_WATCHED_RECORDS_INTERVAL_SECS,
),
anonymous_watch_keys,
}

View File

@ -224,7 +224,7 @@ impl StorageManager {
};
// Call the fanout in a spawned task
spawn(Box::pin(async move {
spawn("outbound_set_value fanout", Box::pin(async move {
let fanout_call = FanoutCall::new(
routing_table.clone(),
key,

View File

@ -133,7 +133,7 @@ impl StorageManagerInner {
self.deferred_result_processor.init().await;
// Schedule tick
let tick_future = interval(1000, move || {
let tick_future = interval("storage manager tick", 1000, move || {
let this = outer_self.clone();
async move {
if let Err(e) = this.tick().await {

View File

@ -15,7 +15,7 @@ impl fmt::Debug for VeilidAPIInner {
impl Drop for VeilidAPIInner {
fn drop(&mut self) {
if let Some(context) = self.context.take() {
spawn_detached(api_shutdown(context));
spawn_detached("api shutdown", api_shutdown(context));
}
}
}

View File

@ -145,7 +145,11 @@ impl ClientApi {
let t_awg = awg.clone();
// Process the connection
spawn(self.clone().handle_ipc_connection(stream, t_awg)).detach();
spawn(
"client_api handle_ipc_connection",
self.clone().handle_ipc_connection(stream, t_awg),
)
.detach();
}
// Wait for all connections to terminate
@ -183,7 +187,11 @@ impl ClientApi {
let t_awg = awg.clone();
// Process the connection
spawn(self.clone().handle_tcp_connection(stream, t_awg)).detach();
spawn(
"client_api handle_tcp_connection",
self.clone().handle_tcp_connection(stream, t_awg),
)
.detach();
}
// Wait for all connections to terminate
@ -543,6 +551,6 @@ impl ClientApi {
}
let bind_futures_join = join_all(bind_futures);
self.inner.lock().join_handle = Some(spawn(bind_futures_join));
self.inner.lock().join_handle = Some(spawn("client_api bind_futures", bind_futures_join));
}
}

View File

@ -108,7 +108,7 @@ pub async fn run_veilid_server(
let capi2 = capi.clone();
let update_receiver_shutdown = SingleShotEventual::new(Some(()));
let mut update_receiver_shutdown_instance = update_receiver_shutdown.instance().fuse();
let update_receiver_jh = spawn_local(async move {
let update_receiver_jh = spawn_local("update_receiver", async move {
loop {
select! {
res = receiver.recv_async() => {

View File

@ -34,7 +34,7 @@ pub async fn run_veilid_server_with_signals(
Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT]).wrap_err("failed to init signals")?;
let handle = signals.handle();
let signals_task = spawn(handle_signals(signals));
let signals_task = spawn("signals", handle_signals(signals));
// Run veilid server
let res = run_veilid_server(settings, server_mode, veilid_logs).await;

View File

@ -32,7 +32,10 @@ impl DeferredStreamProcessor {
self.opt_stopper = Some(stopper);
let (dsc_tx, dsc_rx) = flume::unbounded::<SendPinBoxFuture<()>>();
self.opt_deferred_stream_channel = Some(dsc_tx);
self.opt_join_handle = Some(spawn(Self::processor(stop_token, dsc_rx)));
self.opt_join_handle = Some(spawn(
"deferred stream processor",
Self::processor(stop_token, dsc_rx),
));
}
/// Terminate the processor and ensure all streams are closed

View File

@ -3,7 +3,7 @@ use super::*;
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
pub fn interval<F, FUT>(freq_ms: u32, callback: F) -> SendPinBoxFuture<()>
pub fn interval<F, FUT>(name: &str, freq_ms: u32, callback: F) -> SendPinBoxFuture<()>
where
F: Fn() -> FUT + Send + Sync + 'static,
FUT: Future<Output = ()> + Send,
@ -11,7 +11,7 @@ cfg_if! {
let e = Eventual::new();
let ie = e.clone();
let jh = spawn(Box::pin(async move {
let jh = spawn(name, Box::pin(async move {
while timeout(freq_ms, ie.instance_clone(())).await.is_err() {
callback().await;
}
@ -25,7 +25,7 @@ cfg_if! {
} else {
pub fn interval<F, FUT>(freq_ms: u32, callback: F) -> SendPinBoxFuture<()>
pub fn interval<F, FUT>(name: &str, freq_ms: u32, callback: F) -> SendPinBoxFuture<()>
where
F: Fn() -> FUT + Send + Sync + 'static,
FUT: Future<Output = ()> + Send,
@ -33,7 +33,7 @@ cfg_if! {
let e = Eventual::new();
let ie = e.clone();
let jh = spawn(async move {
let jh = spawn(name, async move {
while timeout(freq_ms, ie.instance_clone(())).await.is_err() {
callback().await;
}

View File

@ -124,6 +124,7 @@ where
// Possibly spawn the future possibly returning the value of the last execution
pub async fn single_spawn_local(
&self,
name: &str,
future: impl Future<Output = T> + 'static,
) -> Result<(Option<T>, bool), ()> {
let mut out: Option<T> = None;
@ -152,7 +153,7 @@ where
// Run if we should do that
if run {
self.unlock(Some(spawn_local(future)));
self.unlock(Some(spawn_local(name, future)));
}
// Return the prior result if we have one
@ -166,6 +167,7 @@ where
{
pub async fn single_spawn(
&self,
name: &str,
future: impl Future<Output = T> + Send + 'static,
) -> Result<(Option<T>, bool), ()> {
let mut out: Option<T> = None;
@ -191,7 +193,7 @@ where
}
// Run if we should do that
if run {
self.unlock(Some(spawn(future)));
self.unlock(Some(spawn(name, future)));
}
// Return the prior result if we have one
Ok((out, run))

View File

@ -4,7 +4,7 @@ cfg_if! {
if #[cfg(target_arch = "wasm32")] {
use async_executors::{Bindgen, LocalSpawnHandleExt, SpawnHandleExt};
pub fn spawn<Out>(future: impl Future<Output = Out> + Send + 'static) -> MustJoinHandle<Out>
pub fn spawn<Out>(_name: &str, future: impl Future<Output = Out> + Send + 'static) -> MustJoinHandle<Out>
where
Out: Send + 'static,
{
@ -15,7 +15,7 @@ cfg_if! {
)
}
pub fn spawn_local<Out>(future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out>
pub fn spawn_local<Out>(_name: &str, future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out>
where
Out: 'static,
{
@ -26,7 +26,7 @@ cfg_if! {
)
}
pub fn spawn_detached<Out>(future: impl Future<Output = Out> + Send + 'static)
pub fn spawn_detached<Out>(_name: &str, future: impl Future<Output = Out> + Send + 'static)
where
Out: Send + 'static,
{
@ -35,7 +35,7 @@ cfg_if! {
.expect("wasm-bindgen-futures spawn_handle_local should never error out")
.detach()
}
pub fn spawn_detached_local<Out>(future: impl Future<Output = Out> + 'static)
pub fn spawn_detached_local<Out>(_name: &str, future: impl Future<Output = Out> + 'static)
where
Out: 'static,
{
@ -47,60 +47,60 @@ cfg_if! {
} else {
pub fn spawn<Out>(future: impl Future<Output = Out> + Send + 'static) -> MustJoinHandle<Out>
pub fn spawn<Out>(name: &str, future: impl Future<Output = Out> + Send + 'static) -> MustJoinHandle<Out>
where
Out: Send + 'static,
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
MustJoinHandle::new(async_std::task::spawn(future))
MustJoinHandle::new(async_std::task::Builder::new().name(name).spawn(future).unwrap())
} else if #[cfg(feature="rt-tokio")] {
MustJoinHandle::new(tokio::task::spawn(future))
MustJoinHandle::new(tokio::task::Builder::new().name(name).spawn(future).unwrap())
}
}
}
pub fn spawn_local<Out>(future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out>
pub fn spawn_local<Out>(name: &str, future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out>
where
Out: 'static,
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
MustJoinHandle::new(async_std::task::spawn_local(future))
MustJoinHandle::new(async_std::task::Builder::new().name(name).local(future).unwrap())
} else if #[cfg(feature="rt-tokio")] {
MustJoinHandle::new(tokio::task::spawn_local(future))
MustJoinHandle::new(tokio::task::Builder::new().name(name).spawn_local(future).unwrap())
}
}
}
pub fn spawn_detached<Out>(future: impl Future<Output = Out> + Send + 'static)
pub fn spawn_detached<Out>(name: &str, future: impl Future<Output = Out> + Send + 'static)
where
Out: Send + 'static,
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
drop(async_std::task::spawn(future));
drop(async_std::task::Builder::new().name(name).spawn(future).unwrap());
} else if #[cfg(feature="rt-tokio")] {
drop(tokio::task::spawn(future));
drop(tokio::task::Builder::new().name(name).spawn(future).unwrap());
}
}
}
pub fn spawn_detached_local<Out>(future: impl Future<Output = Out> + 'static)
pub fn spawn_detached_local<Out>(name: &str,future: impl Future<Output = Out> + 'static)
where
Out: 'static,
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
drop(async_std::task::spawn_local(future));
drop(async_std::task::Builder::new().name(name).local(future).unwrap());
} else if #[cfg(feature="rt-tokio")] {
drop(tokio::task::spawn_local(future));
drop(tokio::task::Builder::new().name(name).spawn_local(future).unwrap());
}
}
}
#[allow(unused_variables)]
pub async fn blocking_wrapper<F, R>(blocking_task: F, err_result: R) -> R
pub async fn blocking_wrapper<F, R>(name: &str, blocking_task: F, err_result: R) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
@ -108,9 +108,9 @@ cfg_if! {
// run blocking stuff in blocking thread
cfg_if! {
if #[cfg(feature="rt-async-std")] {
async_std::task::spawn_blocking(blocking_task).await
async_std::task::Builder::new().name(name).blocking(blocking_task)
} else if #[cfg(feature="rt-tokio")] {
tokio::task::spawn_blocking(blocking_task).await.unwrap_or(err_result)
tokio::task::Builder::new().name(name).spawn_blocking(blocking_task).unwrap().await.unwrap_or(err_result)
} else {
#[compile_error("must use an executor")]
}

View File

@ -35,7 +35,7 @@ pub async fn test_simple_single_contention() {
let g1 = table.lock_tag(a1).await;
info!("locked");
let t1 = spawn(async move {
let t1 = spawn("t1", async move {
// move the guard into the task
let _g1_take = g1;
// hold the guard for a bit
@ -90,7 +90,7 @@ pub async fn test_simple_double_contention() {
let g2 = table.lock_tag(a2).await;
info!("locked");
let t1 = spawn(async move {
let t1 = spawn("t1", async move {
// move the guard into the tas
let _g1_take = g1;
// hold the guard for a bit
@ -99,7 +99,7 @@ pub async fn test_simple_double_contention() {
// release the guard
info!("released");
});
let t2 = spawn(async move {
let t2 = spawn("t2", async move {
// move the guard into the task
let _g2_take = g2;
// hold the guard for a bit
@ -131,7 +131,7 @@ pub async fn test_parallel_single_contention() {
let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234);
let table1 = table.clone();
let t1 = spawn(async move {
let t1 = spawn("t1", async move {
// lock the tag
let _g = table1.lock_tag(a1).await;
info!("locked t1");
@ -143,7 +143,7 @@ pub async fn test_parallel_single_contention() {
});
let table2 = table.clone();
let t2 = spawn(async move {
let t2 = spawn("t2", async move {
// lock the tag
let _g = table2.lock_tag(a1).await;
info!("locked t2");
@ -155,7 +155,7 @@ pub async fn test_parallel_single_contention() {
});
let table3 = table.clone();
let t3 = spawn(async move {
let t3 = spawn("t3", async move {
// lock the tag
let _g = table3.lock_tag(a1).await;
info!("locked t3");

View File

@ -30,7 +30,7 @@ pub async fn test_eventual() {
let i4 = e1.instance_clone(4u32);
drop(i2);
let jh = spawn(async move {
let jh = spawn("task", async move {
sleep(1000).await;
e1.resolve();
});
@ -47,7 +47,7 @@ pub async fn test_eventual() {
let i3 = e1.instance_clone(3u32);
let i4 = e1.instance_clone(4u32);
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
let i5 = e1.instance_clone(5u32);
let i6 = e1.instance_clone(6u32);
assert_eq!(i1.await, 1u32);
@ -67,7 +67,7 @@ pub async fn test_eventual() {
let i1 = e1.instance_clone(1u32);
let i2 = e1.instance_clone(2u32);
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
assert_eq!(i1.await, 1u32);
assert_eq!(i2.await, 2u32);
});
@ -80,7 +80,7 @@ pub async fn test_eventual() {
//
let j1 = e1.instance_clone(1u32);
let j2 = e1.instance_clone(2u32);
let jh = spawn(async move {
let jh = spawn("task", async move {
assert_eq!(j1.await, 1u32);
assert_eq!(j2.await, 2u32);
});
@ -105,7 +105,7 @@ pub async fn test_eventual_value() {
drop(i2);
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
sleep(1000).await;
e1_c1.resolve(3u32);
});
@ -122,7 +122,7 @@ pub async fn test_eventual_value() {
let i3 = e1.instance();
let i4 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
let i5 = e1.instance();
let i6 = e1.instance();
i1.await;
@ -144,7 +144,7 @@ pub async fn test_eventual_value() {
let i1 = e1.instance();
let i2 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
i1.await;
i2.await;
});
@ -157,7 +157,7 @@ pub async fn test_eventual_value() {
//
let j1 = e1.instance();
let j2 = e1.instance();
let jh = spawn(async move {
let jh = spawn("task", async move {
j1.await;
j2.await;
});
@ -181,7 +181,7 @@ pub async fn test_eventual_value_clone() {
let i4 = e1.instance();
drop(i2);
let jh = spawn(async move {
let jh = spawn("task", async move {
sleep(1000).await;
e1.resolve(3u32);
});
@ -199,7 +199,7 @@ pub async fn test_eventual_value_clone() {
let i3 = e1.instance();
let i4 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
let i5 = e1.instance();
let i6 = e1.instance();
assert_eq!(i1.await, 4);
@ -220,7 +220,7 @@ pub async fn test_eventual_value_clone() {
let i1 = e1.instance();
let i2 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
assert_eq!(i1.await, 5);
assert_eq!(i2.await, 5);
});
@ -231,7 +231,7 @@ pub async fn test_eventual_value_clone() {
//
let j1 = e1.instance();
let j2 = e1.instance();
let jh = spawn(async move {
let jh = spawn("task", async move {
assert_eq!(j1.await, 6);
assert_eq!(j2.await, 6);
});
@ -245,7 +245,7 @@ pub async fn test_interval() {
info!("testing interval");
let tick: Arc<Mutex<u32>> = Arc::new(Mutex::new(0u32));
let stopper = interval(1000, move || {
let stopper = interval("interval", 1000, move || {
let tick = tick.clone();
async move {
let mut tick = tick.lock();
@ -493,7 +493,7 @@ pub async fn test_must_join_single_future() {
let sf = MustJoinSingleFuture::<u32>::new();
assert_eq!(sf.check().await, Ok(None));
assert_eq!(
sf.single_spawn(async {
sf.single_spawn("t1", async {
sleep(2000).await;
69
})
@ -501,10 +501,13 @@ pub async fn test_must_join_single_future() {
Ok((None, true))
);
assert_eq!(sf.check().await, Ok(None));
assert_eq!(sf.single_spawn(async { panic!() }).await, Ok((None, false)));
assert_eq!(
sf.single_spawn("t2", async { panic!() }).await,
Ok((None, false))
);
assert_eq!(sf.join().await, Ok(Some(69)));
assert_eq!(
sf.single_spawn(async {
sf.single_spawn("t3", async {
sleep(1000).await;
37
})
@ -513,7 +516,7 @@ pub async fn test_must_join_single_future() {
);
sleep(2000).await;
assert_eq!(
sf.single_spawn(async {
sf.single_spawn("t4", async {
sleep(1000).await;
27
})

View File

@ -62,7 +62,7 @@ pub async fn test_contention() {
assert!(lock.is_started());
let lock2 = lock.clone();
let val2 = val.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
let _guard = lock2.enter().expect("should enter");
sleep(2000).await;
val2.store(true, Ordering::Release);
@ -95,7 +95,7 @@ pub async fn test_bad_enter() {
assert!(!lock.is_shut_down());
let lock2 = lock.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
let guard = lock2.shutdown().await.expect("should shutdown");
sleep(2000).await;
guard.success();
@ -139,7 +139,7 @@ pub async fn test_multiple_enter() {
//eprintln!("1");
let lock2 = lock.clone();
let jh = spawn(async move {
let jh = spawn("task", async move {
//eprintln!("2");
let guard = lock2.shutdown().await.expect("should shutdown");
//eprintln!("7");

View File

@ -10,6 +10,7 @@ type TickTaskRoutine<E> =
/// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again.
/// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity.
pub struct TickTask<E: Send + 'static> {
name: String,
last_timestamp_us: AtomicU64,
tick_period_us: u64,
routine: OnceCell<Box<TickTaskRoutine<E>>>,
@ -19,8 +20,9 @@ pub struct TickTask<E: Send + 'static> {
}
impl<E: Send + 'static> TickTask<E> {
pub fn new_us(tick_period_us: u64) -> Self {
pub fn new_us(name: &str, tick_period_us: u64) -> Self {
Self {
name: name.to_string(),
last_timestamp_us: AtomicU64::new(0),
tick_period_us,
routine: OnceCell::new(),
@ -29,8 +31,9 @@ impl<E: Send + 'static> TickTask<E> {
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn new_ms(tick_period_ms: u32) -> Self {
pub fn new_ms(name: &str, tick_period_ms: u32) -> Self {
Self {
name: name.to_string(),
last_timestamp_us: AtomicU64::new(0),
tick_period_us: (tick_period_ms as u64) * 1000u64,
routine: OnceCell::new(),
@ -39,8 +42,9 @@ impl<E: Send + 'static> TickTask<E> {
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn new(tick_period_sec: u32) -> Self {
pub fn new(name: &str, tick_period_sec: u32) -> Self {
Self {
name: name.to_string(),
last_timestamp_us: AtomicU64::new(0),
tick_period_us: (tick_period_sec as u64) * 1000000u64,
routine: OnceCell::new(),
@ -147,7 +151,11 @@ impl<E: Send + 'static> TickTask<E> {
running.store(false, core::sync::atomic::Ordering::Release);
out
});
match self.single_future.single_spawn(wrapped_routine).await {
match self
.single_future
.single_spawn(&self.name, wrapped_routine)
.await
{
// We should have already consumed the result of the last run, or there was none
// and we should definitely have run, because the prior 'check()' operation
// should have ensured the singlefuture was ready to run