Improved duplicate message detection when syncing from multiple different PNs

This commit is contained in:
Mark Qvist 2024-11-23 13:20:24 +01:00
parent b172c7fcd4
commit c21da895b6

View File

@ -351,6 +351,7 @@ class LXMRouter:
if self.outbound_propagation_node != None: if self.outbound_propagation_node != None:
if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.ACTIVE: if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.ACTIVE:
self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHED self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHED
RNS.log("Requesting message list from propagation node", RNS.LOG_DEBUG)
self.outbound_propagation_link.identify(identity) self.outbound_propagation_link.identify(identity)
self.outbound_propagation_link.request( self.outbound_propagation_link.request(
LXMPeer.MESSAGE_GET_PATH, LXMPeer.MESSAGE_GET_PATH,
@ -643,7 +644,7 @@ class LXMRouter:
removed_entries = [] removed_entries = []
for transient_id in self.locally_delivered_transient_ids: for transient_id in self.locally_delivered_transient_ids:
timestamp = self.locally_delivered_transient_ids[transient_id] timestamp = self.locally_delivered_transient_ids[transient_id]
if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.25: if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0:
removed_entries.append(transient_id) removed_entries.append(transient_id)
for transient_id in removed_entries: for transient_id in removed_entries:
@ -653,7 +654,7 @@ class LXMRouter:
removed_entries = [] removed_entries = []
for transient_id in self.locally_processed_transient_ids: for transient_id in self.locally_processed_transient_ids:
timestamp = self.locally_processed_transient_ids[transient_id] timestamp = self.locally_processed_transient_ids[transient_id]
if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.25: if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0:
removed_entries.append(transient_id) removed_entries.append(transient_id)
for transient_id in removed_entries: for transient_id in removed_entries:
@ -854,9 +855,8 @@ class LXMRouter:
if not os.path.isdir(self.storagepath): if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath) os.makedirs(self.storagepath)
locally_delivered_file = open(self.storagepath+"/local_deliveries", "wb") with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file:
locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids))
locally_delivered_file.close()
except Exception as e: except Exception as e:
RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -866,9 +866,8 @@ class LXMRouter:
if not os.path.isdir(self.storagepath): if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath) os.makedirs(self.storagepath)
locally_processed_file = open(self.storagepath+"/locally_processed", "wb") with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file:
locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids))
locally_processed_file.close()
except Exception as e: except Exception as e:
RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -1124,24 +1123,32 @@ class LXMRouter:
wants = [] wants = []
if len(request_receipt.response) > 0: if len(request_receipt.response) > 0:
for transient_id in request_receipt.response: for transient_id in request_receipt.response:
if not self.retain_synced_on_node and self.has_message(transient_id): if self.has_message(transient_id):
haves.append(transient_id) if not self.retain_synced_on_node:
haves.append(transient_id)
else: else:
if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages: if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages:
wants.append(transient_id) wants.append(transient_id)
ms = "" if len(wants) == 1 else "s"
RNS.log(f"Requesting {len(wants)} message{ms} from propagation node", RNS.LOG_DEBUG)
request_receipt.link.request( request_receipt.link.request(
LXMPeer.MESSAGE_GET_PATH, LXMPeer.MESSAGE_GET_PATH,
[wants, haves, self.delivery_per_transfer_limit], [wants, haves, self.delivery_per_transfer_limit],
response_callback=self.message_get_response, response_callback=self.message_get_response,
failed_callback=self.message_get_failed, failed_callback=self.message_get_failed,
progress_callback=self.message_get_progress progress_callback=self.message_get_progress)
)
else: else:
self.propagation_transfer_state = LXMRouter.PR_COMPLETE self.propagation_transfer_state = LXMRouter.PR_COMPLETE
self.propagation_transfer_progress = 1.0 self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = 0 self.propagation_transfer_last_result = 0
else:
RNS.log("Invalid message list data received from propagation node", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
def message_get_response(self, request_receipt): def message_get_response(self, request_receipt):
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG) RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG)