diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 0cfdfa7..932e4cf 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -70,6 +70,9 @@ class LXMRouter: self.outbound_propagation_node = None self.outbound_propagation_link = None + self.message_storage_limit = None + self.information_storage_limit = None + self.wants_download_on_path_available_from = None self.wants_download_on_path_available_to = None self.propagation_transfer_state = LXMRouter.PR_IDLE @@ -223,15 +226,25 @@ class LXMRouter: if len(components) == 2: if float(components[1]) > 0: if len(components[0]) == RNS.Identity.HASHLENGTH//8*2: - transient_id = bytes.fromhex(components[0]) - received = components[1] + try: + transient_id = bytes.fromhex(components[0]) + received = float(components[1]) - filepath = self.messagepath+"/"+filename - file = open(filepath, "rb") - destination_hash = file.read(LXMessage.DESTINATION_LENGTH) - file.close() + filepath = self.messagepath+"/"+filename + msg_size = os.path.getsize(filepath) + file = open(filepath, "rb") + destination_hash = file.read(LXMessage.DESTINATION_LENGTH) + file.close() - self.propagation_entries[transient_id] = [destination_hash, filepath] + self.propagation_entries[transient_id] = [ + destination_hash, + filepath, + received, + msg_size, + ] + + except Exception as e: + RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) if os.path.isfile(self.storagepath+"/peers"): peers_file = open(self.storagepath+"/peers", "rb") @@ -256,6 +269,8 @@ class LXMRouter: self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL) + RNS.log("LXMF Propagation Node message store size is "+RNS.prettysize(self.message_storage_size()), RNS.LOG_DEBUG) + self.announce_propagation_node() except Exception as e: @@ -275,17 +290,73 @@ class LXMRouter: if destination_hash in self.ignored_list: self.ignored_list.remove(destination_hash) + def set_message_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None): + limit_bytes = 0 + + if kilobytes != None: + limit_bytes += gigabytes*1000 + + if megabytes != None: + limit_bytes += gigabytes*1000*1000 + + if gigabytes != None: + limit_bytes += gigabytes*1000*1000*1000 + + if limit_bytes == 0: + limit_bytes = None + + try: + if limit_bytes == None or int(limit_bytes) > 0: + self.message_storage_limit = int(limit_bytes) + else: + raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) + + except Exception as e: + raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) + + def message_storage_limit(self): + return self.message_storage_limit + + def message_storage_size(self): + if self.propagation_node: + return sum(self.propagation_entries[f][3] for f in self.propagation_entries) + else: + return None + + def set_information_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None): + limit_bytes = 0 + + if kilobytes != None: + limit_bytes += gigabytes*1000 + + if megabytes != None: + limit_bytes += gigabytes*1000*1000 + + if gigabytes != None: + limit_bytes += gigabytes*1000*1000*1000 + + if limit_bytes == 0: + limit_bytes = None + + try: + if limit_bytes == None or int(limit_bytes) > 0: + self.information_storage_limit = int(limit_bytes) + else: + raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) + + except Exception as e: + raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) + + def information_storage_limit(self): + return self.information_storage_limit + + def information_storage_size(self): + pass + ### Utility & Maintenance ############################# ####################################################### - def jobloop(self): - while (True): - # TODO: Improve this to scheduling, so manual - # triggers can delay next run - self.jobs() - time.sleep(LXMRouter.PROCESSING_INTERVAL) - JOB_OUTBOUND_INTERVAL = 1 JOB_LINKS_INTERVAL = 1 JOB_TRANSIENT_INTERVAL = 60 @@ -309,6 +380,13 @@ class LXMRouter: if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: self.sync_peers() + def jobloop(self): + while (True): + # TODO: Improve this to scheduling, so manual + # triggers can delay next run + self.jobs() + time.sleep(LXMRouter.PROCESSING_INTERVAL) + def clean_links(self): closed_links = [] for link_hash in self.direct_links: @@ -341,6 +419,7 @@ class LXMRouter: RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG) def clean_message_store(self): + # Check and remove expired messages now = time.time() removed_entries = {} for transient_id in self.propagation_entries: @@ -371,6 +450,58 @@ class LXMRouter: if removed_count > 0: RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG) + # Check size of message store and cull if needed + try: + message_storage_size = self.message_storage_size() + if message_storage_size != None: + if self.message_storage_limit != None and message_storage_size > self.message_storage_limit: + # Clean the message storage according to priorities + bytes_needed = message_storage_size - self.message_storage_limit + bytes_cleaned = 0 + + now = time.time() + weighted_entries = [] + for transient_id in self.propagation_entries: + entry = self.propagation_entries[transient_id] + + lxm_rcvd = entry[2] + lxm_size = entry[3] + age_weight = max(1, (now - lxm_rcvd)/60/60/24/4) + + weight = age_weight * lxm_size + weighted_entries.append([entry, weight, transient_id]) + + weighted_entries.sort(key=lambda we: we[1], reverse=True) + + i = 0 + while i < len(weighted_entries) and bytes_cleaned < bytes_needed: + try: + w = weighted_entries[i] + entry = w[0] + transient_id = w[2] + filepath = entry[1] + + if os.path.isfile(filepath): + os.unlink(filepath) + + self.propagation_entries.pop(transient_id) + bytes_cleaned += entry[3] + + RNS.log("Removed "+RNS.prettyhexrep(transient_id)+" with weight "+str(w[1])+" to clear up "+RNS.prettysize(entry[3])+", now cleaned "+RNS.prettysize(bytes_cleaned)+" out of "+RNS.prettysize(bytes_needed)+" needed", RNS.LOG_EXTREME) + + except Exception as e: + RNS.log("Error while cleaning LXMF message from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) + + finally: + i += 1 + + RNS.log("LXMF message store size is now "+RNS.prettysize(self.message_storage_size()), RNS.LOG_EXTREME) + RNS.log("PE len "+str(len(self.propagation_entries))) + + + except Exception as e: + RNS.log("Could not clean the LXMF message store. The contained exception was: "+str(e), RNS.LOG_ERROR) + def exit_handler(self): if self.propagation_node: try: @@ -811,7 +942,7 @@ class LXMRouter: msg_file.write(lxmf_data) msg_file.close() - self.propagation_entries[transient_id] = [destination_hash, file_path] + self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data)] RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) for peer_id in self.peers: