From edbb887d81f5f7845475a96fd44a06673aa8addc Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 19 Nov 2022 20:07:00 +0100 Subject: [PATCH] Implemented paper message handling --- LXMF/LXMRouter.py | 82 +++++++++++++++++++++++++++++++++++++++++++---- LXMF/LXMessage.py | 2 +- LXMF/_version.py | 2 +- 3 files changed, 78 insertions(+), 8 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index e7b90af..9be992c 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -1,6 +1,7 @@ import os import time import random +import base64 import atexit import threading @@ -85,6 +86,7 @@ class LXMRouter: self.propagation_transfer_last_result = None self.propagation_transfer_max_messages = None self.locally_delivered_transient_ids = {} + self.locally_processed_transient_ids = {} if identity == None: identity = RNS.Identity() @@ -105,7 +107,6 @@ class LXMRouter: self.peers = {} self.propagation_entries = {} - self.propagated_ids = {} RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self)) RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self)) @@ -118,7 +119,14 @@ class LXMRouter: data = locally_delivered_file.read() self.locally_delivered_transient_ids = msgpack.unpackb(data) locally_delivered_file.close() - self.clean_transient_id_cache() + + if os.path.isfile(self.storagepath+"/locally_processed"): + locally_processed_file = open(self.storagepath+"/locally_processed", "rb") + data = locally_processed_file.read() + self.locally_processed_transient_ids = msgpack.unpackb(data) + locally_processed_file.close() + + self.clean_transient_id_caches() except Exception as e: RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -424,7 +432,7 @@ class LXMRouter: self.clean_links() if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0: - self.clean_transient_id_cache() + self.clean_transient_id_caches() if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: self.clean_message_store() @@ -458,7 +466,7 @@ class LXMRouter: self.acknowledge_sync_completion() RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG) - def clean_transient_id_cache(self): + def clean_transient_id_caches(self): now = time.time() removed_entries = [] for transient_id in self.locally_delivered_transient_ids: @@ -470,6 +478,16 @@ class LXMRouter: self.locally_delivered_transient_ids.pop(transient_id) RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG) + removed_entries = [] + for transient_id in self.locally_processed_transient_ids: + timestamp = self.locally_processed_transient_ids[transient_id] + if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.25: + removed_entries.append(transient_id) + + for transient_id in removed_entries: + self.locally_processed_transient_ids.pop(transient_id) + RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from locally processed cache", RNS.LOG_DEBUG) + def clean_message_store(self): # Check and remove expired messages now = time.time() @@ -571,6 +589,18 @@ class LXMRouter: except Exception as e: RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + def save_locally_processed_transient_ids(self): + try: + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) + + locally_processed_file = open(self.storagepath+"/locally_processed", "wb") + locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) + locally_processed_file.close() + + except Exception as e: + RNS.log("Could not save locally processed message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + def exit_handler(self): if self.propagation_node: try: @@ -589,6 +619,7 @@ class LXMRouter: RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) self.save_locally_delivered_transient_ids() + self.save_locally_processed_transient_ids() def __str__(self): return "" @@ -1012,15 +1043,18 @@ class LXMRouter: except Exception as e: RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) - def lxmf_propagation(self, lxmf_data): + def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None): try: if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD: transient_id = RNS.Identity.full_hash(lxmf_data) - if not transient_id in self.propagation_entries and not transient_id in self.propagated_ids: + + if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids: received = time.time() propagation_entry = [transient_id, received, lxmf_data] destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH] + self.locally_processed_transient_ids[transient_id] = received + if destination_hash in self.delivery_destinations: delivery_destination = self.delivery_destinations[destination_hash] encrypted_lxmf_data = lxmf_data[LXMessage.DESTINATION_LENGTH:] @@ -1029,6 +1063,9 @@ class LXMRouter: self.lxmf_delivery(delivery_data, delivery_destination.type) self.locally_delivered_transient_ids[transient_id] = time.time() + if signal_local_delivery != None: + return signal_local_delivery + else: if self.propagation_node: file_path = self.messagepath+"/"+RNS.hexrep(transient_id, delimit=False)+"_"+str(received) @@ -1043,8 +1080,19 @@ class LXMRouter: peer = self.peers[peer_id] peer.handle_message(transient_id) + else: + # TODO: Add message to sneakernet queues when implemented + RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG) + return True + else: + if signal_duplicate != None: + return signal_duplicate + + else: + return False + return False except Exception as e: @@ -1052,6 +1100,28 @@ class LXMRouter: RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) return False + def ingest_lxm_url(self, url, signal_local_delivery=None, signal_duplicate=None): + try: + if not url.lower().startswith("lxm://"): + RNS.log("Cannot ingest LXM, invalid URL provided.", RNS.LOG_ERROR) + return False + + else: + lxmf_data = base64.urlsafe_b64decode(url.replace(LXMessage.URL_PROTO_SPECIFIER+"://", "").replace("/", "")+"==") + transient_id = RNS.Identity.full_hash(lxmf_data) + + router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate) + if router_propagation_result != False: + RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG) + return router_propagation_result + else: + RNS.log("No valid LXM could be ingested from the provided URL", RNS.LOG_DEBUG) + return False + + except Exception as e: + RNS.log("Error while decoding URL-encoded LXMF message. The contained exception was: "+str(e), RNS.LOG_ERROR) + return False + def fail_message(self, lxmessage): RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG) diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index d5dbb12..6e688a6 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -276,7 +276,7 @@ class LXMessage: paper_content_limit = LXMessage.PAPER_MDU encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) - self.paper_packed = msgpack.packb(self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data) + self.paper_packed = self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data content_size = len(self.paper_packed) if content_size <= paper_content_limit: diff --git a/LXMF/_version.py b/LXMF/_version.py index 788da1f..fe404ae 100644 --- a/LXMF/_version.py +++ b/LXMF/_version.py @@ -1 +1 @@ -__version__ = "0.2.4" +__version__ = "0.2.5"