Implemented storage limit and weight-based message store culling

This commit is contained in:
Mark Qvist 2022-06-17 12:37:00 +02:00
parent ca84f4c8fe
commit 3dfb05a25c

View File

@ -70,6 +70,9 @@ class LXMRouter:
self.outbound_propagation_node = None
self.outbound_propagation_link = None
self.message_storage_limit = None
self.information_storage_limit = None
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
self.propagation_transfer_state = LXMRouter.PR_IDLE
@ -223,15 +226,25 @@ class LXMRouter:
if len(components) == 2:
if float(components[1]) > 0:
if len(components[0]) == RNS.Identity.HASHLENGTH//8*2:
transient_id = bytes.fromhex(components[0])
received = components[1]
try:
transient_id = bytes.fromhex(components[0])
received = float(components[1])
filepath = self.messagepath+"/"+filename
file = open(filepath, "rb")
destination_hash = file.read(LXMessage.DESTINATION_LENGTH)
file.close()
filepath = self.messagepath+"/"+filename
msg_size = os.path.getsize(filepath)
file = open(filepath, "rb")
destination_hash = file.read(LXMessage.DESTINATION_LENGTH)
file.close()
self.propagation_entries[transient_id] = [destination_hash, filepath]
self.propagation_entries[transient_id] = [
destination_hash,
filepath,
received,
msg_size,
]
except Exception as e:
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
if os.path.isfile(self.storagepath+"/peers"):
peers_file = open(self.storagepath+"/peers", "rb")
@ -256,6 +269,8 @@ class LXMRouter:
self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)
RNS.log("LXMF Propagation Node message store size is "+RNS.prettysize(self.message_storage_size()), RNS.LOG_DEBUG)
self.announce_propagation_node()
except Exception as e:
@ -275,17 +290,73 @@ class LXMRouter:
if destination_hash in self.ignored_list:
self.ignored_list.remove(destination_hash)
def set_message_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None):
limit_bytes = 0
if kilobytes != None:
limit_bytes += gigabytes*1000
if megabytes != None:
limit_bytes += gigabytes*1000*1000
if gigabytes != None:
limit_bytes += gigabytes*1000*1000*1000
if limit_bytes == 0:
limit_bytes = None
try:
if limit_bytes == None or int(limit_bytes) > 0:
self.message_storage_limit = int(limit_bytes)
else:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
except Exception as e:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
def message_storage_limit(self):
return self.message_storage_limit
def message_storage_size(self):
if self.propagation_node:
return sum(self.propagation_entries[f][3] for f in self.propagation_entries)
else:
return None
def set_information_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None):
limit_bytes = 0
if kilobytes != None:
limit_bytes += gigabytes*1000
if megabytes != None:
limit_bytes += gigabytes*1000*1000
if gigabytes != None:
limit_bytes += gigabytes*1000*1000*1000
if limit_bytes == 0:
limit_bytes = None
try:
if limit_bytes == None or int(limit_bytes) > 0:
self.information_storage_limit = int(limit_bytes)
else:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
except Exception as e:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
def information_storage_limit(self):
return self.information_storage_limit
def information_storage_size(self):
pass
### Utility & Maintenance #############################
#######################################################
def jobloop(self):
while (True):
# TODO: Improve this to scheduling, so manual
# triggers can delay next run
self.jobs()
time.sleep(LXMRouter.PROCESSING_INTERVAL)
JOB_OUTBOUND_INTERVAL = 1
JOB_LINKS_INTERVAL = 1
JOB_TRANSIENT_INTERVAL = 60
@ -309,6 +380,13 @@ class LXMRouter:
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
self.sync_peers()
def jobloop(self):
while (True):
# TODO: Improve this to scheduling, so manual
# triggers can delay next run
self.jobs()
time.sleep(LXMRouter.PROCESSING_INTERVAL)
def clean_links(self):
closed_links = []
for link_hash in self.direct_links:
@ -341,6 +419,7 @@ class LXMRouter:
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG)
def clean_message_store(self):
# Check and remove expired messages
now = time.time()
removed_entries = {}
for transient_id in self.propagation_entries:
@ -371,6 +450,58 @@ class LXMRouter:
if removed_count > 0:
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG)
# Check size of message store and cull if needed
try:
message_storage_size = self.message_storage_size()
if message_storage_size != None:
if self.message_storage_limit != None and message_storage_size > self.message_storage_limit:
# Clean the message storage according to priorities
bytes_needed = message_storage_size - self.message_storage_limit
bytes_cleaned = 0
now = time.time()
weighted_entries = []
for transient_id in self.propagation_entries:
entry = self.propagation_entries[transient_id]
lxm_rcvd = entry[2]
lxm_size = entry[3]
age_weight = max(1, (now - lxm_rcvd)/60/60/24/4)
weight = age_weight * lxm_size
weighted_entries.append([entry, weight, transient_id])
weighted_entries.sort(key=lambda we: we[1], reverse=True)
i = 0
while i < len(weighted_entries) and bytes_cleaned < bytes_needed:
try:
w = weighted_entries[i]
entry = w[0]
transient_id = w[2]
filepath = entry[1]
if os.path.isfile(filepath):
os.unlink(filepath)
self.propagation_entries.pop(transient_id)
bytes_cleaned += entry[3]
RNS.log("Removed "+RNS.prettyhexrep(transient_id)+" with weight "+str(w[1])+" to clear up "+RNS.prettysize(entry[3])+", now cleaned "+RNS.prettysize(bytes_cleaned)+" out of "+RNS.prettysize(bytes_needed)+" needed", RNS.LOG_EXTREME)
except Exception as e:
RNS.log("Error while cleaning LXMF message from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
finally:
i += 1
RNS.log("LXMF message store size is now "+RNS.prettysize(self.message_storage_size()), RNS.LOG_EXTREME)
RNS.log("PE len "+str(len(self.propagation_entries)))
except Exception as e:
RNS.log("Could not clean the LXMF message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
def exit_handler(self):
if self.propagation_node:
try:
@ -811,7 +942,7 @@ class LXMRouter:
msg_file.write(lxmf_data)
msg_file.close()
self.propagation_entries[transient_id] = [destination_hash, file_path]
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data)]
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG)
for peer_id in self.peers: