improve error handling for network errors

make relay requirements more aggressive
This commit is contained in:
Christien Rioux 2024-08-05 16:10:19 -05:00
parent f7ce5f93d0
commit e0c20ed4c9
7 changed files with 37 additions and 11 deletions

View File

@ -151,7 +151,7 @@ impl RawTcpProtocolHandler {
ps, ps,
)); ));
log_net!(debug "Connection accepted from: {} (TCP)", socket_addr); log_net!("Connection accepted from: {} (TCP)", socket_addr);
Ok(Some(conn)) Ok(Some(conn))
} }

View File

@ -286,7 +286,11 @@ impl WebsocketProtocolHandler {
ws_stream, ws_stream,
)); ));
log_net!(debug "Connection accepted from: {} ({})", socket_addr, if self.arc.tls { "WSS" } else { "WS" }); log_net!(
"Connection accepted from: {} ({})",
socket_addr,
if self.arc.tls { "WSS" } else { "WS" }
);
Ok(Some(conn)) Ok(Some(conn))
} }

View File

@ -279,7 +279,7 @@ impl NetworkConnection {
) -> SendPinBoxFuture<()> { ) -> SendPinBoxFuture<()> {
Box::pin(async move { Box::pin(async move {
log_net!( log_net!(
"== Starting process_connection loop for id={}, {:?}", connection_id, "Starting process_connection loop for id={}, {:?}", connection_id,
flow flow
); );
@ -294,7 +294,7 @@ impl NetworkConnection {
let new_timer = || { let new_timer = || {
sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async { sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async {
// timeout // timeout
log_net!("== Connection timeout on {:?}", flow); log_net!("Connection timeout on {:?}", flow);
RecvLoopAction::Timeout RecvLoopAction::Timeout
}) })
}; };
@ -354,7 +354,7 @@ impl NetworkConnection {
// Check for connection close // Check for connection close
if v.is_no_connection() { if v.is_no_connection() {
log_net!(debug "Connection closed from: {} ({})", peer_address.socket_addr(), peer_address.protocol_type()); log_net!("Connection closed from: {} ({})", peer_address.socket_addr(), peer_address.protocol_type());
return RecvLoopAction::Finish; return RecvLoopAction::Finish;
} }
@ -428,7 +428,7 @@ impl NetworkConnection {
} }
log_net!( log_net!(
"== Connection loop finished flow={:?}", "Connection loop finished flow={:?}",
flow flow
); );

View File

@ -172,6 +172,12 @@ impl RoutingTable {
return false; return false;
}; };
// Exclude any nodes that have 'failed to send' state indicating a
// connection drop or inability to reach the node
if e.peer_stats().rpc_stats.failed_to_send > 0 {
return false;
}
// Until we have a way of reducing a SignedRelayedNodeInfo to a SignedDirectNodeInfo // Until we have a way of reducing a SignedRelayedNodeInfo to a SignedDirectNodeInfo
// See https://gitlab.com/veilid/veilid/-/issues/381 // See https://gitlab.com/veilid/veilid/-/issues/381
// We should consider nodes with allocated relays as disqualified from being a relay themselves // We should consider nodes with allocated relays as disqualified from being a relay themselves

View File

@ -924,7 +924,7 @@ impl RPCProcessor {
remote_private_route: Option<PublicKey>, remote_private_route: Option<PublicKey>,
) { ) {
let wants_answer = matches!(rpc_kind, RPCKind::Question); let wants_answer = matches!(rpc_kind, RPCKind::Question);
// Record for node if this was not sent via a route // Record for node if this was not sent via a route
if safety_route.is_none() && remote_private_route.is_none() { if safety_route.is_none() && remote_private_route.is_none() {
node_ref.stats_failed_to_send(send_ts, wants_answer); node_ref.stats_failed_to_send(send_ts, wants_answer);
@ -1500,7 +1500,7 @@ impl RPCProcessor {
}, },
// Ignored messages that should be dropped // Ignored messages that should be dropped
RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => { RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => {
log_rpc!(debug "Dropping RPC Operation: {}", e); log_rpc!("Dropping RPC Operation: {}", e);
}, },
// Internal errors that deserve louder logging // Internal errors that deserve louder logging
RPCError::Unimplemented(_) | RPCError::Internal(_) => { RPCError::Unimplemented(_) | RPCError::Internal(_) => {
@ -1582,7 +1582,7 @@ impl RPCProcessor {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
// Debug on error // Debug on error
log_rpc!(debug "Dropping RPC operation: {}", e); log_rpc!(debug "Dropping routed RPC: {}", e);
// XXX: Punish routes that send routed undecodable crap // XXX: Punish routes that send routed undecodable crap
// address_filter.punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage); // address_filter.punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage);
@ -1666,7 +1666,21 @@ impl RPCProcessor {
if let Err(e) = self.unlocked_inner if let Err(e) = self.unlocked_inner
.waiting_rpc_table .waiting_rpc_table
.complete_op_waiter(op_id, msg) { .complete_op_waiter(op_id, msg) {
log_rpc!(debug "Operation id {} did not complete: {}", op_id, e); match e {
RPCError::Unimplemented(_) |
RPCError::Internal(_) => {
log_rpc!(error "Could not complete rpc operation: id = {}: {}", op_id, e);
},
RPCError::InvalidFormat(_) |
RPCError::Protocol(_) |
RPCError::Network(_) |
RPCError::TryAgain(_) => {
log_rpc!(debug "Could not complete rpc operation: id = {}: {}", op_id, e);
},
RPCError::Ignore(_) => {
log_rpc!("Answer late: id = {}", op_id);
},
};
// Don't throw an error here because it's okay if the original operation timed out // Don't throw an error here because it's okay if the original operation timed out
} }
Ok(NetworkResult::value(())) Ok(NetworkResult::value(()))

View File

@ -34,7 +34,7 @@ pub struct RPCStats {
pub last_seen_ts: Option<Timestamp>, // when the peer was last seen for any reason, including when we first attempted to reach out to it pub last_seen_ts: Option<Timestamp>, // when the peer was last seen for any reason, including when we first attempted to reach out to it
pub first_consecutive_seen_ts: Option<Timestamp>, // the timestamp of the first consecutive proof-of-life for this node (an answer or received question) pub first_consecutive_seen_ts: Option<Timestamp>, // the timestamp of the first consecutive proof-of-life for this node (an answer or received question)
pub recent_lost_answers: u32, // number of answers that have been lost since we lost reliability pub recent_lost_answers: u32, // number of answers that have been lost since we lost reliability
pub failed_to_send: u32, // number of messages that have failed to send since we last successfully sent one pub failed_to_send: u32, // number of messages that have failed to send or connections dropped since we last successfully sent one
} }
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]

View File

@ -32,6 +32,7 @@ impl<T> IoNetworkResultExt<T> for io::Result<T> {
// Err(e) => match e.kind() { // Err(e) => match e.kind() {
// io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout), // io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout),
// io::ErrorKind::UnexpectedEof // io::ErrorKind::UnexpectedEof
// | io::ErrorKind::NotConnected
// | io::ErrorKind::BrokenPipe // | io::ErrorKind::BrokenPipe
// | io::ErrorKind::ConnectionAborted // | io::ErrorKind::ConnectionAborted
// | io::ErrorKind::ConnectionRefused // | io::ErrorKind::ConnectionRefused
@ -52,6 +53,7 @@ impl<T> IoNetworkResultExt<T> for io::Result<T> {
match e.kind() { match e.kind() {
io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout), io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout),
io::ErrorKind::UnexpectedEof io::ErrorKind::UnexpectedEof
| io::ErrorKind::NotConnected
| io::ErrorKind::BrokenPipe | io::ErrorKind::BrokenPipe
| io::ErrorKind::ConnectionAborted | io::ErrorKind::ConnectionAborted
| io::ErrorKind::ConnectionRefused | io::ErrorKind::ConnectionRefused