Added peer rotation

This commit is contained in:
Mark Qvist 2025-01-29 01:26:36 +01:00
parent e0e901291e
commit f1d060a92e
2 changed files with 67 additions and 5 deletions

View File

@ -469,6 +469,10 @@ class LXMPeer:
return self._um_count
@property
def acceptance_rate(self):
return 0 if self.offered == 0 else (self.outgoing/self.offered)
def _update_counts(self):
if not self._hm_counts_synced:
hm = self.handled_messages; del hm

View File

@ -41,6 +41,7 @@ class LXMRouter:
AUTOPEER = True
AUTOPEER_MAXDEPTH = 4
FASTEST_N_RANDOM_POOL = 2
ROTATION_HEADROOM_PCT = 10
PROPAGATION_LIMIT = 256
DELIVERY_LIMIT = 1000
@ -122,6 +123,7 @@ class LXMRouter:
self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = None
self.propagation_transfer_max_messages = None
self.prioritise_rotating_unreachable_peers = False
self.active_propagation_links = []
self.locally_delivered_transient_ids = {}
self.locally_processed_transient_ids = {}
@ -783,17 +785,13 @@ class LXMRouter:
if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0:
if self.propagation_node == True:
self.rotate_peers()
self.flush_queues()
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
if self.propagation_node == True:
self.sync_peers()
# def syncstats(self):
# for peer_id in self.peers:
# p = self.peers[peer_id]
# RNS.log(f"{RNS.prettyhexrep(peer_id)} O={p.offered} S={p.outgoing} I={p.incoming} TX={RNS.prettysize(p.tx_bytes)} RX={RNS.prettysize(p.rx_bytes)}")
def jobloop(self):
while (True):
# TODO: Improve this to scheduling, so manual
@ -1805,6 +1803,66 @@ class LXMRouter:
self.peers.pop(destination_hash)
RNS.log("Broke peering with "+str(peer.destination))
def rotate_peers(self):
try:
rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0)))
required_drops = len(self.peers) - (self.max_peers - rotation_headroom)
if required_drops > 0 and len(self.peers) - required_drops > 1:
peers = self.peers.copy()
untested_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
if peer.last_sync_attempt == 0:
untested_peers.append(peer)
if len(untested_peers) >= rotation_headroom:
RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG)
return
culled_peers = []
waiting_peers = []
unresponsive_peers = []
for peer_id in peers:
peer = peers[peer_id]
if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE:
if peer.alive:
if peer.offered == 0:
# Don't consider for unpeering until at
# least one message has been offered
pass
else:
waiting_peers.append(peer)
else:
unresponsive_peers.append(peer)
drop_pool = []
if len(unresponsive_peers) > 0:
drop_pool.extend(unresponsive_peers)
if not self.prioritise_rotating_unreachable_peers:
drop_pool.extend(waiting_peers)
else:
drop_pool.extend(waiting_peers)
if len(drop_pool) > 0:
drop_count = min(required_drops, len(drop_pool))
low_acceptance_rate_peers = sorted(
drop_pool,
key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ),
reverse=False
)[0:drop_count]
ms = "" if len(low_acceptance_rate_peers) == 1 else "s"
RNS.log(f"Dropping {len(low_acceptance_rate_peers)} lowest acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG)
for peer in low_acceptance_rate_peers:
ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2)
RNS.log(f"Acceptance rate for {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing} / {peer.offered})", RNS.LOG_DEBUG)
self.unpeer(peer.destination_hash)
except Exception as e:
RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def sync_peers(self):
culled_peers = []
waiting_peers = []