Merge branch 'dht-performance' into 'main'

correct consistency check, exclude initial connection time from latency

See merge request veilid/veilid!358
This commit is contained in:
Christien Rioux 2025-03-01 01:28:14 +00:00
commit f79198c545
2 changed files with 24 additions and 6 deletions

View File

@ -1163,12 +1163,21 @@ impl BucketEntryInner {
self.answer_stats_accounting_ordered self.answer_stats_accounting_ordered
.record_lost_answer(cur_ts); .record_lost_answer(cur_ts);
self.peer_stats.rpc_stats.recent_lost_answers_ordered += 1; self.peer_stats.rpc_stats.recent_lost_answers_ordered += 1;
if self.peer_stats.rpc_stats.recent_lost_answers_ordered
> UNRELIABLE_LOST_ANSWERS_ORDERED
{
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
}
} else { } else {
self.answer_stats_accounting_unordered self.answer_stats_accounting_unordered
.record_lost_answer(cur_ts); .record_lost_answer(cur_ts);
self.peer_stats.rpc_stats.recent_lost_answers_unordered += 1; self.peer_stats.rpc_stats.recent_lost_answers_unordered += 1;
if self.peer_stats.rpc_stats.recent_lost_answers_unordered
> UNRELIABLE_LOST_ANSWERS_UNORDERED
{
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
}
} }
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
self.peer_stats.rpc_stats.questions_in_flight -= 1; self.peer_stats.rpc_stats.questions_in_flight -= 1;
} }
pub(super) fn failed_to_send(&mut self, ts: Timestamp, expects_answer: bool) { pub(super) fn failed_to_send(&mut self, ts: Timestamp, expects_answer: bool) {

View File

@ -527,11 +527,11 @@ impl RPCProcessor {
.await; .await;
match &out { match &out {
Err(e) => { Err(e) => {
veilid_log!(self debug "RPC Lost (id={} {}): {}", id, debug_string, e); veilid_log!(self debug "RPC Lost (id={} {}): {} ({}) ", id, debug_string, e, waitable_reply.context.send_data_result.unique_flow().flow);
self.record_lost_answer(&waitable_reply.context); self.record_lost_answer(&waitable_reply.context);
} }
Ok(TimeoutOr::Timeout) => { Ok(TimeoutOr::Timeout) => {
veilid_log!(self debug "RPC Lost (id={} {}): Timeout", id, debug_string); veilid_log!(self debug "RPC Lost (id={} {}): Timeout ({})", id, debug_string, waitable_reply.context.send_data_result.unique_flow().flow);
self.record_lost_answer(&waitable_reply.context); self.record_lost_answer(&waitable_reply.context);
} }
Ok(TimeoutOr::Value((rpcreader, _))) => { Ok(TimeoutOr::Value((rpcreader, _))) => {
@ -1130,7 +1130,6 @@ impl RPCProcessor {
// Send question // Send question
let bytes: ByteCount = (message.len() as u64).into(); let bytes: ByteCount = (message.len() as u64).into();
let send_ts = Timestamp::now();
#[allow(unused_variables)] #[allow(unused_variables)]
let message_len = message.len(); let message_len = message.len();
let res = self let res = self
@ -1143,6 +1142,7 @@ impl RPCProcessor {
.await .await
.map_err(|e| { .map_err(|e| {
// If we're returning an error, clean up // If we're returning an error, clean up
let send_ts = Timestamp::now();
self.record_send_failure( self.record_send_failure(
RPCKind::Question, RPCKind::Question,
send_ts, send_ts,
@ -1152,6 +1152,9 @@ impl RPCProcessor {
); );
RPCError::network(e) RPCError::network(e)
})?; })?;
// Take send timestamp -after- send is attempted to exclude TCP connection time which
// may unfairly punish some nodes, randomly, based on their being in the connection table or not
let send_ts = Timestamp::now();
let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up // If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Question, send_ts, node_ref.unfiltered(), safety_route, remote_private_route); self.record_send_failure(RPCKind::Question, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);
@ -1219,7 +1222,6 @@ impl RPCProcessor {
// Send statement // Send statement
let bytes: ByteCount = (message.len() as u64).into(); let bytes: ByteCount = (message.len() as u64).into();
let send_ts = Timestamp::now();
#[allow(unused_variables)] #[allow(unused_variables)]
let message_len = message.len(); let message_len = message.len();
let res = self let res = self
@ -1232,6 +1234,7 @@ impl RPCProcessor {
.await .await
.map_err(|e| { .map_err(|e| {
// If we're returning an error, clean up // If we're returning an error, clean up
let send_ts = Timestamp::now();
self.record_send_failure( self.record_send_failure(
RPCKind::Statement, RPCKind::Statement,
send_ts, send_ts,
@ -1241,6 +1244,9 @@ impl RPCProcessor {
); );
RPCError::network(e) RPCError::network(e)
})?; })?;
// Take send timestamp -after- send is attempted to exclude TCP connection time which
// may unfairly punish some nodes, randomly, based on their being in the connection table or not
let send_ts = Timestamp::now();
let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up // If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Statement, send_ts, node_ref.unfiltered(), safety_route, remote_private_route); self.record_send_failure(RPCKind::Statement, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);
@ -1290,7 +1296,6 @@ impl RPCProcessor {
// Send the reply // Send the reply
let bytes: ByteCount = (message.len() as u64).into(); let bytes: ByteCount = (message.len() as u64).into();
let send_ts = Timestamp::now();
#[allow(unused_variables)] #[allow(unused_variables)]
let message_len = message.len(); let message_len = message.len();
let res = self let res = self
@ -1303,6 +1308,7 @@ impl RPCProcessor {
.await .await
.map_err(|e| { .map_err(|e| {
// If we're returning an error, clean up // If we're returning an error, clean up
let send_ts = Timestamp::now();
self.record_send_failure( self.record_send_failure(
RPCKind::Answer, RPCKind::Answer,
send_ts, send_ts,
@ -1312,6 +1318,9 @@ impl RPCProcessor {
); );
RPCError::network(e) RPCError::network(e)
})?; })?;
// Take send timestamp -after- send is attempted to exclude TCP connection time which
// may unfairly punish some nodes, randomly, based on their being in the connection table or not
let send_ts = Timestamp::now();
let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { let send_data_result = network_result_value_or_log!(self res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up // If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Answer, send_ts, node_ref.unfiltered(), safety_route, remote_private_route); self.record_send_failure(RPCKind::Answer, send_ts, node_ref.unfiltered(), safety_route, remote_private_route);