punishment work

This commit is contained in:
Christien Rioux 2024-06-27 20:19:48 +00:00
parent 7368e5d5d3
commit 9944c51368
8 changed files with 109 additions and 48 deletions

View File

@ -27,9 +27,9 @@ struct AddressFilterInner {
conn_count_by_ip6_prefix: BTreeMap<Ipv6Addr, usize>,
conn_timestamps_by_ip4: BTreeMap<Ipv4Addr, Vec<Timestamp>>,
conn_timestamps_by_ip6_prefix: BTreeMap<Ipv6Addr, Vec<Timestamp>>,
punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>,
punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>,
punishments_by_node_id: BTreeMap<TypedKey, Timestamp>,
punishments_by_ip4: BTreeMap<Ipv4Addr, Punishment>,
punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Punishment>,
punishments_by_node_id: BTreeMap<TypedKey, Punishment>,
dial_info_failures: BTreeMap<DialInfo, Timestamp>,
}
@ -194,7 +194,7 @@ impl AddressFilter {
inner.punishments_by_node_id.remove(&key);
// make the entry alive again if it's still here
if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(key) {
nr.operate_mut(|_rti, e| e.set_punished(false));
nr.operate_mut(|_rti, e| e.set_punished(None));
}
}
}
@ -314,10 +314,10 @@ impl AddressFilter {
self.is_node_id_punished_inner(&inner, node_id)
}
pub fn punish_node_id(&self, node_id: TypedKey) {
pub fn punish_node_id(&self, node_id: TypedKey, punishment_reason: PunishmentReason) {
if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(node_id) {
// make the entry dead if it's punished
nr.operate_mut(|_rti, e| e.set_punished(true));
nr.operate_mut(|_rti, e| e.set_punished(Some(punishment_reason)));
}
let ts = get_aligned_timestamp();
@ -327,7 +327,7 @@ impl AddressFilter {
log_net!(debug ">>> PUNISHMENT TABLE FULL: {}", node_id);
return;
}
log_net!(debug ">>> PUNISHED: {}", node_id);
log_net!(debug ">>> PUNISHED: {} for {:?}", node_id, punishment_reason);
inner
.punishments_by_node_id
.entry(node_id)

View File

@ -8,6 +8,7 @@ mod low_level_protocol_type;
mod network_class;
mod peer_address;
mod protocol_type;
mod punishment;
mod relay_kind;
mod signal_info;
mod socket_address;
@ -24,6 +25,7 @@ pub use low_level_protocol_type::*;
pub use network_class::*;
pub use peer_address::*;
pub use protocol_type::*;
pub use punishment::*;
pub use relay_kind::*;
pub use signal_info::*;
pub use socket_address::*;

View File

@ -142,8 +142,8 @@ impl Bucket {
}
a.1.with_inner(|ea| {
b.1.with_inner(|eb| {
let astate = state_ordering(ea.state(cur_ts));
let bstate = state_ordering(eb.state(cur_ts));
let astate = state_ordering(ea.state_reason(cur_ts));
let bstate = state_ordering(eb.state_reason(cur_ts));
// first kick dead nodes, then unreliable nodes
if astate < bstate {
return core::cmp::Ordering::Less;

View File

@ -26,15 +26,56 @@ const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
/// How many times do we try to ping a never-reached node before we call it dead
const NEVER_REACHED_PING_COUNT: u32 = 3;
// Do not change order here, it will mess up other sorts
// Bucket entry state reasons
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum BucketEntryPunishedReason {
Punished,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum BucketEntryDeadReason {
FailedToSend,
TooManyLostAnswers,
NoPingResponse,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum BucketEntryUnreliableReason {
FailedToSend,
LostAnswers,
NotSeenConsecutively,
InUnreliablePingSpan,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum BucketEntryStateReason {
Punished(BucketEntryPunishedReason),
Dead(BucketEntryDeadReason),
Unreliable(BucketEntryUnreliableReason),
Reliable,
}
// Do not change order here, it will mess up other sorts
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum BucketEntryState {
Punished,
Dead,
Unreliable,
Reliable,
}
impl From<BucketEntryStateReason> for BucketEntryState {
fn from(value: BucketEntryStateReason) -> Self
{
match value {
BucketEntryStateReason::Dead(_) => BucketEntryState::Dead,
BucketEntryStateReason::Unreliable(_) => BucketEntryState::Unreliable,
BucketEntryStateReason::Reliable => BucketEntryState::Reliable,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub(crate) struct LastFlowKey(ProtocolType, AddressType);
@ -92,7 +133,7 @@ pub(crate) struct BucketEntryInner {
transfer_stats_accounting: TransferStatsAccounting,
/// If the entry is being punished and should be considered dead
#[serde(skip)]
is_punished: bool,
is_punished: Option<PunishmentReason>,
/// Tracking identifier for NodeRef debugging
#[cfg(feature = "tracking")]
#[serde(skip)]
@ -202,7 +243,7 @@ impl BucketEntryInner {
// Less is more reliable then faster
pub fn cmp_fastest_reliable(cur_ts: Timestamp, e1: &Self, e2: &Self) -> std::cmp::Ordering {
// Reverse compare so most reliable is at front
let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts));
let ret = e2.state_reason(cur_ts).cmp(&e1.state_reason(cur_ts));
if ret != std::cmp::Ordering::Equal {
return ret;
}
@ -224,7 +265,7 @@ impl BucketEntryInner {
// Less is more reliable then older
pub fn cmp_oldest_reliable(cur_ts: Timestamp, e1: &Self, e2: &Self) -> std::cmp::Ordering {
// Reverse compare so most reliable is at front
let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts));
let ret = e2.state_reason(cur_ts).cmp(&e1.state_reason(cur_ts));
if ret != std::cmp::Ordering::Equal {
return ret;
}
@ -558,19 +599,17 @@ impl BucketEntryInner {
self.envelope_support.iter().rev().find(|x| VALID_ENVELOPE_VERSIONS.contains(x)).copied()
}
pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
if self.is_punished {
return BucketEntryState::Dead;
}
if self.check_reliable(cur_ts) {
BucketEntryState::Reliable
} else if self.check_dead(cur_ts) {
BucketEntryState::Dead
pub fn state_reason(&self, cur_ts: Timestamp) -> BucketEntryStateReason {
if Some(dead_reason) = self.check_dead(cur_ts) {
BucketEntryStateReason::Dead(dead_reason)
} else if Some(unreliable_reason) = self.check_unreliable(cur_ts) {
BucketEntryStateReason::Unreliable(unreliable_reason)
} else {
BucketEntryState::Unreliable
BucketEntryStateReason::Reliable
}
}
pub fn set_punished(&mut self, punished: bool) {
pub fn set_punished(&mut self, punished: Option<PunishmentReason>) {
self.is_punished = punished;
if punished {
self.clear_last_flows();
@ -650,40 +689,58 @@ impl BucketEntryInner {
}
///// state machine handling
pub(super) fn check_reliable(&self, cur_ts: Timestamp) -> bool {
pub(super) fn check_unreliable(&self, cur_ts: Timestamp) -> Option<BucketEntryUnreliableReason> {
// If we have had any failures to send, this is not reliable
if self.peer_stats.rpc_stats.failed_to_send > 0 {
return false;
return Some(BucketEntryUnreliableReason::FailedToSend);
}
// If we have had any lost answers recently, this is not reliable
if self.peer_stats.rpc_stats.recent_lost_answers > 0 {
return false;
return Some(BucketEntryUnreliableReason::LostAnswers);
}
match self.peer_stats.rpc_stats.first_consecutive_seen_ts {
// If we have not seen seen a node consecutively, it can't be reliable
None => false,
None => Some(BucketEntryUnreliableReason::NotSeenConsecutively),
// If we have seen the node consistently for longer than UNRELIABLE_PING_SPAN_SECS then it is reliable
Some(ts) => {
cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
let is_reliable = cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64);
if is_reliable {
None
} else {
Some(BucketEntryUnreliableReason::InUnreliablePingSpan)
}
}
}
}
pub(super) fn check_dead(&self, cur_ts: Timestamp) -> bool {
pub(super) fn check_dead(&self, cur_ts: Timestamp) -> Option<BucketEntryDeadReason> {
// If we have failed to send NEVER_REACHED_PING_COUNT times in a row, the node is dead
if self.peer_stats.rpc_stats.failed_to_send >= NEVER_REACHED_PING_COUNT {
return true;
return Some(BucketEntryDeadReason::FailedToSend);
}
match self.peer_stats.rpc_stats.last_seen_ts {
// a node is not dead if we haven't heard from it yet,
// but we give it NEVER_REACHED_PING_COUNT chances to ping before we say it's dead
None => self.peer_stats.rpc_stats.recent_lost_answers >= NEVER_REACHED_PING_COUNT,
None => {
let is_dead = self.peer_stats.rpc_stats.recent_lost_answers >= NEVER_REACHED_PING_COUNT;
if is_dead {
Some(BucketEntryDeadReason::TooManyLostAnswers)
} else {
None
}
}
// return dead if we have not heard from the node at all for the duration of the unreliable ping span
Some(ts) => {
cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
let is_dead = cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64);
if is_dead {
Some(BucketEntryDeadReason::NoPingResponse)
} else {
None
}
}
}
}
@ -712,7 +769,7 @@ impl BucketEntryInner {
// Check if this node needs a ping right now to validate it is still reachable
pub(super) fn needs_ping(&self, cur_ts: Timestamp) -> bool {
// See which ping pattern we are to use
let state = self.state(cur_ts);
let state = self.state_reason(cur_ts);
match state {
BucketEntryState::Reliable => {
@ -798,7 +855,7 @@ impl BucketEntryInner {
format!(
"state: {:?}, first_consecutive_seen_ts: {}, last_seen_ts: {}",
self.state(cur_ts),
self.state_reason(cur_ts),
first_consecutive_seen_ts,
last_seen_ts_str
)

View File

@ -134,7 +134,7 @@ impl RoutingTable {
let cap_match = e.1.with(inner, |_rti, e| {
e.has_all_capabilities(RoutingDomain::PublicInternet, &capabilities)
});
let state = e.1.with(inner, |_rti, e| e.state(cur_ts));
let state = e.1.with(inner, |_rti, e| e.state_reason(cur_ts));
state >= min_state && cap_match
})
.collect();
@ -142,7 +142,7 @@ impl RoutingTable {
if !filtered_entries.is_empty() {
out += &format!("{} Bucket #{}:\n", ck, b);
for e in filtered_entries {
let state = e.1.with(inner, |_rti, e| e.state(cur_ts));
let state = e.1.with(inner, |_rti, e| e.state_reason(cur_ts));
out += &format!(
" {} [{}] {} [{}]\n",
e.0.encode(),
@ -210,7 +210,9 @@ impl RoutingTable {
while c < COLS {
let mut cnt = 0;
for e in inner.buckets[ck][b].entries() {
if e.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) {
if e.1
.with(inner, |_rti, e| e.state_reason(cur_ts) >= min_state)
{
cnt += 1;
}
}

View File

@ -130,7 +130,7 @@ pub(crate) trait NodeRefBase: Sized {
self.operate(|_rti, e| e.best_envelope_version())
}
fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
self.operate(|_rti, e| e.state(cur_ts))
self.operate(|_rti, e| e.state_reason(cur_ts))
}
fn peer_stats(&self) -> PeerStats {
self.operate(|_rti, e| e.peer_stats().clone())

View File

@ -479,7 +479,7 @@ impl RoutingTableInner {
mut f: F,
) -> Option<T> {
for entry in &self.all_entries {
if entry.with_inner(|e| e.state(cur_ts) >= min_state) {
if entry.with_inner(|e| e.state_reason(cur_ts) >= min_state) {
if let Some(out) = f(self, entry) {
return Some(out);
}
@ -498,7 +498,7 @@ impl RoutingTableInner {
) -> Option<T> {
let mut entries = Vec::with_capacity(self.all_entries.len());
for entry in self.all_entries.iter() {
if entry.with_inner(|e| e.state(cur_ts) >= min_state) {
if entry.with_inner(|e| e.state_reason(cur_ts) >= min_state) {
entries.push(entry);
}
}
@ -879,7 +879,7 @@ impl RoutingTableInner {
let cur_ts = get_aligned_timestamp();
for entry in self.all_entries.iter() {
match entry.with_inner(|e| e.state(cur_ts)) {
match entry.with_inner(|e| e.state_reason(cur_ts)) {
BucketEntryState::Reliable => {
reliable_entry_count += 1;
}
@ -1070,7 +1070,7 @@ impl RoutingTableInner {
move |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
if let Some(entry) = &v {
// always filter out dead nodes
!entry.with_inner(|e| e.state(cur_ts) == BucketEntryState::Dead)
!entry.with_inner(|e| e.state_reason(cur_ts) == BucketEntryState::Dead)
} else {
// always filter out self peer, as it is irrelevant to the 'fastest nodes' search
false
@ -1106,8 +1106,8 @@ impl RoutingTableInner {
let be = b_entry.as_ref().unwrap();
ae.with_inner(|ae| {
be.with_inner(|be| {
let ra = ae.check_reliable(cur_ts);
let rb = be.check_reliable(cur_ts);
let ra = ae.check_unreliable(cur_ts);
let rb = be.check_unreliable(cur_ts);
if ra != rb {
if ra {
return core::cmp::Ordering::Less;
@ -1189,10 +1189,10 @@ impl RoutingTableInner {
// reliable nodes come first, pessimistically treating our own node as unreliable
let ra = a_entry
.as_ref()
.map_or(false, |x| x.with_inner(|x| x.check_reliable(cur_ts)));
.map_or(false, |x| x.with_inner(|x| x.check_unreliable(cur_ts)));
let rb = b_entry
.as_ref()
.map_or(false, |x| x.with_inner(|x| x.check_reliable(cur_ts)));
.map_or(false, |x| x.with_inner(|x| x.check_unreliable(cur_ts)));
if ra != rb {
if ra {
return core::cmp::Ordering::Less;

View File

@ -52,7 +52,7 @@ impl RoutingTable {
continue;
}
let state = entry.with(&inner, |_rti, e| e.state(cur_ts));
let state = entry.with(&inner, |_rti, e| e.state_reason(cur_ts));
match state {
BucketEntryState::Dead => {
// Do nothing with dead entries