From 1d651a9b532bacfc11efe51d002aa8f39ce7e80d Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 2 Mar 2024 09:09:51 +0100 Subject: [PATCH] Added transfer limit awareness to message sync. Added ability to retain messages on node. --- LXMF/LXMPeer.py | 3 ++- LXMF/LXMRouter.py | 36 ++++++++++++++++++++++++++++++------ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index b1a49af..a0b61a5 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -169,7 +169,8 @@ class LXMPeer: transient_id = unhandled_entry[0] weight = unhandled_entry[1] lxm_size = unhandled_entry[2] - if self.propagation_transfer_limit != None and cumulative_size + lxm_size > (self.propagation_transfer_limit*1000): + next_size = cumulative_size + (lxm_size+per_message_overhead) + if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000): pass else: cumulative_size += (lxm_size+per_message_overhead) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index cedc2ea..d3a4b7d 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -72,6 +72,7 @@ class LXMRouter: self.ignored_list = [] self.allowed_list = [] self.auth_required = False + self.retain_synced_on_node = False self.processing_outbound = False self.processing_inbound = False @@ -200,6 +201,12 @@ class LXMRouter: def get_outbound_propagation_node(self): return self.outbound_propagation_node + def set_retain_node_lxms(self, retain): + if retain == True: + self.retain_synced_on_node = True + else: + self.retain_synced_on_node = False + def set_authentication(self, required=None): if required != None: self.auth_required = required @@ -247,7 +254,7 @@ class LXMRouter: self.outbound_propagation_link.identify(identity) self.outbound_propagation_link.request( LXMPeer.MESSAGE_GET_PATH, - [None, None], + [None, None], # Set both want and have fields to None to get message list response_callback=self.message_list_response, failed_callback=self.message_get_failed ) @@ -633,8 +640,7 @@ class LXMRouter: finally: i += 1 - RNS.log("LXMF message store size is now "+RNS.prettysize(self.message_storage_size()), RNS.LOG_EXTREME) - RNS.log("PE len "+str(len(self.propagation_entries))) + RNS.log("LXMF message store size is now "+RNS.prettysize(self.message_storage_size())+" for "+str(len(self.propagation_entries))+" items", RNS.LOG_EXTREME) except Exception as e: @@ -765,6 +771,16 @@ class LXMRouter: # Process wanted messages response_messages = [] if data[0] != None and len(data[0]) > 0: + client_transfer_limit = None + if len(data) >= 3: + try: + client_transfer_limit = float(data[2])*1000 + RNS.log("Client indicates transfer limit of "+RNS.prettysize(client_transfer_limit), RNS.LOG_DEBUG) + except: + pass + + per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now + cumulative_size = 24 # Initialised to highest reasonable binary structure overhead for transient_id in data[0]: if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash: try: @@ -773,9 +789,17 @@ class LXMRouter: message_file = open(filepath, "rb") lxmf_data = message_file.read() - response_messages.append(lxmf_data) message_file.close() + lxm_size = len(lxmf_data) + next_size = cumulative_size + (lxm_size+per_message_overhead) + + if client_transfer_limit != None and next_size > client_transfer_limit: + pass + else: + response_messages.append(lxmf_data) + cumulative_size += (lxm_size+per_message_overhead) + except Exception as e: RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -805,7 +829,7 @@ class LXMRouter: wants = [] if len(request_receipt.response) > 0: for transient_id in request_receipt.response: - if self.has_message(transient_id): + if not self.retain_synced_on_node and self.has_message(transient_id): haves.append(transient_id) else: if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages: @@ -813,7 +837,7 @@ class LXMRouter: request_receipt.link.request( LXMPeer.MESSAGE_GET_PATH, - [wants, haves], + [wants, haves, self.delivery_per_transfer_limit], response_callback=self.message_get_response, failed_callback=self.message_get_failed, progress_callback=self.message_get_progress