From 4f9e19642ccb6745b3cda2db1d4ded91e145ffd6 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 28 Jun 2024 00:42:20 +0000 Subject: [PATCH] improve punishment tracking, make reasons for punishment accessible --- .../src/network_manager/address_filter.rs | 34 +++++----- veilid-core/src/network_manager/mod.rs | 9 ++- .../src/network_manager/network_connection.rs | 2 +- .../src/network_manager/types/punishment.rs | 23 +++++++ veilid-core/src/routing_table/bucket.rs | 14 +--- veilid-core/src/routing_table/bucket_entry.rs | 68 +++++++++++++------ veilid-core/src/routing_table/debug.rs | 43 +++++++++--- veilid-core/src/routing_table/mod.rs | 4 +- veilid-core/src/routing_table/node_ref.rs | 5 +- .../src/routing_table/routing_table_inner.rs | 47 ++++++------- .../src/routing_table/tasks/kick_buckets.rs | 6 +- .../routing_table/tasks/relay_management.rs | 9 ++- veilid-core/src/rpc_processor/mod.rs | 19 +++--- .../tasks/check_active_watches.rs | 5 +- veilid-core/src/veilid_api/debug.rs | 4 +- 15 files changed, 182 insertions(+), 110 deletions(-) create mode 100644 veilid-core/src/network_manager/types/punishment.rs diff --git a/veilid-core/src/network_manager/address_filter.rs b/veilid-core/src/network_manager/address_filter.rs index 7cd281b9..1e35f44b 100644 --- a/veilid-core/src/network_manager/address_filter.rs +++ b/veilid-core/src/network_manager/address_filter.rs @@ -151,7 +151,7 @@ impl AddressFilter { let mut dead_keys = Vec::::new(); for (key, value) in &mut inner.punishments_by_ip4 { // Drop punishments older than the punishment duration - if cur_ts.as_u64().saturating_sub(value.as_u64()) + if cur_ts.as_u64().saturating_sub(value.timestamp.as_u64()) > self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64 { dead_keys.push(*key); @@ -167,7 +167,7 @@ impl AddressFilter { let mut dead_keys = Vec::::new(); for (key, value) in &mut inner.punishments_by_ip6_prefix { // Drop punishments older than the punishment duration - if cur_ts.as_u64().saturating_sub(value.as_u64()) + if cur_ts.as_u64().saturating_sub(value.timestamp.as_u64()) > self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64 { dead_keys.push(*key); @@ -183,7 +183,7 @@ impl AddressFilter { let mut dead_keys = Vec::::new(); for (key, value) in &mut inner.punishments_by_node_id { // Drop punishments older than the punishment duration - if cur_ts.as_u64().saturating_sub(value.as_u64()) + if cur_ts.as_u64().saturating_sub(value.timestamp.as_u64()) > self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64 { dead_keys.push(*key); @@ -278,9 +278,10 @@ impl AddressFilter { inner.punishments_by_node_id.clear(); } - pub fn punish_ip_addr(&self, addr: IpAddr) { - log_net!(debug ">>> PUNISHED: {}", addr); - let ts = get_aligned_timestamp(); + pub fn punish_ip_addr(&self, addr: IpAddr, reason: PunishmentReason) { + log_net!(debug ">>> PUNISHED: {} for {:?}", addr, reason); + let timestamp = get_aligned_timestamp(); + let punishment = Punishment { reason, timestamp }; let ipblock = ip_to_ipblock( self.unlocked_inner.max_connections_per_ip6_prefix_size, @@ -292,13 +293,13 @@ impl AddressFilter { IpAddr::V4(v4) => inner .punishments_by_ip4 .entry(v4) - .and_modify(|v| *v = ts) - .or_insert(ts), + .and_modify(|v| *v = punishment) + .or_insert(punishment), IpAddr::V6(v6) => inner .punishments_by_ip6_prefix .entry(v6) - .and_modify(|v| *v = ts) - .or_insert(ts), + .and_modify(|v| *v = punishment) + .or_insert(punishment), }; } @@ -314,25 +315,26 @@ impl AddressFilter { self.is_node_id_punished_inner(&inner, node_id) } - pub fn punish_node_id(&self, node_id: TypedKey, punishment_reason: PunishmentReason) { + pub fn punish_node_id(&self, node_id: TypedKey, reason: PunishmentReason) { if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(node_id) { // make the entry dead if it's punished - nr.operate_mut(|_rti, e| e.set_punished(Some(punishment_reason))); + nr.operate_mut(|_rti, e| e.set_punished(Some(reason))); } - let ts = get_aligned_timestamp(); + let timestamp = get_aligned_timestamp(); + let punishment = Punishment { reason, timestamp }; let mut inner = self.inner.lock(); if inner.punishments_by_node_id.len() >= MAX_PUNISHMENTS_BY_NODE_ID { log_net!(debug ">>> PUNISHMENT TABLE FULL: {}", node_id); return; } - log_net!(debug ">>> PUNISHED: {} for {:?}", node_id, punishment_reason); + log_net!(debug ">>> PUNISHED: {} for {:?}", node_id, reason); inner .punishments_by_node_id .entry(node_id) - .and_modify(|v| *v = ts) - .or_insert(ts); + .and_modify(|v| *v = punishment) + .or_insert(punishment); } pub async fn address_filter_task_routine( diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index bb0d3c7e..ed4ee073 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -930,7 +930,8 @@ impl NetworkManager { // Ensure we can read the magic number if data.len() < 4 { log_net!(debug "short packet"); - self.address_filter().punish_ip_addr(remote_addr); + self.address_filter() + .punish_ip_addr(remote_addr, PunishmentReason::ShortPacket); return Ok(false); } @@ -966,7 +967,8 @@ impl NetworkManager { Err(e) => { log_net!(debug "envelope failed to decode: {}", e); // safe to punish here because relays also check here to ensure they arent forwarding things that don't decode - self.address_filter().punish_ip_addr(remote_addr); + self.address_filter() + .punish_ip_addr(remote_addr, PunishmentReason::FailedToDecodeEnvelope); return Ok(false); } }; @@ -1099,7 +1101,8 @@ impl NetworkManager { // Can't punish by ip address here because relaying can't decrypt envelope bodies to check // But because the envelope was properly signed by the time it gets here, it is safe to // punish by node id - self.address_filter().punish_node_id(sender_id); + self.address_filter() + .punish_node_id(sender_id, PunishmentReason::FailedToDecryptEnvelopeBody); return Ok(false); } }; diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index df4f03e4..7cad975d 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -362,7 +362,7 @@ impl NetworkConnection { // Punish invalid framing (tcp framing or websocket framing) if v.is_invalid_message() { - address_filter.punish_ip_addr(peer_address.socket_addr().ip()); + address_filter.punish_ip_addr(peer_address.socket_addr().ip(), PunishmentReason::InvalidFraming); return RecvLoopAction::Finish; } diff --git a/veilid-core/src/network_manager/types/punishment.rs b/veilid-core/src/network_manager/types/punishment.rs new file mode 100644 index 00000000..06e15204 --- /dev/null +++ b/veilid-core/src/network_manager/types/punishment.rs @@ -0,0 +1,23 @@ +use super::*; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PunishmentReason { + // IP-level punishments + FailedToDecryptEnvelopeBody, + FailedToDecodeEnvelope, + ShortPacket, + InvalidFraming, + // Node-level punishments + FailedToDecodeOperation, + WrongSenderPeerInfo, + FailedToVerifySenderPeerInfo, + FailedToRegisterSenderPeerInfo, + // Route-level punishments + // FailedToDecodeRoutedMessage, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct Punishment { + pub reason: PunishmentReason, + pub timestamp: Timestamp, +} diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 63a3d3d5..708a1898 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -26,14 +26,6 @@ struct SerializedBucketData { entries: Vec, } -fn state_ordering(state: BucketEntryState) -> usize { - match state { - BucketEntryState::Dead => 0, - BucketEntryState::Unreliable => 1, - BucketEntryState::Reliable => 2, - } -} - impl Bucket { pub fn new(kind: CryptoKind) -> Self { Self { @@ -142,9 +134,9 @@ impl Bucket { } a.1.with_inner(|ea| { b.1.with_inner(|eb| { - let astate = state_ordering(ea.state_reason(cur_ts)); - let bstate = state_ordering(eb.state_reason(cur_ts)); - // first kick dead nodes, then unreliable nodes + let astate = ea.state(cur_ts).ordering(); + let bstate = eb.state(cur_ts).ordering(); + // first kick punished nodes, then dead nodes, then unreliable nodes if astate < bstate { return core::cmp::Ordering::Less; } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 5a3218d0..1f288e0f 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -26,21 +26,15 @@ const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; /// How many times do we try to ping a never-reached node before we call it dead const NEVER_REACHED_PING_COUNT: u32 = 3; -// Bucket entry state reasons #[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum BucketEntryPunishedReason { - Punished, -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum BucketEntryDeadReason { +pub(crate) enum BucketEntryDeadReason { FailedToSend, TooManyLostAnswers, NoPingResponse, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum BucketEntryUnreliableReason { +pub(crate) enum BucketEntryUnreliableReason { FailedToSend, LostAnswers, NotSeenConsecutively, @@ -49,7 +43,7 @@ enum BucketEntryUnreliableReason { #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum BucketEntryStateReason { - Punished(BucketEntryPunishedReason), + Punished(PunishmentReason), Dead(BucketEntryDeadReason), Unreliable(BucketEntryUnreliableReason), Reliable, @@ -64,10 +58,31 @@ pub(crate) enum BucketEntryState { Reliable, } +impl BucketEntryState { + pub fn is_alive(&self) -> bool { + match self { + BucketEntryState::Punished => false, + BucketEntryState::Dead => false, + BucketEntryState::Unreliable => true, + BucketEntryState::Reliable => true, + } + } + pub fn ordering(&self) -> usize { + match self { + BucketEntryState::Punished => 0, + BucketEntryState::Dead => 1, + BucketEntryState::Unreliable => 2, + BucketEntryState::Reliable => 3, + } + } + +} + impl From for BucketEntryState { fn from(value: BucketEntryStateReason) -> Self { match value { + BucketEntryStateReason::Punished(_) => BucketEntryState::Punished, BucketEntryStateReason::Dead(_) => BucketEntryState::Dead, BucketEntryStateReason::Unreliable(_) => BucketEntryState::Unreliable, BucketEntryStateReason::Reliable => BucketEntryState::Reliable, @@ -133,7 +148,7 @@ pub(crate) struct BucketEntryInner { transfer_stats_accounting: TransferStatsAccounting, /// If the entry is being punished and should be considered dead #[serde(skip)] - is_punished: Option, + punishment: Option, /// Tracking identifier for NodeRef debugging #[cfg(feature = "tracking")] #[serde(skip)] @@ -243,7 +258,7 @@ impl BucketEntryInner { // Less is more reliable then faster pub fn cmp_fastest_reliable(cur_ts: Timestamp, e1: &Self, e2: &Self) -> std::cmp::Ordering { // Reverse compare so most reliable is at front - let ret = e2.state_reason(cur_ts).cmp(&e1.state_reason(cur_ts)); + let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts)); if ret != std::cmp::Ordering::Equal { return ret; } @@ -265,7 +280,7 @@ impl BucketEntryInner { // Less is more reliable then older pub fn cmp_oldest_reliable(cur_ts: Timestamp, e1: &Self, e2: &Self) -> std::cmp::Ordering { // Reverse compare so most reliable is at front - let ret = e2.state_reason(cur_ts).cmp(&e1.state_reason(cur_ts)); + let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts)); if ret != std::cmp::Ordering::Equal { return ret; } @@ -467,7 +482,7 @@ impl BucketEntryInner { // Stores a flow in this entry's table of last flows pub fn set_last_flow(&mut self, last_flow: Flow, timestamp: Timestamp) { - if self.is_punished { + if self.punishment.is_some() { // Don't record connection if this entry is currently punished return; } @@ -600,18 +615,24 @@ impl BucketEntryInner { } pub fn state_reason(&self, cur_ts: Timestamp) -> BucketEntryStateReason { - - if Some(dead_reason) = self.check_dead(cur_ts) { + if let Some(punished_reason) = self.punishment { + BucketEntryStateReason::Punished(punished_reason) + } else if let Some(dead_reason) = self.check_dead(cur_ts) { BucketEntryStateReason::Dead(dead_reason) - } else if Some(unreliable_reason) = self.check_unreliable(cur_ts) { + } else if let Some(unreliable_reason) = self.check_unreliable(cur_ts) { BucketEntryStateReason::Unreliable(unreliable_reason) } else { BucketEntryStateReason::Reliable } } + + pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState { + self.state_reason(cur_ts).into() + } + pub fn set_punished(&mut self, punished: Option) { - self.is_punished = punished; - if punished { + self.punishment = punished; + if punished.is_some() { self.clear_last_flows(); } } @@ -769,7 +790,7 @@ impl BucketEntryInner { // Check if this node needs a ping right now to validate it is still reachable pub(super) fn needs_ping(&self, cur_ts: Timestamp) -> bool { // See which ping pattern we are to use - let state = self.state_reason(cur_ts); + let state = self.state(cur_ts); match state { BucketEntryState::Reliable => { @@ -809,6 +830,11 @@ impl BucketEntryInner { error!("Should not be asking this for dead nodes"); false } + BucketEntryState::Punished => { + error!("Should not be asking this for punished nodes"); + false + } + } } @@ -830,7 +856,7 @@ impl BucketEntryInner { self.peer_stats.rpc_stats.last_seen_ts = None; self.peer_stats.rpc_stats.failed_to_send = 0; self.peer_stats.rpc_stats.recent_lost_answers = 0; - assert!(!self.check_dead(cur_ts)); + assert!(self.check_dead(cur_ts).is_none()); } pub(super) fn _state_debug_info(&self, cur_ts: Timestamp) -> String { @@ -942,7 +968,7 @@ impl BucketEntry { }, latency_stats_accounting: LatencyStatsAccounting::new(), transfer_stats_accounting: TransferStatsAccounting::new(), - is_punished: false, + punishment: None, #[cfg(feature = "tracking")] next_track_id: 0, #[cfg(feature = "tracking")] diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index b58969e6..221c0eca 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -104,6 +104,35 @@ impl RoutingTable { out } + fn format_state_reason(state_reason: BucketEntryStateReason) -> &'static str { + match state_reason { + BucketEntryStateReason::Punished(p) => match p { + PunishmentReason::FailedToDecryptEnvelopeBody => "PCRYPT", + PunishmentReason::FailedToDecodeEnvelope => "PDECEN", + PunishmentReason::ShortPacket => "PSHORT", + PunishmentReason::InvalidFraming => "PFRAME", + PunishmentReason::FailedToDecodeOperation => "PDECOP", + PunishmentReason::WrongSenderPeerInfo => "PSPBAD", + PunishmentReason::FailedToVerifySenderPeerInfo => "PSPVER", + PunishmentReason::FailedToRegisterSenderPeerInfo => "PSPREG", + // + }, + BucketEntryStateReason::Dead(d) => match d { + BucketEntryDeadReason::FailedToSend => "DFSEND", + BucketEntryDeadReason::TooManyLostAnswers => "DALOST", + BucketEntryDeadReason::NoPingResponse => "DNOPNG", + }, + BucketEntryStateReason::Unreliable(u) => match u { + BucketEntryUnreliableReason::FailedToSend => "UFSEND", + BucketEntryUnreliableReason::LostAnswers => "UALOST", + BucketEntryUnreliableReason::NotSeenConsecutively => "UNSEEN", + BucketEntryUnreliableReason::InUnreliablePingSpan => "UUPING", + // + }, + BucketEntryStateReason::Reliable => "RELIBL", + } + } + pub(crate) fn debug_info_entries( &self, min_state: BucketEntryState, @@ -134,7 +163,7 @@ impl RoutingTable { let cap_match = e.1.with(inner, |_rti, e| { e.has_all_capabilities(RoutingDomain::PublicInternet, &capabilities) }); - let state = e.1.with(inner, |_rti, e| e.state_reason(cur_ts)); + let state = e.1.with(inner, |_rti, e| e.state(cur_ts)); state >= min_state && cap_match }) .collect(); @@ -142,15 +171,11 @@ impl RoutingTable { if !filtered_entries.is_empty() { out += &format!("{} Bucket #{}:\n", ck, b); for e in filtered_entries { - let state = e.1.with(inner, |_rti, e| e.state_reason(cur_ts)); + let state_reason = e.1.with(inner, |_rti, e| e.state_reason(cur_ts)); out += &format!( " {} [{}] {} [{}]\n", e.0.encode(), - match state { - BucketEntryState::Reliable => "R", - BucketEntryState::Unreliable => "U", - BucketEntryState::Dead => "D", - }, + Self::format_state_reason(state_reason), e.1.with(inner, |_rti, e| { e.peer_stats() .latency @@ -210,9 +235,7 @@ impl RoutingTable { while c < COLS { let mut cnt = 0; for e in inner.buckets[ck][b].entries() { - if e.1 - .with(inner, |_rti, e| e.state_reason(cur_ts) >= min_state) - { + if e.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) { cnt += 1; } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 0418a23c..161f34f1 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -750,8 +750,8 @@ impl RoutingTable { let cur_ts = get_aligned_timestamp(); self.inner .write() - .with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| { - e.with_mut(rti, |_rti, ei| ei.set_punished(false)); + .with_entries_mut(cur_ts, BucketEntryState::Punished, |rti, e| { + e.with_mut(rti, |_rti, ei| ei.set_punished(None)); Option::<()>::None }); } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 5165db4d..3fb31010 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -129,9 +129,12 @@ pub(crate) trait NodeRefBase: Sized { fn best_envelope_version(&self) -> Option { self.operate(|_rti, e| e.best_envelope_version()) } - fn state(&self, cur_ts: Timestamp) -> BucketEntryState { + fn state_reason(&self, cur_ts: Timestamp) -> BucketEntryStateReason { self.operate(|_rti, e| e.state_reason(cur_ts)) } + fn state(&self, cur_ts: Timestamp) -> BucketEntryState { + self.operate(|_rti, e| e.state(cur_ts)) + } fn peer_stats(&self) -> PeerStats { self.operate(|_rti, e| e.peer_stats().clone()) } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index d912aa95..265f65b2 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -479,7 +479,7 @@ impl RoutingTableInner { mut f: F, ) -> Option { for entry in &self.all_entries { - if entry.with_inner(|e| e.state_reason(cur_ts) >= min_state) { + if entry.with_inner(|e| e.state(cur_ts) >= min_state) { if let Some(out) = f(self, entry) { return Some(out); } @@ -498,7 +498,7 @@ impl RoutingTableInner { ) -> Option { let mut entries = Vec::with_capacity(self.all_entries.len()); for entry in self.all_entries.iter() { - if entry.with_inner(|e| e.state_reason(cur_ts) >= min_state) { + if entry.with_inner(|e| e.state(cur_ts) >= min_state) { entries.push(entry); } } @@ -559,7 +559,7 @@ impl RoutingTableInner { } #[allow(dead_code)] - pub fn get_all_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec { + pub fn get_all_alive_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec { let mut node_refs = Vec::::with_capacity(self.bucket_entry_count()); self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, entry| { node_refs.push(NodeRef::new(outer_self.clone(), entry, None)); @@ -873,13 +873,14 @@ impl RoutingTableInner { // Routing Table Health Metrics pub fn get_routing_table_health(&self) -> RoutingTableHealth { + let mut _punished_entry_count: usize = 0; let mut reliable_entry_count: usize = 0; let mut unreliable_entry_count: usize = 0; let mut dead_entry_count: usize = 0; let cur_ts = get_aligned_timestamp(); for entry in self.all_entries.iter() { - match entry.with_inner(|e| e.state_reason(cur_ts)) { + match entry.with_inner(|e| e.state(cur_ts)) { BucketEntryState::Reliable => { reliable_entry_count += 1; } @@ -889,6 +890,9 @@ impl RoutingTableInner { BucketEntryState::Dead => { dead_entry_count += 1; } + BucketEntryState::Punished => { + _punished_entry_count += 1; + } } } @@ -1065,19 +1069,11 @@ impl RoutingTableInner { { let cur_ts = get_aligned_timestamp(); - // Add filter to remove dead nodes always - let filter_dead = Box::new( - move |_rti: &RoutingTableInner, v: Option>| { - if let Some(entry) = &v { - // always filter out dead nodes - !entry.with_inner(|e| e.state_reason(cur_ts) == BucketEntryState::Dead) - } else { - // always filter out self peer, as it is irrelevant to the 'fastest nodes' search - false - } - }, - ) as RoutingTableEntryFilter; - filters.push_front(filter_dead); + // always filter out self peer, as it is irrelevant to the 'fastest nodes' search + let filter_self = + Box::new(move |_rti: &RoutingTableInner, v: Option>| v.is_some()) + as RoutingTableEntryFilter; + filters.push_front(filter_self); // Fastest sort let sort = |_rti: &RoutingTableInner, @@ -1106,8 +1102,8 @@ impl RoutingTableInner { let be = b_entry.as_ref().unwrap(); ae.with_inner(|ae| { be.with_inner(|be| { - let ra = ae.check_unreliable(cur_ts); - let rb = be.check_unreliable(cur_ts); + let ra = ae.check_unreliable(cur_ts).is_none(); + let rb = be.check_unreliable(cur_ts).is_none(); if ra != rb { if ra { return core::cmp::Ordering::Less; @@ -1159,6 +1155,7 @@ impl RoutingTableInner { }; // Filter to ensure entries support the crypto kind in use + // always filter out dead and punished nodes let filter = Box::new( move |_rti: &RoutingTableInner, opt_entry: Option>| { if let Some(entry) = opt_entry { @@ -1187,12 +1184,12 @@ impl RoutingTableInner { } // reliable nodes come first, pessimistically treating our own node as unreliable - let ra = a_entry - .as_ref() - .map_or(false, |x| x.with_inner(|x| x.check_unreliable(cur_ts))); - let rb = b_entry - .as_ref() - .map_or(false, |x| x.with_inner(|x| x.check_unreliable(cur_ts))); + let ra = a_entry.as_ref().map_or(false, |x| { + x.with_inner(|x| x.check_unreliable(cur_ts).is_none()) + }); + let rb = b_entry.as_ref().map_or(false, |x| { + x.with_inner(|x| x.check_unreliable(cur_ts).is_none()) + }); if ra != rb { if ra { return core::cmp::Ordering::Less; diff --git a/veilid-core/src/routing_table/tasks/kick_buckets.rs b/veilid-core/src/routing_table/tasks/kick_buckets.rs index 39628492..4e56e314 100644 --- a/veilid-core/src/routing_table/tasks/kick_buckets.rs +++ b/veilid-core/src/routing_table/tasks/kick_buckets.rs @@ -52,10 +52,10 @@ impl RoutingTable { continue; } - let state = entry.with(&inner, |_rti, e| e.state_reason(cur_ts)); + let state = entry.with(&inner, |_rti, e| e.state(cur_ts)); match state { - BucketEntryState::Dead => { - // Do nothing with dead entries + BucketEntryState::Dead | BucketEntryState::Punished => { + // Do nothing with dead or punished entries } BucketEntryState::Unreliable => { // Add to closest unreliable nodes list diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs index f9977c05..e753f519 100644 --- a/veilid-core/src/routing_table/tasks/relay_management.rs +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -63,10 +63,13 @@ impl RoutingTable { // If we already have a relay, see if it is dead, or if we don't need it any more let has_relay = { if let Some(relay_node) = self.relay_node(RoutingDomain::PublicInternet) { - let state = relay_node.state(cur_ts); + let state_reason = relay_node.state_reason(cur_ts); // Relay node is dead or no longer needed - if matches!(state, BucketEntryState::Dead) { - log_rtab!(debug "Relay node died, dropping relay {}", relay_node); + if matches!( + state_reason, + BucketEntryStateReason::Dead(_) | BucketEntryStateReason::Punished(_) + ) { + log_rtab!(debug "Relay node is now {:?}, dropping relay {}", state_reason, relay_node); editor.clear_relay_node(); false } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index d942fec7..60234b05 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -504,7 +504,7 @@ impl RPCProcessor { // ensure we have some dial info for the entry already, // and that the node is still alive // if not, we should keep looking for better info - if !matches!(nr.state(get_aligned_timestamp()),BucketEntryState::Dead) && + if nr.state(get_aligned_timestamp()).is_alive() && nr.has_any_dial_info() { return Some(nr); } @@ -546,7 +546,7 @@ impl RPCProcessor { // ensure we have some dial info for the entry already, // and that the node is still alive // if not, we should do the find_node anyway - if !matches!(nr.state(get_aligned_timestamp()),BucketEntryState::Dead) && + if nr.state(get_aligned_timestamp()).is_alive() && nr.has_any_dial_info() { return Ok(Some(nr)); } @@ -1496,7 +1496,7 @@ impl RPCProcessor { log_rpc!(debug "Invalid RPC Operation: {}", e); // Punish nodes that send direct undecodable crap - address_filter.punish_node_id(sender_node_id); + address_filter.punish_node_id(sender_node_id, PunishmentReason::FailedToDecodeOperation); }, // Ignored messages that should be dropped RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => { @@ -1520,7 +1520,7 @@ impl RPCProcessor { // Ensure the sender peer info is for the actual sender specified in the envelope if !sender_peer_info.node_ids().contains(&sender_node_id) { // Attempted to update peer info for the wrong node id - address_filter.punish_node_id(sender_node_id); + address_filter.punish_node_id(sender_node_id, PunishmentReason::WrongSenderPeerInfo); return Ok(NetworkResult::invalid_message( "attempt to update peer info for non-sender node id", )); @@ -1532,7 +1532,7 @@ impl RPCProcessor { sender_peer_info.signed_node_info(), &[], ) { - address_filter.punish_node_id(sender_node_id); + address_filter.punish_node_id(sender_node_id, PunishmentReason::FailedToVerifySenderPeerInfo); return Ok(NetworkResult::invalid_message( format!("sender peerinfo has invalid peer scope: {:?}",sender_peer_info.signed_node_info()) )); @@ -1544,7 +1544,7 @@ impl RPCProcessor { ) { Ok(v) => Some(v), Err(e) => { - address_filter.punish_node_id(sender_node_id); + address_filter.punish_node_id(sender_node_id, PunishmentReason::FailedToRegisterSenderPeerInfo); return Ok(NetworkResult::invalid_message(e)); } } @@ -1555,8 +1555,9 @@ impl RPCProcessor { opt_sender_nr = match self.routing_table().lookup_node_ref(sender_node_id) { Ok(v) => v, Err(e) => { - address_filter.punish_node_id(sender_node_id); - return Ok(NetworkResult::invalid_message(e)); + // If this fails it's not the other node's fault. We should be able to look up a + // node ref for a registered sender node id that just sent a message to us + return Ok(NetworkResult::no_connection_other(e)); } } } @@ -1584,7 +1585,7 @@ impl RPCProcessor { log_rpc!(debug "Dropping RPC operation: {}", e); // XXX: Punish routes that send routed undecodable crap - // address_filter.punish_route_id(xxx); + // address_filter.punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage); return Ok(NetworkResult::invalid_message(e)); } }; diff --git a/veilid-core/src/storage_manager/tasks/check_active_watches.rs b/veilid-core/src/storage_manager/tasks/check_active_watches.rs index 2da4da81..f999749a 100644 --- a/veilid-core/src/storage_manager/tasks/check_active_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_active_watches.rs @@ -27,10 +27,7 @@ impl StorageManager { // See if the active watch's node is dead let mut is_dead = false; - if matches!( - active_watch.watch_node.state(cur_ts), - BucketEntryState::Dead - ) { + if !active_watch.watch_node.state(cur_ts).is_alive() { // Watched node is dead is_dead = true; } diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 287af751..7a30a12d 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -49,7 +49,9 @@ pub fn format_opt_bps(bps: Option) -> String { } fn get_bucket_entry_state(text: &str) -> Option { - if text == "dead" { + if text == "punished" { + Some(BucketEntryState::Punished) + } else if text == "dead" { Some(BucketEntryState::Dead) } else if text == "reliable" { Some(BucketEntryState::Reliable)