mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-04-19 15:25:54 -04:00
rpc cleanup
This commit is contained in:
parent
909fea721a
commit
8f521099bd
@ -289,3 +289,91 @@ macro_rules! veilid_log {
|
||||
$($k).+ = $($fields)*
|
||||
)};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! network_result_value_or_log {
|
||||
($self:ident $r:expr => $f:expr) => {
|
||||
network_result_value_or_log!($self target: self::__VEILID_LOG_FACILITY, $r => [ "" ] $f )
|
||||
};
|
||||
($self:ident $r:expr => [ $d:expr ] $f:expr) => {
|
||||
network_result_value_or_log!($self target: self::__VEILID_LOG_FACILITY, $r => [ $d ] $f )
|
||||
};
|
||||
($self:ident target: $target:expr, $r:expr => $f:expr) => {
|
||||
network_result_value_or_log!($self target: $target, $r => [ "" ] $f )
|
||||
};
|
||||
($self:ident target: $target:expr, $r:expr => [ $d:expr ] $f:expr) => { {
|
||||
let __extra_message = if debug_target_enabled!("network_result") {
|
||||
$d.to_string()
|
||||
} else {
|
||||
"".to_string()
|
||||
};
|
||||
match $r {
|
||||
NetworkResult::Timeout => {
|
||||
veilid_log!($self debug target: $target,
|
||||
"{} at {}@{}:{} in {}{}",
|
||||
"Timeout",
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::ServiceUnavailable(ref s) => {
|
||||
veilid_log!($self debug target: $target,
|
||||
"{}({}) at {}@{}:{} in {}{}",
|
||||
"ServiceUnavailable",
|
||||
s,
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::NoConnection(ref e) => {
|
||||
veilid_log!($self debug target: $target,
|
||||
"{}({}) at {}@{}:{} in {}{}",
|
||||
"No connection",
|
||||
e.to_string(),
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::AlreadyExists(ref e) => {
|
||||
veilid_log!($self debug target: $target,
|
||||
"{}({}) at {}@{}:{} in {}{}",
|
||||
"Already exists",
|
||||
e.to_string(),
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::InvalidMessage(ref s) => {
|
||||
veilid_log!($self debug target: $target,
|
||||
"{}({}) at {}@{}:{} in {}{}",
|
||||
"Invalid message",
|
||||
s,
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::Value(v) => v,
|
||||
}
|
||||
} };
|
||||
|
||||
}
|
||||
|
@ -461,8 +461,8 @@ impl ConnectionManager {
|
||||
let mut retry_count = NEW_CONNECTION_RETRY_COUNT;
|
||||
let network_manager = self.network_manager();
|
||||
|
||||
let prot_conn = network_result_try!(loop {
|
||||
veilid_log!(self debug "get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info);
|
||||
let nres = loop {
|
||||
veilid_log!(self trace "== get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info);
|
||||
let result_net_res = ProtocolNetworkConnection::connect(
|
||||
self.registry(),
|
||||
preferred_local_address,
|
||||
@ -493,6 +493,10 @@ impl ConnectionManager {
|
||||
// // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of
|
||||
// preferred_local_address = None;
|
||||
sleep(NEW_CONNECTION_RETRY_DELAY_MS).await;
|
||||
};
|
||||
|
||||
let prot_conn = network_result_value_or_log!(self target:"network_result", nres => [ format!("== get_or_create_connection failed {:?} -> {}", preferred_local_address, dial_info) ] {
|
||||
network_result_raise!(nres);
|
||||
});
|
||||
|
||||
// Add to the connection table
|
||||
|
@ -1011,7 +1011,12 @@ impl BucketEntryInner {
|
||||
|
||||
match latest_contact_time {
|
||||
None => {
|
||||
error!("Peer is reliable, but not seen!");
|
||||
// Peer may be appear reliable from a previous attach/detach
|
||||
// But reliability uses last_seen_ts not the last_outbound_contact_time
|
||||
// Regardless, if we haven't pinged it, we need to ping it.
|
||||
// But it it was reliable before, and pings successfully then it can
|
||||
// stay reliable, so we don't make it unreliable just because we haven't
|
||||
// contacted it yet during this attachment.
|
||||
true
|
||||
}
|
||||
Some(latest_contact_time) => {
|
||||
|
@ -1041,7 +1041,7 @@ impl RoutingTable {
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
pub async fn find_nodes_close_to_node_id(
|
||||
&self,
|
||||
node_ref: NodeRef,
|
||||
node_ref: FilteredNodeRef,
|
||||
node_id: TypedKey,
|
||||
capabilities: Vec<Capability>,
|
||||
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
||||
@ -1049,11 +1049,7 @@ impl RoutingTable {
|
||||
|
||||
let res = network_result_try!(
|
||||
rpc_processor
|
||||
.rpc_call_find_node(
|
||||
Destination::direct(node_ref.default_filtered()),
|
||||
node_id,
|
||||
capabilities
|
||||
)
|
||||
.rpc_call_find_node(Destination::direct(node_ref), node_id, capabilities)
|
||||
.await?
|
||||
);
|
||||
|
||||
@ -1069,7 +1065,7 @@ impl RoutingTable {
|
||||
pub async fn find_nodes_close_to_self(
|
||||
&self,
|
||||
crypto_kind: CryptoKind,
|
||||
node_ref: NodeRef,
|
||||
node_ref: FilteredNodeRef,
|
||||
capabilities: Vec<Capability>,
|
||||
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
||||
let self_node_id = self.node_id(crypto_kind);
|
||||
@ -1083,7 +1079,7 @@ impl RoutingTable {
|
||||
pub async fn find_nodes_close_to_node_ref(
|
||||
&self,
|
||||
crypto_kind: CryptoKind,
|
||||
node_ref: NodeRef,
|
||||
node_ref: FilteredNodeRef,
|
||||
capabilities: Vec<Capability>,
|
||||
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
||||
let Some(target_node_id) = node_ref.node_ids().get(crypto_kind) else {
|
||||
@ -1104,7 +1100,7 @@ impl RoutingTable {
|
||||
capabilities: Vec<Capability>,
|
||||
) {
|
||||
// Ask node for nodes closest to our own node
|
||||
let closest_nodes = network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, node_ref.clone(), capabilities.clone())).await {
|
||||
let closest_nodes = network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, node_ref.sequencing_filtered(Sequencing::PreferOrdered), capabilities.clone())).await {
|
||||
Err(e) => {
|
||||
veilid_log!(self error
|
||||
"find_self failed for {:?}: {:?}",
|
||||
@ -1120,7 +1116,7 @@ impl RoutingTable {
|
||||
// Ask each node near us to find us as well
|
||||
if wide {
|
||||
for closest_nr in closest_nodes {
|
||||
network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, closest_nr.clone(), capabilities.clone())).await {
|
||||
network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, closest_nr.sequencing_filtered(Sequencing::PreferOrdered), capabilities.clone())).await {
|
||||
Err(e) => {
|
||||
veilid_log!(self error
|
||||
"find_self failed for {:?}: {:?}",
|
||||
|
@ -716,7 +716,7 @@ impl RouteSpecStore {
|
||||
};
|
||||
|
||||
let Some(rsid) = inner.content.get_id_by_key(&public_key.value) else {
|
||||
veilid_log!(self debug "route id does not exist: {:?}", public_key.value);
|
||||
veilid_log!(self debug target: "network_result", "route id does not exist: {:?}", public_key.value);
|
||||
return None;
|
||||
};
|
||||
let Some(rssd) = inner.content.get_detail(&rsid) else {
|
||||
@ -753,7 +753,7 @@ impl RouteSpecStore {
|
||||
return None;
|
||||
}
|
||||
Err(e) => {
|
||||
veilid_log!(self debug "errir verifying signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e);
|
||||
veilid_log!(self debug "error verifying signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
@ -289,7 +289,7 @@ impl RoutingTable {
|
||||
|
||||
// Get what contact method would be used for contacting the bootstrap
|
||||
let bsdi = match network_manager
|
||||
.get_node_contact_method(nr.default_filtered())
|
||||
.get_node_contact_method(nr.sequencing_filtered(Sequencing::PreferOrdered))
|
||||
{
|
||||
Ok(Some(ncm)) if ncm.is_direct() => ncm.direct_dial_info().unwrap(),
|
||||
Ok(v) => {
|
||||
@ -307,7 +307,7 @@ impl RoutingTable {
|
||||
|
||||
// Need VALID signed peer info, so ask bootstrap to find_node of itself
|
||||
// which will ensure it has the bootstrap's signed peer info as part of the response
|
||||
let _ = routing_table.find_nodes_close_to_node_ref(crypto_kind, nr.clone(), vec![]).await;
|
||||
let _ = routing_table.find_nodes_close_to_node_ref(crypto_kind, nr.sequencing_filtered(Sequencing::PreferOrdered), vec![]).await;
|
||||
|
||||
// Ensure we got the signed peer info
|
||||
if !nr.signed_node_info_has_valid_signature(routing_domain) {
|
||||
|
@ -1471,11 +1471,24 @@ impl RPCProcessor {
|
||||
let operation = match self.decode_rpc_operation(&encoded_msg) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
// Debug on error
|
||||
veilid_log!(self debug "Dropping routed RPC: {}", e);
|
||||
match e {
|
||||
// Invalid messages that should be punished
|
||||
RPCError::Protocol(_) | RPCError::InvalidFormat(_) => {
|
||||
veilid_log!(self debug "Invalid routed RPC Operation: {}", e);
|
||||
|
||||
// XXX: Punish routes that send routed undecodable crap
|
||||
// self.network_manager().address_filter().punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage);
|
||||
}
|
||||
// Ignored messages that should be dropped
|
||||
RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => {
|
||||
veilid_log!(self trace "Dropping routed RPC Operation: {}", e);
|
||||
}
|
||||
// Internal errors that deserve louder logging
|
||||
RPCError::Unimplemented(_) | RPCError::Internal(_) => {
|
||||
veilid_log!(self error "Error decoding routed RPC operation: {}", e);
|
||||
}
|
||||
};
|
||||
|
||||
// XXX: Punish routes that send routed undecodable crap
|
||||
// self.network_manager().address_filter().punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage);
|
||||
return Ok(NetworkResult::invalid_message(e));
|
||||
}
|
||||
};
|
||||
@ -1593,16 +1606,16 @@ impl RPCProcessor {
|
||||
if let Err(e) = self.waiting_rpc_table.complete_op_waiter(op_id, msg) {
|
||||
match e {
|
||||
RPCError::Unimplemented(_) | RPCError::Internal(_) => {
|
||||
veilid_log!(self error "Could not complete rpc operation: id = {}: {}", op_id, e);
|
||||
veilid_log!(self error "Error in RPC operation: id = {}: {}", op_id, e);
|
||||
}
|
||||
RPCError::InvalidFormat(_)
|
||||
| RPCError::Protocol(_)
|
||||
| RPCError::Network(_)
|
||||
| RPCError::TryAgain(_) => {
|
||||
veilid_log!(self debug "Could not complete rpc operation: id = {}: {}", op_id, e);
|
||||
veilid_log!(self debug "Could not complete RPC operation: id = {}: {}", op_id, e);
|
||||
}
|
||||
RPCError::Ignore(_) => {
|
||||
veilid_log!(self debug "Answer late: id = {}", op_id);
|
||||
RPCError::Ignore(e) => {
|
||||
veilid_log!(self debug "RPC operation ignored: id = {}: {}", op_id, e);
|
||||
}
|
||||
};
|
||||
// Don't throw an error here because it's okay if the original operation timed out
|
||||
|
@ -8,7 +8,7 @@ where
|
||||
{
|
||||
waiter: OperationWaiter<T, C>,
|
||||
op_id: OperationId,
|
||||
result_receiver: Option<flume::Receiver<(Span, T)>>,
|
||||
result_receiver: flume::Receiver<(Span, T)>,
|
||||
}
|
||||
|
||||
impl<T, C> OperationWaitHandle<T, C>
|
||||
@ -27,9 +27,7 @@ where
|
||||
C: Unpin + Clone,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
if self.result_receiver.is_some() {
|
||||
self.waiter.cancel_op_waiter(self.op_id);
|
||||
}
|
||||
self.waiter.cancel_op_waiter(self.op_id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,7 +104,7 @@ where
|
||||
OperationWaitHandle {
|
||||
waiter: self.clone(),
|
||||
op_id,
|
||||
result_receiver: Some(result_receiver),
|
||||
result_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,65 +123,69 @@ where
|
||||
/// Get operation context
|
||||
pub fn get_op_context(&self, op_id: OperationId) -> Result<C, RPCError> {
|
||||
let inner = self.inner.lock();
|
||||
let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else {
|
||||
return Err(RPCError::ignore(format!(
|
||||
"Missing operation id getting op context: id={}",
|
||||
op_id
|
||||
)));
|
||||
let res = {
|
||||
let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else {
|
||||
return Err(RPCError::ignore(format!(
|
||||
"Missing operation id getting op context: id={}",
|
||||
op_id
|
||||
)));
|
||||
};
|
||||
Ok(waiting_op.context.clone())
|
||||
};
|
||||
Ok(waiting_op.context.clone())
|
||||
drop(inner);
|
||||
res
|
||||
}
|
||||
|
||||
/// Remove wait for op
|
||||
#[instrument(level = "trace", target = "rpc", skip_all)]
|
||||
fn cancel_op_waiter(&self, op_id: OperationId) {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.waiting_op_table.remove(&op_id);
|
||||
{
|
||||
let waiting_op = inner.waiting_op_table.remove(&op_id);
|
||||
drop(waiting_op);
|
||||
}
|
||||
drop(inner);
|
||||
}
|
||||
|
||||
/// Complete the waiting op
|
||||
#[instrument(level = "trace", target = "rpc", skip_all)]
|
||||
pub fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
|
||||
let waiting_op = {
|
||||
let mut inner = self.inner.lock();
|
||||
inner
|
||||
.waiting_op_table
|
||||
.remove(&op_id)
|
||||
.ok_or_else(RPCError::else_ignore(format!(
|
||||
"Unmatched operation id: {}",
|
||||
op_id
|
||||
)))?
|
||||
let mut inner = self.inner.lock();
|
||||
let res = {
|
||||
let waiting_op =
|
||||
inner
|
||||
.waiting_op_table
|
||||
.remove(&op_id)
|
||||
.ok_or_else(RPCError::else_ignore(format!(
|
||||
"Unmatched operation id: {}",
|
||||
op_id
|
||||
)))?;
|
||||
waiting_op
|
||||
.result_sender
|
||||
.send((Span::current(), message))
|
||||
.map_err(RPCError::ignore)
|
||||
};
|
||||
waiting_op
|
||||
.result_sender
|
||||
.send((Span::current(), message))
|
||||
.map_err(RPCError::ignore)
|
||||
drop(inner);
|
||||
res
|
||||
}
|
||||
|
||||
/// Wait for operation to complete
|
||||
#[instrument(level = "trace", target = "rpc", skip_all)]
|
||||
pub async fn wait_for_op(
|
||||
&self,
|
||||
mut handle: OperationWaitHandle<T, C>,
|
||||
handle: OperationWaitHandle<T, C>,
|
||||
timeout_us: TimestampDuration,
|
||||
) -> Result<TimeoutOr<(T, TimestampDuration)>, RPCError> {
|
||||
let timeout_ms = us_to_ms(timeout_us.as_u64()).map_err(RPCError::internal)?;
|
||||
|
||||
// Take the receiver
|
||||
// After this, we must manually cancel since the cancel on handle drop is disabled
|
||||
let result_receiver = handle.result_receiver.take().unwrap();
|
||||
|
||||
let result_fut = result_receiver.recv_async().in_current_span();
|
||||
let result_fut = handle.result_receiver.recv_async().in_current_span();
|
||||
|
||||
// wait for eventualvalue
|
||||
let start_ts = Timestamp::now();
|
||||
let res = timeout(timeout_ms, result_fut).await.into_timeout_or();
|
||||
|
||||
match res {
|
||||
TimeoutOr::Timeout => {
|
||||
self.cancel_op_waiter(handle.op_id);
|
||||
Ok(TimeoutOr::Timeout)
|
||||
}
|
||||
TimeoutOr::Timeout => Ok(TimeoutOr::Timeout),
|
||||
TimeoutOr::Value(Ok((_span_id, ret))) => {
|
||||
let end_ts = Timestamp::now();
|
||||
|
||||
@ -192,7 +194,10 @@ where
|
||||
|
||||
Ok(TimeoutOr::Value((ret, end_ts.saturating_sub(start_ts))))
|
||||
}
|
||||
TimeoutOr::Value(Err(e)) => Err(RPCError::ignore(e)),
|
||||
TimeoutOr::Value(Err(e)) => {
|
||||
//
|
||||
Err(RPCError::ignore(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ impl RPCProcessor {
|
||||
match request.kind {
|
||||
// Process RPC Message
|
||||
RPCWorkerRequestKind::Message { message_encoded } => {
|
||||
network_result_value_or_log!(self match self
|
||||
network_result_value_or_log!(self target:"network_result", match self
|
||||
.process_rpc_message(message_encoded).instrument(rpc_request_span)
|
||||
.await
|
||||
{
|
||||
|
@ -1120,7 +1120,7 @@ impl StorageManager {
|
||||
let dest = rpc_processor
|
||||
.resolve_target_to_destination(
|
||||
vc.target,
|
||||
SafetySelection::Unsafe(Sequencing::NoPreference),
|
||||
SafetySelection::Unsafe(Sequencing::PreferOrdered),
|
||||
)
|
||||
.await
|
||||
.map_err(VeilidAPIError::from)?;
|
||||
|
@ -278,85 +278,3 @@ macro_rules! network_result_try {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! network_result_value_or_log {
|
||||
($self:ident $r:expr => $f:expr) => {
|
||||
network_result_value_or_log!($self $r => [ "" ] $f )
|
||||
};
|
||||
($self:ident $r:expr => [ $d:expr ] $f:expr) => { {
|
||||
let __extra_message = if debug_target_enabled!("network_result") {
|
||||
$d.to_string()
|
||||
} else {
|
||||
"".to_string()
|
||||
};
|
||||
match $r {
|
||||
NetworkResult::Timeout => {
|
||||
veilid_log!($self debug
|
||||
"{} at {}@{}:{} in {}{}",
|
||||
"Timeout",
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::ServiceUnavailable(ref s) => {
|
||||
veilid_log!($self debug
|
||||
"{}({}) at {}@{}:{} in {}{}",
|
||||
"ServiceUnavailable",
|
||||
s,
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::NoConnection(ref e) => {
|
||||
veilid_log!($self debug
|
||||
"{}({}) at {}@{}:{} in {}{}",
|
||||
"No connection",
|
||||
e.to_string(),
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::AlreadyExists(ref e) => {
|
||||
veilid_log!($self debug
|
||||
"{}({}) at {}@{}:{} in {}{}",
|
||||
"Already exists",
|
||||
e.to_string(),
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::InvalidMessage(ref s) => {
|
||||
veilid_log!($self debug
|
||||
"{}({}) at {}@{}:{} in {}{}",
|
||||
"Invalid message",
|
||||
s,
|
||||
file!(),
|
||||
line!(),
|
||||
column!(),
|
||||
fn_name::uninstantiated!(),
|
||||
__extra_message
|
||||
);
|
||||
$f
|
||||
}
|
||||
NetworkResult::Value(v) => v,
|
||||
}
|
||||
} };
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user