Added transfer limit awareness to message sync. Added ability to retain messages on node.

This commit is contained in:
Mark Qvist 2024-03-02 09:09:51 +01:00
parent 22493005dc
commit 1d651a9b53
2 changed files with 32 additions and 7 deletions

View File

@ -169,7 +169,8 @@ class LXMPeer:
transient_id = unhandled_entry[0]
weight = unhandled_entry[1]
lxm_size = unhandled_entry[2]
if self.propagation_transfer_limit != None and cumulative_size + lxm_size > (self.propagation_transfer_limit*1000):
next_size = cumulative_size + (lxm_size+per_message_overhead)
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
pass
else:
cumulative_size += (lxm_size+per_message_overhead)

View File

@ -72,6 +72,7 @@ class LXMRouter:
self.ignored_list = []
self.allowed_list = []
self.auth_required = False
self.retain_synced_on_node = False
self.processing_outbound = False
self.processing_inbound = False
@ -200,6 +201,12 @@ class LXMRouter:
def get_outbound_propagation_node(self):
return self.outbound_propagation_node
def set_retain_node_lxms(self, retain):
if retain == True:
self.retain_synced_on_node = True
else:
self.retain_synced_on_node = False
def set_authentication(self, required=None):
if required != None:
self.auth_required = required
@ -247,7 +254,7 @@ class LXMRouter:
self.outbound_propagation_link.identify(identity)
self.outbound_propagation_link.request(
LXMPeer.MESSAGE_GET_PATH,
[None, None],
[None, None], # Set both want and have fields to None to get message list
response_callback=self.message_list_response,
failed_callback=self.message_get_failed
)
@ -633,8 +640,7 @@ class LXMRouter:
finally:
i += 1
RNS.log("LXMF message store size is now "+RNS.prettysize(self.message_storage_size()), RNS.LOG_EXTREME)
RNS.log("PE len "+str(len(self.propagation_entries)))
RNS.log("LXMF message store size is now "+RNS.prettysize(self.message_storage_size())+" for "+str(len(self.propagation_entries))+" items", RNS.LOG_EXTREME)
except Exception as e:
@ -765,6 +771,16 @@ class LXMRouter:
# Process wanted messages
response_messages = []
if data[0] != None and len(data[0]) > 0:
client_transfer_limit = None
if len(data) >= 3:
try:
client_transfer_limit = float(data[2])*1000
RNS.log("Client indicates transfer limit of "+RNS.prettysize(client_transfer_limit), RNS.LOG_DEBUG)
except:
pass
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
for transient_id in data[0]:
if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash:
try:
@ -773,9 +789,17 @@ class LXMRouter:
message_file = open(filepath, "rb")
lxmf_data = message_file.read()
response_messages.append(lxmf_data)
message_file.close()
lxm_size = len(lxmf_data)
next_size = cumulative_size + (lxm_size+per_message_overhead)
if client_transfer_limit != None and next_size > client_transfer_limit:
pass
else:
response_messages.append(lxmf_data)
cumulative_size += (lxm_size+per_message_overhead)
except Exception as e:
RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -805,7 +829,7 @@ class LXMRouter:
wants = []
if len(request_receipt.response) > 0:
for transient_id in request_receipt.response:
if self.has_message(transient_id):
if not self.retain_synced_on_node and self.has_message(transient_id):
haves.append(transient_id)
else:
if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages:
@ -813,7 +837,7 @@ class LXMRouter:
request_receipt.link.request(
LXMPeer.MESSAGE_GET_PATH,
[wants, haves],
[wants, haves, self.delivery_per_transfer_limit],
response_callback=self.message_get_response,
failed_callback=self.message_get_failed,
progress_callback=self.message_get_progress