From 8f521099bd55c7131d39390b601de907900429fd Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 4 Mar 2025 21:56:55 -0500 Subject: [PATCH] rpc cleanup --- veilid-core/src/logging/facilities.rs | 88 +++++++++++++++++++ .../src/network_manager/connection_manager.rs | 8 +- veilid-core/src/routing_table/bucket_entry.rs | 7 +- veilid-core/src/routing_table/mod.rs | 16 ++-- .../src/routing_table/route_spec_store/mod.rs | 4 +- .../src/routing_table/tasks/bootstrap.rs | 4 +- veilid-core/src/rpc_processor/mod.rs | 29 ++++-- .../src/rpc_processor/operation_waiter.rs | 77 ++++++++-------- veilid-core/src/rpc_processor/rpc_worker.rs | 2 +- veilid-core/src/storage_manager/mod.rs | 2 +- veilid-tools/src/network_result.rs | 82 ----------------- 11 files changed, 174 insertions(+), 145 deletions(-) diff --git a/veilid-core/src/logging/facilities.rs b/veilid-core/src/logging/facilities.rs index c9ed341f..7963175b 100644 --- a/veilid-core/src/logging/facilities.rs +++ b/veilid-core/src/logging/facilities.rs @@ -289,3 +289,91 @@ macro_rules! veilid_log { $($k).+ = $($fields)* )}; } + +#[macro_export] +macro_rules! network_result_value_or_log { + ($self:ident $r:expr => $f:expr) => { + network_result_value_or_log!($self target: self::__VEILID_LOG_FACILITY, $r => [ "" ] $f ) + }; + ($self:ident $r:expr => [ $d:expr ] $f:expr) => { + network_result_value_or_log!($self target: self::__VEILID_LOG_FACILITY, $r => [ $d ] $f ) + }; + ($self:ident target: $target:expr, $r:expr => $f:expr) => { + network_result_value_or_log!($self target: $target, $r => [ "" ] $f ) + }; + ($self:ident target: $target:expr, $r:expr => [ $d:expr ] $f:expr) => { { + let __extra_message = if debug_target_enabled!("network_result") { + $d.to_string() + } else { + "".to_string() + }; + match $r { + NetworkResult::Timeout => { + veilid_log!($self debug target: $target, + "{} at {}@{}:{} in {}{}", + "Timeout", + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::ServiceUnavailable(ref s) => { + veilid_log!($self debug target: $target, + "{}({}) at {}@{}:{} in {}{}", + "ServiceUnavailable", + s, + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::NoConnection(ref e) => { + veilid_log!($self debug target: $target, + "{}({}) at {}@{}:{} in {}{}", + "No connection", + e.to_string(), + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::AlreadyExists(ref e) => { + veilid_log!($self debug target: $target, + "{}({}) at {}@{}:{} in {}{}", + "Already exists", + e.to_string(), + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::InvalidMessage(ref s) => { + veilid_log!($self debug target: $target, + "{}({}) at {}@{}:{} in {}{}", + "Invalid message", + s, + file!(), + line!(), + column!(), + fn_name::uninstantiated!(), + __extra_message + ); + $f + } + NetworkResult::Value(v) => v, + } + } }; + +} diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 8cb1bdb7..c63e34be 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -461,8 +461,8 @@ impl ConnectionManager { let mut retry_count = NEW_CONNECTION_RETRY_COUNT; let network_manager = self.network_manager(); - let prot_conn = network_result_try!(loop { - veilid_log!(self debug "get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info); + let nres = loop { + veilid_log!(self trace "== get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info); let result_net_res = ProtocolNetworkConnection::connect( self.registry(), preferred_local_address, @@ -493,6 +493,10 @@ impl ConnectionManager { // // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of // preferred_local_address = None; sleep(NEW_CONNECTION_RETRY_DELAY_MS).await; + }; + + let prot_conn = network_result_value_or_log!(self target:"network_result", nres => [ format!("== get_or_create_connection failed {:?} -> {}", preferred_local_address, dial_info) ] { + network_result_raise!(nres); }); // Add to the connection table diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index c0f81088..d97ffcca 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -1011,7 +1011,12 @@ impl BucketEntryInner { match latest_contact_time { None => { - error!("Peer is reliable, but not seen!"); + // Peer may be appear reliable from a previous attach/detach + // But reliability uses last_seen_ts not the last_outbound_contact_time + // Regardless, if we haven't pinged it, we need to ping it. + // But it it was reliable before, and pings successfully then it can + // stay reliable, so we don't make it unreliable just because we haven't + // contacted it yet during this attachment. true } Some(latest_contact_time) => { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index be252fe0..7fe044bb 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -1041,7 +1041,7 @@ impl RoutingTable { #[instrument(level = "trace", skip(self), err)] pub async fn find_nodes_close_to_node_id( &self, - node_ref: NodeRef, + node_ref: FilteredNodeRef, node_id: TypedKey, capabilities: Vec, ) -> EyreResult>> { @@ -1049,11 +1049,7 @@ impl RoutingTable { let res = network_result_try!( rpc_processor - .rpc_call_find_node( - Destination::direct(node_ref.default_filtered()), - node_id, - capabilities - ) + .rpc_call_find_node(Destination::direct(node_ref), node_id, capabilities) .await? ); @@ -1069,7 +1065,7 @@ impl RoutingTable { pub async fn find_nodes_close_to_self( &self, crypto_kind: CryptoKind, - node_ref: NodeRef, + node_ref: FilteredNodeRef, capabilities: Vec, ) -> EyreResult>> { let self_node_id = self.node_id(crypto_kind); @@ -1083,7 +1079,7 @@ impl RoutingTable { pub async fn find_nodes_close_to_node_ref( &self, crypto_kind: CryptoKind, - node_ref: NodeRef, + node_ref: FilteredNodeRef, capabilities: Vec, ) -> EyreResult>> { let Some(target_node_id) = node_ref.node_ids().get(crypto_kind) else { @@ -1104,7 +1100,7 @@ impl RoutingTable { capabilities: Vec, ) { // Ask node for nodes closest to our own node - let closest_nodes = network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, node_ref.clone(), capabilities.clone())).await { + let closest_nodes = network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, node_ref.sequencing_filtered(Sequencing::PreferOrdered), capabilities.clone())).await { Err(e) => { veilid_log!(self error "find_self failed for {:?}: {:?}", @@ -1120,7 +1116,7 @@ impl RoutingTable { // Ask each node near us to find us as well if wide { for closest_nr in closest_nodes { - network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, closest_nr.clone(), capabilities.clone())).await { + network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_self(crypto_kind, closest_nr.sequencing_filtered(Sequencing::PreferOrdered), capabilities.clone())).await { Err(e) => { veilid_log!(self error "find_self failed for {:?}: {:?}", diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs index 45623c11..ad3b39b2 100644 --- a/veilid-core/src/routing_table/route_spec_store/mod.rs +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -716,7 +716,7 @@ impl RouteSpecStore { }; let Some(rsid) = inner.content.get_id_by_key(&public_key.value) else { - veilid_log!(self debug "route id does not exist: {:?}", public_key.value); + veilid_log!(self debug target: "network_result", "route id does not exist: {:?}", public_key.value); return None; }; let Some(rssd) = inner.content.get_detail(&rsid) else { @@ -753,7 +753,7 @@ impl RouteSpecStore { return None; } Err(e) => { - veilid_log!(self debug "errir verifying signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e); + veilid_log!(self debug "error verifying signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e); return None; } } diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index 8ab20079..31dc5089 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -289,7 +289,7 @@ impl RoutingTable { // Get what contact method would be used for contacting the bootstrap let bsdi = match network_manager - .get_node_contact_method(nr.default_filtered()) + .get_node_contact_method(nr.sequencing_filtered(Sequencing::PreferOrdered)) { Ok(Some(ncm)) if ncm.is_direct() => ncm.direct_dial_info().unwrap(), Ok(v) => { @@ -307,7 +307,7 @@ impl RoutingTable { // Need VALID signed peer info, so ask bootstrap to find_node of itself // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = routing_table.find_nodes_close_to_node_ref(crypto_kind, nr.clone(), vec![]).await; + let _ = routing_table.find_nodes_close_to_node_ref(crypto_kind, nr.sequencing_filtered(Sequencing::PreferOrdered), vec![]).await; // Ensure we got the signed peer info if !nr.signed_node_info_has_valid_signature(routing_domain) { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index b623f5ba..68bab229 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1471,11 +1471,24 @@ impl RPCProcessor { let operation = match self.decode_rpc_operation(&encoded_msg) { Ok(v) => v, Err(e) => { - // Debug on error - veilid_log!(self debug "Dropping routed RPC: {}", e); + match e { + // Invalid messages that should be punished + RPCError::Protocol(_) | RPCError::InvalidFormat(_) => { + veilid_log!(self debug "Invalid routed RPC Operation: {}", e); + + // XXX: Punish routes that send routed undecodable crap + // self.network_manager().address_filter().punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage); + } + // Ignored messages that should be dropped + RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => { + veilid_log!(self trace "Dropping routed RPC Operation: {}", e); + } + // Internal errors that deserve louder logging + RPCError::Unimplemented(_) | RPCError::Internal(_) => { + veilid_log!(self error "Error decoding routed RPC operation: {}", e); + } + }; - // XXX: Punish routes that send routed undecodable crap - // self.network_manager().address_filter().punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage); return Ok(NetworkResult::invalid_message(e)); } }; @@ -1593,16 +1606,16 @@ impl RPCProcessor { if let Err(e) = self.waiting_rpc_table.complete_op_waiter(op_id, msg) { match e { RPCError::Unimplemented(_) | RPCError::Internal(_) => { - veilid_log!(self error "Could not complete rpc operation: id = {}: {}", op_id, e); + veilid_log!(self error "Error in RPC operation: id = {}: {}", op_id, e); } RPCError::InvalidFormat(_) | RPCError::Protocol(_) | RPCError::Network(_) | RPCError::TryAgain(_) => { - veilid_log!(self debug "Could not complete rpc operation: id = {}: {}", op_id, e); + veilid_log!(self debug "Could not complete RPC operation: id = {}: {}", op_id, e); } - RPCError::Ignore(_) => { - veilid_log!(self debug "Answer late: id = {}", op_id); + RPCError::Ignore(e) => { + veilid_log!(self debug "RPC operation ignored: id = {}: {}", op_id, e); } }; // Don't throw an error here because it's okay if the original operation timed out diff --git a/veilid-core/src/rpc_processor/operation_waiter.rs b/veilid-core/src/rpc_processor/operation_waiter.rs index 205fa339..7e4ee05c 100644 --- a/veilid-core/src/rpc_processor/operation_waiter.rs +++ b/veilid-core/src/rpc_processor/operation_waiter.rs @@ -8,7 +8,7 @@ where { waiter: OperationWaiter, op_id: OperationId, - result_receiver: Option>, + result_receiver: flume::Receiver<(Span, T)>, } impl OperationWaitHandle @@ -27,9 +27,7 @@ where C: Unpin + Clone, { fn drop(&mut self) { - if self.result_receiver.is_some() { - self.waiter.cancel_op_waiter(self.op_id); - } + self.waiter.cancel_op_waiter(self.op_id); } } @@ -106,7 +104,7 @@ where OperationWaitHandle { waiter: self.clone(), op_id, - result_receiver: Some(result_receiver), + result_receiver, } } @@ -125,65 +123,69 @@ where /// Get operation context pub fn get_op_context(&self, op_id: OperationId) -> Result { let inner = self.inner.lock(); - let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else { - return Err(RPCError::ignore(format!( - "Missing operation id getting op context: id={}", - op_id - ))); + let res = { + let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else { + return Err(RPCError::ignore(format!( + "Missing operation id getting op context: id={}", + op_id + ))); + }; + Ok(waiting_op.context.clone()) }; - Ok(waiting_op.context.clone()) + drop(inner); + res } /// Remove wait for op #[instrument(level = "trace", target = "rpc", skip_all)] fn cancel_op_waiter(&self, op_id: OperationId) { let mut inner = self.inner.lock(); - inner.waiting_op_table.remove(&op_id); + { + let waiting_op = inner.waiting_op_table.remove(&op_id); + drop(waiting_op); + } + drop(inner); } /// Complete the waiting op #[instrument(level = "trace", target = "rpc", skip_all)] pub fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> { - let waiting_op = { - let mut inner = self.inner.lock(); - inner - .waiting_op_table - .remove(&op_id) - .ok_or_else(RPCError::else_ignore(format!( - "Unmatched operation id: {}", - op_id - )))? + let mut inner = self.inner.lock(); + let res = { + let waiting_op = + inner + .waiting_op_table + .remove(&op_id) + .ok_or_else(RPCError::else_ignore(format!( + "Unmatched operation id: {}", + op_id + )))?; + waiting_op + .result_sender + .send((Span::current(), message)) + .map_err(RPCError::ignore) }; - waiting_op - .result_sender - .send((Span::current(), message)) - .map_err(RPCError::ignore) + drop(inner); + res } /// Wait for operation to complete #[instrument(level = "trace", target = "rpc", skip_all)] pub async fn wait_for_op( &self, - mut handle: OperationWaitHandle, + handle: OperationWaitHandle, timeout_us: TimestampDuration, ) -> Result, RPCError> { let timeout_ms = us_to_ms(timeout_us.as_u64()).map_err(RPCError::internal)?; - // Take the receiver - // After this, we must manually cancel since the cancel on handle drop is disabled - let result_receiver = handle.result_receiver.take().unwrap(); - - let result_fut = result_receiver.recv_async().in_current_span(); + let result_fut = handle.result_receiver.recv_async().in_current_span(); // wait for eventualvalue let start_ts = Timestamp::now(); let res = timeout(timeout_ms, result_fut).await.into_timeout_or(); match res { - TimeoutOr::Timeout => { - self.cancel_op_waiter(handle.op_id); - Ok(TimeoutOr::Timeout) - } + TimeoutOr::Timeout => Ok(TimeoutOr::Timeout), TimeoutOr::Value(Ok((_span_id, ret))) => { let end_ts = Timestamp::now(); @@ -192,7 +194,10 @@ where Ok(TimeoutOr::Value((ret, end_ts.saturating_sub(start_ts)))) } - TimeoutOr::Value(Err(e)) => Err(RPCError::ignore(e)), + TimeoutOr::Value(Err(e)) => { + // + Err(RPCError::ignore(e)) + } } } } diff --git a/veilid-core/src/rpc_processor/rpc_worker.rs b/veilid-core/src/rpc_processor/rpc_worker.rs index 8141f965..194a0f2f 100644 --- a/veilid-core/src/rpc_processor/rpc_worker.rs +++ b/veilid-core/src/rpc_processor/rpc_worker.rs @@ -70,7 +70,7 @@ impl RPCProcessor { match request.kind { // Process RPC Message RPCWorkerRequestKind::Message { message_encoded } => { - network_result_value_or_log!(self match self + network_result_value_or_log!(self target:"network_result", match self .process_rpc_message(message_encoded).instrument(rpc_request_span) .await { diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 73ea7ebf..722c41b9 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1120,7 +1120,7 @@ impl StorageManager { let dest = rpc_processor .resolve_target_to_destination( vc.target, - SafetySelection::Unsafe(Sequencing::NoPreference), + SafetySelection::Unsafe(Sequencing::PreferOrdered), ) .await .map_err(VeilidAPIError::from)?; diff --git a/veilid-tools/src/network_result.rs b/veilid-tools/src/network_result.rs index 88201b60..89ccf1f8 100644 --- a/veilid-tools/src/network_result.rs +++ b/veilid-tools/src/network_result.rs @@ -278,85 +278,3 @@ macro_rules! network_result_try { } }; } - -#[macro_export] -macro_rules! network_result_value_or_log { - ($self:ident $r:expr => $f:expr) => { - network_result_value_or_log!($self $r => [ "" ] $f ) - }; - ($self:ident $r:expr => [ $d:expr ] $f:expr) => { { - let __extra_message = if debug_target_enabled!("network_result") { - $d.to_string() - } else { - "".to_string() - }; - match $r { - NetworkResult::Timeout => { - veilid_log!($self debug - "{} at {}@{}:{} in {}{}", - "Timeout", - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::ServiceUnavailable(ref s) => { - veilid_log!($self debug - "{}({}) at {}@{}:{} in {}{}", - "ServiceUnavailable", - s, - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::NoConnection(ref e) => { - veilid_log!($self debug - "{}({}) at {}@{}:{} in {}{}", - "No connection", - e.to_string(), - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::AlreadyExists(ref e) => { - veilid_log!($self debug - "{}({}) at {}@{}:{} in {}{}", - "Already exists", - e.to_string(), - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::InvalidMessage(ref s) => { - veilid_log!($self debug - "{}({}) at {}@{}:{} in {}{}", - "Invalid message", - s, - file!(), - line!(), - column!(), - fn_name::uninstantiated!(), - __extra_message - ); - $f - } - NetworkResult::Value(v) => v, - } - } }; - -}