From f18ce9ea99c6376872f07ce8d2629e44aa59fc92 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 3 Nov 2025 00:08:50 +0100 Subject: [PATCH 01/13] Cleanup --- LXMF/Utilities/lxmd.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index b09e7ae..ab8e30f 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 -# MIT License +# Reticulum License # -# Copyright (c) 2016-2022 Mark Qvist / unsigned.io +# Copyright (c) 2020-2025 Mark Qvist # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -11,8 +11,16 @@ # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. +# - The Software shall not be used in any kind of system which includes amongst +# its functions the ability to purposefully do harm to human beings. +# +# - The Software shall not be used, directly or indirectly, in the creation of +# an artificial intelligence, machine learning or language model training +# dataset, including but not limited to any use that contributes to the +# training or development of such a model or algorithm. +# +# - The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, From fa2d78c3510c47d4903cdfca1dacb49e69164ae5 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 3 Nov 2025 22:19:20 +0100 Subject: [PATCH 02/13] Fixed message stamps getting overwritten if propagation stamp was also present --- LXMF/LXMRouter.py | 2 +- LXMF/LXMessage.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 08c11f8..7afbc73 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -2424,7 +2424,7 @@ class LXMRouter: selected_lxm.stamp = generated_stamp selected_lxm.defer_stamp = False selected_lxm.packed = None - selected_lxm.pack() + selected_lxm.pack(payload_updated=True) stamp_generation_success = True RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) else: diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 0533f07..baf951a 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -357,7 +357,7 @@ class LXMessage: else: return None - def pack(self): + def pack(self, payload_updated=False): if not self.packed: if self.timestamp == None: self.timestamp = time.time() @@ -431,7 +431,7 @@ class LXMessage: elif self.desired_method == LXMessage.PROPAGATED: single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT - if self.__pn_encrypted_data == None: + if self.__pn_encrypted_data == None or payload_updated: self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) self.ratchet_id = self.__destination.latest_ratchet_id From 62038573f1b249ef92c7005ff66ecfc1ef47d566 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 3 Nov 2025 22:21:13 +0100 Subject: [PATCH 03/13] Updated version --- LXMF/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LXMF/_version.py b/LXMF/_version.py index 3e2f46a..d69d16e 100644 --- a/LXMF/_version.py +++ b/LXMF/_version.py @@ -1 +1 @@ -__version__ = "0.9.0" +__version__ = "0.9.1" From dca6cc2adc733ebfb9aaef721273982195ba36ae Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 7 Nov 2025 23:10:30 +0100 Subject: [PATCH 04/13] Ensure LXMF and RNS exit handlers are called on SIGINT and SIGTERM, since for some ungodly reason atexit events are not always called on some combinations of Python version and platforms, even though they have been registered. --- LXMF/LXMRouter.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 7afbc73..4c247be 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -1360,14 +1360,16 @@ class LXMRouter: def sigint_handler(self, signal, frame): if not self.exit_handler_running: RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING) - sys.exit(0) + self.exit_handler() + RNS.exit(0) else: RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) def sigterm_handler(self, signal, frame): if not self.exit_handler_running: RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING) - sys.exit(0) + self.exit_handler() + RNS.exit(0) else: RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) From 00ffbc09febd3df2a548c9c60625140e09b15c9b Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 8 Nov 2025 01:20:31 +0100 Subject: [PATCH 05/13] Using multiprocessing start method fork on Linux to avoid issues with Python 3.14. Fixes #35. --- LXMF/LXStamper.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index 56e2500..39b541b 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -15,6 +15,8 @@ PN_VALIDATION_POOL_MIN_SIZE = 256 active_jobs = {} +if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork") + def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): wb_st = time.time() workblock = b"" From ee15e9f0b6dfbe6c6f1f87dc5ca1de4ca9de8d31 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 8 Nov 2025 14:30:47 +0100 Subject: [PATCH 06/13] Updated version --- LXMF/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LXMF/_version.py b/LXMF/_version.py index d69d16e..a2fecb4 100644 --- a/LXMF/_version.py +++ b/LXMF/_version.py @@ -1 +1 @@ -__version__ = "0.9.1" +__version__ = "0.9.2" From 39e398be65f7c4a46c6d93badb072f335c4580d4 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 13 Nov 2025 17:48:10 +0100 Subject: [PATCH 07/13] Fixed missing PN config unpack on incoming sync auto-peering --- LXMF/LXMRouter.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 4c247be..3016e89 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -2200,14 +2200,15 @@ class LXMRouter: # 4: Limit for incoming propagation node syncs # 5: Propagation stamp costs for this node # 6: Node metadata - if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: - remote_timebase = remote_app_data[1] - remote_transfer_limit = remote_app_data[3] - remote_sync_limit = remote_app_data[4] - remote_stamp_cost = remote_app_data[5][0] - remote_stamp_flex = remote_app_data[5][1] - remote_peering_cost = remote_app_data[5][2] - remote_metadata = remote_app_data[6] + pn_config = msgpack.unpackb(remote_app_data) + if pn_config[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: + remote_timebase = pn_config[1] + remote_transfer_limit = pn_config[3] + remote_sync_limit = pn_config[4] + remote_stamp_cost = pn_config[5][0] + remote_stamp_flex = pn_config[5][1] + remote_peering_cost = pn_config[5][2] + remote_metadata = pn_config[6] RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata) From bc7522b63d9c1f4ca275c78998e6b7fda362158c Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 13 Nov 2025 19:42:24 +0100 Subject: [PATCH 08/13] Updated version --- LXMF/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LXMF/_version.py b/LXMF/_version.py index a2fecb4..c598173 100644 --- a/LXMF/_version.py +++ b/LXMF/_version.py @@ -1 +1 @@ -__version__ = "0.9.2" +__version__ = "0.9.3" From 7c71eb1df46b2f53481199d46a3f84dd195d8bea Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 24 Nov 2025 22:02:30 +0100 Subject: [PATCH 09/13] Cleanup --- LXMF/LXMRouter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 3016e89..9abef7c 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -219,7 +219,7 @@ class LXMRouter: self.locally_delivered_transient_ids = {} except Exception as e: - RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.log(f"Could not load locally delivered message ID cache from storage. The contained exception was: {e}", RNS.LOG_ERROR) self.locally_delivered_transient_ids = {} try: From f4c805ea35bcf549c35a4a2b713492b17188bf81 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 27 Nov 2025 18:38:52 +0100 Subject: [PATCH 10/13] Updated makefile --- Makefile | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index c0b53da..c00cde9 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,12 @@ create_symlinks: -ln -s ../../LXMF ./LXMF/Utilities/LXMF build_wheel: - python3 setup.py sdist bdist_wheel + python3 setup.py bdist_wheel + +build_sdist: + python3 setup.py sdist + +build_spkg: remove_symlinks build_sdist create_symlinks release: remove_symlinks build_wheel create_symlinks From a6f5a56a38f076708923f3a27ba30f88c6ab5def Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 2 Dec 2025 20:17:46 +0100 Subject: [PATCH 11/13] Improved outbound message processing speed --- LXMF/Handlers.py | 22 +-- LXMF/LXMRouter.py | 374 +++++++++++++++++++++++----------------------- 2 files changed, 198 insertions(+), 198 deletions(-) diff --git a/LXMF/Handlers.py b/LXMF/Handlers.py index 01841f9..871cc56 100644 --- a/LXMF/Handlers.py +++ b/LXMF/Handlers.py @@ -13,17 +13,6 @@ class LXMFDeliveryAnnounceHandler: self.lxmrouter = lxmrouter def received_announce(self, destination_hash, announced_identity, app_data): - for lxmessage in self.lxmrouter.pending_outbound: - if destination_hash == lxmessage.destination_hash: - if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC: - lxmessage.next_delivery_attempt = time.time() - - def outbound_trigger(): - while self.lxmrouter.processing_outbound: time.sleep(0.1) - self.lxmrouter.process_outbound() - - threading.Thread(target=outbound_trigger, daemon=True).start() - try: stamp_cost = stamp_cost_from_app_data(app_data) self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost) @@ -31,6 +20,17 @@ class LXMFDeliveryAnnounceHandler: except Exception as e: RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR) + for lxmessage in self.lxmrouter.pending_outbound: + if destination_hash == lxmessage.destination_hash: + if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC: + lxmessage.next_delivery_attempt = time.time() + + def outbound_trigger(): + while self.lxmrouter.outbound_processing_lock.locked(): time.sleep(0.1) + self.lxmrouter.process_outbound() + + threading.Thread(target=outbound_trigger, daemon=True).start() + class LXMFPropagationAnnounceHandler: def __init__(self, lxmrouter): diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 9abef7c..6f44f95 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -109,7 +109,6 @@ class LXMRouter: self.retain_synced_on_node = False self.default_sync_strategy = sync_strategy - self.processing_outbound = False self.processing_inbound = False self.processing_count = 0 self.name = name @@ -160,6 +159,7 @@ class LXMRouter: self.outbound_stamp_costs = {} self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}} + self.outbound_processing_lock = threading.Lock() self.cost_file_lock = threading.Lock() self.ticket_file_lock = threading.Lock() self.stamp_gen_lock = threading.Lock() @@ -1664,10 +1664,10 @@ class LXMRouter: lxmessage.defer_stamp = False if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp): - while not unknown_path_requested and self.processing_outbound: time.sleep(0.05) + while not unknown_path_requested and self.outbound_processing_lock.locked(): time.sleep(0.05) self.pending_outbound.append(lxmessage) - if not unknown_path_requested: self.process_outbound() + if not unknown_path_requested: threading.Thread(target=self.process_outbound, daemon=True).start() else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage @@ -2373,6 +2373,7 @@ class LXMRouter: def fail_message(self, lxmessage): RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG) + lxmessage.progress = 0.0 if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage) @@ -2494,198 +2495,141 @@ class LXMRouter: RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR) def process_outbound(self, sender = None): - if self.processing_outbound: - return + if self.outbound_processing_lock.locked(): return + with self.outbound_processing_lock: + for lxmessage in self.pending_outbound: + if lxmessage.state == LXMessage.DELIVERED: + RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + self.pending_outbound.remove(lxmessage) - for lxmessage in self.pending_outbound: - if lxmessage.state == LXMessage.DELIVERED: - RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - self.pending_outbound.remove(lxmessage) + # Udate ticket delivery stats + if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields: + RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG) + self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time() + self.save_available_tickets() - # Udate ticket delivery stats - if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields: - RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG) - self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time() - self.save_available_tickets() + # Prepare link for backchannel communications + delivery_destination_hash = lxmessage.get_destination().hash + if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links: + direct_link = self.direct_links[delivery_destination_hash] + if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False: + if direct_link.initiator == True: + source_destination_hash = lxmessage.get_source().hash + if source_destination_hash in self.delivery_destinations: + backchannel_identity = self.delivery_destinations[source_destination_hash].identity + backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity) + direct_link.identify(backchannel_identity) + direct_link.backchannel_identified = True + self.delivery_link_established(direct_link) + RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG) - # Prepare link for backchannel communications - delivery_destination_hash = lxmessage.get_destination().hash - if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links: - direct_link = self.direct_links[delivery_destination_hash] - if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False: - if direct_link.initiator == True: - source_destination_hash = lxmessage.get_source().hash - if source_destination_hash in self.delivery_destinations: - backchannel_identity = self.delivery_destinations[source_destination_hash].identity - backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity) - direct_link.identify(backchannel_identity) - direct_link.backchannel_identified = True - self.delivery_link_established(direct_link) - RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG) + elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT: + RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + self.pending_outbound.remove(lxmessage) - elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT: - RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - self.pending_outbound.remove(lxmessage) + elif lxmessage.state == LXMessage.CANCELLED: + RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + self.pending_outbound.remove(lxmessage) + if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): + lxmessage.failed_callback(lxmessage) - elif lxmessage.state == LXMessage.CANCELLED: - RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - self.pending_outbound.remove(lxmessage) - if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): - lxmessage.failed_callback(lxmessage) + elif lxmessage.state == LXMessage.REJECTED: + RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage) + if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): + lxmessage.failed_callback(lxmessage) - elif lxmessage.state == LXMessage.REJECTED: - RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage) - if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): - lxmessage.failed_callback(lxmessage) + else: + RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - else: - RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 - if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 - - # Outbound handling for opportunistic messages - if lxmessage.method == LXMessage.OPPORTUNISTIC: - if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: - if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash): - RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG) - lxmessage.delivery_attempts += 1 - RNS.Transport.request_path(lxmessage.get_destination().hash) - lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - lxmessage.progress = 0.01 - elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash): - RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG) - lxmessage.delivery_attempts += 1 - RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash) - def rediscover_job(): - time.sleep(0.5) - RNS.Transport.request_path(lxmessage.get_destination().hash) - threading.Thread(target=rediscover_job, daemon=True).start() - lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - lxmessage.progress = 0.01 - else: - if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: - lxmessage.delivery_attempts += 1 - lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT - RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - lxmessage.send() - else: - RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - self.fail_message(lxmessage) - - # Outbound handling for messages transferred - # over a direct link to the final recipient - elif lxmessage.method == LXMessage.DIRECT: - if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: - delivery_destination_hash = lxmessage.get_destination().hash - direct_link = None - - if delivery_destination_hash in self.direct_links: - # An established direct link already exists to - # the destination, so we'll try to use it for - # delivering the message - direct_link = self.direct_links[delivery_destination_hash] - RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) - - elif delivery_destination_hash in self.backchannel_links: - # An established backchannel link exists to - # the destination, so we'll try to use it for - # delivering the message - direct_link = self.backchannel_links[delivery_destination_hash] - RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) - - if direct_link != None: - if direct_link.status == RNS.Link.ACTIVE: - if lxmessage.progress == None or lxmessage.progress < 0.05: - lxmessage.progress = 0.05 - if lxmessage.state != LXMessage.SENDING: - RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG) - lxmessage.set_delivery_destination(direct_link) - lxmessage.send() - else: - if lxmessage.representation == LXMessage.RESOURCE: - RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) - else: - RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) - elif direct_link.status == RNS.Link.CLOSED: - if direct_link.activated_at != None: - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG) - RNS.Transport.request_path(lxmessage.get_destination().hash) - else: - if not hasattr(lxmessage, "path_request_retried"): - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG) - RNS.Transport.request_path(lxmessage.get_destination().hash) - lxmessage.path_request_retried = True - else: - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG) - - lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - - lxmessage.set_delivery_destination(None) - if delivery_destination_hash in self.direct_links: - self.direct_links.pop(delivery_destination_hash) - if delivery_destination_hash in self.backchannel_links: - self.backchannel_links.pop(delivery_destination_hash) - lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT - else: - # Simply wait for the link to become active or close - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) - else: - # No link exists, so we'll try to establish one, but - # only if we've never tried before, or the retry wait - # period has elapsed. - if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: - lxmessage.delivery_attempts += 1 - lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT - - if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: - if RNS.Transport.has_path(lxmessage.get_destination().hash): - RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - delivery_link = RNS.Link(lxmessage.get_destination()) - delivery_link.set_link_established_callback(self.process_outbound) - self.direct_links[delivery_destination_hash] = delivery_link - lxmessage.progress = 0.03 - else: - RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG) - RNS.Transport.request_path(lxmessage.get_destination().hash) - lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - lxmessage.progress = 0.01 - else: - RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - self.fail_message(lxmessage) - - # Outbound handling for messages transported via - # propagation to a LXMF router network. - elif lxmessage.method == LXMessage.PROPAGATED: - RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - - if self.outbound_propagation_node == None: - RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR) - self.fail_message(lxmessage) - else: + # Outbound handling for opportunistic messages + if lxmessage.method == LXMessage.OPPORTUNISTIC: if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: + if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash): + RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG) + lxmessage.delivery_attempts += 1 + RNS.Transport.request_path(lxmessage.get_destination().hash) + lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT + lxmessage.progress = 0.01 + elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash): + RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG) + lxmessage.delivery_attempts += 1 + RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash) + def rediscover_job(): + time.sleep(0.5) + RNS.Transport.request_path(lxmessage.get_destination().hash) + threading.Thread(target=rediscover_job, daemon=True).start() + lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT + lxmessage.progress = 0.01 + else: + if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: + lxmessage.delivery_attempts += 1 + lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT + RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + lxmessage.send() + else: + RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + self.fail_message(lxmessage) - if self.outbound_propagation_link != None: - # A link already exists, so we'll try to use it - # to deliver the message - if self.outbound_propagation_link.status == RNS.Link.ACTIVE: + # Outbound handling for messages transferred + # over a direct link to the final recipient + elif lxmessage.method == LXMessage.DIRECT: + if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: + delivery_destination_hash = lxmessage.get_destination().hash + direct_link = None + + if delivery_destination_hash in self.direct_links: + # An established direct link already exists to + # the destination, so we'll try to use it for + # delivering the message + direct_link = self.direct_links[delivery_destination_hash] + RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) + + elif delivery_destination_hash in self.backchannel_links: + # An established backchannel link exists to + # the destination, so we'll try to use it for + # delivering the message + direct_link = self.backchannel_links[delivery_destination_hash] + RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) + + if direct_link != None: + if direct_link.status == RNS.Link.ACTIVE: + if lxmessage.progress == None or lxmessage.progress < 0.05: + lxmessage.progress = 0.05 if lxmessage.state != LXMessage.SENDING: - RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG) - lxmessage.set_delivery_destination(self.outbound_propagation_link) + RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG) + lxmessage.set_delivery_destination(direct_link) lxmessage.send() else: if lxmessage.representation == LXMessage.RESOURCE: RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) else: RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) - elif self.outbound_propagation_link.status == RNS.Link.CLOSED: - RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG) - self.outbound_propagation_link = None + elif direct_link.status == RNS.Link.CLOSED: + if direct_link.activated_at != None: + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG) + RNS.Transport.request_path(lxmessage.get_destination().hash) + else: + if not hasattr(lxmessage, "path_request_retried"): + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG) + RNS.Transport.request_path(lxmessage.get_destination().hash) + lxmessage.path_request_retried = True + else: + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG) + + lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT + + lxmessage.set_delivery_destination(None) + if delivery_destination_hash in self.direct_links: + self.direct_links.pop(delivery_destination_hash) + if delivery_destination_hash in self.backchannel_links: + self.backchannel_links.pop(delivery_destination_hash) lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT else: - # Simply wait for the link to become - # active or close - RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) + # Simply wait for the link to become active or close + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) else: # No link exists, so we'll try to establish one, but # only if we've never tried before, or the retry wait @@ -2695,18 +2639,74 @@ class LXMRouter: lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: - if RNS.Transport.has_path(self.outbound_propagation_node): - RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) - propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) - self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet) - self.outbound_propagation_link.for_lxmessage = lxmessage + if RNS.Transport.has_path(lxmessage.get_destination().hash): + RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + delivery_link = RNS.Link(lxmessage.get_destination()) + delivery_link.set_link_established_callback(self.process_outbound) + self.direct_links[delivery_destination_hash] = delivery_link + lxmessage.progress = 0.03 else: - RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) - RNS.Transport.request_path(self.outbound_propagation_node) + RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG) + RNS.Transport.request_path(lxmessage.get_destination().hash) lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - + lxmessage.progress = 0.01 else: - RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) self.fail_message(lxmessage) + + # Outbound handling for messages transported via + # propagation to a LXMF router network. + elif lxmessage.method == LXMessage.PROPAGATED: + RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + + if self.outbound_propagation_node == None: + RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR) + self.fail_message(lxmessage) + else: + if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: + + if self.outbound_propagation_link != None: + # A link already exists, so we'll try to use it + # to deliver the message + if self.outbound_propagation_link.status == RNS.Link.ACTIVE: + if lxmessage.state != LXMessage.SENDING: + RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG) + lxmessage.set_delivery_destination(self.outbound_propagation_link) + lxmessage.send() + else: + if lxmessage.representation == LXMessage.RESOURCE: + RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) + else: + RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) + elif self.outbound_propagation_link.status == RNS.Link.CLOSED: + RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG) + self.outbound_propagation_link = None + lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT + else: + # Simply wait for the link to become + # active or close + RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) + else: + # No link exists, so we'll try to establish one, but + # only if we've never tried before, or the retry wait + # period has elapsed. + if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: + lxmessage.delivery_attempts += 1 + lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT + + if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: + if RNS.Transport.has_path(self.outbound_propagation_node): + RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) + propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) + self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet) + self.outbound_propagation_link.for_lxmessage = lxmessage + else: + RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) + RNS.Transport.request_path(self.outbound_propagation_node) + lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT + + else: + RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + self.fail_message(lxmessage) From 694f2413ea899f70b3bff7f079a879d2fa1d70d5 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 2 Dec 2025 20:43:44 +0100 Subject: [PATCH 12/13] Added more descriptive error if propagation node peers file is corrupt --- LXMF/LXMRouter.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 6f44f95..8506769 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -576,13 +576,20 @@ class LXMRouter: RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE) st = time.time() - if os.path.isfile(self.storagepath+"/peers"): - peers_file = open(self.storagepath+"/peers", "rb") + peers_storage_path = self.storagepath+"/peers" + if os.path.isfile(peers_storage_path): + peers_file = open(peers_storage_path, "rb") peers_data = peers_file.read() peers_file.close() if len(peers_data) > 0: - serialised_peers = msgpack.unpackb(peers_data) + try: serialised_peers = msgpack.unpackb(peers_data) + except Exception as e: + RNS.log(f"Could not load propagation node peering data from storage. The contained exception was: {e}", RNS.LOG_ERROR) + RNS.log(f"The peering data file located at {peers_storage_path} is likely corrupt.", RNS.LOG_ERROR) + RNS.log(f"You can delete this file and attempt starting the propagation node again, but peer synchronization states will be lost.", RNS.LOG_ERROR) + raise e + del peers_data while len(serialised_peers) > 0: From 6ecd271e48e9085031ef5a1c7d802958191170db Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 22 Dec 2025 22:22:17 +0100 Subject: [PATCH 13/13] Updated readme --- README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.md b/README.md index ed7e4f0..cd1a6c5 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,26 @@ options: Or run `lxmd --exampleconfig` to generate a commented example configuration documenting all the available configuration directives. +## Support LXMF Development +You can help support the continued development of open, free and private communications systems by donating via one of the following channels: + +- Monero: + ``` + 84FpY1QbxHcgdseePYNmhTHcrgMX4nFfBYtz2GKYToqHVVhJp8Eaw1Z1EedRnKD19b3B8NiLCGVxzKV17UMmmeEsCrPyA5w + ``` +- Bitcoin + ``` + bc1pgqgu8h8xvj4jtafslq396v7ju7hkgymyrzyqft4llfslz5vp99psqfk3a6 + ``` +- Ethereum + ``` + 0x91C421DdfB8a30a49A71d63447ddb54cEBe3465E + ``` +- Liberapay: https://liberapay.com/Reticulum/ + +- Ko-Fi: https://ko-fi.com/markqvist + + ## Caveat Emptor LXMF is beta software, and should be considered experimental. While it has been built with cryptography best practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch.