diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index c7d852d..52d01d6 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -569,6 +569,10 @@ class LXMPeer: ERROR_NO_IDENTITY = 0xf0 + # Maximum amount of time a peer can + # be unreachable before it is removed + MAX_UNREACHABLE = 4*24*60*60 + @staticmethod def from_bytes(peer_bytes, router): dictionary = msgpack.unpackb(peer_bytes) @@ -610,7 +614,7 @@ class LXMPeer: def __init__(self, router, destination_hash): self.alive = False - self.last_heard = None + self.last_heard = 0 self.peering_timebase = 0 self.link = None @@ -648,6 +652,9 @@ class LXMPeer: else: if self.state == LXMPeer.LINK_READY: + self.alive = True + self.last_heard = time.time() + RNS.log("Sync link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) unhandled_ids = [] purged_ids = [] @@ -756,6 +763,8 @@ class LXMPeer: self.state = LXMPeer.IDLE self.link.teardown() RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed", RNS.LOG_DEBUG) + self.alive = True + self.last_heard = time.time() else: RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG) if self.link != None: @@ -780,6 +789,12 @@ class LXMPeer: RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was added to distribution queue for "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_EXTREME) self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id] + def __str__(self): + if self.destination_hash: + return RNS.prettyhexrep(self.destination_hash) + else: + return "" + class LXMRouter: MAX_DELIVERY_ATTEMPTS = 3 PROCESSING_INTERVAL = 5 @@ -1528,11 +1543,15 @@ class LXMRouter: def sync_peers(self): + culled_peers = [] waiting_peers = [] for peer_id in self.peers: peer = self.peers[peer_id] - if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: - waiting_peers.append(peer) + if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: + culled_peers.append(peer_id) + else: + if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: + waiting_peers.append(peer) if len(waiting_peers) > 0: RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG) @@ -1541,6 +1560,10 @@ class LXMRouter: RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG) selected_peer.sync() + for peer in culled_peers: + RNS.log("Removing peer "+RNS.prettyhexrep(peer)+" due to excessive unreachability", RNS.LOG_WARNING) + self.peers.pop(peer_id) + def fail_message(self, lxmessage): RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG) diff --git a/setup.py b/setup.py index 1b05948..7c38ec0 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="lxmf", - version="0.1.0", + version="0.1.1", author="Mark Qvist", author_email="mark@unsigned.io", description="Lightweight Extensible Message Format for Reticulum", @@ -18,6 +18,6 @@ setuptools.setup( "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], - install_requires=['rns>=0.2.4'], + install_requires=['rns>=0.2.8'], python_requires='>=3.6', ) \ No newline at end of file