diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 56af514f..7f4bb9b4 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -1163,12 +1163,21 @@ impl BucketEntryInner { self.answer_stats_accounting_ordered .record_lost_answer(cur_ts); self.peer_stats.rpc_stats.recent_lost_answers_ordered += 1; + if self.peer_stats.rpc_stats.recent_lost_answers_ordered + > UNRELIABLE_LOST_ANSWERS_ORDERED + { + self.peer_stats.rpc_stats.first_consecutive_seen_ts = None; + } } else { self.answer_stats_accounting_unordered .record_lost_answer(cur_ts); self.peer_stats.rpc_stats.recent_lost_answers_unordered += 1; + if self.peer_stats.rpc_stats.recent_lost_answers_unordered + > UNRELIABLE_LOST_ANSWERS_UNORDERED + { + self.peer_stats.rpc_stats.first_consecutive_seen_ts = None; + } } - self.peer_stats.rpc_stats.first_consecutive_seen_ts = None; self.peer_stats.rpc_stats.questions_in_flight -= 1; } pub(super) fn failed_to_send(&mut self, ts: Timestamp, expects_answer: bool) { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 300f35dc..49260ae3 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -527,11 +527,11 @@ impl RPCProcessor { .await; match &out { Err(e) => { - veilid_log!(self debug "RPC Lost (id={} {}): {}", id, debug_string, e); + veilid_log!(self debug "RPC Lost (id={} {}): {} ({}) ", id, debug_string, e, waitable_reply.context.send_data_result.unique_flow().flow); self.record_lost_answer(&waitable_reply.context); } Ok(TimeoutOr::Timeout) => { - veilid_log!(self debug "RPC Lost (id={} {}): Timeout", id, debug_string); + veilid_log!(self debug "RPC Lost (id={} {}): Timeout ({})", id, debug_string, waitable_reply.context.send_data_result.unique_flow().flow); self.record_lost_answer(&waitable_reply.context); } Ok(TimeoutOr::Value((rpcreader, _))) => { @@ -1130,7 +1130,6 @@ impl RPCProcessor { // Send question let bytes: ByteCount = (message.len() as u64).into(); - let send_ts = Timestamp::now(); #[allow(unused_variables)] let message_len = message.len(); let res = self @@ -1143,6 +1142,7 @@ impl RPCProcessor { .await .map_err(|e| { // If we're returning an error, clean up + let send_ts = Timestamp::now(); self.record_send_failure( RPCKind::Question, send_ts, @@ -1152,6 +1152,9 @@ impl RPCProcessor { ); RPCError::network(e) })?; + // Take send timestamp -after- send is attempted to exclude TCP connection time which + // may unfairly punish some nodes, randomly, based on their being in the connection table or not + let send_ts = Timestamp::now(); let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { // If we couldn't send we're still cleaning up self.record_send_failure(RPCKind::Question, send_ts, node_ref.unfiltered(), safety_route, remote_private_route); @@ -1219,7 +1222,6 @@ impl RPCProcessor { // Send statement let bytes: ByteCount = (message.len() as u64).into(); - let send_ts = Timestamp::now(); #[allow(unused_variables)] let message_len = message.len(); let res = self @@ -1232,6 +1234,7 @@ impl RPCProcessor { .await .map_err(|e| { // If we're returning an error, clean up + let send_ts = Timestamp::now(); self.record_send_failure( RPCKind::Statement, send_ts, @@ -1241,6 +1244,9 @@ impl RPCProcessor { ); RPCError::network(e) })?; + // Take send timestamp -after- send is attempted to exclude TCP connection time which + // may unfairly punish some nodes, randomly, based on their being in the connection table or not + let send_ts = Timestamp::now(); let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { // If we couldn't send we're still cleaning up self.record_send_failure(RPCKind::Statement, send_ts, node_ref.unfiltered(), safety_route, remote_private_route); @@ -1290,7 +1296,6 @@ impl RPCProcessor { // Send the reply let bytes: ByteCount = (message.len() as u64).into(); - let send_ts = Timestamp::now(); #[allow(unused_variables)] let message_len = message.len(); let res = self @@ -1303,6 +1308,7 @@ impl RPCProcessor { .await .map_err(|e| { // If we're returning an error, clean up + let send_ts = Timestamp::now(); self.record_send_failure( RPCKind::Answer, send_ts, @@ -1312,6 +1318,9 @@ impl RPCProcessor { ); RPCError::network(e) })?; + // Take send timestamp -after- send is attempted to exclude TCP connection time which + // may unfairly punish some nodes, randomly, based on their being in the connection table or not + let send_ts = Timestamp::now(); let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { // If we couldn't send we're still cleaning up self.record_send_failure(RPCKind::Answer, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);