Improved peer sync sequence

This commit is contained in:
Mark Qvist 2021-10-08 17:05:11 +02:00
parent 2a6c602e9b
commit 831e91a87a

View File

@ -627,42 +627,47 @@ class LXMPeer:
def sync(self, initiator=True): def sync(self, initiator=True):
RNS.log("Attempting sync to peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) RNS.log("Attempting sync to peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
if RNS.Transport.has_path(self.destination_hash): if not RNS.Transport.has_path(self.destination_hash):
RNS.log("Path to peer "+RNS.prettyhexrep(self.destination_hash)+" exist over "+str(RNS.Transport.hops_to(self.destination_hash))+" via "+str(RNS.Transport.next_hop_interface(self.destination_hash)), RNS.LOG_DEBUG) RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.destination_hash)
RNS.log("Path requested, retrying sync later", RNS.LOG_DEBUG)
else: else:
RNS.log("Attempting sync to peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) RNS.log("Path to peer "+RNS.prettyhexrep(self.destination_hash)+" exist over "+str(RNS.Transport.hops_to(self.destination_hash))+" hops via "+str(RNS.Transport.next_hop_interface(self.destination_hash)), RNS.LOG_DEBUG)
if self.identity == None: if self.identity == None:
RNS.log("Attempting to recall identity for peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) RNS.log("Attempting to recall identity for peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
self.identity = RNS.Identity.recall(destination_hash) self.identity = RNS.Identity.recall(destination_hash)
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
if self.identity != None: if self.identity != None:
if len(self.unhandled_messages) > 0: if len(self.unhandled_messages) > 0:
if self.state == LXMPeer.IDLE: if self.state == LXMPeer.IDLE:
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG) RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed) self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed)
self.state = LXMPeer.LINK_ESTABLISHING self.state = LXMPeer.LINK_ESTABLISHING
else:
if self.state == LXMPeer.LINK_READY:
RNS.log("Sync link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
unhandled_ids = []
purged_ids = []
for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries:
unhandled_ids.append(transient_id)
else:
purged_ids.append(transient_id)
for transient_id in purged_ids: # TODO: Remove
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) RNS.log("Establishment timeout is "+str(self.link.establishment_timeout))
self.unhandled_messages.pop(transient_id) else:
if self.state == LXMPeer.LINK_READY:
RNS.log("Sync link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
unhandled_ids = []
purged_ids = []
for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries:
unhandled_ids.append(transient_id)
else:
purged_ids.append(transient_id)
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) for transient_id in purged_ids:
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.state = LXMPeer.REQUEST_SENT self.unhandled_messages.pop(transient_id)
else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT
else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
def request_failed(self, request_receipt): def request_failed(self, request_receipt):
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
@ -744,6 +749,8 @@ class LXMPeer:
message = self.unhandled_messages.pop(transient_id) message = self.unhandled_messages.pop(transient_id)
self.handled_messages[transient_id] = message self.handled_messages[transient_id] = message
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
self.link.teardown()
RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed", RNS.LOG_DEBUG)
else: else:
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG) RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
if self.link != None: if self.link != None:
@ -759,6 +766,9 @@ class LXMPeer:
self.sync() self.sync()
def link_closed(self, link): def link_closed(self, link):
# TODO: Remove
RNS.log("The sync link to peer "+RNS.prettyhexrep(self.destination_hash)+" was closed: "+str(link.teardown_reason), RNS.LOG_DEBUG)
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
@ -1169,15 +1179,18 @@ class LXMRouter:
if os.path.isfile(self.storagepath+"/peers"): if os.path.isfile(self.storagepath+"/peers"):
peers_file = open(self.storagepath+"/peers", "rb") peers_file = open(self.storagepath+"/peers", "rb")
serialised_peers = msgpack.unpackb(peers_file.read()) peers_data = peers_file.read()
for serialised_peer in serialised_peers: if len(peers_data) > 0:
peer = LXMPeer.from_bytes(serialised_peer, self) serialised_peers = msgpack.unpackb(peers_data)
if peer.identity != None:
self.peers[peer.destination_hash] = peer for serialised_peer in serialised_peers:
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages", RNS.LOG_DEBUG) peer = LXMPeer.from_bytes(serialised_peer, self)
else: if peer.identity != None:
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) self.peers[peer.destination_hash] = peer
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages", RNS.LOG_DEBUG)
else:
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
self.propagation_node = True self.propagation_node = True
@ -1191,6 +1204,7 @@ class LXMRouter:
except Exception as e: except Exception as e:
RNS.log("Could not enable propagation node. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not enable propagation node. The contained exception was: "+str(e), RNS.LOG_ERROR)
raise e
RNS.panic() RNS.panic()
def disable_propagation(self): def disable_propagation(self):