From 25bcba84d2f270a4b62e948d8c5bdc6c01e11397 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 8 Oct 2021 11:57:52 +0200 Subject: [PATCH] Added max messages per sync option --- LXMF/LXMF.py | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index 532449b..c2d7f8d 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -625,19 +625,26 @@ class LXMPeer: self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") def sync(self, initiator=True): - if len(self.unhandled_messages) > 0: - if self.state == LXMPeer.IDLE: - self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed) - self.state = LXMPeer.LINK_ESTABLISHING - else: - if self.state == LXMPeer.LINK_READY: - unhandled_ids = [] - for transient_id in self.unhandled_messages: - unhandled_ids.append(transient_id) + if self.identity == None: + self.identity = RNS.Identity.recall(destination_hash) + self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) - self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) - self.state = LXMPeer.REQUEST_SENT + if self.identity != None: + if len(self.unhandled_messages) > 0: + if self.state == LXMPeer.IDLE: + self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed) + self.state = LXMPeer.LINK_ESTABLISHING + else: + if self.state == LXMPeer.LINK_READY: + unhandled_ids = [] + for transient_id in self.unhandled_messages: + unhandled_ids.append(transient_id) + + RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) + self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) + self.state = LXMPeer.REQUEST_SENT + else: + RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) def request_failed(self, request_receipt): RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) @@ -799,6 +806,7 @@ class LXMRouter: self.propagation_transfer_state = LXMRouter.PR_IDLE self.propagation_transfer_progress = 0.0 self.propagation_transfer_last_result = None + self.propagation_transfer_max_messages = None self.locally_delivered_transient_ids = {} if identity == None: @@ -908,6 +916,10 @@ class LXMRouter: self.wants_download_on_path_available_to = None def request_messages_from_propagation_node(self, identity, max_messages = PR_ALL_MESSAGES): + if max_messages == None: + max_messages = LXMRouter.PR_ALL_MESSAGES + + self.propagation_transfer_max_messages = max_messages if self.outbound_propagation_node != None: self.propagation_transfer_progress = 0.0 if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.ACTIVE: @@ -987,7 +999,8 @@ class LXMRouter: if self.has_message(transient_id): haves.append(transient_id) else: - wants.append(transient_id) + if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages: + wants.append(transient_id) request_receipt.link.request( LXMPeer.MESSAGE_GET_PATH,