mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-10-01 01:26:08 -04:00
punish by node id
This commit is contained in:
parent
80cb23c0c6
commit
3264b568d0
@ -3,6 +3,7 @@ use alloc::collections::btree_map::Entry;
|
|||||||
|
|
||||||
// XXX: Move to config eventually?
|
// XXX: Move to config eventually?
|
||||||
const PUNISHMENT_DURATION_MIN: usize = 60;
|
const PUNISHMENT_DURATION_MIN: usize = 60;
|
||||||
|
const MAX_PUNISHMENTS_BY_NODE_ID: usize = 65536;
|
||||||
|
|
||||||
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub enum AddressFilterError {
|
pub enum AddressFilterError {
|
||||||
@ -26,15 +27,37 @@ struct AddressFilterInner {
|
|||||||
conn_timestamps_by_ip6_prefix: BTreeMap<Ipv6Addr, Vec<Timestamp>>,
|
conn_timestamps_by_ip6_prefix: BTreeMap<Ipv6Addr, Vec<Timestamp>>,
|
||||||
punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>,
|
punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>,
|
||||||
punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>,
|
punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>,
|
||||||
|
punishments_by_node_id: BTreeMap<TypedKey, Timestamp>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct AddressFilterUnlockedInner {
|
struct AddressFilterUnlockedInner {
|
||||||
max_connections_per_ip4: usize,
|
max_connections_per_ip4: usize,
|
||||||
max_connections_per_ip6_prefix: usize,
|
max_connections_per_ip6_prefix: usize,
|
||||||
max_connections_per_ip6_prefix_size: usize,
|
max_connections_per_ip6_prefix_size: usize,
|
||||||
max_connection_frequency_per_min: usize,
|
max_connection_frequency_per_min: usize,
|
||||||
punishment_duration_min: usize,
|
punishment_duration_min: usize,
|
||||||
|
routing_table: RoutingTable,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for AddressFilterUnlockedInner {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("AddressFilterUnlockedInner")
|
||||||
|
.field("max_connections_per_ip4", &self.max_connections_per_ip4)
|
||||||
|
.field(
|
||||||
|
"max_connections_per_ip6_prefix",
|
||||||
|
&self.max_connections_per_ip6_prefix,
|
||||||
|
)
|
||||||
|
.field(
|
||||||
|
"max_connections_per_ip6_prefix_size",
|
||||||
|
&self.max_connections_per_ip6_prefix_size,
|
||||||
|
)
|
||||||
|
.field(
|
||||||
|
"max_connection_frequency_per_min",
|
||||||
|
&self.max_connection_frequency_per_min,
|
||||||
|
)
|
||||||
|
.field("punishment_duration_min", &self.punishment_duration_min)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@ -44,7 +67,7 @@ pub struct AddressFilter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl AddressFilter {
|
impl AddressFilter {
|
||||||
pub fn new(config: VeilidConfig) -> Self {
|
pub fn new(config: VeilidConfig, routing_table: RoutingTable) -> Self {
|
||||||
let c = config.get();
|
let c = config.get();
|
||||||
Self {
|
Self {
|
||||||
unlocked_inner: Arc::new(AddressFilterUnlockedInner {
|
unlocked_inner: Arc::new(AddressFilterUnlockedInner {
|
||||||
@ -55,6 +78,7 @@ impl AddressFilter {
|
|||||||
max_connection_frequency_per_min: c.network.max_connection_frequency_per_min
|
max_connection_frequency_per_min: c.network.max_connection_frequency_per_min
|
||||||
as usize,
|
as usize,
|
||||||
punishment_duration_min: PUNISHMENT_DURATION_MIN,
|
punishment_duration_min: PUNISHMENT_DURATION_MIN,
|
||||||
|
routing_table,
|
||||||
}),
|
}),
|
||||||
inner: Arc::new(Mutex::new(AddressFilterInner {
|
inner: Arc::new(Mutex::new(AddressFilterInner {
|
||||||
conn_count_by_ip4: BTreeMap::new(),
|
conn_count_by_ip4: BTreeMap::new(),
|
||||||
@ -63,6 +87,7 @@ impl AddressFilter {
|
|||||||
conn_timestamps_by_ip6_prefix: BTreeMap::new(),
|
conn_timestamps_by_ip6_prefix: BTreeMap::new(),
|
||||||
punishments_by_ip4: BTreeMap::new(),
|
punishments_by_ip4: BTreeMap::new(),
|
||||||
punishments_by_ip6_prefix: BTreeMap::new(),
|
punishments_by_ip6_prefix: BTreeMap::new(),
|
||||||
|
punishments_by_node_id: BTreeMap::new(),
|
||||||
})),
|
})),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -135,9 +160,29 @@ impl AddressFilter {
|
|||||||
inner.punishments_by_ip6_prefix.remove(&key);
|
inner.punishments_by_ip6_prefix.remove(&key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// node id
|
||||||
|
{
|
||||||
|
let mut dead_keys = Vec::<TypedKey>::new();
|
||||||
|
for (key, value) in &mut inner.punishments_by_node_id {
|
||||||
|
// Drop punishments older than the punishment duration
|
||||||
|
if cur_ts.as_u64().saturating_sub(value.as_u64())
|
||||||
|
> self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64
|
||||||
|
{
|
||||||
|
dead_keys.push(*key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for key in dead_keys {
|
||||||
|
log_net!(debug ">>> FORGIVING: {}", key);
|
||||||
|
inner.punishments_by_node_id.remove(&key);
|
||||||
|
// make the entry alive again if it's still here
|
||||||
|
if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(key) {
|
||||||
|
nr.operate_mut(|_rti, e| e.set_punished(false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool {
|
fn is_ip_addr_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool {
|
||||||
match ipblock {
|
match ipblock {
|
||||||
IpAddr::V4(v4) => {
|
IpAddr::V4(v4) => {
|
||||||
if inner.punishments_by_ip4.contains_key(&v4) {
|
if inner.punishments_by_ip4.contains_key(&v4) {
|
||||||
@ -153,16 +198,16 @@ impl AddressFilter {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_punished(&self, addr: IpAddr) -> bool {
|
pub fn is_ip_addr_punished(&self, addr: IpAddr) -> bool {
|
||||||
let inner = self.inner.lock();
|
let inner = self.inner.lock();
|
||||||
let ipblock = ip_to_ipblock(
|
let ipblock = ip_to_ipblock(
|
||||||
self.unlocked_inner.max_connections_per_ip6_prefix_size,
|
self.unlocked_inner.max_connections_per_ip6_prefix_size,
|
||||||
addr,
|
addr,
|
||||||
);
|
);
|
||||||
self.is_punished_inner(&*inner, ipblock)
|
self.is_ip_addr_punished_inner(&*inner, ipblock)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn punish(&self, addr: IpAddr) {
|
pub fn punish_ip_addr(&self, addr: IpAddr) {
|
||||||
log_net!(debug ">>> PUNISHED: {}", addr);
|
log_net!(debug ">>> PUNISHED: {}", addr);
|
||||||
let ts = get_aligned_timestamp();
|
let ts = get_aligned_timestamp();
|
||||||
|
|
||||||
@ -186,6 +231,39 @@ impl AddressFilter {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_node_id_punished_inner(&self, inner: &AddressFilterInner, node_id: TypedKey) -> bool {
|
||||||
|
if inner.punishments_by_node_id.contains_key(&node_id) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_node_id_punished(&self, node_id: TypedKey) -> bool {
|
||||||
|
let inner = self.inner.lock();
|
||||||
|
self.is_node_id_punished_inner(&*inner, node_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn punish_node_id(&self, node_id: TypedKey) {
|
||||||
|
if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(node_id) {
|
||||||
|
// make the entry dead if it's punished
|
||||||
|
nr.operate_mut(|_rti, e| e.set_punished(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
let ts = get_aligned_timestamp();
|
||||||
|
|
||||||
|
let mut inner = self.inner.lock();
|
||||||
|
if inner.punishments_by_node_id.len() >= MAX_PUNISHMENTS_BY_NODE_ID {
|
||||||
|
log_net!(debug ">>> PUNISHMENT TABLE FULL: {}", node_id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log_net!(debug ">>> PUNISHED: {}", node_id);
|
||||||
|
inner
|
||||||
|
.punishments_by_node_id
|
||||||
|
.entry(node_id)
|
||||||
|
.and_modify(|v| *v = ts)
|
||||||
|
.or_insert(ts);
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn address_filter_task_routine(
|
pub async fn address_filter_task_routine(
|
||||||
self,
|
self,
|
||||||
_stop_token: StopToken,
|
_stop_token: StopToken,
|
||||||
@ -207,7 +285,7 @@ impl AddressFilter {
|
|||||||
self.unlocked_inner.max_connections_per_ip6_prefix_size,
|
self.unlocked_inner.max_connections_per_ip6_prefix_size,
|
||||||
addr,
|
addr,
|
||||||
);
|
);
|
||||||
if self.is_punished_inner(inner, ipblock) {
|
if self.is_ip_addr_punished_inner(inner, ipblock) {
|
||||||
return Err(AddressFilterError::Punished);
|
return Err(AddressFilterError::Punished);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,9 +143,9 @@ struct NetworkManagerUnlockedInner {
|
|||||||
#[cfg(feature = "unstable-blockstore")]
|
#[cfg(feature = "unstable-blockstore")]
|
||||||
block_store: BlockStore,
|
block_store: BlockStore,
|
||||||
crypto: Crypto,
|
crypto: Crypto,
|
||||||
address_filter: AddressFilter,
|
|
||||||
// Accessors
|
// Accessors
|
||||||
routing_table: RwLock<Option<RoutingTable>>,
|
routing_table: RwLock<Option<RoutingTable>>,
|
||||||
|
address_filter: RwLock<Option<AddressFilter>>,
|
||||||
components: RwLock<Option<NetworkComponents>>,
|
components: RwLock<Option<NetworkComponents>>,
|
||||||
update_callback: RwLock<Option<UpdateCallback>>,
|
update_callback: RwLock<Option<UpdateCallback>>,
|
||||||
// Background processes
|
// Background processes
|
||||||
@ -189,7 +189,7 @@ impl NetworkManager {
|
|||||||
#[cfg(feature = "unstable-blockstore")]
|
#[cfg(feature = "unstable-blockstore")]
|
||||||
block_store,
|
block_store,
|
||||||
crypto,
|
crypto,
|
||||||
address_filter: AddressFilter::new(config),
|
address_filter: RwLock::new(None),
|
||||||
routing_table: RwLock::new(None),
|
routing_table: RwLock::new(None),
|
||||||
components: RwLock::new(None),
|
components: RwLock::new(None),
|
||||||
update_callback: RwLock::new(None),
|
update_callback: RwLock::new(None),
|
||||||
@ -292,7 +292,12 @@ impl NetworkManager {
|
|||||||
self.unlocked_inner.crypto.clone()
|
self.unlocked_inner.crypto.clone()
|
||||||
}
|
}
|
||||||
pub fn address_filter(&self) -> AddressFilter {
|
pub fn address_filter(&self) -> AddressFilter {
|
||||||
self.unlocked_inner.address_filter.clone()
|
self.unlocked_inner
|
||||||
|
.address_filter
|
||||||
|
.read()
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.clone()
|
||||||
}
|
}
|
||||||
pub fn routing_table(&self) -> RoutingTable {
|
pub fn routing_table(&self) -> RoutingTable {
|
||||||
self.unlocked_inner
|
self.unlocked_inner
|
||||||
@ -351,7 +356,9 @@ impl NetworkManager {
|
|||||||
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
|
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
|
||||||
let routing_table = RoutingTable::new(self.clone());
|
let routing_table = RoutingTable::new(self.clone());
|
||||||
routing_table.init().await?;
|
routing_table.init().await?;
|
||||||
|
let address_filter = AddressFilter::new(self.config(), routing_table.clone());
|
||||||
*self.unlocked_inner.routing_table.write() = Some(routing_table.clone());
|
*self.unlocked_inner.routing_table.write() = Some(routing_table.clone());
|
||||||
|
*self.unlocked_inner.address_filter.write() = Some(address_filter);
|
||||||
*self.unlocked_inner.update_callback.write() = Some(update_callback);
|
*self.unlocked_inner.update_callback.write() = Some(update_callback);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -904,7 +911,7 @@ impl NetworkManager {
|
|||||||
// Ensure we can read the magic number
|
// Ensure we can read the magic number
|
||||||
if data.len() < 4 {
|
if data.len() < 4 {
|
||||||
log_net!(debug "short packet");
|
log_net!(debug "short packet");
|
||||||
self.address_filter().punish(remote_addr);
|
self.address_filter().punish_ip_addr(remote_addr);
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -939,7 +946,7 @@ impl NetworkManager {
|
|||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log_net!(debug "envelope failed to decode: {}", e);
|
log_net!(debug "envelope failed to decode: {}", e);
|
||||||
self.address_filter().punish(remote_addr);
|
self.address_filter().punish_ip_addr(remote_addr);
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -991,6 +998,10 @@ impl NetworkManager {
|
|||||||
// Peek at header and see if we need to relay this
|
// Peek at header and see if we need to relay this
|
||||||
// If the recipient id is not our node id, then it needs relaying
|
// If the recipient id is not our node id, then it needs relaying
|
||||||
let sender_id = TypedKey::new(envelope.get_crypto_kind(), envelope.get_sender_id());
|
let sender_id = TypedKey::new(envelope.get_crypto_kind(), envelope.get_sender_id());
|
||||||
|
if self.address_filter().is_node_id_punished(sender_id) {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
let recipient_id = TypedKey::new(envelope.get_crypto_kind(), envelope.get_recipient_id());
|
let recipient_id = TypedKey::new(envelope.get_crypto_kind(), envelope.get_recipient_id());
|
||||||
if !routing_table.matches_own_node_id(&[recipient_id]) {
|
if !routing_table.matches_own_node_id(&[recipient_id]) {
|
||||||
// See if the source node is allowed to resolve nodes
|
// See if the source node is allowed to resolve nodes
|
||||||
@ -1070,7 +1081,7 @@ impl NetworkManager {
|
|||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log_net!(debug "failed to decrypt envelope body: {}",e);
|
log_net!(debug "failed to decrypt envelope body: {}",e);
|
||||||
self.address_filter().punish(remote_addr);
|
self.address_filter().punish_ip_addr(remote_addr);
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -410,7 +410,7 @@ impl Network {
|
|||||||
if self
|
if self
|
||||||
.network_manager()
|
.network_manager()
|
||||||
.address_filter()
|
.address_filter()
|
||||||
.is_punished(dial_info.address().to_ip_addr())
|
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||||
{
|
{
|
||||||
return Ok(NetworkResult::no_connection_other("punished"));
|
return Ok(NetworkResult::no_connection_other("punished"));
|
||||||
}
|
}
|
||||||
@ -477,7 +477,7 @@ impl Network {
|
|||||||
if self
|
if self
|
||||||
.network_manager()
|
.network_manager()
|
||||||
.address_filter()
|
.address_filter()
|
||||||
.is_punished(dial_info.address().to_ip_addr())
|
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||||
{
|
{
|
||||||
return Ok(NetworkResult::no_connection_other("punished"));
|
return Ok(NetworkResult::no_connection_other("punished"));
|
||||||
}
|
}
|
||||||
|
@ -120,7 +120,7 @@ impl Network {
|
|||||||
};
|
};
|
||||||
// Check to see if it is punished
|
// Check to see if it is punished
|
||||||
let address_filter = self.network_manager().address_filter();
|
let address_filter = self.network_manager().address_filter();
|
||||||
if address_filter.is_punished(peer_addr.ip()) {
|
if address_filter.is_ip_addr_punished(peer_addr.ip()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ impl ProtocolNetworkConnection {
|
|||||||
timeout_ms: u32,
|
timeout_ms: u32,
|
||||||
address_filter: AddressFilter,
|
address_filter: AddressFilter,
|
||||||
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
|
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
|
||||||
if address_filter.is_punished(dial_info.address().to_ip_addr()) {
|
if address_filter.is_ip_addr_punished(dial_info.address().to_ip_addr()) {
|
||||||
return Ok(NetworkResult::no_connection_other("punished"));
|
return Ok(NetworkResult::no_connection_other("punished"));
|
||||||
}
|
}
|
||||||
match dial_info.protocol_type() {
|
match dial_info.protocol_type() {
|
||||||
|
@ -25,7 +25,7 @@ impl RawUdpProtocolHandler {
|
|||||||
|
|
||||||
// Check to see if it is punished
|
// Check to see if it is punished
|
||||||
if let Some(af) = self.address_filter.as_ref() {
|
if let Some(af) = self.address_filter.as_ref() {
|
||||||
if af.is_punished(remote_addr.ip()) {
|
if af.is_ip_addr_punished(remote_addr.ip()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -97,7 +97,7 @@ impl RawUdpProtocolHandler {
|
|||||||
|
|
||||||
// Check to see if it is punished
|
// Check to see if it is punished
|
||||||
if let Some(af) = self.address_filter.as_ref() {
|
if let Some(af) = self.address_filter.as_ref() {
|
||||||
if af.is_punished(remote_addr.ip()) {
|
if af.is_ip_addr_punished(remote_addr.ip()) {
|
||||||
return Ok(NetworkResult::no_connection_other("punished"));
|
return Ok(NetworkResult::no_connection_other("punished"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -305,7 +305,7 @@ impl NetworkConnection {
|
|||||||
let peer_address = protocol_connection.descriptor().remote();
|
let peer_address = protocol_connection.descriptor().remote();
|
||||||
|
|
||||||
// Check to see if it is punished
|
// Check to see if it is punished
|
||||||
if address_filter.is_punished(peer_address.to_socket_addr().ip()) {
|
if address_filter.is_ip_addr_punished(peer_address.to_socket_addr().ip()) {
|
||||||
return RecvLoopAction::Finish;
|
return RecvLoopAction::Finish;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,6 +307,11 @@ impl NetworkManager {
|
|||||||
) -> EyreResult<NodeContactMethod> {
|
) -> EyreResult<NodeContactMethod> {
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
|
|
||||||
|
// If a node is punished, then don't try to contact it
|
||||||
|
if target_node_ref.node_ids().iter().find(|nid| self.address_filter().is_node_id_punished(**nid)).is_some() {
|
||||||
|
return Ok(NodeContactMethod::Unreachable);
|
||||||
|
}
|
||||||
|
|
||||||
// Figure out the best routing domain to get the contact method over
|
// Figure out the best routing domain to get the contact method over
|
||||||
let routing_domain = match target_node_ref.best_routing_domain() {
|
let routing_domain = match target_node_ref.best_routing_domain() {
|
||||||
Some(rd) => rd,
|
Some(rd) => rd,
|
||||||
|
@ -2,10 +2,11 @@ use super::*;
|
|||||||
|
|
||||||
use super::connection_table::*;
|
use super::connection_table::*;
|
||||||
use crate::tests::common::test_veilid_config::*;
|
use crate::tests::common::test_veilid_config::*;
|
||||||
|
use crate::tests::mock_routing_table;
|
||||||
|
|
||||||
pub async fn test_add_get_remove() {
|
pub async fn test_add_get_remove() {
|
||||||
let config = get_config();
|
let config = get_config();
|
||||||
let address_filter = AddressFilter::new(config.clone());
|
let address_filter = AddressFilter::new(config.clone(), mock_routing_table());
|
||||||
let table = ConnectionTable::new(config, address_filter);
|
let table = ConnectionTable::new(config, address_filter);
|
||||||
|
|
||||||
let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new(
|
let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new(
|
||||||
|
@ -133,7 +133,7 @@ impl Network {
|
|||||||
if self
|
if self
|
||||||
.network_manager()
|
.network_manager()
|
||||||
.address_filter()
|
.address_filter()
|
||||||
.is_punished(dial_info.address().to_ip_addr())
|
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||||
{
|
{
|
||||||
return Ok(NetworkResult::no_connection_other("punished"));
|
return Ok(NetworkResult::no_connection_other("punished"));
|
||||||
}
|
}
|
||||||
@ -182,7 +182,7 @@ impl Network {
|
|||||||
if self
|
if self
|
||||||
.network_manager()
|
.network_manager()
|
||||||
.address_filter()
|
.address_filter()
|
||||||
.is_punished(dial_info.address().to_ip_addr())
|
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||||
{
|
{
|
||||||
return Ok(NetworkResult::no_connection_other("punished"));
|
return Ok(NetworkResult::no_connection_other("punished"));
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,7 @@ impl ProtocolNetworkConnection {
|
|||||||
timeout_ms: u32,
|
timeout_ms: u32,
|
||||||
address_filter: AddressFilter,
|
address_filter: AddressFilter,
|
||||||
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
|
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
|
||||||
if address_filter.is_punished(dial_info.address().to_ip_addr()) {
|
if address_filter.is_ip_addr_punished(dial_info.address().to_ip_addr()) {
|
||||||
return Ok(NetworkResult::no_connection_other("punished"));
|
return Ok(NetworkResult::no_connection_other("punished"));
|
||||||
}
|
}
|
||||||
match dial_info.protocol_type() {
|
match dial_info.protocol_type() {
|
||||||
|
@ -90,6 +90,9 @@ pub struct BucketEntryInner {
|
|||||||
/// The accounting for the transfer statistics
|
/// The accounting for the transfer statistics
|
||||||
#[serde(skip)]
|
#[serde(skip)]
|
||||||
transfer_stats_accounting: TransferStatsAccounting,
|
transfer_stats_accounting: TransferStatsAccounting,
|
||||||
|
/// If the entry is being punished and should be considered dead
|
||||||
|
#[serde(skip)]
|
||||||
|
is_punished: bool,
|
||||||
/// Tracking identifier for NodeRef debugging
|
/// Tracking identifier for NodeRef debugging
|
||||||
#[cfg(feature = "tracking")]
|
#[cfg(feature = "tracking")]
|
||||||
#[serde(skip)]
|
#[serde(skip)]
|
||||||
@ -403,6 +406,10 @@ impl BucketEntryInner {
|
|||||||
|
|
||||||
// Stores a connection descriptor in this entry's table of last connections
|
// Stores a connection descriptor in this entry's table of last connections
|
||||||
pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: Timestamp) {
|
pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: Timestamp) {
|
||||||
|
if self.is_punished {
|
||||||
|
// Don't record connection if this entry is currently punished
|
||||||
|
return;
|
||||||
|
}
|
||||||
let key = self.descriptor_to_key(last_connection);
|
let key = self.descriptor_to_key(last_connection);
|
||||||
self.last_connections
|
self.last_connections
|
||||||
.insert(key, (last_connection, timestamp));
|
.insert(key, (last_connection, timestamp));
|
||||||
@ -531,6 +538,9 @@ impl BucketEntryInner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
|
pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
|
||||||
|
if self.is_punished {
|
||||||
|
return BucketEntryState::Dead;
|
||||||
|
}
|
||||||
if self.check_reliable(cur_ts) {
|
if self.check_reliable(cur_ts) {
|
||||||
BucketEntryState::Reliable
|
BucketEntryState::Reliable
|
||||||
} else if self.check_dead(cur_ts) {
|
} else if self.check_dead(cur_ts) {
|
||||||
@ -539,6 +549,12 @@ impl BucketEntryInner {
|
|||||||
BucketEntryState::Unreliable
|
BucketEntryState::Unreliable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn set_punished(&mut self, punished: bool) {
|
||||||
|
self.is_punished = punished;
|
||||||
|
if punished {
|
||||||
|
self.clear_last_connections();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn peer_stats(&self) -> &PeerStats {
|
pub fn peer_stats(&self) -> &PeerStats {
|
||||||
&self.peer_stats
|
&self.peer_stats
|
||||||
@ -845,6 +861,7 @@ impl BucketEntry {
|
|||||||
},
|
},
|
||||||
latency_stats_accounting: LatencyStatsAccounting::new(),
|
latency_stats_accounting: LatencyStatsAccounting::new(),
|
||||||
transfer_stats_accounting: TransferStatsAccounting::new(),
|
transfer_stats_accounting: TransferStatsAccounting::new(),
|
||||||
|
is_punished: false,
|
||||||
#[cfg(feature = "tracking")]
|
#[cfg(feature = "tracking")]
|
||||||
next_track_id: 0,
|
next_track_id: 0,
|
||||||
#[cfg(feature = "tracking")]
|
#[cfg(feature = "tracking")]
|
||||||
|
@ -1 +1,29 @@
|
|||||||
|
use super::*;
|
||||||
|
|
||||||
pub mod test_serialize_routing_table;
|
pub mod test_serialize_routing_table;
|
||||||
|
|
||||||
|
pub(crate) fn mock_routing_table() -> routing_table::RoutingTable {
|
||||||
|
let veilid_config = VeilidConfig::new();
|
||||||
|
#[cfg(feature = "unstable-blockstore")]
|
||||||
|
let block_store = BlockStore::new(veilid_config.clone());
|
||||||
|
let protected_store = ProtectedStore::new(veilid_config.clone());
|
||||||
|
let table_store = TableStore::new(veilid_config.clone(), protected_store.clone());
|
||||||
|
let crypto = Crypto::new(veilid_config.clone(), table_store.clone());
|
||||||
|
let storage_manager = storage_manager::StorageManager::new(
|
||||||
|
veilid_config.clone(),
|
||||||
|
crypto.clone(),
|
||||||
|
table_store.clone(),
|
||||||
|
#[cfg(feature = "unstable-blockstore")]
|
||||||
|
block_store.clone(),
|
||||||
|
);
|
||||||
|
let network_manager = network_manager::NetworkManager::new(
|
||||||
|
veilid_config.clone(),
|
||||||
|
storage_manager,
|
||||||
|
protected_store.clone(),
|
||||||
|
table_store.clone(),
|
||||||
|
#[cfg(feature = "unstable-blockstore")]
|
||||||
|
block_store.clone(),
|
||||||
|
crypto.clone(),
|
||||||
|
);
|
||||||
|
RoutingTable::new(network_manager)
|
||||||
|
}
|
||||||
|
@ -1,35 +1,8 @@
|
|||||||
use crate::*;
|
use super::*;
|
||||||
use routing_table::*;
|
|
||||||
|
|
||||||
fn fake_routing_table() -> routing_table::RoutingTable {
|
|
||||||
let veilid_config = VeilidConfig::new();
|
|
||||||
#[cfg(feature = "unstable-blockstore")]
|
|
||||||
let block_store = BlockStore::new(veilid_config.clone());
|
|
||||||
let protected_store = ProtectedStore::new(veilid_config.clone());
|
|
||||||
let table_store = TableStore::new(veilid_config.clone(), protected_store.clone());
|
|
||||||
let crypto = Crypto::new(veilid_config.clone(), table_store.clone());
|
|
||||||
let storage_manager = storage_manager::StorageManager::new(
|
|
||||||
veilid_config.clone(),
|
|
||||||
crypto.clone(),
|
|
||||||
table_store.clone(),
|
|
||||||
#[cfg(feature = "unstable-blockstore")]
|
|
||||||
block_store.clone(),
|
|
||||||
);
|
|
||||||
let network_manager = network_manager::NetworkManager::new(
|
|
||||||
veilid_config.clone(),
|
|
||||||
storage_manager,
|
|
||||||
protected_store.clone(),
|
|
||||||
table_store.clone(),
|
|
||||||
#[cfg(feature = "unstable-blockstore")]
|
|
||||||
block_store.clone(),
|
|
||||||
crypto.clone(),
|
|
||||||
);
|
|
||||||
RoutingTable::new(network_manager)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn test_routingtable_buckets_round_trip() {
|
pub async fn test_routingtable_buckets_round_trip() {
|
||||||
let original = fake_routing_table();
|
let original = mock_routing_table();
|
||||||
let copy = fake_routing_table();
|
let copy = mock_routing_table();
|
||||||
original.init().await.unwrap();
|
original.init().await.unwrap();
|
||||||
copy.init().await.unwrap();
|
copy.init().await.unwrap();
|
||||||
|
|
||||||
|
@ -1454,28 +1454,37 @@ impl RPCProcessor {
|
|||||||
&self,
|
&self,
|
||||||
encoded_msg: RPCMessageEncoded,
|
encoded_msg: RPCMessageEncoded,
|
||||||
) -> Result<NetworkResult<()>, RPCError> {
|
) -> Result<NetworkResult<()>, RPCError> {
|
||||||
|
let address_filter = self.network_manager.address_filter();
|
||||||
|
|
||||||
// Decode operation appropriately based on header detail
|
// Decode operation appropriately based on header detail
|
||||||
let msg = match &encoded_msg.header.detail {
|
let msg = match &encoded_msg.header.detail {
|
||||||
RPCMessageHeaderDetail::Direct(detail) => {
|
RPCMessageHeaderDetail::Direct(detail) => {
|
||||||
|
// Get sender node id
|
||||||
|
let sender_node_id = TypedKey::new(
|
||||||
|
detail.envelope.get_crypto_kind(),
|
||||||
|
detail.envelope.get_sender_id(),
|
||||||
|
);
|
||||||
|
|
||||||
// Decode and validate the RPC operation
|
// Decode and validate the RPC operation
|
||||||
let operation = match self.decode_rpc_operation(&encoded_msg) {
|
let operation = match self.decode_rpc_operation(&encoded_msg) {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => return Ok(NetworkResult::invalid_message(e)),
|
Err(e) => {
|
||||||
|
// Punish nodes that send direct undecodable crap
|
||||||
|
address_filter.punish_node_id(sender_node_id);
|
||||||
|
return Ok(NetworkResult::invalid_message(e));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Get the routing domain this message came over
|
// Get the routing domain this message came over
|
||||||
let routing_domain = detail.routing_domain;
|
let routing_domain = detail.routing_domain;
|
||||||
|
|
||||||
// Get the sender noderef, incorporating sender's peer info
|
// Get the sender noderef, incorporating sender's peer info
|
||||||
let sender_node_id = TypedKey::new(
|
|
||||||
detail.envelope.get_crypto_kind(),
|
|
||||||
detail.envelope.get_sender_id(),
|
|
||||||
);
|
|
||||||
let mut opt_sender_nr: Option<NodeRef> = None;
|
let mut opt_sender_nr: Option<NodeRef> = None;
|
||||||
if let Some(sender_peer_info) = operation.sender_peer_info() {
|
if let Some(sender_peer_info) = operation.sender_peer_info() {
|
||||||
// Ensure the sender peer info is for the actual sender specified in the envelope
|
// Ensure the sender peer info is for the actual sender specified in the envelope
|
||||||
if !sender_peer_info.node_ids().contains(&sender_node_id) {
|
if !sender_peer_info.node_ids().contains(&sender_node_id) {
|
||||||
// Attempted to update peer info for the wrong node id
|
// Attempted to update peer info for the wrong node id
|
||||||
|
address_filter.punish_node_id(sender_node_id);
|
||||||
return Ok(NetworkResult::invalid_message(
|
return Ok(NetworkResult::invalid_message(
|
||||||
"attempt to update peer info for non-sender node id",
|
"attempt to update peer info for non-sender node id",
|
||||||
));
|
));
|
||||||
@ -1487,6 +1496,7 @@ impl RPCProcessor {
|
|||||||
sender_peer_info.signed_node_info(),
|
sender_peer_info.signed_node_info(),
|
||||||
&[],
|
&[],
|
||||||
) {
|
) {
|
||||||
|
address_filter.punish_node_id(sender_node_id);
|
||||||
return Ok(NetworkResult::invalid_message(
|
return Ok(NetworkResult::invalid_message(
|
||||||
"sender peerinfo has invalid peer scope",
|
"sender peerinfo has invalid peer scope",
|
||||||
));
|
));
|
||||||
@ -1497,7 +1507,10 @@ impl RPCProcessor {
|
|||||||
false,
|
false,
|
||||||
) {
|
) {
|
||||||
Ok(v) => Some(v),
|
Ok(v) => Some(v),
|
||||||
Err(e) => return Ok(NetworkResult::invalid_message(e)),
|
Err(e) => {
|
||||||
|
address_filter.punish_node_id(sender_node_id);
|
||||||
|
return Ok(NetworkResult::invalid_message(e));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1505,7 +1518,10 @@ impl RPCProcessor {
|
|||||||
if opt_sender_nr.is_none() {
|
if opt_sender_nr.is_none() {
|
||||||
opt_sender_nr = match self.routing_table().lookup_node_ref(sender_node_id) {
|
opt_sender_nr = match self.routing_table().lookup_node_ref(sender_node_id) {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => return Ok(NetworkResult::invalid_message(e)),
|
Err(e) => {
|
||||||
|
address_filter.punish_node_id(sender_node_id);
|
||||||
|
return Ok(NetworkResult::invalid_message(e));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1525,7 +1541,14 @@ impl RPCProcessor {
|
|||||||
}
|
}
|
||||||
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
|
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
|
||||||
// Decode and validate the RPC operation
|
// Decode and validate the RPC operation
|
||||||
let operation = self.decode_rpc_operation(&encoded_msg)?;
|
let operation = match self.decode_rpc_operation(&encoded_msg) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
// Punish routes that send routed undecodable crap
|
||||||
|
// address_filter.punish_route_id(xxx);
|
||||||
|
return Ok(NetworkResult::invalid_message(e));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Make the RPC message
|
// Make the RPC message
|
||||||
RPCMessage {
|
RPCMessage {
|
||||||
@ -1623,7 +1646,9 @@ impl RPCProcessor {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(v) => v,
|
Ok(v) => {
|
||||||
|
v
|
||||||
|
}
|
||||||
} => [ format!(": msg.header={:?}", msg.header) ] {});
|
} => [ format!(": msg.header={:?}", msg.header) ] {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user