[ci skip] correct consistency check for 'seen consecutively', ensure latency timing is done after calls are issued, not before connections are established

This commit is contained in:
Christien Rioux 2025-02-28 20:06:20 -05:00
parent a3cf47ac33
commit 76fa9e4507
2 changed files with 22 additions and 4 deletions

View File

@ -1163,12 +1163,21 @@ impl BucketEntryInner {
self.answer_stats_accounting_ordered
.record_lost_answer(cur_ts);
self.peer_stats.rpc_stats.recent_lost_answers_ordered += 1;
if self.peer_stats.rpc_stats.recent_lost_answers_ordered
> UNRELIABLE_LOST_ANSWERS_ORDERED
{
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
}
} else {
self.answer_stats_accounting_unordered
.record_lost_answer(cur_ts);
self.peer_stats.rpc_stats.recent_lost_answers_unordered += 1;
if self.peer_stats.rpc_stats.recent_lost_answers_unordered
> UNRELIABLE_LOST_ANSWERS_UNORDERED
{
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
}
}
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
self.peer_stats.rpc_stats.questions_in_flight -= 1;
}
pub(super) fn failed_to_send(&mut self, ts: Timestamp, expects_answer: bool) {

View File

@ -1130,7 +1130,6 @@ impl RPCProcessor {
// Send question
let bytes: ByteCount = (message.len() as u64).into();
let send_ts = Timestamp::now();
#[allow(unused_variables)]
let message_len = message.len();
let res = self
@ -1143,6 +1142,7 @@ impl RPCProcessor {
.await
.map_err(|e| {
// If we're returning an error, clean up
let send_ts = Timestamp::now();
self.record_send_failure(
RPCKind::Question,
send_ts,
@ -1152,6 +1152,9 @@ impl RPCProcessor {
);
RPCError::network(e)
})?;
// Take send timestamp -after- send is attempted to exclude TCP connection time which
// may unfairly punish some nodes, randomly, based on their being in the connection table or not
let send_ts = Timestamp::now();
let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Question, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);
@ -1219,7 +1222,6 @@ impl RPCProcessor {
// Send statement
let bytes: ByteCount = (message.len() as u64).into();
let send_ts = Timestamp::now();
#[allow(unused_variables)]
let message_len = message.len();
let res = self
@ -1232,6 +1234,7 @@ impl RPCProcessor {
.await
.map_err(|e| {
// If we're returning an error, clean up
let send_ts = Timestamp::now();
self.record_send_failure(
RPCKind::Statement,
send_ts,
@ -1241,6 +1244,9 @@ impl RPCProcessor {
);
RPCError::network(e)
})?;
// Take send timestamp -after- send is attempted to exclude TCP connection time which
// may unfairly punish some nodes, randomly, based on their being in the connection table or not
let send_ts = Timestamp::now();
let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Statement, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);
@ -1290,7 +1296,6 @@ impl RPCProcessor {
// Send the reply
let bytes: ByteCount = (message.len() as u64).into();
let send_ts = Timestamp::now();
#[allow(unused_variables)]
let message_len = message.len();
let res = self
@ -1303,6 +1308,7 @@ impl RPCProcessor {
.await
.map_err(|e| {
// If we're returning an error, clean up
let send_ts = Timestamp::now();
self.record_send_failure(
RPCKind::Answer,
send_ts,
@ -1312,6 +1318,9 @@ impl RPCProcessor {
);
RPCError::network(e)
})?;
// Take send timestamp -after- send is attempted to exclude TCP connection time which
// may unfairly punish some nodes, randomly, based on their being in the connection table or not
let send_ts = Timestamp::now();
let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Answer, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);