Implemented message store cleaning jobs

This commit is contained in:
Mark Qvist 2021-10-08 13:03:14 +02:00
parent 25bcba84d2
commit 0c34dae1af

View File

@ -637,8 +637,16 @@ class LXMPeer:
else:
if self.state == LXMPeer.LINK_READY:
unhandled_ids = []
purged_ids = []
for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries:
unhandled_ids.append(transient_id)
else:
purged_ids.append(transient_id)
for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_WARNING)
self.unhandled_messages.pop(transient_id)
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
@ -763,6 +771,8 @@ class LXMRouter:
PATH_REQUEST_WAIT = 5
LINK_MAX_INACTIVITY = 10*60
MESSAGE_EXPIRY = 30*24*60*60
AUTOPEER = True
AUTOPEER_MAXDEPTH = 4
@ -834,6 +844,7 @@ class LXMRouter:
data = locally_delivered_file.read()
self.locally_delivered_transient_ids = msgpack.unpackb(data)
locally_delivered_file.close()
self.clean_transient_id_cache()
except Exception as e:
RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -1410,19 +1421,27 @@ class LXMRouter:
JOB_OUTBOUND_INTERVAL = 1
JOB_LINKS_INTERVAL = 1
JOB_TRANSIENT_INTERVAL = 60
JOB_STORE_INTERVAL = 120
JOB_PEERSYNC_INTERVAL = 12
def jobs(self):
self.processing_count += 1
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
self.process_outbound()
if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
self.clean_links()
if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0:
self.clean_transient_id_cache()
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
self.clean_message_store()
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
self.sync_peers()
self.processing_count += 1
def clean_links(self):
closed_links = []
@ -1443,6 +1462,53 @@ class LXMRouter:
self.acknowledge_sync_completion()
RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG)
def clean_transient_id_cache(self):
# TODO: Remove
RNS.log("Cleaning transient id cache")
now = time.time()
removed_entries = []
for transient_id in self.locally_delivered_transient_ids:
timestamp = self.locally_delivered_transient_ids[transient_id]
if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.1:
removed_entries.append(transient_id)
for transient_id in removed_entries:
self.locally_delivered_transient_ids.pop(transient_id)
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG)
def clean_message_store(self):
now = time.time()
removed_entries = {}
for transient_id in self.propagation_entries:
entry = self.propagation_entries[transient_id]
filepath = entry[1]
components = filepath.split("_")
if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2:
timestamp = float(components[1])
if now > timestamp+LXMRouter.MESSAGE_EXPIRY:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG)
removed_entries[transient_id] = filepath
else:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
removed_entries[transient_id] = filepath
removed_count = 0
for transient_id in removed_entries:
try:
filepath = removed_entries[transient_id]
self.propagation_entries.pop(transient_id)
if os.path.isfile(filepath):
os.unlink(filepath)
removed_count += 1
except Exception as e:
RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
if removed_count > 0:
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG)
def sync_peers(self):
waiting_peers = []