mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-08-23 14:05:11 -04:00
Merge branch 'dht-performance' into 'main'
RPC improvements See merge request veilid/veilid!361
This commit is contained in:
commit
d0e751ea25
24 changed files with 417 additions and 214 deletions
|
@ -289,3 +289,91 @@ macro_rules! veilid_log {
|
||||||
$($k).+ = $($fields)*
|
$($k).+ = $($fields)*
|
||||||
)};
|
)};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! network_result_value_or_log {
|
||||||
|
($self:ident $r:expr => $f:expr) => {
|
||||||
|
network_result_value_or_log!($self target: self::__VEILID_LOG_FACILITY, $r => [ "" ] $f )
|
||||||
|
};
|
||||||
|
($self:ident $r:expr => [ $d:expr ] $f:expr) => {
|
||||||
|
network_result_value_or_log!($self target: self::__VEILID_LOG_FACILITY, $r => [ $d ] $f )
|
||||||
|
};
|
||||||
|
($self:ident target: $target:expr, $r:expr => $f:expr) => {
|
||||||
|
network_result_value_or_log!($self target: $target, $r => [ "" ] $f )
|
||||||
|
};
|
||||||
|
($self:ident target: $target:expr, $r:expr => [ $d:expr ] $f:expr) => { {
|
||||||
|
let __extra_message = if debug_target_enabled!("network_result") {
|
||||||
|
$d.to_string()
|
||||||
|
} else {
|
||||||
|
"".to_string()
|
||||||
|
};
|
||||||
|
match $r {
|
||||||
|
NetworkResult::Timeout => {
|
||||||
|
veilid_log!($self debug target: $target,
|
||||||
|
"{} at {}@{}:{} in {}{}",
|
||||||
|
"Timeout",
|
||||||
|
file!(),
|
||||||
|
line!(),
|
||||||
|
column!(),
|
||||||
|
fn_name::uninstantiated!(),
|
||||||
|
__extra_message
|
||||||
|
);
|
||||||
|
$f
|
||||||
|
}
|
||||||
|
NetworkResult::ServiceUnavailable(ref s) => {
|
||||||
|
veilid_log!($self debug target: $target,
|
||||||
|
"{}({}) at {}@{}:{} in {}{}",
|
||||||
|
"ServiceUnavailable",
|
||||||
|
s,
|
||||||
|
file!(),
|
||||||
|
line!(),
|
||||||
|
column!(),
|
||||||
|
fn_name::uninstantiated!(),
|
||||||
|
__extra_message
|
||||||
|
);
|
||||||
|
$f
|
||||||
|
}
|
||||||
|
NetworkResult::NoConnection(ref e) => {
|
||||||
|
veilid_log!($self debug target: $target,
|
||||||
|
"{}({}) at {}@{}:{} in {}{}",
|
||||||
|
"No connection",
|
||||||
|
e.to_string(),
|
||||||
|
file!(),
|
||||||
|
line!(),
|
||||||
|
column!(),
|
||||||
|
fn_name::uninstantiated!(),
|
||||||
|
__extra_message
|
||||||
|
);
|
||||||
|
$f
|
||||||
|
}
|
||||||
|
NetworkResult::AlreadyExists(ref e) => {
|
||||||
|
veilid_log!($self debug target: $target,
|
||||||
|
"{}({}) at {}@{}:{} in {}{}",
|
||||||
|
"Already exists",
|
||||||
|
e.to_string(),
|
||||||
|
file!(),
|
||||||
|
line!(),
|
||||||
|
column!(),
|
||||||
|
fn_name::uninstantiated!(),
|
||||||
|
__extra_message
|
||||||
|
);
|
||||||
|
$f
|
||||||
|
}
|
||||||
|
NetworkResult::InvalidMessage(ref s) => {
|
||||||
|
veilid_log!($self debug target: $target,
|
||||||
|
"{}({}) at {}@{}:{} in {}{}",
|
||||||
|
"Invalid message",
|
||||||
|
s,
|
||||||
|
file!(),
|
||||||
|
line!(),
|
||||||
|
column!(),
|
||||||
|
fn_name::uninstantiated!(),
|
||||||
|
__extra_message
|
||||||
|
);
|
||||||
|
$f
|
||||||
|
}
|
||||||
|
NetworkResult::Value(v) => v,
|
||||||
|
}
|
||||||
|
} };
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ impl_veilid_log_facility!("net");
|
||||||
|
|
||||||
const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10);
|
const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10);
|
||||||
const PROTECTED_CONNECTION_DROP_COUNT: usize = 3;
|
const PROTECTED_CONNECTION_DROP_COUNT: usize = 3;
|
||||||
const NEW_CONNECTION_RETRY_COUNT: usize = 1;
|
const NEW_CONNECTION_RETRY_COUNT: usize = 0;
|
||||||
const NEW_CONNECTION_RETRY_DELAY_MS: u32 = 500;
|
const NEW_CONNECTION_RETRY_DELAY_MS: u32 = 500;
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////
|
||||||
|
@ -415,7 +415,19 @@ impl ConnectionManager {
|
||||||
let best_port = preferred_local_address.map(|pla| pla.port());
|
let best_port = preferred_local_address.map(|pla| pla.port());
|
||||||
|
|
||||||
// Async lock on the remote address for atomicity per remote
|
// Async lock on the remote address for atomicity per remote
|
||||||
let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr).await;
|
// Use the initial connection timeout here because multiple calls to get_or_create_connection
|
||||||
|
// can be performed simultaneously and we want to wait for the first one to succeed or not
|
||||||
|
let Ok(_lock_guard) = timeout(
|
||||||
|
self.arc.connection_initial_timeout_ms,
|
||||||
|
self.arc.address_lock_table.lock_tag(remote_addr),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
else {
|
||||||
|
veilid_log!(self debug "== get_or_create_connection: connection busy, not connecting to dial_info={:?}", dial_info);
|
||||||
|
return Ok(NetworkResult::no_connection_other(
|
||||||
|
"connection endpoint busy",
|
||||||
|
));
|
||||||
|
};
|
||||||
|
|
||||||
veilid_log!(self trace "== get_or_create_connection dial_info={:?}", dial_info);
|
veilid_log!(self trace "== get_or_create_connection dial_info={:?}", dial_info);
|
||||||
|
|
||||||
|
@ -449,7 +461,8 @@ impl ConnectionManager {
|
||||||
let mut retry_count = NEW_CONNECTION_RETRY_COUNT;
|
let mut retry_count = NEW_CONNECTION_RETRY_COUNT;
|
||||||
let network_manager = self.network_manager();
|
let network_manager = self.network_manager();
|
||||||
|
|
||||||
let prot_conn = network_result_try!(loop {
|
let nres = loop {
|
||||||
|
veilid_log!(self trace "== get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info);
|
||||||
let result_net_res = ProtocolNetworkConnection::connect(
|
let result_net_res = ProtocolNetworkConnection::connect(
|
||||||
self.registry(),
|
self.registry(),
|
||||||
preferred_local_address,
|
preferred_local_address,
|
||||||
|
@ -474,12 +487,16 @@ impl ConnectionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
veilid_log!(self debug "get_or_create_connection retries left: {}", retry_count);
|
|
||||||
retry_count -= 1;
|
retry_count -= 1;
|
||||||
|
|
||||||
// Release the preferred local address if things can't connect due to a low-level collision we dont have a record of
|
// // XXX: This should not be necessary
|
||||||
preferred_local_address = None;
|
// // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of
|
||||||
|
// preferred_local_address = None;
|
||||||
sleep(NEW_CONNECTION_RETRY_DELAY_MS).await;
|
sleep(NEW_CONNECTION_RETRY_DELAY_MS).await;
|
||||||
|
};
|
||||||
|
|
||||||
|
let prot_conn = network_result_value_or_log!(self target:"network_result", nres => [ format!("== get_or_create_connection failed {:?} -> {}", preferred_local_address, dial_info) ] {
|
||||||
|
network_result_raise!(nres);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Add to the connection table
|
// Add to the connection table
|
||||||
|
@ -598,7 +615,7 @@ impl ConnectionManager {
|
||||||
|
|
||||||
// Callback from network connection receive loop when it exits
|
// Callback from network connection receive loop when it exits
|
||||||
// cleans up the entry in the connection table
|
// cleans up the entry in the connection table
|
||||||
pub(super) async fn report_connection_finished(&self, connection_id: NetworkConnectionId) {
|
pub(super) fn report_connection_finished(&self, connection_id: NetworkConnectionId) {
|
||||||
// Get channel sender
|
// Get channel sender
|
||||||
let sender = {
|
let sender = {
|
||||||
let mut inner = self.arc.inner.lock();
|
let mut inner = self.arc.inner.lock();
|
||||||
|
@ -668,7 +685,7 @@ impl ConnectionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = sender.send_async(ConnectionManagerEvent::Dead(conn)).await;
|
let _ = sender.send(ConnectionManagerEvent::Dead(conn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -662,11 +662,9 @@ impl Network {
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
// Handle connection-oriented protocols
|
// Handle connection-oriented protocols
|
||||||
|
let connmgr = self.network_manager().connection_manager();
|
||||||
let conn = network_result_try!(
|
let conn = network_result_try!(
|
||||||
self.network_manager()
|
connmgr.get_or_create_connection(dial_info.clone()).await?
|
||||||
.connection_manager()
|
|
||||||
.get_or_create_connection(dial_info.clone())
|
|
||||||
.await?
|
|
||||||
);
|
);
|
||||||
|
|
||||||
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
|
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
|
||||||
|
|
|
@ -6,7 +6,7 @@ use async_tungstenite::tungstenite::handshake::server::{
|
||||||
Callback, ErrorResponse, Request, Response,
|
Callback, ErrorResponse, Request, Response,
|
||||||
};
|
};
|
||||||
use async_tungstenite::tungstenite::http::StatusCode;
|
use async_tungstenite::tungstenite::http::StatusCode;
|
||||||
use async_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message};
|
use async_tungstenite::tungstenite::protocol::Message;
|
||||||
use async_tungstenite::tungstenite::Error;
|
use async_tungstenite::tungstenite::Error;
|
||||||
use async_tungstenite::{accept_hdr_async, client_async, WebSocketStream};
|
use async_tungstenite::{accept_hdr_async, client_async, WebSocketStream};
|
||||||
use futures_util::{AsyncRead, AsyncWrite, SinkExt};
|
use futures_util::{AsyncRead, AsyncWrite, SinkExt};
|
||||||
|
@ -98,45 +98,27 @@ where
|
||||||
|
|
||||||
#[instrument(level = "trace", target = "protocol", err, skip_all)]
|
#[instrument(level = "trace", target = "protocol", err, skip_all)]
|
||||||
pub async fn close(&self) -> io::Result<NetworkResult<()>> {
|
pub async fn close(&self) -> io::Result<NetworkResult<()>> {
|
||||||
|
let timeout_ms = self
|
||||||
|
.registry
|
||||||
|
.config()
|
||||||
|
.with(|c| c.network.connection_initial_timeout_ms);
|
||||||
|
|
||||||
// Make an attempt to close the stream normally
|
// Make an attempt to close the stream normally
|
||||||
let mut stream = self.stream.clone();
|
let mut stream = self.stream.clone();
|
||||||
let out = match stream
|
|
||||||
.send(Message::Close(Some(CloseFrame {
|
|
||||||
code: CloseCode::Normal,
|
|
||||||
reason: "".into(),
|
|
||||||
})))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(v) => NetworkResult::value(v),
|
|
||||||
Err(e) => err_to_network_result(e),
|
|
||||||
};
|
|
||||||
|
|
||||||
// This close does not do a TCP shutdown so it is safe and will not cause TIME_WAIT
|
// This close does not do a TCP shutdown so it is safe and will not cause TIME_WAIT
|
||||||
let _ = stream.close().await;
|
match timeout(timeout_ms, stream.close()).await {
|
||||||
|
Ok(Ok(())) => {}
|
||||||
Ok(out)
|
Ok(Err(e)) => {
|
||||||
|
return Ok(err_to_network_result(e));
|
||||||
// Drive connection to close
|
|
||||||
/*
|
|
||||||
let cur_ts = get_timestamp();
|
|
||||||
loop {
|
|
||||||
match stream.flush().await {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(Error::Io(ioerr)) => {
|
|
||||||
break Err(ioerr).into_network_result();
|
|
||||||
}
|
|
||||||
Err(Error::ConnectionClosed) => {
|
|
||||||
break Ok(NetworkResult::value(()));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
break Err(to_io_error_other(e));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if get_timestamp().saturating_sub(cur_ts) >= MAX_CONNECTION_CLOSE_WAIT_US {
|
Err(_) => {
|
||||||
return Ok(NetworkResult::Timeout);
|
// Timed out
|
||||||
|
return Ok(NetworkResult::timeout());
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
*/
|
|
||||||
|
Ok(NetworkResult::value(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))]
|
#[instrument(level = "trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))]
|
||||||
|
|
|
@ -480,20 +480,21 @@ impl NetworkConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
veilid_log!(registry trace
|
|
||||||
"Connection loop finished flow={:?}",
|
|
||||||
flow
|
|
||||||
);
|
|
||||||
|
|
||||||
// Let the connection manager know the receive loop exited
|
// Let the connection manager know the receive loop exited
|
||||||
connection_manager
|
connection_manager
|
||||||
.report_connection_finished(connection_id)
|
.report_connection_finished(connection_id);
|
||||||
.await;
|
|
||||||
|
|
||||||
// Close the low level socket
|
// Close the low level socket
|
||||||
if let Err(e) = protocol_connection.close().await {
|
if let Err(e) = protocol_connection.close().await {
|
||||||
veilid_log!(registry debug "Protocol connection close error: {}", e);
|
veilid_log!(registry debug "Protocol connection close error: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
veilid_log!(registry trace
|
||||||
|
"Connection loop exited flow={:?}",
|
||||||
|
flow
|
||||||
|
);
|
||||||
|
|
||||||
}.in_current_span())
|
}.in_current_span())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -152,7 +152,7 @@ impl NetworkManager {
|
||||||
// If a node is unreachable it may still have an existing inbound connection
|
// If a node is unreachable it may still have an existing inbound connection
|
||||||
// Try that, but don't cache anything
|
// Try that, but don't cache anything
|
||||||
network_result_try!(
|
network_result_try!(
|
||||||
pin_future_closure!(self.send_data_ncm_existing(target_node_ref, data)).await?
|
pin_future_closure!(self.send_data_unreachable(target_node_ref, data)).await?
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
Some(NodeContactMethod {
|
Some(NodeContactMethod {
|
||||||
|
@ -239,6 +239,42 @@ impl NetworkManager {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send data to unreachable node
|
||||||
|
#[instrument(level = "trace", target = "net", skip_all, err)]
|
||||||
|
async fn send_data_unreachable(
|
||||||
|
&self,
|
||||||
|
target_node_ref: FilteredNodeRef,
|
||||||
|
data: Vec<u8>,
|
||||||
|
) -> EyreResult<NetworkResult<UniqueFlow>> {
|
||||||
|
// First try to send data to the last connection we've seen this peer on
|
||||||
|
let Some(flow) = target_node_ref.last_flow() else {
|
||||||
|
return Ok(NetworkResult::no_connection_other(format!(
|
||||||
|
"node was unreachable: {}",
|
||||||
|
target_node_ref
|
||||||
|
)));
|
||||||
|
};
|
||||||
|
|
||||||
|
let net = self.net();
|
||||||
|
let unique_flow = match pin_future!(debug_duration(
|
||||||
|
|| { net.send_data_to_existing_flow(flow, data) },
|
||||||
|
Some(1_000_000)
|
||||||
|
))
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow,
|
||||||
|
SendDataToExistingFlowResult::NotSent(_) => {
|
||||||
|
return Ok(NetworkResult::no_connection_other(
|
||||||
|
"failed to send to existing flow",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Update timestamp for this last connection since we just sent to it
|
||||||
|
self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
|
||||||
|
|
||||||
|
Ok(NetworkResult::value(unique_flow))
|
||||||
|
}
|
||||||
|
|
||||||
/// Send data using NodeContactMethod::Existing
|
/// Send data using NodeContactMethod::Existing
|
||||||
#[instrument(level = "trace", target = "net", skip_all, err)]
|
#[instrument(level = "trace", target = "net", skip_all, err)]
|
||||||
async fn send_data_ncm_existing(
|
async fn send_data_ncm_existing(
|
||||||
|
@ -255,7 +291,12 @@ impl NetworkManager {
|
||||||
};
|
};
|
||||||
|
|
||||||
let net = self.net();
|
let net = self.net();
|
||||||
let unique_flow = match pin_future!(net.send_data_to_existing_flow(flow, data)).await? {
|
let unique_flow = match pin_future!(debug_duration(
|
||||||
|
|| { net.send_data_to_existing_flow(flow, data) },
|
||||||
|
Some(1_000_000)
|
||||||
|
))
|
||||||
|
.await?
|
||||||
|
{
|
||||||
SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow,
|
SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow,
|
||||||
SendDataToExistingFlowResult::NotSent(_) => {
|
SendDataToExistingFlowResult::NotSent(_) => {
|
||||||
return Ok(NetworkResult::no_connection_other(
|
return Ok(NetworkResult::no_connection_other(
|
||||||
|
@ -297,7 +338,12 @@ impl NetworkManager {
|
||||||
// First try to send data to the last flow we've seen this peer on
|
// First try to send data to the last flow we've seen this peer on
|
||||||
let data = if let Some(flow) = seq_target_node_ref.last_flow() {
|
let data = if let Some(flow) = seq_target_node_ref.last_flow() {
|
||||||
let net = self.net();
|
let net = self.net();
|
||||||
match pin_future!(net.send_data_to_existing_flow(flow, data)).await? {
|
match pin_future!(debug_duration(
|
||||||
|
|| { net.send_data_to_existing_flow(flow, data) },
|
||||||
|
Some(1_000_000)
|
||||||
|
))
|
||||||
|
.await?
|
||||||
|
{
|
||||||
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
||||||
// Update timestamp for this last connection since we just sent to it
|
// Update timestamp for this last connection since we just sent to it
|
||||||
self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
|
self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
|
||||||
|
@ -321,9 +367,16 @@ impl NetworkManager {
|
||||||
data
|
data
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let connection_initial_timeout_us = self
|
||||||
|
.config()
|
||||||
|
.with(|c| c.network.connection_initial_timeout_ms as u64 * 1000);
|
||||||
|
|
||||||
let unique_flow = network_result_try!(
|
let unique_flow = network_result_try!(
|
||||||
pin_future!(self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data))
|
pin_future!(debug_duration(
|
||||||
.await?
|
|| { self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data) },
|
||||||
|
Some(connection_initial_timeout_us * 2)
|
||||||
|
))
|
||||||
|
.await?
|
||||||
);
|
);
|
||||||
Ok(NetworkResult::value(unique_flow))
|
Ok(NetworkResult::value(unique_flow))
|
||||||
}
|
}
|
||||||
|
@ -339,7 +392,12 @@ impl NetworkManager {
|
||||||
// First try to send data to the last flow we've seen this peer on
|
// First try to send data to the last flow we've seen this peer on
|
||||||
let data = if let Some(flow) = target_node_ref.last_flow() {
|
let data = if let Some(flow) = target_node_ref.last_flow() {
|
||||||
let net = self.net();
|
let net = self.net();
|
||||||
match pin_future!(net.send_data_to_existing_flow(flow, data)).await? {
|
match pin_future!(debug_duration(
|
||||||
|
|| { net.send_data_to_existing_flow(flow, data) },
|
||||||
|
Some(1_000_000)
|
||||||
|
))
|
||||||
|
.await?
|
||||||
|
{
|
||||||
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
||||||
// Update timestamp for this last connection since we just sent to it
|
// Update timestamp for this last connection since we just sent to it
|
||||||
self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
|
self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
|
||||||
|
@ -363,9 +421,16 @@ impl NetworkManager {
|
||||||
data
|
data
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let hole_punch_receipt_time_us = self
|
||||||
|
.config()
|
||||||
|
.with(|c| c.network.hole_punch_receipt_time_ms as u64 * 1000);
|
||||||
|
|
||||||
let unique_flow = network_result_try!(
|
let unique_flow = network_result_try!(
|
||||||
pin_future!(self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data))
|
pin_future!(debug_duration(
|
||||||
.await?
|
|| { self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data) },
|
||||||
|
Some(hole_punch_receipt_time_us * 2)
|
||||||
|
))
|
||||||
|
.await?
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(NetworkResult::value(unique_flow))
|
Ok(NetworkResult::value(unique_flow))
|
||||||
|
@ -391,7 +456,12 @@ impl NetworkManager {
|
||||||
);
|
);
|
||||||
|
|
||||||
let net = self.net();
|
let net = self.net();
|
||||||
match pin_future!(net.send_data_to_existing_flow(flow, data)).await? {
|
match pin_future!(debug_duration(
|
||||||
|
|| { net.send_data_to_existing_flow(flow, data) },
|
||||||
|
Some(1_000_000)
|
||||||
|
))
|
||||||
|
.await?
|
||||||
|
{
|
||||||
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
SendDataToExistingFlowResult::Sent(unique_flow) => {
|
||||||
// Update timestamp for this last connection since we just sent to it
|
// Update timestamp for this last connection since we just sent to it
|
||||||
self.set_last_flow(node_ref.unfiltered(), flow, Timestamp::now());
|
self.set_last_flow(node_ref.unfiltered(), flow, Timestamp::now());
|
||||||
|
|
|
@ -52,8 +52,16 @@ impl WebsocketNetworkConnection {
|
||||||
instrument(level = "trace", err, skip(self))
|
instrument(level = "trace", err, skip(self))
|
||||||
)]
|
)]
|
||||||
pub async fn close(&self) -> io::Result<NetworkResult<()>> {
|
pub async fn close(&self) -> io::Result<NetworkResult<()>> {
|
||||||
|
let timeout_ms = self
|
||||||
|
.registry
|
||||||
|
.config()
|
||||||
|
.with(|c| c.network.connection_initial_timeout_ms);
|
||||||
|
|
||||||
#[allow(unused_variables)]
|
#[allow(unused_variables)]
|
||||||
let x = self.inner.ws_meta.close().await.map_err(ws_err_to_io_error);
|
let x = match timeout(timeout_ms, self.inner.ws_meta.close()).await {
|
||||||
|
Ok(v) => v.map_err(ws_err_to_io_error),
|
||||||
|
Err(_) => return Ok(NetworkResult::timeout()),
|
||||||
|
};
|
||||||
#[cfg(feature = "verbose-tracing")]
|
#[cfg(feature = "verbose-tracing")]
|
||||||
veilid_log!(self debug "close result: {:?}", x);
|
veilid_log!(self debug "close result: {:?}", x);
|
||||||
Ok(NetworkResult::value(()))
|
Ok(NetworkResult::value(()))
|
||||||
|
|
|
@ -1011,7 +1011,12 @@ impl BucketEntryInner {
|
||||||
|
|
||||||
match latest_contact_time {
|
match latest_contact_time {
|
||||||
None => {
|
None => {
|
||||||
error!("Peer is reliable, but not seen!");
|
// Peer may be appear reliable from a previous attach/detach
|
||||||
|
// But reliability uses last_seen_ts not the last_outbound_contact_time
|
||||||
|
// Regardless, if we haven't pinged it, we need to ping it.
|
||||||
|
// But it it was reliable before, and pings successfully then it can
|
||||||
|
// stay reliable, so we don't make it unreliable just because we haven't
|
||||||
|
// contacted it yet during this attachment.
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
Some(latest_contact_time) => {
|
Some(latest_contact_time) => {
|
||||||
|
|
|
@ -1041,7 +1041,7 @@ impl RoutingTable {
|
||||||
#[instrument(level = "trace", skip(self), err)]
|
#[instrument(level = "trace", skip(self), err)]
|
||||||
pub async fn find_nodes_close_to_node_id(
|
pub async fn find_nodes_close_to_node_id(
|
||||||
&self,
|
&self,
|
||||||
node_ref: NodeRef,
|
node_ref: FilteredNodeRef,
|
||||||
node_id: TypedKey,
|
node_id: TypedKey,
|
||||||
capabilities: Vec<Capability>,
|
capabilities: Vec<Capability>,
|
||||||
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
||||||
|
@ -1049,11 +1049,7 @@ impl RoutingTable {
|
||||||
|
|
||||||
let res = network_result_try!(
|
let res = network_result_try!(
|
||||||
rpc_processor
|
rpc_processor
|
||||||
.rpc_call_find_node(
|
.rpc_call_find_node(Destination::direct(node_ref), node_id, capabilities)
|
||||||
Destination::direct(node_ref.default_filtered()),
|
|
||||||
node_id,
|
|
||||||
capabilities
|
|
||||||
)
|
|
||||||
.await?
|
.await?
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -1069,7 +1065,7 @@ impl RoutingTable {
|
||||||
pub async fn find_nodes_close_to_self(
|
pub async fn find_nodes_close_to_self(
|
||||||
&self,
|
&self,
|
||||||
crypto_kind: CryptoKind,
|
crypto_kind: CryptoKind,
|
||||||
node_ref: NodeRef,
|
node_ref: FilteredNodeRef,
|
||||||
capabilities: Vec<Capability>,
|
capabilities: Vec<Capability>,
|
||||||
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
||||||
let self_node_id = self.node_id(crypto_kind);
|
let self_node_id = self.node_id(crypto_kind);
|
||||||
|
@ -1083,7 +1079,7 @@ impl RoutingTable {
|
||||||
pub async fn find_nodes_close_to_node_ref(
|
pub async fn find_nodes_close_to_node_ref(
|
||||||
&self,
|
&self,
|
||||||
crypto_kind: CryptoKind,
|
crypto_kind: CryptoKind,
|
||||||
node_ref: NodeRef,
|
node_ref: FilteredNodeRef,
|
||||||
capabilities: Vec<Capability>,
|
capabilities: Vec<Capability>,
|
||||||
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
||||||
let Some(target_node_id) = node_ref.node_ids().get(crypto_kind) else {
|
let Some(target_node_id) = node_ref.node_ids().get(crypto_kind) else {
|
||||||
|
@ -1104,7 +1100,7 @@ impl RoutingTable {
|
||||||
capabilities: Vec<Capability>,
|
capabilities: Vec<Capability>,
|
||||||
) {
|
) {
|
||||||
// Ask node for nodes closest to our own node
|
// Ask node for nodes closest to our own node
|
||||||
let closest_nodes = network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, node_ref.clone(), capabilities.clone())).await {
|
let closest_nodes = network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, node_ref.sequencing_filtered(Sequencing::PreferOrdered), capabilities.clone())).await {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
veilid_log!(self error
|
veilid_log!(self error
|
||||||
"find_self failed for {:?}: {:?}",
|
"find_self failed for {:?}: {:?}",
|
||||||
|
@ -1120,7 +1116,7 @@ impl RoutingTable {
|
||||||
// Ask each node near us to find us as well
|
// Ask each node near us to find us as well
|
||||||
if wide {
|
if wide {
|
||||||
for closest_nr in closest_nodes {
|
for closest_nr in closest_nodes {
|
||||||
network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, closest_nr.clone(), capabilities.clone())).await {
|
network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, closest_nr.sequencing_filtered(Sequencing::PreferOrdered), capabilities.clone())).await {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
veilid_log!(self error
|
veilid_log!(self error
|
||||||
"find_self failed for {:?}: {:?}",
|
"find_self failed for {:?}: {:?}",
|
||||||
|
|
|
@ -119,6 +119,24 @@ impl NodeRefOperateTrait for FilteredNodeRef {
|
||||||
let inner = &mut *routing_table.inner.write();
|
let inner = &mut *routing_table.inner.write();
|
||||||
self.entry.with_mut(inner, f)
|
self.entry.with_mut(inner, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn with_inner<T, F>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&RoutingTableInner) -> T,
|
||||||
|
{
|
||||||
|
let routing_table = self.registry.routing_table();
|
||||||
|
let inner = &*routing_table.inner.read();
|
||||||
|
f(inner)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_inner_mut<T, F>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut RoutingTableInner) -> T,
|
||||||
|
{
|
||||||
|
let routing_table = self.registry.routing_table();
|
||||||
|
let inner = &mut *routing_table.inner.write();
|
||||||
|
f(inner)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NodeRefCommonTrait for FilteredNodeRef {}
|
impl NodeRefCommonTrait for FilteredNodeRef {}
|
||||||
|
|
|
@ -139,6 +139,24 @@ impl NodeRefOperateTrait for NodeRef {
|
||||||
let inner = &mut *routing_table.inner.write();
|
let inner = &mut *routing_table.inner.write();
|
||||||
self.entry.with_mut(inner, f)
|
self.entry.with_mut(inner, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn with_inner<T, F>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&RoutingTableInner) -> T,
|
||||||
|
{
|
||||||
|
let routing_table = self.routing_table();
|
||||||
|
let inner = &*routing_table.inner.read();
|
||||||
|
f(inner)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_inner_mut<T, F>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut RoutingTableInner) -> T,
|
||||||
|
{
|
||||||
|
let routing_table = self.routing_table();
|
||||||
|
let inner = &mut *routing_table.inner.write();
|
||||||
|
f(inner)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NodeRefCommonTrait for NodeRef {}
|
impl NodeRefCommonTrait for NodeRef {}
|
||||||
|
|
|
@ -90,6 +90,21 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp
|
||||||
{
|
{
|
||||||
panic!("need to locked_mut() for this operation")
|
panic!("need to locked_mut() for this operation")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn with_inner<T, F>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&RoutingTableInner) -> T,
|
||||||
|
{
|
||||||
|
let inner = &*self.inner.lock();
|
||||||
|
f(inner)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_inner_mut<T, F>(&self, _f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut RoutingTableInner) -> T,
|
||||||
|
{
|
||||||
|
panic!("need to locked_mut() for this operation")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone>
|
impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone>
|
||||||
|
|
|
@ -92,6 +92,22 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp
|
||||||
let inner = &mut *self.inner.lock();
|
let inner = &mut *self.inner.lock();
|
||||||
self.nr.entry().with_mut(inner, f)
|
self.nr.entry().with_mut(inner, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn with_inner<T, F>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&RoutingTableInner) -> T,
|
||||||
|
{
|
||||||
|
let inner = &*self.inner.lock();
|
||||||
|
f(inner)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_inner_mut<T, F>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut RoutingTableInner) -> T,
|
||||||
|
{
|
||||||
|
let inner = &mut *self.inner.lock();
|
||||||
|
f(inner)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone>
|
impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone>
|
||||||
|
|
|
@ -20,6 +20,13 @@ pub(crate) trait NodeRefOperateTrait {
|
||||||
fn operate_mut<T, F>(&self, f: F) -> T
|
fn operate_mut<T, F>(&self, f: F) -> T
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T;
|
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T;
|
||||||
|
#[expect(dead_code)]
|
||||||
|
fn with_inner<T, F>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&RoutingTableInner) -> T;
|
||||||
|
fn with_inner_mut<T, F>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut RoutingTableInner) -> T;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Common Operations
|
// Common Operations
|
||||||
|
@ -115,7 +122,7 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait
|
||||||
// }
|
// }
|
||||||
|
|
||||||
fn relay(&self, routing_domain: RoutingDomain) -> EyreResult<Option<FilteredNodeRef>> {
|
fn relay(&self, routing_domain: RoutingDomain) -> EyreResult<Option<FilteredNodeRef>> {
|
||||||
self.operate_mut(|rti, e| {
|
let Some(rpi) = self.operate(|rti, e| {
|
||||||
let Some(sni) = e.signed_node_info(routing_domain) else {
|
let Some(sni) = e.signed_node_info(routing_domain) else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
@ -127,8 +134,14 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait
|
||||||
if rti.routing_table().matches_own_node_id(rpi.node_ids()) {
|
if rti.routing_table().matches_own_node_id(rpi.node_ids()) {
|
||||||
bail!("Can't relay though ourselves");
|
bail!("Can't relay though ourselves");
|
||||||
}
|
}
|
||||||
|
Ok(Some(rpi))
|
||||||
|
})?
|
||||||
|
else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
// Register relay node and return noderef
|
// Register relay node and return noderef
|
||||||
|
self.with_inner_mut(|rti| {
|
||||||
let nr = rti.register_node_with_peer_info(rpi, false)?;
|
let nr = rti.register_node_with_peer_info(rpi, false)?;
|
||||||
Ok(Some(nr))
|
Ok(Some(nr))
|
||||||
})
|
})
|
||||||
|
|
|
@ -716,7 +716,7 @@ impl RouteSpecStore {
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some(rsid) = inner.content.get_id_by_key(&public_key.value) else {
|
let Some(rsid) = inner.content.get_id_by_key(&public_key.value) else {
|
||||||
veilid_log!(self debug "route id does not exist: {:?}", public_key.value);
|
veilid_log!(self debug target: "network_result", "route id does not exist: {:?}", public_key.value);
|
||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
let Some(rssd) = inner.content.get_detail(&rsid) else {
|
let Some(rssd) = inner.content.get_detail(&rsid) else {
|
||||||
|
@ -753,7 +753,7 @@ impl RouteSpecStore {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
veilid_log!(self debug "errir verifying signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e);
|
veilid_log!(self debug "error verifying signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -289,7 +289,7 @@ impl RoutingTable {
|
||||||
|
|
||||||
// Get what contact method would be used for contacting the bootstrap
|
// Get what contact method would be used for contacting the bootstrap
|
||||||
let bsdi = match network_manager
|
let bsdi = match network_manager
|
||||||
.get_node_contact_method(nr.default_filtered())
|
.get_node_contact_method(nr.sequencing_filtered(Sequencing::PreferOrdered))
|
||||||
{
|
{
|
||||||
Ok(Some(ncm)) if ncm.is_direct() => ncm.direct_dial_info().unwrap(),
|
Ok(Some(ncm)) if ncm.is_direct() => ncm.direct_dial_info().unwrap(),
|
||||||
Ok(v) => {
|
Ok(v) => {
|
||||||
|
@ -307,7 +307,7 @@ impl RoutingTable {
|
||||||
|
|
||||||
// Need VALID signed peer info, so ask bootstrap to find_node of itself
|
// Need VALID signed peer info, so ask bootstrap to find_node of itself
|
||||||
// which will ensure it has the bootstrap's signed peer info as part of the response
|
// which will ensure it has the bootstrap's signed peer info as part of the response
|
||||||
let _ = routing_table.find_nodes_close_to_node_ref(crypto_kind, nr.clone(), vec![]).await;
|
let _ = routing_table.find_nodes_close_to_node_ref(crypto_kind, nr.sequencing_filtered(Sequencing::PreferOrdered), vec![]).await;
|
||||||
|
|
||||||
// Ensure we got the signed peer info
|
// Ensure we got the signed peer info
|
||||||
if !nr.signed_node_info_has_valid_signature(routing_domain) {
|
if !nr.signed_node_info_has_valid_signature(routing_domain) {
|
||||||
|
|
|
@ -1471,11 +1471,24 @@ impl RPCProcessor {
|
||||||
let operation = match self.decode_rpc_operation(&encoded_msg) {
|
let operation = match self.decode_rpc_operation(&encoded_msg) {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Debug on error
|
match e {
|
||||||
veilid_log!(self debug "Dropping routed RPC: {}", e);
|
// Invalid messages that should be punished
|
||||||
|
RPCError::Protocol(_) | RPCError::InvalidFormat(_) => {
|
||||||
|
veilid_log!(self debug "Invalid routed RPC Operation: {}", e);
|
||||||
|
|
||||||
|
// XXX: Punish routes that send routed undecodable crap
|
||||||
|
// self.network_manager().address_filter().punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage);
|
||||||
|
}
|
||||||
|
// Ignored messages that should be dropped
|
||||||
|
RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => {
|
||||||
|
veilid_log!(self trace "Dropping routed RPC Operation: {}", e);
|
||||||
|
}
|
||||||
|
// Internal errors that deserve louder logging
|
||||||
|
RPCError::Unimplemented(_) | RPCError::Internal(_) => {
|
||||||
|
veilid_log!(self error "Error decoding routed RPC operation: {}", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// XXX: Punish routes that send routed undecodable crap
|
|
||||||
// self.network_manager().address_filter().punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage);
|
|
||||||
return Ok(NetworkResult::invalid_message(e));
|
return Ok(NetworkResult::invalid_message(e));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -1593,16 +1606,16 @@ impl RPCProcessor {
|
||||||
if let Err(e) = self.waiting_rpc_table.complete_op_waiter(op_id, msg) {
|
if let Err(e) = self.waiting_rpc_table.complete_op_waiter(op_id, msg) {
|
||||||
match e {
|
match e {
|
||||||
RPCError::Unimplemented(_) | RPCError::Internal(_) => {
|
RPCError::Unimplemented(_) | RPCError::Internal(_) => {
|
||||||
veilid_log!(self error "Could not complete rpc operation: id = {}: {}", op_id, e);
|
veilid_log!(self error "Error in RPC operation: id = {}: {}", op_id, e);
|
||||||
}
|
}
|
||||||
RPCError::InvalidFormat(_)
|
RPCError::InvalidFormat(_)
|
||||||
| RPCError::Protocol(_)
|
| RPCError::Protocol(_)
|
||||||
| RPCError::Network(_)
|
| RPCError::Network(_)
|
||||||
| RPCError::TryAgain(_) => {
|
| RPCError::TryAgain(_) => {
|
||||||
veilid_log!(self debug "Could not complete rpc operation: id = {}: {}", op_id, e);
|
veilid_log!(self debug "Could not complete RPC operation: id = {}: {}", op_id, e);
|
||||||
}
|
}
|
||||||
RPCError::Ignore(_) => {
|
RPCError::Ignore(e) => {
|
||||||
veilid_log!(self debug "Answer late: id = {}", op_id);
|
veilid_log!(self debug "RPC operation ignored: id = {}: {}", op_id, e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Don't throw an error here because it's okay if the original operation timed out
|
// Don't throw an error here because it's okay if the original operation timed out
|
||||||
|
|
|
@ -8,7 +8,7 @@ where
|
||||||
{
|
{
|
||||||
waiter: OperationWaiter<T, C>,
|
waiter: OperationWaiter<T, C>,
|
||||||
op_id: OperationId,
|
op_id: OperationId,
|
||||||
result_receiver: Option<flume::Receiver<(Span, T)>>,
|
result_receiver: flume::Receiver<(Span, T)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, C> OperationWaitHandle<T, C>
|
impl<T, C> OperationWaitHandle<T, C>
|
||||||
|
@ -27,9 +27,7 @@ where
|
||||||
C: Unpin + Clone,
|
C: Unpin + Clone,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.result_receiver.is_some() {
|
self.waiter.cancel_op_waiter(self.op_id);
|
||||||
self.waiter.cancel_op_waiter(self.op_id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +104,7 @@ where
|
||||||
OperationWaitHandle {
|
OperationWaitHandle {
|
||||||
waiter: self.clone(),
|
waiter: self.clone(),
|
||||||
op_id,
|
op_id,
|
||||||
result_receiver: Some(result_receiver),
|
result_receiver,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,65 +123,69 @@ where
|
||||||
/// Get operation context
|
/// Get operation context
|
||||||
pub fn get_op_context(&self, op_id: OperationId) -> Result<C, RPCError> {
|
pub fn get_op_context(&self, op_id: OperationId) -> Result<C, RPCError> {
|
||||||
let inner = self.inner.lock();
|
let inner = self.inner.lock();
|
||||||
let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else {
|
let res = {
|
||||||
return Err(RPCError::ignore(format!(
|
let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else {
|
||||||
"Missing operation id getting op context: id={}",
|
return Err(RPCError::ignore(format!(
|
||||||
op_id
|
"Missing operation id getting op context: id={}",
|
||||||
)));
|
op_id
|
||||||
|
)));
|
||||||
|
};
|
||||||
|
Ok(waiting_op.context.clone())
|
||||||
};
|
};
|
||||||
Ok(waiting_op.context.clone())
|
drop(inner);
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove wait for op
|
/// Remove wait for op
|
||||||
#[instrument(level = "trace", target = "rpc", skip_all)]
|
#[instrument(level = "trace", target = "rpc", skip_all)]
|
||||||
fn cancel_op_waiter(&self, op_id: OperationId) {
|
fn cancel_op_waiter(&self, op_id: OperationId) {
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
inner.waiting_op_table.remove(&op_id);
|
{
|
||||||
|
let waiting_op = inner.waiting_op_table.remove(&op_id);
|
||||||
|
drop(waiting_op);
|
||||||
|
}
|
||||||
|
drop(inner);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Complete the waiting op
|
/// Complete the waiting op
|
||||||
#[instrument(level = "trace", target = "rpc", skip_all)]
|
#[instrument(level = "trace", target = "rpc", skip_all)]
|
||||||
pub fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
|
pub fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
|
||||||
let waiting_op = {
|
let mut inner = self.inner.lock();
|
||||||
let mut inner = self.inner.lock();
|
let res = {
|
||||||
inner
|
let waiting_op =
|
||||||
.waiting_op_table
|
inner
|
||||||
.remove(&op_id)
|
.waiting_op_table
|
||||||
.ok_or_else(RPCError::else_ignore(format!(
|
.remove(&op_id)
|
||||||
"Unmatched operation id: {}",
|
.ok_or_else(RPCError::else_ignore(format!(
|
||||||
op_id
|
"Unmatched operation id: {}",
|
||||||
)))?
|
op_id
|
||||||
|
)))?;
|
||||||
|
waiting_op
|
||||||
|
.result_sender
|
||||||
|
.send((Span::current(), message))
|
||||||
|
.map_err(RPCError::ignore)
|
||||||
};
|
};
|
||||||
waiting_op
|
drop(inner);
|
||||||
.result_sender
|
res
|
||||||
.send((Span::current(), message))
|
|
||||||
.map_err(RPCError::ignore)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wait for operation to complete
|
/// Wait for operation to complete
|
||||||
#[instrument(level = "trace", target = "rpc", skip_all)]
|
#[instrument(level = "trace", target = "rpc", skip_all)]
|
||||||
pub async fn wait_for_op(
|
pub async fn wait_for_op(
|
||||||
&self,
|
&self,
|
||||||
mut handle: OperationWaitHandle<T, C>,
|
handle: OperationWaitHandle<T, C>,
|
||||||
timeout_us: TimestampDuration,
|
timeout_us: TimestampDuration,
|
||||||
) -> Result<TimeoutOr<(T, TimestampDuration)>, RPCError> {
|
) -> Result<TimeoutOr<(T, TimestampDuration)>, RPCError> {
|
||||||
let timeout_ms = us_to_ms(timeout_us.as_u64()).map_err(RPCError::internal)?;
|
let timeout_ms = us_to_ms(timeout_us.as_u64()).map_err(RPCError::internal)?;
|
||||||
|
|
||||||
// Take the receiver
|
let result_fut = handle.result_receiver.recv_async().in_current_span();
|
||||||
// After this, we must manually cancel since the cancel on handle drop is disabled
|
|
||||||
let result_receiver = handle.result_receiver.take().unwrap();
|
|
||||||
|
|
||||||
let result_fut = result_receiver.recv_async().in_current_span();
|
|
||||||
|
|
||||||
// wait for eventualvalue
|
// wait for eventualvalue
|
||||||
let start_ts = Timestamp::now();
|
let start_ts = Timestamp::now();
|
||||||
let res = timeout(timeout_ms, result_fut).await.into_timeout_or();
|
let res = timeout(timeout_ms, result_fut).await.into_timeout_or();
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
TimeoutOr::Timeout => {
|
TimeoutOr::Timeout => Ok(TimeoutOr::Timeout),
|
||||||
self.cancel_op_waiter(handle.op_id);
|
|
||||||
Ok(TimeoutOr::Timeout)
|
|
||||||
}
|
|
||||||
TimeoutOr::Value(Ok((_span_id, ret))) => {
|
TimeoutOr::Value(Ok((_span_id, ret))) => {
|
||||||
let end_ts = Timestamp::now();
|
let end_ts = Timestamp::now();
|
||||||
|
|
||||||
|
@ -192,7 +194,10 @@ where
|
||||||
|
|
||||||
Ok(TimeoutOr::Value((ret, end_ts.saturating_sub(start_ts))))
|
Ok(TimeoutOr::Value((ret, end_ts.saturating_sub(start_ts))))
|
||||||
}
|
}
|
||||||
TimeoutOr::Value(Err(e)) => Err(RPCError::ignore(e)),
|
TimeoutOr::Value(Err(e)) => {
|
||||||
|
//
|
||||||
|
Err(RPCError::ignore(e))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ impl RPCProcessor {
|
||||||
match request.kind {
|
match request.kind {
|
||||||
// Process RPC Message
|
// Process RPC Message
|
||||||
RPCWorkerRequestKind::Message { message_encoded } => {
|
RPCWorkerRequestKind::Message { message_encoded } => {
|
||||||
network_result_value_or_log!(self match self
|
network_result_value_or_log!(self target:"network_result", match self
|
||||||
.process_rpc_message(message_encoded).instrument(rpc_request_span)
|
.process_rpc_message(message_encoded).instrument(rpc_request_span)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|
|
@ -1120,7 +1120,7 @@ impl StorageManager {
|
||||||
let dest = rpc_processor
|
let dest = rpc_processor
|
||||||
.resolve_target_to_destination(
|
.resolve_target_to_destination(
|
||||||
vc.target,
|
vc.target,
|
||||||
SafetySelection::Unsafe(Sequencing::NoPreference),
|
SafetySelection::Unsafe(Sequencing::PreferOrdered),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(VeilidAPIError::from)?;
|
.map_err(VeilidAPIError::from)?;
|
||||||
|
|
|
@ -47,6 +47,7 @@ veilid_tools_android_tests = ["dep:paranoid-android"]
|
||||||
veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"]
|
veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"]
|
||||||
tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"]
|
tracing = ["dep:tracing", "dep:tracing-subscriber", "tokio/tracing"]
|
||||||
debug-locks = []
|
debug-locks = []
|
||||||
|
debug-duration-timeout = []
|
||||||
|
|
||||||
virtual-network = []
|
virtual-network = []
|
||||||
virtual-network-server = [
|
virtual-network-server = [
|
||||||
|
|
|
@ -10,7 +10,7 @@ where
|
||||||
{
|
{
|
||||||
table: AsyncTagLockTable<T>,
|
table: AsyncTagLockTable<T>,
|
||||||
tag: T,
|
tag: T,
|
||||||
_guard: AsyncMutexGuardArc<()>,
|
guard: Option<AsyncMutexGuardArc<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> AsyncTagLockGuard<T>
|
impl<T> AsyncTagLockGuard<T>
|
||||||
|
@ -21,7 +21,7 @@ where
|
||||||
Self {
|
Self {
|
||||||
table,
|
table,
|
||||||
tag,
|
tag,
|
||||||
_guard: guard,
|
guard: Some(guard),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,8 @@ where
|
||||||
if guards == 0 {
|
if guards == 0 {
|
||||||
inner.table.remove(&self.tag).unwrap();
|
inner.table.remove(&self.tag).unwrap();
|
||||||
}
|
}
|
||||||
// Proceed with releasing _guard, which may cause some concurrent tag lock to acquire
|
// Proceed with releasing guard, which may cause some concurrent tag lock to acquire
|
||||||
|
drop(self.guard.take());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,7 +154,7 @@ where
|
||||||
}
|
}
|
||||||
std::collections::hash_map::Entry::Vacant(v) => {
|
std::collections::hash_map::Entry::Vacant(v) => {
|
||||||
let mutex = Arc::new(AsyncMutex::new(()));
|
let mutex = Arc::new(AsyncMutex::new(()));
|
||||||
let guard = asyncmutex_try_lock_arc!(mutex)?;
|
let guard = asyncmutex_try_lock_arc!(mutex).unwrap();
|
||||||
v.insert(AsyncTagLockTableEntry { mutex, guards: 1 });
|
v.insert(AsyncTagLockTableEntry { mutex, guards: 1 });
|
||||||
guard
|
guard
|
||||||
}
|
}
|
||||||
|
|
|
@ -278,85 +278,3 @@ macro_rules! network_result_try {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! network_result_value_or_log {
|
|
||||||
($self:ident $r:expr => $f:expr) => {
|
|
||||||
network_result_value_or_log!($self $r => [ "" ] $f )
|
|
||||||
};
|
|
||||||
($self:ident $r:expr => [ $d:expr ] $f:expr) => { {
|
|
||||||
let __extra_message = if debug_target_enabled!("network_result") {
|
|
||||||
$d.to_string()
|
|
||||||
} else {
|
|
||||||
"".to_string()
|
|
||||||
};
|
|
||||||
match $r {
|
|
||||||
NetworkResult::Timeout => {
|
|
||||||
veilid_log!($self debug
|
|
||||||
"{} at {}@{}:{} in {}{}",
|
|
||||||
"Timeout",
|
|
||||||
file!(),
|
|
||||||
line!(),
|
|
||||||
column!(),
|
|
||||||
fn_name::uninstantiated!(),
|
|
||||||
__extra_message
|
|
||||||
);
|
|
||||||
$f
|
|
||||||
}
|
|
||||||
NetworkResult::ServiceUnavailable(ref s) => {
|
|
||||||
veilid_log!($self debug
|
|
||||||
"{}({}) at {}@{}:{} in {}{}",
|
|
||||||
"ServiceUnavailable",
|
|
||||||
s,
|
|
||||||
file!(),
|
|
||||||
line!(),
|
|
||||||
column!(),
|
|
||||||
fn_name::uninstantiated!(),
|
|
||||||
__extra_message
|
|
||||||
);
|
|
||||||
$f
|
|
||||||
}
|
|
||||||
NetworkResult::NoConnection(ref e) => {
|
|
||||||
veilid_log!($self debug
|
|
||||||
"{}({}) at {}@{}:{} in {}{}",
|
|
||||||
"No connection",
|
|
||||||
e.to_string(),
|
|
||||||
file!(),
|
|
||||||
line!(),
|
|
||||||
column!(),
|
|
||||||
fn_name::uninstantiated!(),
|
|
||||||
__extra_message
|
|
||||||
);
|
|
||||||
$f
|
|
||||||
}
|
|
||||||
NetworkResult::AlreadyExists(ref e) => {
|
|
||||||
veilid_log!($self debug
|
|
||||||
"{}({}) at {}@{}:{} in {}{}",
|
|
||||||
"Already exists",
|
|
||||||
e.to_string(),
|
|
||||||
file!(),
|
|
||||||
line!(),
|
|
||||||
column!(),
|
|
||||||
fn_name::uninstantiated!(),
|
|
||||||
__extra_message
|
|
||||||
);
|
|
||||||
$f
|
|
||||||
}
|
|
||||||
NetworkResult::InvalidMessage(ref s) => {
|
|
||||||
veilid_log!($self debug
|
|
||||||
"{}({}) at {}@{}:{} in {}{}",
|
|
||||||
"Invalid message",
|
|
||||||
s,
|
|
||||||
file!(),
|
|
||||||
line!(),
|
|
||||||
column!(),
|
|
||||||
fn_name::uninstantiated!(),
|
|
||||||
__extra_message
|
|
||||||
);
|
|
||||||
$f
|
|
||||||
}
|
|
||||||
NetworkResult::Value(v) => v,
|
|
||||||
}
|
|
||||||
} };
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
@ -522,13 +522,33 @@ pub fn is_debug_backtrace_enabled() -> bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[track_caller]
|
#[track_caller]
|
||||||
pub fn debug_duration<R, F: Future<Output = R>, T: FnOnce() -> F>(f: T) -> impl Future<Output = R> {
|
pub fn debug_duration<R, F: Future<Output = R>, T: FnOnce() -> F>(
|
||||||
let location = std::panic::Location::caller();
|
f: T,
|
||||||
|
opt_timeout_us: Option<u64>,
|
||||||
|
) -> impl Future<Output = R> {
|
||||||
|
let location = core::panic::Location::caller();
|
||||||
async move {
|
async move {
|
||||||
let t1 = get_timestamp();
|
let t1 = get_timestamp();
|
||||||
let out = f().await;
|
let out = f().await;
|
||||||
let t2 = get_timestamp();
|
let t2 = get_timestamp();
|
||||||
debug!("duration@{}: {}", location, display_duration(t2 - t1));
|
let duration_us = t2 - t1;
|
||||||
|
if let Some(timeout_us) = opt_timeout_us {
|
||||||
|
if duration_us > timeout_us {
|
||||||
|
#[cfg(not(feature = "debug-duration-timeout"))]
|
||||||
|
debug!(
|
||||||
|
"Excessive duration: {}\n{:?}",
|
||||||
|
display_duration(duration_us),
|
||||||
|
backtrace::Backtrace::new()
|
||||||
|
);
|
||||||
|
#[cfg(feature = "debug-duration-timeout")]
|
||||||
|
panic!(format!(
|
||||||
|
"Duration panic timeout exceeded: {}",
|
||||||
|
display_duration(duration_us)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!("Duration: {} = {}", location, display_duration(duration_us),);
|
||||||
|
}
|
||||||
out
|
out
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue