mirror of
https://github.com/markqvist/LXMF.git
synced 2025-11-23 17:03:08 -05:00
Implemented throttling for naughty propagation node peers
This commit is contained in:
parent
df6271a026
commit
b35b9213a6
2 changed files with 41 additions and 12 deletions
|
|
@ -286,7 +286,7 @@ class LXMPeer:
|
|||
threading.Thread(target=job, daemon=True).start()
|
||||
|
||||
delay = self.next_sync_attempt-time.time()
|
||||
postpone_delay = " for {RNS.prettytime({delay})}" if delay > 0 else ""
|
||||
postpone_delay = f" for {RNS.prettytime(delay)}" if delay > 0 else ""
|
||||
RNS.log(f"Postponing sync with peer {RNS.prettyhexrep(self.destination_hash)}{postpone_delay}{postpone_reason}", RNS.LOG_DEBUG)
|
||||
except Exception as e:
|
||||
RNS.trace_exception(e)
|
||||
|
|
@ -414,6 +414,12 @@ class LXMPeer:
|
|||
self.router.unpeer(self.destination_hash)
|
||||
return
|
||||
|
||||
elif response == LXMPeer.ERROR_THROTTLED:
|
||||
throttle_time = self.router.PN_STAMP_THROTTLE
|
||||
RNS.log(f"Remote indicated that we're throttled, postponing sync for {RNS.prettytime(throttle_time)}", RNS.LOG_VERBOSE)
|
||||
self.next_sync_attempt = time.time()+throttle_time
|
||||
return
|
||||
|
||||
elif response == False:
|
||||
# Peer already has all advertised messages
|
||||
for transient_id in self.last_offer:
|
||||
|
|
|
|||
|
|
@ -56,6 +56,7 @@ class LXMRouter:
|
|||
DELIVERY_LIMIT = 1000
|
||||
|
||||
PR_PATH_TIMEOUT = 10
|
||||
PN_STAMP_THROTTLE = 180
|
||||
|
||||
PR_IDLE = 0x00
|
||||
PR_PATH_REQUESTED = 0x01
|
||||
|
|
@ -135,6 +136,7 @@ class LXMRouter:
|
|||
self.enforce_ratchets = enforce_ratchets
|
||||
self._enforce_stamps = enforce_stamps
|
||||
self.pending_deferred_stamps = {}
|
||||
self.throttled_peers = {}
|
||||
|
||||
if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit:
|
||||
self.propagation_per_sync_limit = self.propagation_per_transfer_limit
|
||||
|
|
@ -850,20 +852,17 @@ class LXMRouter:
|
|||
self.clean_transient_id_caches()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
|
||||
if self.propagation_node == True:
|
||||
self.clean_message_store()
|
||||
if self.propagation_node == True: self.clean_message_store()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0:
|
||||
if self.propagation_node == True:
|
||||
self.flush_queues()
|
||||
if self.propagation_node == True: self.flush_queues()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0:
|
||||
if self.propagation_node == True:
|
||||
self.rotate_peers()
|
||||
if self.propagation_node == True: self.rotate_peers()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
|
||||
if self.propagation_node == True:
|
||||
self.sync_peers()
|
||||
if self.propagation_node == True: self.sync_peers()
|
||||
self.clean_throttled_peers()
|
||||
|
||||
def jobloop(self):
|
||||
while (True):
|
||||
|
|
@ -1060,6 +1059,14 @@ class LXMRouter:
|
|||
else:
|
||||
return available_tickets
|
||||
|
||||
def clean_throttled_peers(self):
|
||||
expired_entries = []
|
||||
now = time.time()
|
||||
for peer_hash in self.throttled_peers:
|
||||
if now > self.throttled_peers[peer_hash]: expired_entries.append(peer_hash)
|
||||
|
||||
for peer_hash in expired_entries: self.throttled_peers.pop(peer_hash)
|
||||
|
||||
def clean_message_store(self):
|
||||
RNS.log("Cleaning message store", RNS.LOG_VERBOSE)
|
||||
# Check and remove expired messages
|
||||
|
|
@ -2085,10 +2092,18 @@ class LXMRouter:
|
|||
if remote_identity == None:
|
||||
return LXMPeer.ERROR_NO_IDENTITY
|
||||
else:
|
||||
if self.from_static_only:
|
||||
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
remote_hash = remote_destination.hash
|
||||
remote_str = RNS.prettyhexrep(remote_hash)
|
||||
|
||||
if remote_hash in self.throttled_peers:
|
||||
throttle_remaining = self.throttled_peers[remote_hash]-time.time()
|
||||
if throttle_remaining > 0:
|
||||
RNS.log(f"Propagation offer from node {remote_str} rejected, throttled for {RNS.prettytime(throttle_remaining)} more", RNS.LOG_NOTICE)
|
||||
return LXMPeer.ERROR_THROTTLED
|
||||
else: self.throttled_peers.pop(remote_hash)
|
||||
|
||||
if self.from_static_only:
|
||||
if not remote_hash in self.static_peers:
|
||||
RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG)
|
||||
return LXMPeer.ERROR_NO_ACCESS
|
||||
|
|
@ -2193,6 +2208,14 @@ class LXMRouter:
|
|||
self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value, stamp_data=stamp_data)
|
||||
if peer != None: peer.queue_handled_message(transient_id)
|
||||
|
||||
invalid_message_count = len(messages) - len(validated_messages)
|
||||
if invalid_message_count > 0:
|
||||
resource.link.teardown()
|
||||
throttle_time = LXMRouter.PN_STAMP_THROTTLE
|
||||
self.throttled_peers[remote_hash] = time.time()+throttle_time
|
||||
ms = "" if invalid_message_count == 1 else "s"
|
||||
RNS.log(f"Propagation transfer from {remote_str} contained {invalid_message_count} message{ms} with invalid stamps, throttled for {RNS.prettytime(throttle_time)}", RNS.LOG_NOTICE)
|
||||
|
||||
else:
|
||||
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue