diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index 1dc19c7..1f3dc82 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -627,42 +627,47 @@ class LXMPeer: def sync(self, initiator=True): RNS.log("Attempting sync to peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) - if RNS.Transport.has_path(self.destination_hash): - RNS.log("Path to peer "+RNS.prettyhexrep(self.destination_hash)+" exist over "+str(RNS.Transport.hops_to(self.destination_hash))+" via "+str(RNS.Transport.next_hop_interface(self.destination_hash)), RNS.LOG_DEBUG) + if not RNS.Transport.has_path(self.destination_hash): + RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG) + RNS.Transport.request_path(self.destination_hash) + RNS.log("Path requested, retrying sync later", RNS.LOG_DEBUG) else: - RNS.log("Attempting sync to peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) + RNS.log("Path to peer "+RNS.prettyhexrep(self.destination_hash)+" exist over "+str(RNS.Transport.hops_to(self.destination_hash))+" hops via "+str(RNS.Transport.next_hop_interface(self.destination_hash)), RNS.LOG_DEBUG) - if self.identity == None: - RNS.log("Attempting to recall identity for peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) - self.identity = RNS.Identity.recall(destination_hash) - self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + if self.identity == None: + RNS.log("Attempting to recall identity for peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) + self.identity = RNS.Identity.recall(destination_hash) + self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - if self.identity != None: - if len(self.unhandled_messages) > 0: - if self.state == LXMPeer.IDLE: - RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG) - self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed) - self.state = LXMPeer.LINK_ESTABLISHING - else: - if self.state == LXMPeer.LINK_READY: - RNS.log("Sync link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) - unhandled_ids = [] - purged_ids = [] - for transient_id in self.unhandled_messages: - if transient_id in self.router.propagation_entries: - unhandled_ids.append(transient_id) - else: - purged_ids.append(transient_id) + if self.identity != None: + if len(self.unhandled_messages) > 0: + if self.state == LXMPeer.IDLE: + RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG) + self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed) + self.state = LXMPeer.LINK_ESTABLISHING - for transient_id in purged_ids: - RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) - self.unhandled_messages.pop(transient_id) + # TODO: Remove + RNS.log("Establishment timeout is "+str(self.link.establishment_timeout)) + else: + if self.state == LXMPeer.LINK_READY: + RNS.log("Sync link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) + unhandled_ids = [] + purged_ids = [] + for transient_id in self.unhandled_messages: + if transient_id in self.router.propagation_entries: + unhandled_ids.append(transient_id) + else: + purged_ids.append(transient_id) - RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) - self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) - self.state = LXMPeer.REQUEST_SENT - else: - RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) + for transient_id in purged_ids: + RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) + self.unhandled_messages.pop(transient_id) + + RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) + self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) + self.state = LXMPeer.REQUEST_SENT + else: + RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) def request_failed(self, request_receipt): RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) @@ -744,6 +749,8 @@ class LXMPeer: message = self.unhandled_messages.pop(transient_id) self.handled_messages[transient_id] = message self.state = LXMPeer.IDLE + self.link.teardown() + RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed", RNS.LOG_DEBUG) else: RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG) if self.link != None: @@ -759,6 +766,9 @@ class LXMPeer: self.sync() def link_closed(self, link): + # TODO: Remove + RNS.log("The sync link to peer "+RNS.prettyhexrep(self.destination_hash)+" was closed: "+str(link.teardown_reason), RNS.LOG_DEBUG) + self.link = None self.state = LXMPeer.IDLE @@ -1169,15 +1179,18 @@ class LXMRouter: if os.path.isfile(self.storagepath+"/peers"): peers_file = open(self.storagepath+"/peers", "rb") - serialised_peers = msgpack.unpackb(peers_file.read()) + peers_data = peers_file.read() - for serialised_peer in serialised_peers: - peer = LXMPeer.from_bytes(serialised_peer, self) - if peer.identity != None: - self.peers[peer.destination_hash] = peer - RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages", RNS.LOG_DEBUG) - else: - RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) + if len(peers_data) > 0: + serialised_peers = msgpack.unpackb(peers_data) + + for serialised_peer in serialised_peers: + peer = LXMPeer.from_bytes(serialised_peer, self) + if peer.identity != None: + self.peers[peer.destination_hash] = peer + RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages", RNS.LOG_DEBUG) + else: + RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) self.propagation_node = True @@ -1191,6 +1204,7 @@ class LXMRouter: except Exception as e: RNS.log("Could not enable propagation node. The contained exception was: "+str(e), RNS.LOG_ERROR) + raise e RNS.panic() def disable_propagation(self):