diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index c2d7f8d..dd0c86e 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -637,8 +637,16 @@ class LXMPeer: else: if self.state == LXMPeer.LINK_READY: unhandled_ids = [] + purged_ids = [] for transient_id in self.unhandled_messages: - unhandled_ids.append(transient_id) + if transient_id in self.router.propagation_entries: + unhandled_ids.append(transient_id) + else: + purged_ids.append(transient_id) + + for transient_id in purged_ids: + RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_WARNING) + self.unhandled_messages.pop(transient_id) RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) @@ -763,6 +771,8 @@ class LXMRouter: PATH_REQUEST_WAIT = 5 LINK_MAX_INACTIVITY = 10*60 + MESSAGE_EXPIRY = 30*24*60*60 + AUTOPEER = True AUTOPEER_MAXDEPTH = 4 @@ -834,6 +844,7 @@ class LXMRouter: data = locally_delivered_file.read() self.locally_delivered_transient_ids = msgpack.unpackb(data) locally_delivered_file.close() + self.clean_transient_id_cache() except Exception as e: RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -918,7 +929,7 @@ class LXMRouter: def request_messages_from_propagation_node(self, identity, max_messages = PR_ALL_MESSAGES): if max_messages == None: max_messages = LXMRouter.PR_ALL_MESSAGES - + self.propagation_transfer_max_messages = max_messages if self.outbound_propagation_node != None: self.propagation_transfer_progress = 0.0 @@ -1408,21 +1419,29 @@ class LXMRouter: self.jobs() time.sleep(LXMRouter.PROCESSING_INTERVAL) - JOB_OUTBOUND_INTERVAL = 1 - JOB_LINKS_INTERVAL = 1 - JOB_PEERSYNC_INTERVAL = 12 + JOB_OUTBOUND_INTERVAL = 1 + JOB_LINKS_INTERVAL = 1 + JOB_TRANSIENT_INTERVAL = 60 + JOB_STORE_INTERVAL = 120 + JOB_PEERSYNC_INTERVAL = 12 def jobs(self): + self.processing_count += 1 + if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: self.process_outbound() if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: self.clean_links() + if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0: + self.clean_transient_id_cache() + + if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: + self.clean_message_store() + if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: self.sync_peers() - self.processing_count += 1 - def clean_links(self): closed_links = [] @@ -1443,6 +1462,53 @@ class LXMRouter: self.acknowledge_sync_completion() RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG) + def clean_transient_id_cache(self): + # TODO: Remove + RNS.log("Cleaning transient id cache") + + now = time.time() + removed_entries = [] + for transient_id in self.locally_delivered_transient_ids: + timestamp = self.locally_delivered_transient_ids[transient_id] + if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.1: + removed_entries.append(transient_id) + + for transient_id in removed_entries: + self.locally_delivered_transient_ids.pop(transient_id) + RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG) + + + def clean_message_store(self): + now = time.time() + removed_entries = {} + for transient_id in self.propagation_entries: + entry = self.propagation_entries[transient_id] + filepath = entry[1] + components = filepath.split("_") + + if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2: + timestamp = float(components[1]) + if now > timestamp+LXMRouter.MESSAGE_EXPIRY: + RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG) + removed_entries[transient_id] = filepath + else: + RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING) + removed_entries[transient_id] = filepath + + removed_count = 0 + for transient_id in removed_entries: + try: + filepath = removed_entries[transient_id] + self.propagation_entries.pop(transient_id) + if os.path.isfile(filepath): + os.unlink(filepath) + removed_count += 1 + except Exception as e: + RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) + + if removed_count > 0: + RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG) + def sync_peers(self): waiting_peers = []