clean up spans and correct shutdown behavior. clean up reverse connection and hole punch errors

This commit is contained in:
Christien Rioux 2024-07-21 21:15:54 -04:00
parent 35dc7bdfd6
commit 2ec843cd17
46 changed files with 354 additions and 212 deletions

View File

@ -22,12 +22,12 @@ rt-async-std = [
rt-tokio = ["tokio", "tokio-util", "veilid-tools/rt-tokio", "cursive/rt-tokio"]
[dependencies]
async-std = { version = "^1.12", features = [
async-std = { version = "1.12.0", features = [
"unstable",
"attributes",
], optional = true }
tokio = { version = "^1", features = ["full"], optional = true }
tokio-util = { version = "^0", features = ["compat"], optional = true }
tokio = { version = "1.38.1", features = ["full", "tracing"], optional = true }
tokio-util = { version = "0.7.11", features = ["compat"], optional = true }
async-tungstenite = { version = "^0.23" }
cursive = { git = "https://gitlab.com/veilid/cursive.git", default-features = false, features = [
"crossterm",

View File

@ -51,6 +51,7 @@ crypto-test = ["enable-crypto-vld0", "enable-crypto-none"]
crypto-test-none = ["enable-crypto-none"]
veilid_core_android_tests = ["dep:paranoid-android"]
veilid_core_ios_tests = ["dep:tracing-oslog"]
debug-locks = ["veilid-tools/debug-locks"]
### DEPENDENCIES

View File

@ -211,7 +211,7 @@ impl AttachmentManager {
}
}
#[instrument(level = "debug", skip_all)]
#[instrument(parent = None, level = "debug", skip_all)]
async fn attachment_maintainer(self) {
log_net!(debug "attachment starting");
self.update_attaching_detaching_state(AttachmentState::Attaching);

View File

@ -337,6 +337,7 @@ impl AddressFilter {
.or_insert(punishment);
}
#[instrument(parent = None, level = "trace", skip_all, err)]
pub async fn address_filter_task_routine(
self,
_stop_token: StopToken,

View File

@ -64,8 +64,6 @@ pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration
pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60;
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
static FUCK: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Debug, Default)]
pub struct ProtocolConfig {
pub outbound: ProtocolTypeSet,
@ -586,7 +584,7 @@ impl NetworkManager {
}
/// Generates a multi-shot/normal receipt
#[instrument(level = "trace", skip(self, extra_data, callback), err)]
#[instrument(level = "trace", skip(self, extra_data, callback))]
pub fn generate_receipt<D: AsRef<[u8]>>(
&self,
expiration_us: u64,
@ -626,7 +624,7 @@ impl NetworkManager {
}
/// Generates a single-shot/normal receipt
#[instrument(level = "trace", skip(self, extra_data), err)]
#[instrument(level = "trace", skip(self, extra_data))]
pub fn generate_single_shot_receipt<D: AsRef<[u8]>>(
&self,
expiration_us: u64,
@ -918,10 +916,8 @@ impl NetworkManager {
destination_node_ref: Option<NodeRef>,
body: B,
) -> EyreResult<NetworkResult<SendDataMethod>> {
let _dg = DebugGuard::new(&FUCK);
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
bail!("network is not started");
return Ok(NetworkResult::no_connection_other("network is not started"));
};
let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone();
@ -962,7 +958,8 @@ impl NetworkManager {
rcpt_data: Vec<u8>,
) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
bail!("network is not started");
log_net!(debug "not sending out-of-band receipt to {} because network is stopped", dial_info);
return Ok(());
};
// Do we need to validate the outgoing receipt? Probably not

View File

@ -262,7 +262,7 @@ impl DiscoveryContext {
// Always process two at a time so we get both addresses in parallel if possible
if unord.len() == 2 {
// Process one
if let Some(Some(ei)) = unord.next().await {
if let Some(Some(ei)) = unord.next().in_current_span().await {
external_address_infos.push(ei);
if external_address_infos.len() == 2 {
break;
@ -272,7 +272,7 @@ impl DiscoveryContext {
}
// Finish whatever is left if we need to
if external_address_infos.len() < 2 {
while let Some(res) = unord.next().await {
while let Some(res) = unord.next().in_current_span().await {
if let Some(ei) = res {
external_address_infos.push(ei);
if external_address_infos.len() == 2 {
@ -644,6 +644,7 @@ impl DiscoveryContext {
}
/// Add discovery futures to an unordered set that may detect dialinfo when they complete
#[instrument(level = "trace", skip(self))]
pub async fn discover(
&self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectionResult>>>,
@ -681,7 +682,7 @@ impl DiscoveryContext {
}
};
if let Some(clear_network_callback) = some_clear_network_callback {
clear_network_callback().await;
clear_network_callback().in_current_span().await;
}
// UPNP Automatic Mapping

View File

@ -310,7 +310,7 @@ impl IGDManager {
.await
}
#[instrument(level = "trace", target = "net", skip_all, err)]
#[instrument(level = "trace", target = "net", name = "IGDManager::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<bool> {
// Refresh mappings if we have them
// If an error is received, then return false to restart the local network
@ -434,6 +434,6 @@ impl IGDManager {
// Normal exit, no restart
Ok(true)
}, Err(eyre!("failed to process blocking task"))).in_current_span().await
}, Err(eyre!("failed to process blocking task"))).instrument(tracing::trace_span!("igd tick fut")).await
}
}

View File

@ -518,7 +518,7 @@ impl Network {
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
let (recv_len, recv_addr) = network_result_try!(timeout(
timeout_ms,
h.recv_message(&mut out).instrument(Span::current())
h.recv_message(&mut out).in_current_span()
)
.await
.into_network_result())
@ -569,7 +569,7 @@ impl Network {
let out = network_result_try!(network_result_try!(timeout(
timeout_ms,
pnc.recv()
pnc.recv().in_current_span()
)
.await
.into_network_result())
@ -1063,7 +1063,7 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", target = "net", skip_all, err)]
#[instrument(parent = None, level = "trace", target = "net", skip_all, err)]
async fn upnp_task_routine(self, _stop_token: StopToken, _l: u64, _t: u64) -> EyreResult<()> {
if !self.unlocked_inner.igd_manager.tick().await? {
info!("upnp failed, restarting local network");
@ -1074,7 +1074,7 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", target = "net", skip_all, err)]
#[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)]
pub(crate) async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");

View File

@ -200,7 +200,12 @@ impl Network {
// Wait for all discovery futures to complete and apply discoverycontexts
let mut all_address_types = AddressTypeSet::new();
loop {
match unord.next().timeout_at(stop_token.clone()).await {
match unord
.next()
.timeout_at(stop_token.clone())
.in_current_span()
.await
{
Ok(Some(Some(dr))) => {
// Found some new dial info for this protocol/address combination
self.update_with_detected_dial_info(dr.ddi.clone()).await?;
@ -277,7 +282,7 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", skip(self), err)]
#[instrument(parent = None, level = "trace", skip(self), err)]
pub async fn update_network_class_task_routine(
self,
stop_token: StopToken,

View File

@ -38,6 +38,7 @@ impl Network {
Ok(acceptor)
}
#[instrument(level = "trace", skip_all)]
async fn try_tls_handlers(
&self,
tls_acceptor: &TlsAcceptor,
@ -60,7 +61,7 @@ impl Network {
// read a chunk of the stream
timeout(
tls_connection_initial_timeout_ms,
ps.peek_exact(&mut first_packet),
ps.peek_exact(&mut first_packet).in_current_span(),
)
.await
.wrap_err("tls initial timeout")?
@ -70,6 +71,7 @@ impl Network {
.await
}
#[instrument(level = "trace", skip_all)]
async fn try_handlers(
&self,
stream: AsyncPeekStream,
@ -90,6 +92,7 @@ impl Network {
Ok(None)
}
#[instrument(level = "trace", skip_all)]
async fn tcp_acceptor(
self,
tcp_stream: io::Result<TcpStream>,
@ -180,7 +183,7 @@ impl Network {
// read a chunk of the stream
if timeout(
connection_initial_timeout_ms,
ps.peek_exact(&mut first_packet),
ps.peek_exact(&mut first_packet).in_current_span(),
)
.await
.is_err()
@ -237,6 +240,7 @@ impl Network {
}
}
#[instrument(level = "trace", skip_all)]
async fn spawn_socket_listener(&self, addr: SocketAddr) -> EyreResult<bool> {
// Get config
let (connection_initial_timeout_ms, tls_connection_initial_timeout_ms) = {
@ -344,6 +348,7 @@ impl Network {
/////////////////////////////////////////////////////////////////
// TCP listener that multiplexes ports so multiple protocols can exist on a single port
#[instrument(level = "trace", skip_all)]
pub(super) async fn start_tcp_listener(
&self,
bind_set: NetworkBindSet,

View File

@ -3,6 +3,7 @@ use sockets::*;
use stop_token::future::FutureExt;
impl Network {
#[instrument(level = "trace", skip_all)]
pub(super) async fn create_udp_listener_tasks(&self) -> EyreResult<()> {
// Spawn socket tasks
let mut task_count = {
@ -108,6 +109,7 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn create_udp_protocol_handler(&self, addr: SocketAddr) -> EyreResult<bool> {
log_net!("create_udp_protocol_handler on {:?}", &addr);
@ -148,6 +150,7 @@ impl Network {
Ok(true)
}
#[instrument(level = "trace", skip_all)]
pub(super) async fn create_udp_protocol_handlers(
&self,
bind_set: NetworkBindSet,

View File

@ -162,10 +162,12 @@ pub async fn nonblocking_connect(
let async_stream = Async::new(std::net::TcpStream::from(socket))?;
// The stream becomes writable when connected
timeout_or_try!(timeout(timeout_ms, async_stream.writable())
.await
.into_timeout_or()
.into_result()?);
timeout_or_try!(
timeout(timeout_ms, async_stream.writable().in_current_span())
.await
.into_timeout_or()
.into_result()?
);
// Check low level error
let async_stream = match async_stream.get_ref().take_error()? {

View File

@ -134,7 +134,7 @@ impl RawTcpProtocolHandler {
let mut peekbuf: [u8; PEEK_DETECT_LEN] = [0u8; PEEK_DETECT_LEN];
if (timeout(
self.connection_initial_timeout_ms,
ps.peek_exact(&mut peekbuf),
ps.peek_exact(&mut peekbuf).in_current_span(),
)
.await)
.is_err()

View File

@ -222,7 +222,7 @@ impl WebsocketProtocolHandler {
let mut peek_buf = [0u8; MAX_WS_BEFORE_BODY];
let peek_len = match timeout(
self.arc.connection_initial_timeout_ms,
ps.peek(&mut peek_buf),
ps.peek(&mut peek_buf).in_current_span(),
)
.await
{

View File

@ -84,6 +84,7 @@ impl Network {
// Returns a port, a set of ip addresses to bind to, and a
// bool specifying if multiple ports should be tried
#[instrument(level = "trace", skip_all)]
async fn convert_listen_address_to_bind_set(
&self,
listen_address: String,
@ -136,6 +137,7 @@ impl Network {
/////////////////////////////////////////////////////
#[instrument(level = "trace", skip_all)]
pub(super) async fn bind_udp_protocol_handlers(
&self,
editor_public_internet: &mut RoutingDomainEditor,
@ -249,6 +251,7 @@ impl Network {
Ok(StartupDisposition::Success)
}
#[instrument(level = "trace", skip_all)]
pub(super) async fn start_ws_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,
@ -364,6 +367,7 @@ impl Network {
Ok(StartupDisposition::Success)
}
#[instrument(level = "trace", skip_all)]
pub(super) async fn start_wss_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,
@ -463,6 +467,7 @@ impl Network {
Ok(StartupDisposition::Success)
}
#[instrument(level = "trace", skip_all)]
pub(super) async fn start_tcp_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,

View File

@ -136,7 +136,8 @@ impl NetworkConnection {
let flow = protocol_connection.flow();
// Create handle for sending
let (sender, receiver) = flume::bounded(get_concurrency() as usize);
//let (sender, receiver) = flume::bounded(get_concurrency() as usize);
let (sender, receiver) = flume::unbounded();
// Create stats
let stats = Arc::new(Mutex::new(NetworkConnectionStats {
@ -265,7 +266,7 @@ impl NetworkConnection {
// Connection receiver loop
#[allow(clippy::too_many_arguments)]
#[instrument(level="trace", target="net", skip_all)]
#[instrument(parent = None, level="trace", target="net", skip_all)]
fn process_connection(
connection_manager: ConnectionManager,
local_stop_token: StopToken,
@ -299,7 +300,7 @@ impl NetworkConnection {
};
let timer = MutableFuture::new(new_timer());
unord.push(system_boxed(timer.clone()));
unord.push(system_boxed(timer.clone().in_current_span()));
loop {
// Add another message sender future if necessary
@ -333,7 +334,7 @@ impl NetworkConnection {
RecvLoopAction::Finish
}
}
});
}.in_current_span());
unord.push(system_boxed(sender_fut.in_current_span()));
}

View File

@ -281,7 +281,13 @@ impl ReceiptManager {
}
}
#[instrument(level = "trace", target = "receipt", skip_all, err)]
#[instrument(
level = "trace",
target = "receipt",
name = "ReceiptManager::tick",
skip_all,
err
)]
pub async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return Ok(());
@ -308,8 +314,9 @@ impl ReceiptManager {
"receipt timeout",
self.clone()
.timeout_task_routine(now, stop_token)
.in_current_span(),
.instrument(trace_span!(parent: None, "receipt timeout task")),
)
.in_current_span()
.await;
}
}

View File

@ -1,4 +1,5 @@
use super::*;
use stop_token::future::FutureExt as _;
impl NetworkManager {
/// Send raw data to a node
@ -146,7 +147,7 @@ impl NetworkManager {
Ok(NetworkResult::value(send_data_method))
}
.instrument(trace_span!("send_data")),
.in_current_span()
)
}
@ -559,6 +560,12 @@ impl NetworkManager {
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
// Detect if network is stopping so we can break out of this
let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else {
return Ok(NetworkResult::service_unavailable("network is stopping"));
};
// Build a return receipt for the signal
let receipt_timeout = ms_to_us(
self.unlocked_inner
@ -588,30 +595,38 @@ impl NetworkManager {
let rpc = self.rpc_processor();
network_result_try!(rpc
.rpc_call_signal(
Destination::relay(relay_nr, target_nr.clone()),
Destination::relay(relay_nr.clone(), target_nr.clone()),
SignalInfo::ReverseConnect { receipt, peer_info },
)
.await
.wrap_err("failed to send signal")?);
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"reverse connect receipt should be returned in-band",
));
let inbound_nr = match eventual_value.timeout_at(stop_token).in_current_span().await {
Err(_) => {
return Ok(NetworkResult::service_unavailable("network is stopping"));
}
ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef,
ReceiptEvent::Expired => {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
return Ok(NetworkResult::no_connection_other(format!(
"reverse connect receipt cancelled from {}",
target_nr
)))
Ok(v) => {
let receipt_event = v.take_value().unwrap();
match receipt_event {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"reverse connect receipt should be returned in-band",
));
}
ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef,
ReceiptEvent::Expired => {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
return Ok(NetworkResult::no_connection_other(format!(
"reverse connect receipt cancelled from {}",
target_nr
)))
}
}
}
};
@ -634,7 +649,9 @@ impl NetworkManager {
)),
}
} else {
bail!("no reverse connection available")
return Ok(NetworkResult::no_connection_other(format!(
"reverse connection dropped from {}", target_nr)
));
}
}
@ -648,6 +665,11 @@ impl NetworkManager {
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
// Detect if network is stopping so we can break out of this
let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else {
return Ok(NetworkResult::service_unavailable("network is stopping"));
};
// Ensure we are filtered down to UDP (the only hole punch protocol supported today)
assert!(target_nr
.filter_ref()
@ -706,23 +728,31 @@ impl NetworkManager {
.wrap_err("failed to send signal")?);
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"hole punch receipt should be returned in-band",
));
let inbound_nr = match eventual_value.timeout_at(stop_token).in_current_span().await {
Err(_) => {
return Ok(NetworkResult::service_unavailable("network is stopping"));
}
ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef,
ReceiptEvent::Expired => {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
return Ok(NetworkResult::no_connection_other(format!(
"hole punch receipt cancelled from {}",
target_nr
)))
Ok(v) => {
let receipt_event = v.take_value().unwrap();
match receipt_event {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"hole punch receipt should be returned in-band",
));
}
ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef,
ReceiptEvent::Expired => {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
return Ok(NetworkResult::no_connection_other(format!(
"hole punch receipt cancelled from {}",
target_nr
)))
}
}
}
};
@ -749,7 +779,9 @@ impl NetworkManager {
)),
}
} else {
bail!("no hole punch available")
return Ok(NetworkResult::no_connection_other(format!(
"hole punch dropped from {}", target_nr)
));
}
}
}

View File

@ -48,6 +48,7 @@ impl NetworkManager {
}
}
#[instrument(level = "trace", name = "NetworkManager::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> {
let routing_table = self.routing_table();
let net = self.net();

View File

@ -2,10 +2,10 @@ use super::*;
impl NetworkManager {
// Clean up the public address check tables, removing entries that have timed out
#[instrument(level = "trace", skip(self), err)]
#[instrument(parent = None, level = "trace", skip_all, err)]
pub(crate) async fn public_address_check_task_routine(
self,
stop_token: StopToken,
_stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {

View File

@ -510,6 +510,7 @@ impl Network {
}
//////////////////////////////////////////
#[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)]
pub(crate) async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");

View File

@ -112,6 +112,7 @@ impl RoutingTable {
/// Ticks about once per second
/// to run tick tasks which may run at slower tick rates as configured
#[instrument(level = "trace", name = "RoutingTable::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> {
// Don't tick if paused
let opt_tick_guard = {

View File

@ -237,8 +237,6 @@ impl RoutingTable {
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
eprintln!("pv tick");
let mut futurequeue: VecDeque<PingValidatorFuture> = VecDeque::new();
// PublicInternet
@ -253,9 +251,19 @@ impl RoutingTable {
let mut unord = FuturesUnordered::new();
while !unord.is_empty() || !futurequeue.is_empty() {
log_rtab!(debug "Ping validation queue: {} remaining, {} in progress", futurequeue.len(), unord.len());
log_rtab!(
"Ping validation queue: {} remaining, {} in progress",
futurequeue.len(),
unord.len()
);
// Process one unordered futures if we have some
match unord.next().timeout_at(stop_token.clone()).await {
match unord
.next()
.timeout_at(stop_token.clone())
.in_current_span()
.await
{
Ok(Some(_)) => {
// Some ping completed
}
@ -273,7 +281,7 @@ impl RoutingTable {
let Some(fq) = futurequeue.pop_front() else {
break;
};
unord.push(fq.in_current_span());
unord.push(fq);
}
}

View File

@ -321,13 +321,17 @@ where
}
}
// Wait for them to complete
timeout(timeout_ms, async {
while let Some(is_done) = unord.next().await {
if is_done {
break;
timeout(
timeout_ms,
async {
while let Some(is_done) = unord.next().in_current_span().await {
if is_done {
break;
}
}
}
})
.in_current_span(),
)
.await
.into_timeout_or()
.map(|_| {

View File

@ -277,7 +277,7 @@ enum RPCKind {
/////////////////////////////////////////////////////////////////////
struct RPCProcessorInner {
send_channel: Option<flume::Sender<(Option<Id>, RPCMessageEncoded)>>,
send_channel: Option<flume::Sender<(Span, RPCMessageEncoded)>>,
stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>,
}
@ -590,7 +590,7 @@ impl RPCProcessor {
};
Ok(nr)
})
}.in_current_span())
}
#[instrument(level="trace", target="rpc", skip_all)]
@ -1662,10 +1662,13 @@ impl RPCProcessor {
RPCStatementDetail::AppMessage(_) => self.process_app_message(msg).await,
},
RPCOperationKind::Answer(_) => {
self.unlocked_inner
let op_id = msg.operation.op_id();
if let Err(e) = self.unlocked_inner
.waiting_rpc_table
.complete_op_waiter(msg.operation.op_id(), msg)
.await?;
.complete_op_waiter(op_id, msg) {
log_rpc!(debug "Operation id {} did not complete: {}", op_id, e);
// Don't throw an error here because it's okay if the original operation timed out
}
Ok(NetworkResult::value(()))
}
}
@ -1675,13 +1678,16 @@ impl RPCProcessor {
async fn rpc_worker(
self,
stop_token: StopToken,
receiver: flume::Receiver<(Option<Id>, RPCMessageEncoded)>,
receiver: flume::Receiver<(Span, RPCMessageEncoded)>,
) {
while let Ok(Ok((_span_id, msg))) =
while let Ok(Ok((prev_span, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await
{
let rpc_message_span = tracing::trace_span!("rpc message");
rpc_message_span.follows_from(prev_span);
network_result_value_or_log!(match self
.process_rpc_message(msg).in_current_span()
.process_rpc_message(msg).instrument(rpc_message_span)
.await
{
Err(e) => {
@ -1730,9 +1736,8 @@ impl RPCProcessor {
};
send_channel
};
let span_id = Span::current().id();
send_channel
.try_send((span_id, msg))
.try_send((Span::current(), msg))
.map_err(|e| eyre!("failed to enqueue direct RPC message: {}", e))?;
Ok(())
}
@ -1766,9 +1771,8 @@ impl RPCProcessor {
};
send_channel
};
let span_id = Span::current().id();
send_channel
.try_send((span_id, msg))
.try_send((Span::current(), msg))
.map_err(|e| eyre!("failed to enqueue safety routed RPC message: {}", e))?;
Ok(())
}
@ -1805,9 +1809,8 @@ impl RPCProcessor {
};
send_channel
};
let span_id = Span::current().id();
send_channel
.try_send((span_id, msg))
.try_send((Span::current(), msg))
.map_err(|e| eyre!("failed to enqueue private routed RPC message: {}", e))?;
Ok(())
}

View File

@ -8,7 +8,7 @@ where
{
waiter: OperationWaiter<T, C>,
op_id: OperationId,
eventual_instance: Option<EventualValueFuture<(Option<Id>, T)>>,
result_receiver: Option<flume::Receiver<(Span, T)>>,
}
impl<T, C> Drop for OperationWaitHandle<T, C>
@ -17,7 +17,7 @@ where
C: Unpin + Clone,
{
fn drop(&mut self) {
if self.eventual_instance.is_some() {
if self.result_receiver.is_some() {
self.waiter.cancel_op_waiter(self.op_id);
}
}
@ -31,7 +31,7 @@ where
{
context: C,
timestamp: Timestamp,
eventual: EventualValue<(Option<Id>, T)>,
result_sender: flume::Sender<(Span, T)>,
}
#[derive(Debug)]
@ -80,11 +80,11 @@ where
/// Set up wait for operation to complete
pub fn add_op_waiter(&self, op_id: OperationId, context: C) -> OperationWaitHandle<T, C> {
let mut inner = self.inner.lock();
let e = EventualValue::new();
let (result_sender, result_receiver) = flume::bounded(1);
let waiting_op = OperationWaitingOp {
context,
timestamp: get_aligned_timestamp(),
eventual: e.clone(),
result_sender,
};
if inner.waiting_op_table.insert(op_id, waiting_op).is_some() {
error!(
@ -96,7 +96,7 @@ where
OperationWaitHandle {
waiter: self.clone(),
op_id,
eventual_instance: Some(e.instance()),
result_receiver: Some(result_receiver),
}
}
@ -122,14 +122,15 @@ where
}
/// Remove wait for op
#[instrument(level = "trace", target = "rpc", skip_all)]
fn cancel_op_waiter(&self, op_id: OperationId) {
let mut inner = self.inner.lock();
inner.waiting_op_table.remove(&op_id);
}
/// Complete the app call
/// Complete the waiting op
#[instrument(level = "trace", target = "rpc", skip_all)]
pub async fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
pub fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
let waiting_op = {
let mut inner = self.inner.lock();
inner
@ -141,10 +142,9 @@ where
)))?
};
waiting_op
.eventual
.resolve((Span::current().id(), message))
.await;
Ok(())
.result_sender
.send((Span::current(), message))
.map_err(RPCError::ignore)
}
/// Wait for operation to complete
@ -156,29 +156,30 @@ where
) -> Result<TimeoutOr<(T, TimestampDuration)>, RPCError> {
let timeout_ms = us_to_ms(timeout_us.as_u64()).map_err(RPCError::internal)?;
// Take the instance
// Take the receiver
// After this, we must manually cancel since the cancel on handle drop is disabled
let eventual_instance = handle.eventual_instance.take().unwrap();
let result_receiver = handle.result_receiver.take().unwrap();
let result_fut = result_receiver.recv_async().in_current_span();
// wait for eventualvalue
let start_ts = get_aligned_timestamp();
let res = timeout(timeout_ms, eventual_instance)
.await
.into_timeout_or();
Ok(res
.on_timeout(|| {
// log_rpc!(debug "op wait timed out: {}", handle.op_id);
// debug_print_backtrace();
let res = timeout(timeout_ms, result_fut).await.into_timeout_or();
match res {
TimeoutOr::Timeout => {
self.cancel_op_waiter(handle.op_id);
})
.map(|res| {
let (_span_id, ret) = res.take_value().unwrap();
Ok(TimeoutOr::Timeout)
}
TimeoutOr::Value(Ok((_span_id, ret))) => {
let end_ts = get_aligned_timestamp();
//xxx: causes crash (Missing otel data span extensions)
// Span::current().follows_from(span_id);
(ret, end_ts.saturating_sub(start_ts))
}))
Ok(TimeoutOr::Value((ret, end_ts.saturating_sub(start_ts))))
}
TimeoutOr::Value(Err(e)) => Err(RPCError::ignore(e)),
}
}
}

View File

@ -153,11 +153,7 @@ impl RPCProcessor {
/// Exposed to API for apps to return app call answers
#[instrument(level = "trace", target = "rpc", skip_all)]
pub async fn app_call_reply(
&self,
call_id: OperationId,
message: Vec<u8>,
) -> Result<(), RPCError> {
pub fn app_call_reply(&self, call_id: OperationId, message: Vec<u8>) -> Result<(), RPCError> {
let _guard = self
.unlocked_inner
.startup_lock
@ -166,6 +162,6 @@ impl RPCProcessor {
self.unlocked_inner
.waiting_app_call_table
.complete_op_waiter(call_id, message)
.await
.map_err(RPCError::ignore)
}
}

View File

@ -15,6 +15,11 @@ impl RPCProcessor {
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
let stop_token = self
.unlocked_inner
.startup_lock
.stop_token()
.ok_or(RPCError::try_again("not started up"))?;
let network_manager = self.network_manager();
let receipt_time = ms_to_us(self.unlocked_inner.validate_dial_info_receipt_time_ms);
@ -38,23 +43,35 @@ impl RPCProcessor {
);
// Wait for receipt
match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedInBand { inbound_noderef: _ }
| ReceiptEvent::ReturnedSafety => {
log_net!(debug "validate_dial_info receipt should be returned out-of-band");
Ok(false)
match eventual_value
.timeout_at(stop_token)
.in_current_span()
.await
{
Err(_) => {
return Err(RPCError::try_again("not started up"));
}
ReceiptEvent::ReturnedOutOfBand => {
log_net!(debug "validate_dial_info receipt returned");
Ok(true)
}
ReceiptEvent::Expired => {
log_net!(debug "validate_dial_info receipt expired");
Ok(false)
}
ReceiptEvent::Cancelled => {
Err(RPCError::internal("receipt was dropped before expiration"))
Ok(v) => {
let receipt_event = v.take_value().unwrap();
match receipt_event {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedInBand { inbound_noderef: _ }
| ReceiptEvent::ReturnedSafety => {
log_net!(debug "validate_dial_info receipt should be returned out-of-band");
Ok(false)
}
ReceiptEvent::ReturnedOutOfBand => {
log_net!(debug "validate_dial_info receipt returned");
Ok(true)
}
ReceiptEvent::Expired => {
log_net!(debug "validate_dial_info receipt expired");
Ok(false)
}
ReceiptEvent::Cancelled => {
Err(RPCError::internal("receipt was dropped before expiration"))
}
}
}
}
}

View File

@ -182,7 +182,7 @@ impl StorageManager {
log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
Ok(NetworkResult::value(gva.answer.peers))
}.in_current_span()
}.instrument(tracing::trace_span!("outbound_get_value fanout routine"))
}
};
@ -271,7 +271,7 @@ impl StorageManager {
})) {
log_dht!(debug "Sending GetValue result failed: {}", e);
}
}.in_current_span()))
}.instrument(tracing::trace_span!("outbound_get_value result"))))
.detach();
Ok(out_rx)
@ -319,7 +319,7 @@ impl StorageManager {
// Return done
false
}.in_current_span())
}.instrument(tracing::trace_span!("outbound_get_value deferred results")))
},
),
);

View File

@ -228,7 +228,7 @@ impl StorageManager {
log_network_result!(debug "InspectValue fanout call returned peers {}", answer.peers.len());
Ok(NetworkResult::value(answer.peers))
}.in_current_span()
}.instrument(tracing::trace_span!("outbound_inspect_value fanout call"))
};
// Routine to call to check if we're done at each step

View File

@ -177,7 +177,7 @@ impl StorageManager {
ctx.send_partial_update = true;
Ok(NetworkResult::value(sva.answer.peers))
}.in_current_span()
}.instrument(tracing::trace_span!("fanout call_routine"))
}
};
@ -267,7 +267,7 @@ impl StorageManager {
})) {
log_dht!(debug "Sending SetValue result failed: {}", e);
}
}.in_current_span()))
}.instrument(tracing::trace_span!("outbound_set_value fanout routine"))))
.detach();
Ok(out_rx)
@ -329,7 +329,7 @@ impl StorageManager {
// Return done
false
}.in_current_span())
}.instrument(tracing::trace_span!("outbound_set_value deferred results")))
},
),
);

View File

@ -80,6 +80,7 @@ impl StorageManager {
}
}
#[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> {
// Run the flush stores task
self.unlocked_inner.flush_record_stores_task.tick().await?;
@ -109,6 +110,7 @@ impl StorageManager {
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(crate) async fn cancel_tasks(&self) {
log_stor!(debug "stopping check watched records task");
if let Err(e) = self.unlocked_inner.check_watched_records_task.stop().await {

View File

@ -32,15 +32,24 @@ impl StorageManager {
// Add a future for each value change
for vc in value_changes {
let this = self.clone();
unord.push(async move {
if let Err(e) = this.send_value_change(vc).await {
log_stor!(debug "Failed to send value change: {}", e);
unord.push(
async move {
if let Err(e) = this.send_value_change(vc).await {
log_stor!(debug "Failed to send value change: {}", e);
}
}
});
.in_current_span(),
);
}
while !unord.is_empty() {
match unord.next().timeout_at(stop_token.clone()).await {
match unord
.next()
.in_current_span()
.timeout_at(stop_token.clone())
.in_current_span()
.await
{
Ok(Some(_)) => {
// Some ValueChanged completed
}

View File

@ -294,7 +294,7 @@ impl StorageManager {
log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node);
Ok(NetworkResult::value(wva.answer.peers))
}.in_current_span()
}.instrument(tracing::trace_span!("outbound_watch_value call routine"))
};
// Routine to call to check if we're done at each step

View File

@ -366,7 +366,6 @@ impl VeilidAPI {
let rpc_processor = self.rpc_processor()?;
rpc_processor
.app_call_reply(call_id, message)
.await
.map_err(|e| e.into())
}

View File

@ -38,6 +38,7 @@ rt-tokio = [
]
tracking = ["veilid-core/tracking"]
debug-json-api = []
debug-locks = ["veilid-core/debug-locks"]
[dependencies]
veilid-core = { path = "../veilid-core", default-features = false }

View File

@ -108,25 +108,29 @@ pub async fn run_veilid_server(
let capi2 = capi.clone();
let update_receiver_shutdown = SingleShotEventual::new(Some(()));
let mut update_receiver_shutdown_instance = update_receiver_shutdown.instance().fuse();
let update_receiver_jh = spawn_local("update_receiver", async move {
loop {
select! {
res = receiver.recv_async() => {
if let Ok(change) = res {
if let Some(capi) = &capi2 {
// Handle state changes on main thread for capnproto rpc
capi.clone().handle_update(change);
let update_receiver_jh = spawn_local(
"update_receiver",
async move {
loop {
select! {
res = receiver.recv_async() => {
if let Ok(change) = res {
if let Some(capi) = &capi2 {
// Handle state changes on main thread for capnproto rpc
capi.clone().handle_update(change);
}
} else {
break;
}
} else {
}
_ = update_receiver_shutdown_instance => {
break;
}
}
_ = update_receiver_shutdown_instance => {
break;
}
};
};
}
}
});
.in_current_span(),
);
// Auto-attach if desired
let mut out = Ok(());

View File

@ -104,7 +104,7 @@ where
match out {
None => task::Poll::<Self::Output>::Pending,
Some(wakers) => {
// Wake all EventualResolvedFutures
// Wake all other instance futures
for w in wakers {
w.wake();
}

View File

@ -81,7 +81,7 @@ impl<T: Unpin> Future for EventualValueFuture<T> {
match out {
None => task::Poll::<Self::Output>::Pending,
Some(wakers) => {
// Wake all EventualResolvedFutures
// Wake all other instance futures
for w in wakers {
w.wake();
}

View File

@ -77,7 +77,7 @@ impl<T: Unpin + Clone> Future for EventualValueCloneFuture<T> {
match out {
None => task::Poll::<Self::Output>::Pending,
Some(wakers) => {
// Wake all EventualResolvedFutures
// Wake all other instance futures
for w in wakers {
w.wake();
}

View File

@ -251,6 +251,7 @@ pub mod tests;
cfg_if! {
if #[cfg(feature = "tracing")] {
use tracing::*;
#[macro_export]
macro_rules! debug_target_enabled {
($target:expr) => { enabled!(target: $target, Level::DEBUG) }

View File

@ -64,6 +64,7 @@ where
}
/// Check the result and take it if there is one
#[cfg_attr(feature = "tracing", instrument(level = "trace", skip_all))]
pub async fn check(&self) -> Result<Option<T>, ()> {
let mut out: Option<T> = None;
@ -95,6 +96,7 @@ where
}
/// Wait for the result and take it
#[cfg_attr(feature = "tracing", instrument(level = "trace", skip_all))]
pub async fn join(&self) -> Result<Option<T>, ()> {
let mut out: Option<T> = None;

View File

@ -34,20 +34,20 @@ impl<'a> StartupLockGuard<'a> {
#[derive(Debug)]
pub struct StartupLockEnterGuard<'a> {
_guard: AsyncRwLockReadGuard<'a, bool>,
// #[cfg(feature = "debug-locks")]
#[cfg(feature = "debug-locks")]
id: usize,
// #[cfg(feature = "debug-locks")]
#[cfg(feature = "debug-locks")]
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
}
//#[cfg(feature = "debug-locks")]
#[cfg(feature = "debug-locks")]
impl<'a> Drop for StartupLockEnterGuard<'a> {
fn drop(&mut self) {
self.active_guards.lock().remove(&self.id);
}
}
//#[cfg(feature = "debug-locks")]
#[cfg(feature = "debug-locks")]
static GUARD_ID: AtomicUsize = AtomicUsize::new(0);
/// Synchronization mechanism that tracks the startup and shutdown of a region of code.
@ -59,16 +59,18 @@ static GUARD_ID: AtomicUsize = AtomicUsize::new(0);
/// asynchronous shutdown to wait for operations to finish before proceeding.
#[derive(Debug)]
pub struct StartupLock {
rwlock: AsyncRwLock<bool>,
// #[cfg(feature = "debug-locks")]
startup_state: AsyncRwLock<bool>,
stop_source: Mutex<Option<StopSource>>,
#[cfg(feature = "debug-locks")]
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
}
impl StartupLock {
pub fn new() -> Self {
Self {
rwlock: AsyncRwLock::new(false),
// #[cfg(feature = "debug-locks")]
startup_state: AsyncRwLock::new(false),
stop_source: Mutex::new(None),
#[cfg(feature = "debug-locks")]
active_guards: Arc::new(Mutex::new(HashMap::new())),
}
}
@ -77,20 +79,29 @@ impl StartupLock {
/// One must call 'success()' on the returned startup lock guard if startup was successful
/// otherwise the startup lock will not shift to the 'started' state.
pub fn startup(&self) -> Result<StartupLockGuard, StartupLockAlreadyStartedError> {
let guard = asyncrwlock_try_write!(self.rwlock).ok_or(StartupLockAlreadyStartedError)?;
let guard =
asyncrwlock_try_write!(self.startup_state).ok_or(StartupLockAlreadyStartedError)?;
if *guard {
return Err(StartupLockAlreadyStartedError);
}
*self.stop_source.lock() = Some(StopSource::new());
Ok(StartupLockGuard {
guard,
success_value: true,
})
}
/// Get a stop token for this lock
/// One can wait on this to timeout operations when a shutdown is requested
pub fn stop_token(&self) -> Option<StopToken> {
self.stop_source.lock().as_ref().map(|ss| ss.token())
}
/// Check if this StartupLock is currently in a started state
/// Returns false is the state is in transition
pub fn is_started(&self) -> bool {
let Some(guard) = asyncrwlock_try_read!(self.rwlock) else {
let Some(guard) = asyncrwlock_try_read!(self.startup_state) else {
return false;
};
*guard
@ -99,7 +110,7 @@ impl StartupLock {
/// Check if this StartupLock is currently in a shut down state
/// Returns false is the state is in transition
pub fn is_shut_down(&self) -> bool {
let Some(guard) = asyncrwlock_try_read!(self.rwlock) else {
let Some(guard) = asyncrwlock_try_read!(self.startup_state) else {
return false;
};
!*guard
@ -109,18 +120,20 @@ impl StartupLock {
/// One must call 'success()' on the returned startup lock guard if shutdown was successful
/// otherwise the startup lock will not shift to the 'stopped' state.
pub async fn shutdown(&self) -> Result<StartupLockGuard, StartupLockAlreadyShutDownError> {
// Drop the stop source to ensure we can detect shutdown has been requested
*self.stop_source.lock() = None;
cfg_if! {
if #[cfg(feature = "debug-locks")] {
//let guard = self.rwlock.write().await;
let guard = match timeout(30000, self.startup_state.write()).await {
Ok(v) => v,
Err(_) => {
eprintln!("active guards: {:#?}", self.active_guards.lock().values().collect::<Vec<_>>());
panic!("shutdown deadlock");
}
};
} else {
let guard = self.rwlock.write().await;
// let guard = match timeout(30000, self.rwlock.write()).await {
// Ok(v) => v,
// Err(_) => {
// eprintln!("active guards: {:#?}", self.active_guards.lock().values().collect::<Vec<_>>());
// panic!("shutdown deadlock");
// }
// };
let guard = self.startup_state.write().await;
}
}
if !*guard {
@ -136,19 +149,23 @@ impl StartupLock {
/// If this module has not yet started up or is in the process of startup or shutdown
/// this will fail.
pub fn enter(&self) -> Result<StartupLockEnterGuard, StartupLockNotStartedError> {
let guard = asyncrwlock_try_read!(self.rwlock).ok_or(StartupLockNotStartedError)?;
let guard = asyncrwlock_try_read!(self.startup_state).ok_or(StartupLockNotStartedError)?;
if !*guard {
return Err(StartupLockNotStartedError);
}
let out = StartupLockEnterGuard {
_guard: guard,
//#[cfg(feature = "debug-locks")]
#[cfg(feature = "debug-locks")]
id: GUARD_ID.fetch_add(1, Ordering::AcqRel),
#[cfg(feature = "debug-locks")]
active_guards: self.active_guards.clone(),
};
#[cfg(feature = "debug-locks")]
self.active_guards
.lock()
.insert(out.id, backtrace::Backtrace::new());
Ok(out)
}
}

View File

@ -104,22 +104,29 @@ impl<E: Send + 'static> TickTask<E> {
return Ok(());
}
self.internal_tick(now, last_timestamp_us).await.map(drop)
let itick = self.internal_tick(now, last_timestamp_us);
itick.await.map(drop)
}
pub async fn try_tick_now(&self) -> Result<bool, E> {
let now = get_timestamp();
let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
self.internal_tick(now, last_timestamp_us).await
let itick = self.internal_tick(now, last_timestamp_us);
itick.await
}
async fn internal_tick(&self, now: u64, last_timestamp_us: u64) -> Result<bool, E> {
// Lock the stop source, tells us if we have ever started this future
let opt_stop_source = &mut *self.stop_source.lock().await;
let opt_stop_source_fut = self.stop_source.lock();
let opt_stop_source = &mut *opt_stop_source_fut.await;
if opt_stop_source.is_some() {
// See if the previous execution finished with an error
match self.single_future.check().await {
match self.single_future.check().in_current_span().await {
Ok(Some(Err(e))) => {
// We have an error result, which means the singlefuture ran but we need to propagate the error
return Err(e);
@ -145,15 +152,18 @@ impl<E: Send + 'static> TickTask<E> {
let stop_token = stop_source.token();
let running = self.running.clone();
let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now);
let wrapped_routine = Box::pin(async move {
running.store(true, core::sync::atomic::Ordering::Release);
let out = routine.await;
running.store(false, core::sync::atomic::Ordering::Release);
out
});
match self
.single_future
.single_spawn(&self.name, wrapped_routine)
.in_current_span()
.await
{
// We should have already consumed the result of the last run, or there was none

View File

@ -8,7 +8,9 @@ cfg_if! {
where
F: Future<Output = T>,
{
match select(Box::pin(sleep(dur_ms)), Box::pin(f)).await {
let tout = select(Box::pin(sleep(dur_ms)), Box::pin(f));
match tout.await {
Either::Left((_x, _b)) => Err(TimeoutError()),
Either::Right((y, _a)) => Ok(y),
}
@ -22,11 +24,13 @@ cfg_if! {
{
cfg_if! {
if #[cfg(feature="rt-async-std")] {
async_std::future::timeout(Duration::from_millis(dur_ms as u64), f).await.map_err(|e| e.into())
let tout = async_std::future::timeout(Duration::from_millis(dur_ms as u64), f);
} else if #[cfg(feature="rt-tokio")] {
tokio::time::timeout(Duration::from_millis(dur_ms as u64), f).await.map_err(|e| e.into())
let tout = tokio::time::timeout(Duration::from_millis(dur_ms as u64), f);
}
}
tout.await.map_err(|e| e.into())
}
}

View File

@ -506,20 +506,21 @@ pub fn map_to_string<X: ToString>(arg: X) -> String {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub struct DebugGuard {
name: &'static str,
counter: &'static AtomicUsize,
}
impl DebugGuard {
pub fn new(counter: &'static AtomicUsize) -> Self {
pub fn new(name: &'static str, counter: &'static AtomicUsize) -> Self {
let c = counter.fetch_add(1, Ordering::SeqCst);
eprintln!("DebugGuard Entered: {}", c + 1);
Self { counter }
eprintln!("{} entered: {}", name, c + 1);
Self { name, counter }
}
}
impl Drop for DebugGuard {
fn drop(&mut self) {
let c = self.counter.fetch_sub(1, Ordering::SeqCst);
eprintln!("DebugGuard Exited: {}", c - 1);
eprintln!("{} exited: {}", self.name, c - 1);
}
}