diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index 7cc5c20..585b2a3 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -24,6 +24,17 @@ class LXMPeer: # be unreachable before it is removed MAX_UNREACHABLE = 4*24*60*60 + # Everytime consecutive time a sync + # link fails to establish, add this + # amount off time to wait before the + # next sync is attempted. + SYNC_BACKOFF_STEP = 12*60 + + # How long to wait for an answer to + # peer path requests before deferring + # sync to later. + PATH_REQUEST_GRACE = 7.5 + @staticmethod def from_bytes(peer_bytes, router): dictionary = msgpack.unpackb(peer_bytes) @@ -66,6 +77,9 @@ class LXMPeer: def __init__(self, router, destination_hash): self.alive = False self.last_heard = 0 + self.next_sync_attempt = 0 + self.last_sync_attempt = 0 + self.sync_backoff = 0 self.peering_timebase = 0 self.link = None @@ -80,48 +94,61 @@ class LXMPeer: self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") def sync(self): + RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) + self.last_sync_attempt = time.time() - if not RNS.Transport.has_path(self.destination_hash): - RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG) - RNS.Transport.request_path(self.destination_hash) - RNS.log("Path requested, retrying sync later", RNS.LOG_DEBUG) - - else: - if self.identity == None: - self.identity = RNS.Identity.recall(destination_hash) - self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + if time.time() > self.next_sync_attempt: + if not RNS.Transport.has_path(self.destination_hash): + RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG) + RNS.Transport.request_path(self.destination_hash) + time.sleep(LXMPeer.PATH_REQUEST_GRACE) - if self.identity != None: - if len(self.unhandled_messages) > 0: - if self.state == LXMPeer.IDLE: - RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG) - self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed) - self.state = LXMPeer.LINK_ESTABLISHING - - else: - if self.state == LXMPeer.LINK_READY: - self.alive = True - self.last_heard = time.time() - - RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) - unhandled_ids = [] - purged_ids = [] - for transient_id in self.unhandled_messages: - if transient_id in self.router.propagation_entries: - unhandled_ids.append(transient_id) - else: - purged_ids.append(transient_id) - - for transient_id in purged_ids: - RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) - self.unhandled_messages.pop(transient_id) - - RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) - self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) - self.state = LXMPeer.REQUEST_SENT + if not RNS.Transport.has_path(self.destination_hash): + RNS.log("Path request was not answered, retrying sync with peer "+RNS.prettyhexrep(self.destination_hash)+" later", RNS.LOG_DEBUG) + else: - RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) + if self.identity == None: + self.identity = RNS.Identity.recall(destination_hash) + self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + + if self.identity != None: + if len(self.unhandled_messages) > 0: + if self.state == LXMPeer.IDLE: + RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG) + self.sync_backoff += LXMPeer.SYNC_BACKOFF_STEP + self.next_sync_attempt = time.time() + self.sync_backoff + self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed) + self.state = LXMPeer.LINK_ESTABLISHING + + else: + if self.state == LXMPeer.LINK_READY: + self.alive = True + self.last_heard = time.time() + self.sync_backoff = 0 + + RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) + unhandled_ids = [] + purged_ids = [] + for transient_id in self.unhandled_messages: + if transient_id in self.router.propagation_entries: + unhandled_ids.append(transient_id) + else: + purged_ids.append(transient_id) + + for transient_id in purged_ids: + RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) + self.unhandled_messages.pop(transient_id) + + RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) + self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) + self.state = LXMPeer.REQUEST_SENT + else: + RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) + else: + RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG) + if self.last_sync_attempt > self.last_heard: + self.alive = False def request_failed(self, request_receipt): RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) @@ -191,6 +218,7 @@ class LXMPeer: resource.transferred_messages = wanted_message_ids self.state = LXMPeer.RESOURCE_TRANSFERRING else: + RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG) self.state = LXMPeer.IDLE except Exception as e: @@ -226,6 +254,7 @@ class LXMPeer: def link_established(self, link): self.link.identify(self.router.identity) self.state = LXMPeer.LINK_READY + self.next_sync_attempt = 0 self.sync() def link_closed(self, link): diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index d1322ca..19df21d 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -910,7 +910,11 @@ class LXMRouter: def peer(self, destination_hash, timestamp): if destination_hash in self.peers: peer = self.peers[destination_hash] - peer.alive = True + if timestamp > peer.peering_timebase: + peer.alive = True + peer.sync_backoff = 0 + peer.next_sync_attempt = 0 + peer.peering_timebase = timestamp peer.last_heard = time.time() else: @@ -934,18 +938,33 @@ class LXMRouter: def sync_peers(self): culled_peers = [] waiting_peers = [] + unresponsive_peers = [] for peer_id in self.peers: peer = self.peers[peer_id] if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: culled_peers.append(peer_id) else: if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: - waiting_peers.append(peer) + if peer.alive: + waiting_peers.append(peer) + else: + if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt: + unresponsive_peers.append(peer) + else: + pass + # RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG) + peer_pool = [] if len(waiting_peers) > 0: RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG) - selected_index = random.randint(0,len(waiting_peers)-1) - selected_peer = waiting_peers[selected_index] + peer_pool = waiting_peers + elif len(unresponsive_peers) > 0: + RNS.log("No active peers available, randomly selecting peer to sync from "+str(len(unresponsive_peers))+" unresponsive peers.", RNS.LOG_DEBUG) + peer_pool = unresponsive_peers + + if len(peer_pool) > 0: + selected_index = random.randint(0,len(peer_pool)-1) + selected_peer = peer_pool[selected_index] RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG) selected_peer.sync() diff --git a/LXMF/_version.py b/LXMF/_version.py index 01ef120..6cd38b7 100644 --- a/LXMF/_version.py +++ b/LXMF/_version.py @@ -1 +1 @@ -__version__ = "0.2.6" +__version__ = "0.2.7"