From f1d060a92ef9ea9b0a0f3402ff46fc9d91fddd5c Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 29 Jan 2025 01:26:36 +0100 Subject: [PATCH] Added peer rotation --- LXMF/LXMPeer.py | 4 +++ LXMF/LXMRouter.py | 68 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index 5036528..e2f951a 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -469,6 +469,10 @@ class LXMPeer: return self._um_count + @property + def acceptance_rate(self): + return 0 if self.offered == 0 else (self.outgoing/self.offered) + def _update_counts(self): if not self._hm_counts_synced: hm = self.handled_messages; del hm diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 9eccedc..4bbd24c 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -41,6 +41,7 @@ class LXMRouter: AUTOPEER = True AUTOPEER_MAXDEPTH = 4 FASTEST_N_RANDOM_POOL = 2 + ROTATION_HEADROOM_PCT = 10 PROPAGATION_LIMIT = 256 DELIVERY_LIMIT = 1000 @@ -122,6 +123,7 @@ class LXMRouter: self.propagation_transfer_progress = 0.0 self.propagation_transfer_last_result = None self.propagation_transfer_max_messages = None + self.prioritise_rotating_unreachable_peers = False self.active_propagation_links = [] self.locally_delivered_transient_ids = {} self.locally_processed_transient_ids = {} @@ -783,17 +785,13 @@ class LXMRouter: if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0: if self.propagation_node == True: + self.rotate_peers() self.flush_queues() if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: if self.propagation_node == True: self.sync_peers() - # def syncstats(self): - # for peer_id in self.peers: - # p = self.peers[peer_id] - # RNS.log(f"{RNS.prettyhexrep(peer_id)} O={p.offered} S={p.outgoing} I={p.incoming} TX={RNS.prettysize(p.tx_bytes)} RX={RNS.prettysize(p.rx_bytes)}") - def jobloop(self): while (True): # TODO: Improve this to scheduling, so manual @@ -1805,6 +1803,66 @@ class LXMRouter: self.peers.pop(destination_hash) RNS.log("Broke peering with "+str(peer.destination)) + def rotate_peers(self): + try: + rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0))) + required_drops = len(self.peers) - (self.max_peers - rotation_headroom) + if required_drops > 0 and len(self.peers) - required_drops > 1: + peers = self.peers.copy() + untested_peers = [] + for peer_id in self.peers: + peer = self.peers[peer_id] + if peer.last_sync_attempt == 0: + untested_peers.append(peer) + + if len(untested_peers) >= rotation_headroom: + RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG) + return + + culled_peers = [] + waiting_peers = [] + unresponsive_peers = [] + for peer_id in peers: + peer = peers[peer_id] + if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE: + if peer.alive: + if peer.offered == 0: + # Don't consider for unpeering until at + # least one message has been offered + pass + else: + waiting_peers.append(peer) + else: + unresponsive_peers.append(peer) + + drop_pool = [] + if len(unresponsive_peers) > 0: + drop_pool.extend(unresponsive_peers) + if not self.prioritise_rotating_unreachable_peers: + drop_pool.extend(waiting_peers) + + else: + drop_pool.extend(waiting_peers) + + if len(drop_pool) > 0: + drop_count = min(required_drops, len(drop_pool)) + low_acceptance_rate_peers = sorted( + drop_pool, + key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ), + reverse=False + )[0:drop_count] + + ms = "" if len(low_acceptance_rate_peers) == 1 else "s" + RNS.log(f"Dropping {len(low_acceptance_rate_peers)} lowest acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG) + for peer in low_acceptance_rate_peers: + ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2) + RNS.log(f"Acceptance rate for {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing} / {peer.offered})", RNS.LOG_DEBUG) + self.unpeer(peer.destination_hash) + + except Exception as e: + RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + def sync_peers(self): culled_peers = [] waiting_peers = []