From b35b9213a6b92f9b54bf2defc31ca0fa04e74df1 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 1 Nov 2025 01:35:15 +0100 Subject: [PATCH] Implemented throttling for naughty propagation node peers --- LXMF/LXMPeer.py | 8 +++++++- LXMF/LXMRouter.py | 45 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index 786514d..0dbf8ce 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -286,7 +286,7 @@ class LXMPeer: threading.Thread(target=job, daemon=True).start() delay = self.next_sync_attempt-time.time() - postpone_delay = " for {RNS.prettytime({delay})}" if delay > 0 else "" + postpone_delay = f" for {RNS.prettytime(delay)}" if delay > 0 else "" RNS.log(f"Postponing sync with peer {RNS.prettyhexrep(self.destination_hash)}{postpone_delay}{postpone_reason}", RNS.LOG_DEBUG) except Exception as e: RNS.trace_exception(e) @@ -414,6 +414,12 @@ class LXMPeer: self.router.unpeer(self.destination_hash) return + elif response == LXMPeer.ERROR_THROTTLED: + throttle_time = self.router.PN_STAMP_THROTTLE + RNS.log(f"Remote indicated that we're throttled, postponing sync for {RNS.prettytime(throttle_time)}", RNS.LOG_VERBOSE) + self.next_sync_attempt = time.time()+throttle_time + return + elif response == False: # Peer already has all advertised messages for transient_id in self.last_offer: diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index c7c6051..f25c8f0 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -56,6 +56,7 @@ class LXMRouter: DELIVERY_LIMIT = 1000 PR_PATH_TIMEOUT = 10 + PN_STAMP_THROTTLE = 180 PR_IDLE = 0x00 PR_PATH_REQUESTED = 0x01 @@ -135,6 +136,7 @@ class LXMRouter: self.enforce_ratchets = enforce_ratchets self._enforce_stamps = enforce_stamps self.pending_deferred_stamps = {} + self.throttled_peers = {} if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit: self.propagation_per_sync_limit = self.propagation_per_transfer_limit @@ -850,20 +852,17 @@ class LXMRouter: self.clean_transient_id_caches() if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: - if self.propagation_node == True: - self.clean_message_store() + if self.propagation_node == True: self.clean_message_store() if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0: - if self.propagation_node == True: - self.flush_queues() + if self.propagation_node == True: self.flush_queues() if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0: - if self.propagation_node == True: - self.rotate_peers() + if self.propagation_node == True: self.rotate_peers() if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: - if self.propagation_node == True: - self.sync_peers() + if self.propagation_node == True: self.sync_peers() + self.clean_throttled_peers() def jobloop(self): while (True): @@ -1060,6 +1059,14 @@ class LXMRouter: else: return available_tickets + def clean_throttled_peers(self): + expired_entries = [] + now = time.time() + for peer_hash in self.throttled_peers: + if now > self.throttled_peers[peer_hash]: expired_entries.append(peer_hash) + + for peer_hash in expired_entries: self.throttled_peers.pop(peer_hash) + def clean_message_store(self): RNS.log("Cleaning message store", RNS.LOG_VERBOSE) # Check and remove expired messages @@ -2085,10 +2092,18 @@ class LXMRouter: if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY else: + remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + remote_hash = remote_destination.hash + remote_str = RNS.prettyhexrep(remote_hash) + + if remote_hash in self.throttled_peers: + throttle_remaining = self.throttled_peers[remote_hash]-time.time() + if throttle_remaining > 0: + RNS.log(f"Propagation offer from node {remote_str} rejected, throttled for {RNS.prettytime(throttle_remaining)} more", RNS.LOG_NOTICE) + return LXMPeer.ERROR_THROTTLED + else: self.throttled_peers.pop(remote_hash) + if self.from_static_only: - remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - remote_hash = remote_destination.hash - remote_str = RNS.prettyhexrep(remote_hash) if not remote_hash in self.static_peers: RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG) return LXMPeer.ERROR_NO_ACCESS @@ -2193,6 +2208,14 @@ class LXMRouter: self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value, stamp_data=stamp_data) if peer != None: peer.queue_handled_message(transient_id) + invalid_message_count = len(messages) - len(validated_messages) + if invalid_message_count > 0: + resource.link.teardown() + throttle_time = LXMRouter.PN_STAMP_THROTTLE + self.throttled_peers[remote_hash] = time.time()+throttle_time + ms = "" if invalid_message_count == 1 else "s" + RNS.log(f"Propagation transfer from {remote_str} contained {invalid_message_count} message{ms} with invalid stamps, throttled for {RNS.prettytime(throttle_time)}", RNS.LOG_NOTICE) + else: RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)