From c21da895b6510952e1ae3771c13c84b5b3b04ba4 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 23 Nov 2024 13:20:24 +0100 Subject: [PATCH] Improved duplicate message detection when syncing from multiple different PNs --- LXMF/LXMRouter.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 068dc44..743cfa8 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -351,6 +351,7 @@ class LXMRouter: if self.outbound_propagation_node != None: if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.ACTIVE: self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHED + RNS.log("Requesting message list from propagation node", RNS.LOG_DEBUG) self.outbound_propagation_link.identify(identity) self.outbound_propagation_link.request( LXMPeer.MESSAGE_GET_PATH, @@ -643,7 +644,7 @@ class LXMRouter: removed_entries = [] for transient_id in self.locally_delivered_transient_ids: timestamp = self.locally_delivered_transient_ids[transient_id] - if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.25: + if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0: removed_entries.append(transient_id) for transient_id in removed_entries: @@ -653,7 +654,7 @@ class LXMRouter: removed_entries = [] for transient_id in self.locally_processed_transient_ids: timestamp = self.locally_processed_transient_ids[transient_id] - if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.25: + if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0: removed_entries.append(transient_id) for transient_id in removed_entries: @@ -854,9 +855,8 @@ class LXMRouter: if not os.path.isdir(self.storagepath): os.makedirs(self.storagepath) - locally_delivered_file = open(self.storagepath+"/local_deliveries", "wb") - locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) - locally_delivered_file.close() + with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file: + locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) except Exception as e: RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -866,9 +866,8 @@ class LXMRouter: if not os.path.isdir(self.storagepath): os.makedirs(self.storagepath) - locally_processed_file = open(self.storagepath+"/locally_processed", "wb") - locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) - locally_processed_file.close() + with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file: + locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) except Exception as e: RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -1124,24 +1123,32 @@ class LXMRouter: wants = [] if len(request_receipt.response) > 0: for transient_id in request_receipt.response: - if not self.retain_synced_on_node and self.has_message(transient_id): - haves.append(transient_id) + if self.has_message(transient_id): + if not self.retain_synced_on_node: + haves.append(transient_id) else: if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages: wants.append(transient_id) + ms = "" if len(wants) == 1 else "s" + RNS.log(f"Requesting {len(wants)} message{ms} from propagation node", RNS.LOG_DEBUG) request_receipt.link.request( LXMPeer.MESSAGE_GET_PATH, [wants, haves, self.delivery_per_transfer_limit], response_callback=self.message_get_response, failed_callback=self.message_get_failed, - progress_callback=self.message_get_progress - ) + progress_callback=self.message_get_progress) + else: self.propagation_transfer_state = LXMRouter.PR_COMPLETE self.propagation_transfer_progress = 1.0 self.propagation_transfer_last_result = 0 + else: + RNS.log("Invalid message list data received from propagation node", RNS.LOG_DEBUG) + if self.outbound_propagation_link != None: + self.outbound_propagation_link.teardown() + def message_get_response(self, request_receipt): if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG)