diff --git a/LXMF/Handlers.py b/LXMF/Handlers.py index 01841f9..871cc56 100644 --- a/LXMF/Handlers.py +++ b/LXMF/Handlers.py @@ -13,17 +13,6 @@ class LXMFDeliveryAnnounceHandler: self.lxmrouter = lxmrouter def received_announce(self, destination_hash, announced_identity, app_data): - for lxmessage in self.lxmrouter.pending_outbound: - if destination_hash == lxmessage.destination_hash: - if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC: - lxmessage.next_delivery_attempt = time.time() - - def outbound_trigger(): - while self.lxmrouter.processing_outbound: time.sleep(0.1) - self.lxmrouter.process_outbound() - - threading.Thread(target=outbound_trigger, daemon=True).start() - try: stamp_cost = stamp_cost_from_app_data(app_data) self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost) @@ -31,6 +20,17 @@ class LXMFDeliveryAnnounceHandler: except Exception as e: RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR) + for lxmessage in self.lxmrouter.pending_outbound: + if destination_hash == lxmessage.destination_hash: + if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC: + lxmessage.next_delivery_attempt = time.time() + + def outbound_trigger(): + while self.lxmrouter.outbound_processing_lock.locked(): time.sleep(0.1) + self.lxmrouter.process_outbound() + + threading.Thread(target=outbound_trigger, daemon=True).start() + class LXMFPropagationAnnounceHandler: def __init__(self, lxmrouter): diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 08c11f8..8506769 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -109,7 +109,6 @@ class LXMRouter: self.retain_synced_on_node = False self.default_sync_strategy = sync_strategy - self.processing_outbound = False self.processing_inbound = False self.processing_count = 0 self.name = name @@ -160,6 +159,7 @@ class LXMRouter: self.outbound_stamp_costs = {} self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}} + self.outbound_processing_lock = threading.Lock() self.cost_file_lock = threading.Lock() self.ticket_file_lock = threading.Lock() self.stamp_gen_lock = threading.Lock() @@ -219,7 +219,7 @@ class LXMRouter: self.locally_delivered_transient_ids = {} except Exception as e: - RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.log(f"Could not load locally delivered message ID cache from storage. The contained exception was: {e}", RNS.LOG_ERROR) self.locally_delivered_transient_ids = {} try: @@ -576,13 +576,20 @@ class LXMRouter: RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE) st = time.time() - if os.path.isfile(self.storagepath+"/peers"): - peers_file = open(self.storagepath+"/peers", "rb") + peers_storage_path = self.storagepath+"/peers" + if os.path.isfile(peers_storage_path): + peers_file = open(peers_storage_path, "rb") peers_data = peers_file.read() peers_file.close() if len(peers_data) > 0: - serialised_peers = msgpack.unpackb(peers_data) + try: serialised_peers = msgpack.unpackb(peers_data) + except Exception as e: + RNS.log(f"Could not load propagation node peering data from storage. The contained exception was: {e}", RNS.LOG_ERROR) + RNS.log(f"The peering data file located at {peers_storage_path} is likely corrupt.", RNS.LOG_ERROR) + RNS.log(f"You can delete this file and attempt starting the propagation node again, but peer synchronization states will be lost.", RNS.LOG_ERROR) + raise e + del peers_data while len(serialised_peers) > 0: @@ -1360,14 +1367,16 @@ class LXMRouter: def sigint_handler(self, signal, frame): if not self.exit_handler_running: RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING) - sys.exit(0) + self.exit_handler() + RNS.exit(0) else: RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) def sigterm_handler(self, signal, frame): if not self.exit_handler_running: RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING) - sys.exit(0) + self.exit_handler() + RNS.exit(0) else: RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) @@ -1662,10 +1671,10 @@ class LXMRouter: lxmessage.defer_stamp = False if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp): - while not unknown_path_requested and self.processing_outbound: time.sleep(0.05) + while not unknown_path_requested and self.outbound_processing_lock.locked(): time.sleep(0.05) self.pending_outbound.append(lxmessage) - if not unknown_path_requested: self.process_outbound() + if not unknown_path_requested: threading.Thread(target=self.process_outbound, daemon=True).start() else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage @@ -2198,14 +2207,15 @@ class LXMRouter: # 4: Limit for incoming propagation node syncs # 5: Propagation stamp costs for this node # 6: Node metadata - if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: - remote_timebase = remote_app_data[1] - remote_transfer_limit = remote_app_data[3] - remote_sync_limit = remote_app_data[4] - remote_stamp_cost = remote_app_data[5][0] - remote_stamp_flex = remote_app_data[5][1] - remote_peering_cost = remote_app_data[5][2] - remote_metadata = remote_app_data[6] + pn_config = msgpack.unpackb(remote_app_data) + if pn_config[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: + remote_timebase = pn_config[1] + remote_transfer_limit = pn_config[3] + remote_sync_limit = pn_config[4] + remote_stamp_cost = pn_config[5][0] + remote_stamp_flex = pn_config[5][1] + remote_peering_cost = pn_config[5][2] + remote_metadata = pn_config[6] RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata) @@ -2370,6 +2380,7 @@ class LXMRouter: def fail_message(self, lxmessage): RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG) + lxmessage.progress = 0.0 if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage) @@ -2424,7 +2435,7 @@ class LXMRouter: selected_lxm.stamp = generated_stamp selected_lxm.defer_stamp = False selected_lxm.packed = None - selected_lxm.pack() + selected_lxm.pack(payload_updated=True) stamp_generation_success = True RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) else: @@ -2491,198 +2502,141 @@ class LXMRouter: RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR) def process_outbound(self, sender = None): - if self.processing_outbound: - return + if self.outbound_processing_lock.locked(): return + with self.outbound_processing_lock: + for lxmessage in self.pending_outbound: + if lxmessage.state == LXMessage.DELIVERED: + RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + self.pending_outbound.remove(lxmessage) - for lxmessage in self.pending_outbound: - if lxmessage.state == LXMessage.DELIVERED: - RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - self.pending_outbound.remove(lxmessage) + # Udate ticket delivery stats + if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields: + RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG) + self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time() + self.save_available_tickets() - # Udate ticket delivery stats - if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields: - RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG) - self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time() - self.save_available_tickets() + # Prepare link for backchannel communications + delivery_destination_hash = lxmessage.get_destination().hash + if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links: + direct_link = self.direct_links[delivery_destination_hash] + if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False: + if direct_link.initiator == True: + source_destination_hash = lxmessage.get_source().hash + if source_destination_hash in self.delivery_destinations: + backchannel_identity = self.delivery_destinations[source_destination_hash].identity + backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity) + direct_link.identify(backchannel_identity) + direct_link.backchannel_identified = True + self.delivery_link_established(direct_link) + RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG) - # Prepare link for backchannel communications - delivery_destination_hash = lxmessage.get_destination().hash - if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links: - direct_link = self.direct_links[delivery_destination_hash] - if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False: - if direct_link.initiator == True: - source_destination_hash = lxmessage.get_source().hash - if source_destination_hash in self.delivery_destinations: - backchannel_identity = self.delivery_destinations[source_destination_hash].identity - backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity) - direct_link.identify(backchannel_identity) - direct_link.backchannel_identified = True - self.delivery_link_established(direct_link) - RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG) + elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT: + RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + self.pending_outbound.remove(lxmessage) - elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT: - RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - self.pending_outbound.remove(lxmessage) + elif lxmessage.state == LXMessage.CANCELLED: + RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + self.pending_outbound.remove(lxmessage) + if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): + lxmessage.failed_callback(lxmessage) - elif lxmessage.state == LXMessage.CANCELLED: - RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - self.pending_outbound.remove(lxmessage) - if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): - lxmessage.failed_callback(lxmessage) + elif lxmessage.state == LXMessage.REJECTED: + RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage) + if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): + lxmessage.failed_callback(lxmessage) - elif lxmessage.state == LXMessage.REJECTED: - RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage) - if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): - lxmessage.failed_callback(lxmessage) + else: + RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - else: - RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 - if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 - - # Outbound handling for opportunistic messages - if lxmessage.method == LXMessage.OPPORTUNISTIC: - if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: - if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash): - RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG) - lxmessage.delivery_attempts += 1 - RNS.Transport.request_path(lxmessage.get_destination().hash) - lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - lxmessage.progress = 0.01 - elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash): - RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG) - lxmessage.delivery_attempts += 1 - RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash) - def rediscover_job(): - time.sleep(0.5) - RNS.Transport.request_path(lxmessage.get_destination().hash) - threading.Thread(target=rediscover_job, daemon=True).start() - lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - lxmessage.progress = 0.01 - else: - if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: - lxmessage.delivery_attempts += 1 - lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT - RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - lxmessage.send() - else: - RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - self.fail_message(lxmessage) - - # Outbound handling for messages transferred - # over a direct link to the final recipient - elif lxmessage.method == LXMessage.DIRECT: - if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: - delivery_destination_hash = lxmessage.get_destination().hash - direct_link = None - - if delivery_destination_hash in self.direct_links: - # An established direct link already exists to - # the destination, so we'll try to use it for - # delivering the message - direct_link = self.direct_links[delivery_destination_hash] - RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) - - elif delivery_destination_hash in self.backchannel_links: - # An established backchannel link exists to - # the destination, so we'll try to use it for - # delivering the message - direct_link = self.backchannel_links[delivery_destination_hash] - RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) - - if direct_link != None: - if direct_link.status == RNS.Link.ACTIVE: - if lxmessage.progress == None or lxmessage.progress < 0.05: - lxmessage.progress = 0.05 - if lxmessage.state != LXMessage.SENDING: - RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG) - lxmessage.set_delivery_destination(direct_link) - lxmessage.send() - else: - if lxmessage.representation == LXMessage.RESOURCE: - RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) - else: - RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) - elif direct_link.status == RNS.Link.CLOSED: - if direct_link.activated_at != None: - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG) - RNS.Transport.request_path(lxmessage.get_destination().hash) - else: - if not hasattr(lxmessage, "path_request_retried"): - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG) - RNS.Transport.request_path(lxmessage.get_destination().hash) - lxmessage.path_request_retried = True - else: - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG) - - lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - - lxmessage.set_delivery_destination(None) - if delivery_destination_hash in self.direct_links: - self.direct_links.pop(delivery_destination_hash) - if delivery_destination_hash in self.backchannel_links: - self.backchannel_links.pop(delivery_destination_hash) - lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT - else: - # Simply wait for the link to become active or close - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) - else: - # No link exists, so we'll try to establish one, but - # only if we've never tried before, or the retry wait - # period has elapsed. - if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: - lxmessage.delivery_attempts += 1 - lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT - - if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: - if RNS.Transport.has_path(lxmessage.get_destination().hash): - RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - delivery_link = RNS.Link(lxmessage.get_destination()) - delivery_link.set_link_established_callback(self.process_outbound) - self.direct_links[delivery_destination_hash] = delivery_link - lxmessage.progress = 0.03 - else: - RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG) - RNS.Transport.request_path(lxmessage.get_destination().hash) - lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - lxmessage.progress = 0.01 - else: - RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - self.fail_message(lxmessage) - - # Outbound handling for messages transported via - # propagation to a LXMF router network. - elif lxmessage.method == LXMessage.PROPAGATED: - RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - - if self.outbound_propagation_node == None: - RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR) - self.fail_message(lxmessage) - else: + # Outbound handling for opportunistic messages + if lxmessage.method == LXMessage.OPPORTUNISTIC: if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: + if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash): + RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG) + lxmessage.delivery_attempts += 1 + RNS.Transport.request_path(lxmessage.get_destination().hash) + lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT + lxmessage.progress = 0.01 + elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash): + RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG) + lxmessage.delivery_attempts += 1 + RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash) + def rediscover_job(): + time.sleep(0.5) + RNS.Transport.request_path(lxmessage.get_destination().hash) + threading.Thread(target=rediscover_job, daemon=True).start() + lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT + lxmessage.progress = 0.01 + else: + if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: + lxmessage.delivery_attempts += 1 + lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT + RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + lxmessage.send() + else: + RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + self.fail_message(lxmessage) - if self.outbound_propagation_link != None: - # A link already exists, so we'll try to use it - # to deliver the message - if self.outbound_propagation_link.status == RNS.Link.ACTIVE: + # Outbound handling for messages transferred + # over a direct link to the final recipient + elif lxmessage.method == LXMessage.DIRECT: + if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: + delivery_destination_hash = lxmessage.get_destination().hash + direct_link = None + + if delivery_destination_hash in self.direct_links: + # An established direct link already exists to + # the destination, so we'll try to use it for + # delivering the message + direct_link = self.direct_links[delivery_destination_hash] + RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) + + elif delivery_destination_hash in self.backchannel_links: + # An established backchannel link exists to + # the destination, so we'll try to use it for + # delivering the message + direct_link = self.backchannel_links[delivery_destination_hash] + RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) + + if direct_link != None: + if direct_link.status == RNS.Link.ACTIVE: + if lxmessage.progress == None or lxmessage.progress < 0.05: + lxmessage.progress = 0.05 if lxmessage.state != LXMessage.SENDING: - RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG) - lxmessage.set_delivery_destination(self.outbound_propagation_link) + RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG) + lxmessage.set_delivery_destination(direct_link) lxmessage.send() else: if lxmessage.representation == LXMessage.RESOURCE: RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) else: RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) - elif self.outbound_propagation_link.status == RNS.Link.CLOSED: - RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG) - self.outbound_propagation_link = None + elif direct_link.status == RNS.Link.CLOSED: + if direct_link.activated_at != None: + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG) + RNS.Transport.request_path(lxmessage.get_destination().hash) + else: + if not hasattr(lxmessage, "path_request_retried"): + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG) + RNS.Transport.request_path(lxmessage.get_destination().hash) + lxmessage.path_request_retried = True + else: + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG) + + lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT + + lxmessage.set_delivery_destination(None) + if delivery_destination_hash in self.direct_links: + self.direct_links.pop(delivery_destination_hash) + if delivery_destination_hash in self.backchannel_links: + self.backchannel_links.pop(delivery_destination_hash) lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT else: - # Simply wait for the link to become - # active or close - RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) + # Simply wait for the link to become active or close + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) else: # No link exists, so we'll try to establish one, but # only if we've never tried before, or the retry wait @@ -2692,18 +2646,74 @@ class LXMRouter: lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: - if RNS.Transport.has_path(self.outbound_propagation_node): - RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) - propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) - self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet) - self.outbound_propagation_link.for_lxmessage = lxmessage + if RNS.Transport.has_path(lxmessage.get_destination().hash): + RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + delivery_link = RNS.Link(lxmessage.get_destination()) + delivery_link.set_link_established_callback(self.process_outbound) + self.direct_links[delivery_destination_hash] = delivery_link + lxmessage.progress = 0.03 else: - RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) - RNS.Transport.request_path(self.outbound_propagation_node) + RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG) + RNS.Transport.request_path(lxmessage.get_destination().hash) lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT - + lxmessage.progress = 0.01 else: - RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) self.fail_message(lxmessage) + + # Outbound handling for messages transported via + # propagation to a LXMF router network. + elif lxmessage.method == LXMessage.PROPAGATED: + RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + + if self.outbound_propagation_node == None: + RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR) + self.fail_message(lxmessage) + else: + if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: + + if self.outbound_propagation_link != None: + # A link already exists, so we'll try to use it + # to deliver the message + if self.outbound_propagation_link.status == RNS.Link.ACTIVE: + if lxmessage.state != LXMessage.SENDING: + RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG) + lxmessage.set_delivery_destination(self.outbound_propagation_link) + lxmessage.send() + else: + if lxmessage.representation == LXMessage.RESOURCE: + RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) + else: + RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) + elif self.outbound_propagation_link.status == RNS.Link.CLOSED: + RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG) + self.outbound_propagation_link = None + lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT + else: + # Simply wait for the link to become + # active or close + RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) + else: + # No link exists, so we'll try to establish one, but + # only if we've never tried before, or the retry wait + # period has elapsed. + if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: + lxmessage.delivery_attempts += 1 + lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT + + if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: + if RNS.Transport.has_path(self.outbound_propagation_node): + RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) + propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) + self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet) + self.outbound_propagation_link.for_lxmessage = lxmessage + else: + RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) + RNS.Transport.request_path(self.outbound_propagation_node) + lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT + + else: + RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + self.fail_message(lxmessage) diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 0533f07..baf951a 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -357,7 +357,7 @@ class LXMessage: else: return None - def pack(self): + def pack(self, payload_updated=False): if not self.packed: if self.timestamp == None: self.timestamp = time.time() @@ -431,7 +431,7 @@ class LXMessage: elif self.desired_method == LXMessage.PROPAGATED: single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT - if self.__pn_encrypted_data == None: + if self.__pn_encrypted_data == None or payload_updated: self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) self.ratchet_id = self.__destination.latest_ratchet_id diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index 56e2500..39b541b 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -15,6 +15,8 @@ PN_VALIDATION_POOL_MIN_SIZE = 256 active_jobs = {} +if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork") + def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): wb_st = time.time() workblock = b"" diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index b09e7ae..ab8e30f 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 -# MIT License +# Reticulum License # -# Copyright (c) 2016-2022 Mark Qvist / unsigned.io +# Copyright (c) 2020-2025 Mark Qvist # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -11,8 +11,16 @@ # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. +# - The Software shall not be used in any kind of system which includes amongst +# its functions the ability to purposefully do harm to human beings. +# +# - The Software shall not be used, directly or indirectly, in the creation of +# an artificial intelligence, machine learning or language model training +# dataset, including but not limited to any use that contributes to the +# training or development of such a model or algorithm. +# +# - The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, diff --git a/LXMF/_version.py b/LXMF/_version.py index 3e2f46a..c598173 100644 --- a/LXMF/_version.py +++ b/LXMF/_version.py @@ -1 +1 @@ -__version__ = "0.9.0" +__version__ = "0.9.3" diff --git a/Makefile b/Makefile index c0b53da..c00cde9 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,12 @@ create_symlinks: -ln -s ../../LXMF ./LXMF/Utilities/LXMF build_wheel: - python3 setup.py sdist bdist_wheel + python3 setup.py bdist_wheel + +build_sdist: + python3 setup.py sdist + +build_spkg: remove_symlinks build_sdist create_symlinks release: remove_symlinks build_wheel create_symlinks diff --git a/README.md b/README.md index ed7e4f0..cd1a6c5 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,26 @@ options: Or run `lxmd --exampleconfig` to generate a commented example configuration documenting all the available configuration directives. +## Support LXMF Development +You can help support the continued development of open, free and private communications systems by donating via one of the following channels: + +- Monero: + ``` + 84FpY1QbxHcgdseePYNmhTHcrgMX4nFfBYtz2GKYToqHVVhJp8Eaw1Z1EedRnKD19b3B8NiLCGVxzKV17UMmmeEsCrPyA5w + ``` +- Bitcoin + ``` + bc1pgqgu8h8xvj4jtafslq396v7ju7hkgymyrzyqft4llfslz5vp99psqfk3a6 + ``` +- Ethereum + ``` + 0x91C421DdfB8a30a49A71d63447ddb54cEBe3465E + ``` +- Liberapay: https://liberapay.com/Reticulum/ + +- Ko-Fi: https://ko-fi.com/markqvist + + ## Caveat Emptor LXMF is beta software, and should be considered experimental. While it has been built with cryptography best practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch.