Merge branch 'improved-punishment-and-state' into 'main'

Improved punishment and state

Closes #281

See merge request veilid/veilid!287
This commit is contained in:
Christien Rioux 2024-06-28 17:42:00 +00:00
commit be11b4543c
16 changed files with 248 additions and 116 deletions

View File

@ -27,9 +27,9 @@ struct AddressFilterInner {
conn_count_by_ip6_prefix: BTreeMap<Ipv6Addr, usize>, conn_count_by_ip6_prefix: BTreeMap<Ipv6Addr, usize>,
conn_timestamps_by_ip4: BTreeMap<Ipv4Addr, Vec<Timestamp>>, conn_timestamps_by_ip4: BTreeMap<Ipv4Addr, Vec<Timestamp>>,
conn_timestamps_by_ip6_prefix: BTreeMap<Ipv6Addr, Vec<Timestamp>>, conn_timestamps_by_ip6_prefix: BTreeMap<Ipv6Addr, Vec<Timestamp>>,
punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>, punishments_by_ip4: BTreeMap<Ipv4Addr, Punishment>,
punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>, punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Punishment>,
punishments_by_node_id: BTreeMap<TypedKey, Timestamp>, punishments_by_node_id: BTreeMap<TypedKey, Punishment>,
dial_info_failures: BTreeMap<DialInfo, Timestamp>, dial_info_failures: BTreeMap<DialInfo, Timestamp>,
} }
@ -151,7 +151,7 @@ impl AddressFilter {
let mut dead_keys = Vec::<Ipv4Addr>::new(); let mut dead_keys = Vec::<Ipv4Addr>::new();
for (key, value) in &mut inner.punishments_by_ip4 { for (key, value) in &mut inner.punishments_by_ip4 {
// Drop punishments older than the punishment duration // Drop punishments older than the punishment duration
if cur_ts.as_u64().saturating_sub(value.as_u64()) if cur_ts.as_u64().saturating_sub(value.timestamp.as_u64())
> self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64 > self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64
{ {
dead_keys.push(*key); dead_keys.push(*key);
@ -167,7 +167,7 @@ impl AddressFilter {
let mut dead_keys = Vec::<Ipv6Addr>::new(); let mut dead_keys = Vec::<Ipv6Addr>::new();
for (key, value) in &mut inner.punishments_by_ip6_prefix { for (key, value) in &mut inner.punishments_by_ip6_prefix {
// Drop punishments older than the punishment duration // Drop punishments older than the punishment duration
if cur_ts.as_u64().saturating_sub(value.as_u64()) if cur_ts.as_u64().saturating_sub(value.timestamp.as_u64())
> self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64 > self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64
{ {
dead_keys.push(*key); dead_keys.push(*key);
@ -183,7 +183,7 @@ impl AddressFilter {
let mut dead_keys = Vec::<TypedKey>::new(); let mut dead_keys = Vec::<TypedKey>::new();
for (key, value) in &mut inner.punishments_by_node_id { for (key, value) in &mut inner.punishments_by_node_id {
// Drop punishments older than the punishment duration // Drop punishments older than the punishment duration
if cur_ts.as_u64().saturating_sub(value.as_u64()) if cur_ts.as_u64().saturating_sub(value.timestamp.as_u64())
> self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64 > self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64
{ {
dead_keys.push(*key); dead_keys.push(*key);
@ -194,7 +194,7 @@ impl AddressFilter {
inner.punishments_by_node_id.remove(&key); inner.punishments_by_node_id.remove(&key);
// make the entry alive again if it's still here // make the entry alive again if it's still here
if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(key) { if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(key) {
nr.operate_mut(|_rti, e| e.set_punished(false)); nr.operate_mut(|_rti, e| e.set_punished(None));
} }
} }
} }
@ -278,9 +278,10 @@ impl AddressFilter {
inner.punishments_by_node_id.clear(); inner.punishments_by_node_id.clear();
} }
pub fn punish_ip_addr(&self, addr: IpAddr) { pub fn punish_ip_addr(&self, addr: IpAddr, reason: PunishmentReason) {
log_net!(debug ">>> PUNISHED: {}", addr); log_net!(debug ">>> PUNISHED: {} for {:?}", addr, reason);
let ts = get_aligned_timestamp(); let timestamp = get_aligned_timestamp();
let punishment = Punishment { reason, timestamp };
let ipblock = ip_to_ipblock( let ipblock = ip_to_ipblock(
self.unlocked_inner.max_connections_per_ip6_prefix_size, self.unlocked_inner.max_connections_per_ip6_prefix_size,
@ -292,13 +293,13 @@ impl AddressFilter {
IpAddr::V4(v4) => inner IpAddr::V4(v4) => inner
.punishments_by_ip4 .punishments_by_ip4
.entry(v4) .entry(v4)
.and_modify(|v| *v = ts) .and_modify(|v| *v = punishment)
.or_insert(ts), .or_insert(punishment),
IpAddr::V6(v6) => inner IpAddr::V6(v6) => inner
.punishments_by_ip6_prefix .punishments_by_ip6_prefix
.entry(v6) .entry(v6)
.and_modify(|v| *v = ts) .and_modify(|v| *v = punishment)
.or_insert(ts), .or_insert(punishment),
}; };
} }
@ -314,25 +315,26 @@ impl AddressFilter {
self.is_node_id_punished_inner(&inner, node_id) self.is_node_id_punished_inner(&inner, node_id)
} }
pub fn punish_node_id(&self, node_id: TypedKey) { pub fn punish_node_id(&self, node_id: TypedKey, reason: PunishmentReason) {
if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(node_id) { if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(node_id) {
// make the entry dead if it's punished // make the entry dead if it's punished
nr.operate_mut(|_rti, e| e.set_punished(true)); nr.operate_mut(|_rti, e| e.set_punished(Some(reason)));
} }
let ts = get_aligned_timestamp(); let timestamp = get_aligned_timestamp();
let punishment = Punishment { reason, timestamp };
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if inner.punishments_by_node_id.len() >= MAX_PUNISHMENTS_BY_NODE_ID { if inner.punishments_by_node_id.len() >= MAX_PUNISHMENTS_BY_NODE_ID {
log_net!(debug ">>> PUNISHMENT TABLE FULL: {}", node_id); log_net!(debug ">>> PUNISHMENT TABLE FULL: {}", node_id);
return; return;
} }
log_net!(debug ">>> PUNISHED: {}", node_id); log_net!(debug ">>> PUNISHED: {} for {:?}", node_id, reason);
inner inner
.punishments_by_node_id .punishments_by_node_id
.entry(node_id) .entry(node_id)
.and_modify(|v| *v = ts) .and_modify(|v| *v = punishment)
.or_insert(ts); .or_insert(punishment);
} }
pub async fn address_filter_task_routine( pub async fn address_filter_task_routine(

View File

@ -930,7 +930,8 @@ impl NetworkManager {
// Ensure we can read the magic number // Ensure we can read the magic number
if data.len() < 4 { if data.len() < 4 {
log_net!(debug "short packet"); log_net!(debug "short packet");
self.address_filter().punish_ip_addr(remote_addr); self.address_filter()
.punish_ip_addr(remote_addr, PunishmentReason::ShortPacket);
return Ok(false); return Ok(false);
} }
@ -966,7 +967,8 @@ impl NetworkManager {
Err(e) => { Err(e) => {
log_net!(debug "envelope failed to decode: {}", e); log_net!(debug "envelope failed to decode: {}", e);
// safe to punish here because relays also check here to ensure they arent forwarding things that don't decode // safe to punish here because relays also check here to ensure they arent forwarding things that don't decode
self.address_filter().punish_ip_addr(remote_addr); self.address_filter()
.punish_ip_addr(remote_addr, PunishmentReason::FailedToDecodeEnvelope);
return Ok(false); return Ok(false);
} }
}; };
@ -1099,7 +1101,8 @@ impl NetworkManager {
// Can't punish by ip address here because relaying can't decrypt envelope bodies to check // Can't punish by ip address here because relaying can't decrypt envelope bodies to check
// But because the envelope was properly signed by the time it gets here, it is safe to // But because the envelope was properly signed by the time it gets here, it is safe to
// punish by node id // punish by node id
self.address_filter().punish_node_id(sender_id); self.address_filter()
.punish_node_id(sender_id, PunishmentReason::FailedToDecryptEnvelopeBody);
return Ok(false); return Ok(false);
} }
}; };

View File

@ -362,7 +362,7 @@ impl NetworkConnection {
// Punish invalid framing (tcp framing or websocket framing) // Punish invalid framing (tcp framing or websocket framing)
if v.is_invalid_message() { if v.is_invalid_message() {
address_filter.punish_ip_addr(peer_address.socket_addr().ip()); address_filter.punish_ip_addr(peer_address.socket_addr().ip(), PunishmentReason::InvalidFraming);
return RecvLoopAction::Finish; return RecvLoopAction::Finish;
} }

View File

@ -8,6 +8,7 @@ mod low_level_protocol_type;
mod network_class; mod network_class;
mod peer_address; mod peer_address;
mod protocol_type; mod protocol_type;
mod punishment;
mod relay_kind; mod relay_kind;
mod signal_info; mod signal_info;
mod socket_address; mod socket_address;
@ -24,6 +25,7 @@ pub use low_level_protocol_type::*;
pub use network_class::*; pub use network_class::*;
pub use peer_address::*; pub use peer_address::*;
pub use protocol_type::*; pub use protocol_type::*;
pub use punishment::*;
pub use relay_kind::*; pub use relay_kind::*;
pub use signal_info::*; pub use signal_info::*;
pub use socket_address::*; pub use socket_address::*;

View File

@ -0,0 +1,23 @@
use super::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PunishmentReason {
// IP-level punishments
FailedToDecryptEnvelopeBody,
FailedToDecodeEnvelope,
ShortPacket,
InvalidFraming,
// Node-level punishments
FailedToDecodeOperation,
WrongSenderPeerInfo,
FailedToVerifySenderPeerInfo,
FailedToRegisterSenderPeerInfo,
// Route-level punishments
// FailedToDecodeRoutedMessage,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Punishment {
pub reason: PunishmentReason,
pub timestamp: Timestamp,
}

View File

@ -26,14 +26,6 @@ struct SerializedBucketData {
entries: Vec<SerializedBucketEntryData>, entries: Vec<SerializedBucketEntryData>,
} }
fn state_ordering(state: BucketEntryState) -> usize {
match state {
BucketEntryState::Dead => 0,
BucketEntryState::Unreliable => 1,
BucketEntryState::Reliable => 2,
}
}
impl Bucket { impl Bucket {
pub fn new(kind: CryptoKind) -> Self { pub fn new(kind: CryptoKind) -> Self {
Self { Self {
@ -142,9 +134,9 @@ impl Bucket {
} }
a.1.with_inner(|ea| { a.1.with_inner(|ea| {
b.1.with_inner(|eb| { b.1.with_inner(|eb| {
let astate = state_ordering(ea.state(cur_ts)); let astate = ea.state(cur_ts).ordering();
let bstate = state_ordering(eb.state(cur_ts)); let bstate = eb.state(cur_ts).ordering();
// first kick dead nodes, then unreliable nodes // first kick punished nodes, then dead nodes, then unreliable nodes
if astate < bstate { if astate < bstate {
return core::cmp::Ordering::Less; return core::cmp::Ordering::Less;
} }

View File

@ -24,17 +24,73 @@ const UNRELIABLE_PING_SPAN_SECS: u32 = 60;
const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
/// How many times do we try to ping a never-reached node before we call it dead /// How many times do we try to ping a never-reached node before we call it dead
const NEVER_REACHED_PING_COUNT: u32 = 3; const NEVER_SEEN_PING_COUNT: u32 = 3;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum BucketEntryDeadReason {
FailedToSend,
TooManyLostAnswers,
NoPingResponse,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum BucketEntryUnreliableReason {
FailedToSend,
LostAnswers,
NotSeenConsecutively,
InUnreliablePingSpan,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum BucketEntryStateReason {
Punished(PunishmentReason),
Dead(BucketEntryDeadReason),
Unreliable(BucketEntryUnreliableReason),
Reliable,
}
// Do not change order here, it will mess up other sorts // Do not change order here, it will mess up other sorts
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum BucketEntryState { pub(crate) enum BucketEntryState {
Punished,
Dead, Dead,
Unreliable, Unreliable,
Reliable, Reliable,
} }
impl BucketEntryState {
pub fn is_alive(&self) -> bool {
match self {
BucketEntryState::Punished => false,
BucketEntryState::Dead => false,
BucketEntryState::Unreliable => true,
BucketEntryState::Reliable => true,
}
}
pub fn ordering(&self) -> usize {
match self {
BucketEntryState::Punished => 0,
BucketEntryState::Dead => 1,
BucketEntryState::Unreliable => 2,
BucketEntryState::Reliable => 3,
}
}
}
impl From<BucketEntryStateReason> for BucketEntryState {
fn from(value: BucketEntryStateReason) -> Self
{
match value {
BucketEntryStateReason::Punished(_) => BucketEntryState::Punished,
BucketEntryStateReason::Dead(_) => BucketEntryState::Dead,
BucketEntryStateReason::Unreliable(_) => BucketEntryState::Unreliable,
BucketEntryStateReason::Reliable => BucketEntryState::Reliable,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub(crate) struct LastFlowKey(ProtocolType, AddressType); pub(crate) struct LastFlowKey(ProtocolType, AddressType);
@ -92,7 +148,7 @@ pub(crate) struct BucketEntryInner {
transfer_stats_accounting: TransferStatsAccounting, transfer_stats_accounting: TransferStatsAccounting,
/// If the entry is being punished and should be considered dead /// If the entry is being punished and should be considered dead
#[serde(skip)] #[serde(skip)]
is_punished: bool, punishment: Option<PunishmentReason>,
/// Tracking identifier for NodeRef debugging /// Tracking identifier for NodeRef debugging
#[cfg(feature = "tracking")] #[cfg(feature = "tracking")]
#[serde(skip)] #[serde(skip)]
@ -426,7 +482,7 @@ impl BucketEntryInner {
// Stores a flow in this entry's table of last flows // Stores a flow in this entry's table of last flows
pub fn set_last_flow(&mut self, last_flow: Flow, timestamp: Timestamp) { pub fn set_last_flow(&mut self, last_flow: Flow, timestamp: Timestamp) {
if self.is_punished { if self.punishment.is_some() {
// Don't record connection if this entry is currently punished // Don't record connection if this entry is currently punished
return; return;
} }
@ -558,21 +614,25 @@ impl BucketEntryInner {
self.envelope_support.iter().rev().find(|x| VALID_ENVELOPE_VERSIONS.contains(x)).copied() self.envelope_support.iter().rev().find(|x| VALID_ENVELOPE_VERSIONS.contains(x)).copied()
} }
pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState { pub fn state_reason(&self, cur_ts: Timestamp) -> BucketEntryStateReason {
if self.is_punished { if let Some(punished_reason) = self.punishment {
return BucketEntryState::Dead; BucketEntryStateReason::Punished(punished_reason)
} } else if let Some(dead_reason) = self.check_dead(cur_ts) {
if self.check_reliable(cur_ts) { BucketEntryStateReason::Dead(dead_reason)
BucketEntryState::Reliable } else if let Some(unreliable_reason) = self.check_unreliable(cur_ts) {
} else if self.check_dead(cur_ts) { BucketEntryStateReason::Unreliable(unreliable_reason)
BucketEntryState::Dead
} else { } else {
BucketEntryState::Unreliable BucketEntryStateReason::Reliable
} }
} }
pub fn set_punished(&mut self, punished: bool) {
self.is_punished = punished; pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
if punished { self.state_reason(cur_ts).into()
}
pub fn set_punished(&mut self, punished: Option<PunishmentReason>) {
self.punishment = punished;
if punished.is_some() {
self.clear_last_flows(); self.clear_last_flows();
} }
} }
@ -650,42 +710,59 @@ impl BucketEntryInner {
} }
///// state machine handling ///// state machine handling
pub(super) fn check_reliable(&self, cur_ts: Timestamp) -> bool { pub(super) fn check_unreliable(&self, cur_ts: Timestamp) -> Option<BucketEntryUnreliableReason> {
// If we have had any failures to send, this is not reliable // If we have had any failures to send, this is not reliable
if self.peer_stats.rpc_stats.failed_to_send > 0 { if self.peer_stats.rpc_stats.failed_to_send > 0 {
return false; return Some(BucketEntryUnreliableReason::FailedToSend);
} }
// If we have had any lost answers recently, this is not reliable // If we have had any lost answers recently, this is not reliable
if self.peer_stats.rpc_stats.recent_lost_answers > 0 { if self.peer_stats.rpc_stats.recent_lost_answers > 0 {
return false; return Some(BucketEntryUnreliableReason::LostAnswers);
} }
match self.peer_stats.rpc_stats.first_consecutive_seen_ts { match self.peer_stats.rpc_stats.first_consecutive_seen_ts {
// If we have not seen seen a node consecutively, it can't be reliable // If we have not seen seen a node consecutively, it can't be reliable
None => false, None => return Some(BucketEntryUnreliableReason::NotSeenConsecutively),
// If we have seen the node consistently for longer than UNRELIABLE_PING_SPAN_SECS then it is reliable // If not have seen the node consistently for longer than UNRELIABLE_PING_SPAN_SECS then it is unreliable
Some(ts) => { Some(ts) => {
cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) let seen_consecutively = cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1_000_000u64);
if !seen_consecutively {
return Some(BucketEntryUnreliableReason::InUnreliablePingSpan);
}
} }
} }
None
} }
pub(super) fn check_dead(&self, cur_ts: Timestamp) -> bool { pub(super) fn check_dead(&self, cur_ts: Timestamp) -> Option<BucketEntryDeadReason> {
// If we have failed to send NEVER_REACHED_PING_COUNT times in a row, the node is dead // If we have failed to send NEVER_REACHED_PING_COUNT times in a row, the node is dead
if self.peer_stats.rpc_stats.failed_to_send >= NEVER_REACHED_PING_COUNT { if self.peer_stats.rpc_stats.failed_to_send >= NEVER_SEEN_PING_COUNT {
return true; return Some(BucketEntryDeadReason::FailedToSend);
} }
match self.peer_stats.rpc_stats.last_seen_ts { match self.peer_stats.rpc_stats.last_seen_ts {
// a node is not dead if we haven't heard from it yet, // a node is not dead if we haven't heard from it yet,
// but we give it NEVER_REACHED_PING_COUNT chances to ping before we say it's dead // but we give it NEVER_REACHED_PING_COUNT chances to ping before we say it's dead
None => self.peer_stats.rpc_stats.recent_lost_answers >= NEVER_REACHED_PING_COUNT, None => {
let no_answers = self.peer_stats.rpc_stats.recent_lost_answers >= NEVER_SEEN_PING_COUNT;
if no_answers {
return Some(BucketEntryDeadReason::TooManyLostAnswers)
}
}
// return dead if we have not heard from the node at all for the duration of the unreliable ping span // return dead if we have not heard from the node at all for the duration of the unreliable ping span
// and we have tried to reach it and failed the entire time of unreliable ping span
Some(ts) => { Some(ts) => {
cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) let not_seen = cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1_000_000u64);
let no_answers = self.peer_stats.rpc_stats.recent_lost_answers >= (UNRELIABLE_PING_SPAN_SECS / UNRELIABLE_PING_INTERVAL_SECS);
if not_seen && no_answers {
return Some(BucketEntryDeadReason::NoPingResponse)
}
} }
} }
None
} }
/// Return the last time we either saw a node, or asked it a question /// Return the last time we either saw a node, or asked it a question
@ -746,12 +823,17 @@ impl BucketEntryInner {
} }
BucketEntryState::Unreliable => { BucketEntryState::Unreliable => {
// If we are in an unreliable state, we need a ping every UNRELIABLE_PING_INTERVAL_SECS seconds // If we are in an unreliable state, we need a ping every UNRELIABLE_PING_INTERVAL_SECS seconds
self.needs_constant_ping(cur_ts, TimestampDuration::new(UNRELIABLE_PING_INTERVAL_SECS as u64 * 1000000u64)) self.needs_constant_ping(cur_ts, TimestampDuration::new(UNRELIABLE_PING_INTERVAL_SECS as u64 * 1_000_000u64))
} }
BucketEntryState::Dead => { BucketEntryState::Dead => {
error!("Should not be asking this for dead nodes"); error!("Should not be asking this for dead nodes");
false false
} }
BucketEntryState::Punished => {
error!("Should not be asking this for punished nodes");
false
}
} }
} }
@ -773,7 +855,7 @@ impl BucketEntryInner {
self.peer_stats.rpc_stats.last_seen_ts = None; self.peer_stats.rpc_stats.last_seen_ts = None;
self.peer_stats.rpc_stats.failed_to_send = 0; self.peer_stats.rpc_stats.failed_to_send = 0;
self.peer_stats.rpc_stats.recent_lost_answers = 0; self.peer_stats.rpc_stats.recent_lost_answers = 0;
assert!(!self.check_dead(cur_ts)); assert!(self.check_dead(cur_ts).is_none());
} }
pub(super) fn _state_debug_info(&self, cur_ts: Timestamp) -> String { pub(super) fn _state_debug_info(&self, cur_ts: Timestamp) -> String {
@ -798,7 +880,7 @@ impl BucketEntryInner {
format!( format!(
"state: {:?}, first_consecutive_seen_ts: {}, last_seen_ts: {}", "state: {:?}, first_consecutive_seen_ts: {}, last_seen_ts: {}",
self.state(cur_ts), self.state_reason(cur_ts),
first_consecutive_seen_ts, first_consecutive_seen_ts,
last_seen_ts_str last_seen_ts_str
) )
@ -885,7 +967,7 @@ impl BucketEntry {
}, },
latency_stats_accounting: LatencyStatsAccounting::new(), latency_stats_accounting: LatencyStatsAccounting::new(),
transfer_stats_accounting: TransferStatsAccounting::new(), transfer_stats_accounting: TransferStatsAccounting::new(),
is_punished: false, punishment: None,
#[cfg(feature = "tracking")] #[cfg(feature = "tracking")]
next_track_id: 0, next_track_id: 0,
#[cfg(feature = "tracking")] #[cfg(feature = "tracking")]

View File

@ -104,6 +104,35 @@ impl RoutingTable {
out out
} }
fn format_state_reason(state_reason: BucketEntryStateReason) -> &'static str {
match state_reason {
BucketEntryStateReason::Punished(p) => match p {
PunishmentReason::FailedToDecryptEnvelopeBody => "PCRYPT",
PunishmentReason::FailedToDecodeEnvelope => "PDECEN",
PunishmentReason::ShortPacket => "PSHORT",
PunishmentReason::InvalidFraming => "PFRAME",
PunishmentReason::FailedToDecodeOperation => "PDECOP",
PunishmentReason::WrongSenderPeerInfo => "PSPBAD",
PunishmentReason::FailedToVerifySenderPeerInfo => "PSPVER",
PunishmentReason::FailedToRegisterSenderPeerInfo => "PSPREG",
//
},
BucketEntryStateReason::Dead(d) => match d {
BucketEntryDeadReason::FailedToSend => "DFSEND",
BucketEntryDeadReason::TooManyLostAnswers => "DALOST",
BucketEntryDeadReason::NoPingResponse => "DNOPNG",
},
BucketEntryStateReason::Unreliable(u) => match u {
BucketEntryUnreliableReason::FailedToSend => "UFSEND",
BucketEntryUnreliableReason::LostAnswers => "UALOST",
BucketEntryUnreliableReason::NotSeenConsecutively => "UNSEEN",
BucketEntryUnreliableReason::InUnreliablePingSpan => "UUPING",
//
},
BucketEntryStateReason::Reliable => "RELIBL",
}
}
pub(crate) fn debug_info_entries( pub(crate) fn debug_info_entries(
&self, &self,
min_state: BucketEntryState, min_state: BucketEntryState,
@ -142,15 +171,11 @@ impl RoutingTable {
if !filtered_entries.is_empty() { if !filtered_entries.is_empty() {
out += &format!("{} Bucket #{}:\n", ck, b); out += &format!("{} Bucket #{}:\n", ck, b);
for e in filtered_entries { for e in filtered_entries {
let state = e.1.with(inner, |_rti, e| e.state(cur_ts)); let state_reason = e.1.with(inner, |_rti, e| e.state_reason(cur_ts));
out += &format!( out += &format!(
" {} [{}] {} [{}]\n", " {} [{}] {} [{}]\n",
e.0.encode(), e.0.encode(),
match state { Self::format_state_reason(state_reason),
BucketEntryState::Reliable => "R",
BucketEntryState::Unreliable => "U",
BucketEntryState::Dead => "D",
},
e.1.with(inner, |_rti, e| { e.1.with(inner, |_rti, e| {
e.peer_stats() e.peer_stats()
.latency .latency

View File

@ -750,8 +750,8 @@ impl RoutingTable {
let cur_ts = get_aligned_timestamp(); let cur_ts = get_aligned_timestamp();
self.inner self.inner
.write() .write()
.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| { .with_entries_mut(cur_ts, BucketEntryState::Punished, |rti, e| {
e.with_mut(rti, |_rti, ei| ei.set_punished(false)); e.with_mut(rti, |_rti, ei| ei.set_punished(None));
Option::<()>::None Option::<()>::None
}); });
} }

View File

@ -129,6 +129,9 @@ pub(crate) trait NodeRefBase: Sized {
fn best_envelope_version(&self) -> Option<u8> { fn best_envelope_version(&self) -> Option<u8> {
self.operate(|_rti, e| e.best_envelope_version()) self.operate(|_rti, e| e.best_envelope_version())
} }
fn state_reason(&self, cur_ts: Timestamp) -> BucketEntryStateReason {
self.operate(|_rti, e| e.state_reason(cur_ts))
}
fn state(&self, cur_ts: Timestamp) -> BucketEntryState { fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
self.operate(|_rti, e| e.state(cur_ts)) self.operate(|_rti, e| e.state(cur_ts))
} }

View File

@ -559,7 +559,7 @@ impl RoutingTableInner {
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn get_all_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec<NodeRef> { pub fn get_all_alive_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec<NodeRef> {
let mut node_refs = Vec::<NodeRef>::with_capacity(self.bucket_entry_count()); let mut node_refs = Vec::<NodeRef>::with_capacity(self.bucket_entry_count());
self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, entry| { self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, entry| {
node_refs.push(NodeRef::new(outer_self.clone(), entry, None)); node_refs.push(NodeRef::new(outer_self.clone(), entry, None));
@ -873,6 +873,7 @@ impl RoutingTableInner {
// Routing Table Health Metrics // Routing Table Health Metrics
pub fn get_routing_table_health(&self) -> RoutingTableHealth { pub fn get_routing_table_health(&self) -> RoutingTableHealth {
let mut _punished_entry_count: usize = 0;
let mut reliable_entry_count: usize = 0; let mut reliable_entry_count: usize = 0;
let mut unreliable_entry_count: usize = 0; let mut unreliable_entry_count: usize = 0;
let mut dead_entry_count: usize = 0; let mut dead_entry_count: usize = 0;
@ -889,6 +890,9 @@ impl RoutingTableInner {
BucketEntryState::Dead => { BucketEntryState::Dead => {
dead_entry_count += 1; dead_entry_count += 1;
} }
BucketEntryState::Punished => {
_punished_entry_count += 1;
}
} }
} }
@ -1065,19 +1069,11 @@ impl RoutingTableInner {
{ {
let cur_ts = get_aligned_timestamp(); let cur_ts = get_aligned_timestamp();
// Add filter to remove dead nodes always // always filter out self peer, as it is irrelevant to the 'fastest nodes' search
let filter_dead = Box::new( let filter_self =
move |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| { Box::new(move |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| v.is_some())
if let Some(entry) = &v { as RoutingTableEntryFilter;
// always filter out dead nodes filters.push_front(filter_self);
!entry.with_inner(|e| e.state(cur_ts) == BucketEntryState::Dead)
} else {
// always filter out self peer, as it is irrelevant to the 'fastest nodes' search
false
}
},
) as RoutingTableEntryFilter;
filters.push_front(filter_dead);
// Fastest sort // Fastest sort
let sort = |_rti: &RoutingTableInner, let sort = |_rti: &RoutingTableInner,
@ -1106,8 +1102,8 @@ impl RoutingTableInner {
let be = b_entry.as_ref().unwrap(); let be = b_entry.as_ref().unwrap();
ae.with_inner(|ae| { ae.with_inner(|ae| {
be.with_inner(|be| { be.with_inner(|be| {
let ra = ae.check_reliable(cur_ts); let ra = ae.check_unreliable(cur_ts).is_none();
let rb = be.check_reliable(cur_ts); let rb = be.check_unreliable(cur_ts).is_none();
if ra != rb { if ra != rb {
if ra { if ra {
return core::cmp::Ordering::Less; return core::cmp::Ordering::Less;
@ -1159,6 +1155,7 @@ impl RoutingTableInner {
}; };
// Filter to ensure entries support the crypto kind in use // Filter to ensure entries support the crypto kind in use
// always filter out dead and punished nodes
let filter = Box::new( let filter = Box::new(
move |_rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| { move |_rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
if let Some(entry) = opt_entry { if let Some(entry) = opt_entry {
@ -1187,12 +1184,12 @@ impl RoutingTableInner {
} }
// reliable nodes come first, pessimistically treating our own node as unreliable // reliable nodes come first, pessimistically treating our own node as unreliable
let ra = a_entry let ra = a_entry.as_ref().map_or(false, |x| {
.as_ref() x.with_inner(|x| x.check_unreliable(cur_ts).is_none())
.map_or(false, |x| x.with_inner(|x| x.check_reliable(cur_ts))); });
let rb = b_entry let rb = b_entry.as_ref().map_or(false, |x| {
.as_ref() x.with_inner(|x| x.check_unreliable(cur_ts).is_none())
.map_or(false, |x| x.with_inner(|x| x.check_reliable(cur_ts))); });
if ra != rb { if ra != rb {
if ra { if ra {
return core::cmp::Ordering::Less; return core::cmp::Ordering::Less;

View File

@ -54,8 +54,8 @@ impl RoutingTable {
let state = entry.with(&inner, |_rti, e| e.state(cur_ts)); let state = entry.with(&inner, |_rti, e| e.state(cur_ts));
match state { match state {
BucketEntryState::Dead => { BucketEntryState::Dead | BucketEntryState::Punished => {
// Do nothing with dead entries // Do nothing with dead or punished entries
} }
BucketEntryState::Unreliable => { BucketEntryState::Unreliable => {
// Add to closest unreliable nodes list // Add to closest unreliable nodes list

View File

@ -63,10 +63,13 @@ impl RoutingTable {
// If we already have a relay, see if it is dead, or if we don't need it any more // If we already have a relay, see if it is dead, or if we don't need it any more
let has_relay = { let has_relay = {
if let Some(relay_node) = self.relay_node(RoutingDomain::PublicInternet) { if let Some(relay_node) = self.relay_node(RoutingDomain::PublicInternet) {
let state = relay_node.state(cur_ts); let state_reason = relay_node.state_reason(cur_ts);
// Relay node is dead or no longer needed // Relay node is dead or no longer needed
if matches!(state, BucketEntryState::Dead) { if matches!(
log_rtab!(debug "Relay node died, dropping relay {}", relay_node); state_reason,
BucketEntryStateReason::Dead(_) | BucketEntryStateReason::Punished(_)
) {
log_rtab!(debug "Relay node is now {:?}, dropping relay {}", state_reason, relay_node);
editor.clear_relay_node(); editor.clear_relay_node();
false false
} }

View File

@ -504,7 +504,7 @@ impl RPCProcessor {
// ensure we have some dial info for the entry already, // ensure we have some dial info for the entry already,
// and that the node is still alive // and that the node is still alive
// if not, we should keep looking for better info // if not, we should keep looking for better info
if !matches!(nr.state(get_aligned_timestamp()),BucketEntryState::Dead) && if nr.state(get_aligned_timestamp()).is_alive() &&
nr.has_any_dial_info() { nr.has_any_dial_info() {
return Some(nr); return Some(nr);
} }
@ -546,7 +546,7 @@ impl RPCProcessor {
// ensure we have some dial info for the entry already, // ensure we have some dial info for the entry already,
// and that the node is still alive // and that the node is still alive
// if not, we should do the find_node anyway // if not, we should do the find_node anyway
if !matches!(nr.state(get_aligned_timestamp()),BucketEntryState::Dead) && if nr.state(get_aligned_timestamp()).is_alive() &&
nr.has_any_dial_info() { nr.has_any_dial_info() {
return Ok(Some(nr)); return Ok(Some(nr));
} }
@ -1496,7 +1496,7 @@ impl RPCProcessor {
log_rpc!(debug "Invalid RPC Operation: {}", e); log_rpc!(debug "Invalid RPC Operation: {}", e);
// Punish nodes that send direct undecodable crap // Punish nodes that send direct undecodable crap
address_filter.punish_node_id(sender_node_id); address_filter.punish_node_id(sender_node_id, PunishmentReason::FailedToDecodeOperation);
}, },
// Ignored messages that should be dropped // Ignored messages that should be dropped
RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => { RPCError::Ignore(_) | RPCError::Network(_) | RPCError::TryAgain(_) => {
@ -1520,7 +1520,7 @@ impl RPCProcessor {
// Ensure the sender peer info is for the actual sender specified in the envelope // Ensure the sender peer info is for the actual sender specified in the envelope
if !sender_peer_info.node_ids().contains(&sender_node_id) { if !sender_peer_info.node_ids().contains(&sender_node_id) {
// Attempted to update peer info for the wrong node id // Attempted to update peer info for the wrong node id
address_filter.punish_node_id(sender_node_id); address_filter.punish_node_id(sender_node_id, PunishmentReason::WrongSenderPeerInfo);
return Ok(NetworkResult::invalid_message( return Ok(NetworkResult::invalid_message(
"attempt to update peer info for non-sender node id", "attempt to update peer info for non-sender node id",
)); ));
@ -1532,7 +1532,7 @@ impl RPCProcessor {
sender_peer_info.signed_node_info(), sender_peer_info.signed_node_info(),
&[], &[],
) { ) {
address_filter.punish_node_id(sender_node_id); address_filter.punish_node_id(sender_node_id, PunishmentReason::FailedToVerifySenderPeerInfo);
return Ok(NetworkResult::invalid_message( return Ok(NetworkResult::invalid_message(
format!("sender peerinfo has invalid peer scope: {:?}",sender_peer_info.signed_node_info()) format!("sender peerinfo has invalid peer scope: {:?}",sender_peer_info.signed_node_info())
)); ));
@ -1544,7 +1544,7 @@ impl RPCProcessor {
) { ) {
Ok(v) => Some(v), Ok(v) => Some(v),
Err(e) => { Err(e) => {
address_filter.punish_node_id(sender_node_id); address_filter.punish_node_id(sender_node_id, PunishmentReason::FailedToRegisterSenderPeerInfo);
return Ok(NetworkResult::invalid_message(e)); return Ok(NetworkResult::invalid_message(e));
} }
} }
@ -1555,8 +1555,9 @@ impl RPCProcessor {
opt_sender_nr = match self.routing_table().lookup_node_ref(sender_node_id) { opt_sender_nr = match self.routing_table().lookup_node_ref(sender_node_id) {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
address_filter.punish_node_id(sender_node_id); // If this fails it's not the other node's fault. We should be able to look up a
return Ok(NetworkResult::invalid_message(e)); // node ref for a registered sender node id that just sent a message to us
return Ok(NetworkResult::no_connection_other(e));
} }
} }
} }
@ -1584,7 +1585,7 @@ impl RPCProcessor {
log_rpc!(debug "Dropping RPC operation: {}", e); log_rpc!(debug "Dropping RPC operation: {}", e);
// XXX: Punish routes that send routed undecodable crap // XXX: Punish routes that send routed undecodable crap
// address_filter.punish_route_id(xxx); // address_filter.punish_route_id(xxx, PunishmentReason::FailedToDecodeRoutedMessage);
return Ok(NetworkResult::invalid_message(e)); return Ok(NetworkResult::invalid_message(e));
} }
}; };

View File

@ -27,10 +27,7 @@ impl StorageManager {
// See if the active watch's node is dead // See if the active watch's node is dead
let mut is_dead = false; let mut is_dead = false;
if matches!( if !active_watch.watch_node.state(cur_ts).is_alive() {
active_watch.watch_node.state(cur_ts),
BucketEntryState::Dead
) {
// Watched node is dead // Watched node is dead
is_dead = true; is_dead = true;
} }

View File

@ -49,7 +49,9 @@ pub fn format_opt_bps(bps: Option<ByteCount>) -> String {
} }
fn get_bucket_entry_state(text: &str) -> Option<BucketEntryState> { fn get_bucket_entry_state(text: &str) -> Option<BucketEntryState> {
if text == "dead" { if text == "punished" {
Some(BucketEntryState::Punished)
} else if text == "dead" {
Some(BucketEntryState::Dead) Some(BucketEntryState::Dead)
} else if text == "reliable" { } else if text == "reliable" {
Some(BucketEntryState::Reliable) Some(BucketEntryState::Reliable)