Compare commits

...

13 commits

8 changed files with 266 additions and 221 deletions

View file

@ -13,17 +13,6 @@ class LXMFDeliveryAnnounceHandler:
self.lxmrouter = lxmrouter self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data): def received_announce(self, destination_hash, announced_identity, app_data):
for lxmessage in self.lxmrouter.pending_outbound:
if destination_hash == lxmessage.destination_hash:
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
lxmessage.next_delivery_attempt = time.time()
def outbound_trigger():
while self.lxmrouter.processing_outbound: time.sleep(0.1)
self.lxmrouter.process_outbound()
threading.Thread(target=outbound_trigger, daemon=True).start()
try: try:
stamp_cost = stamp_cost_from_app_data(app_data) stamp_cost = stamp_cost_from_app_data(app_data)
self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost) self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost)
@ -31,6 +20,17 @@ class LXMFDeliveryAnnounceHandler:
except Exception as e: except Exception as e:
RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR) RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR)
for lxmessage in self.lxmrouter.pending_outbound:
if destination_hash == lxmessage.destination_hash:
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
lxmessage.next_delivery_attempt = time.time()
def outbound_trigger():
while self.lxmrouter.outbound_processing_lock.locked(): time.sleep(0.1)
self.lxmrouter.process_outbound()
threading.Thread(target=outbound_trigger, daemon=True).start()
class LXMFPropagationAnnounceHandler: class LXMFPropagationAnnounceHandler:
def __init__(self, lxmrouter): def __init__(self, lxmrouter):

View file

@ -109,7 +109,6 @@ class LXMRouter:
self.retain_synced_on_node = False self.retain_synced_on_node = False
self.default_sync_strategy = sync_strategy self.default_sync_strategy = sync_strategy
self.processing_outbound = False
self.processing_inbound = False self.processing_inbound = False
self.processing_count = 0 self.processing_count = 0
self.name = name self.name = name
@ -160,6 +159,7 @@ class LXMRouter:
self.outbound_stamp_costs = {} self.outbound_stamp_costs = {}
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}} self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
self.outbound_processing_lock = threading.Lock()
self.cost_file_lock = threading.Lock() self.cost_file_lock = threading.Lock()
self.ticket_file_lock = threading.Lock() self.ticket_file_lock = threading.Lock()
self.stamp_gen_lock = threading.Lock() self.stamp_gen_lock = threading.Lock()
@ -219,7 +219,7 @@ class LXMRouter:
self.locally_delivered_transient_ids = {} self.locally_delivered_transient_ids = {}
except Exception as e: except Exception as e:
RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log(f"Could not load locally delivered message ID cache from storage. The contained exception was: {e}", RNS.LOG_ERROR)
self.locally_delivered_transient_ids = {} self.locally_delivered_transient_ids = {}
try: try:
@ -576,13 +576,20 @@ class LXMRouter:
RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE) RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
st = time.time() st = time.time()
if os.path.isfile(self.storagepath+"/peers"): peers_storage_path = self.storagepath+"/peers"
peers_file = open(self.storagepath+"/peers", "rb") if os.path.isfile(peers_storage_path):
peers_file = open(peers_storage_path, "rb")
peers_data = peers_file.read() peers_data = peers_file.read()
peers_file.close() peers_file.close()
if len(peers_data) > 0: if len(peers_data) > 0:
serialised_peers = msgpack.unpackb(peers_data) try: serialised_peers = msgpack.unpackb(peers_data)
except Exception as e:
RNS.log(f"Could not load propagation node peering data from storage. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.log(f"The peering data file located at {peers_storage_path} is likely corrupt.", RNS.LOG_ERROR)
RNS.log(f"You can delete this file and attempt starting the propagation node again, but peer synchronization states will be lost.", RNS.LOG_ERROR)
raise e
del peers_data del peers_data
while len(serialised_peers) > 0: while len(serialised_peers) > 0:
@ -1360,14 +1367,16 @@ class LXMRouter:
def sigint_handler(self, signal, frame): def sigint_handler(self, signal, frame):
if not self.exit_handler_running: if not self.exit_handler_running:
RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING) RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING)
sys.exit(0) self.exit_handler()
RNS.exit(0)
else: else:
RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
def sigterm_handler(self, signal, frame): def sigterm_handler(self, signal, frame):
if not self.exit_handler_running: if not self.exit_handler_running:
RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING) RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING)
sys.exit(0) self.exit_handler()
RNS.exit(0)
else: else:
RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
@ -1662,10 +1671,10 @@ class LXMRouter:
lxmessage.defer_stamp = False lxmessage.defer_stamp = False
if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp): if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp):
while not unknown_path_requested and self.processing_outbound: time.sleep(0.05) while not unknown_path_requested and self.outbound_processing_lock.locked(): time.sleep(0.05)
self.pending_outbound.append(lxmessage) self.pending_outbound.append(lxmessage)
if not unknown_path_requested: self.process_outbound() if not unknown_path_requested: threading.Thread(target=self.process_outbound, daemon=True).start()
else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage
@ -2198,14 +2207,15 @@ class LXMRouter:
# 4: Limit for incoming propagation node syncs # 4: Limit for incoming propagation node syncs
# 5: Propagation stamp costs for this node # 5: Propagation stamp costs for this node
# 6: Node metadata # 6: Node metadata
if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: pn_config = msgpack.unpackb(remote_app_data)
remote_timebase = remote_app_data[1] if pn_config[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
remote_transfer_limit = remote_app_data[3] remote_timebase = pn_config[1]
remote_sync_limit = remote_app_data[4] remote_transfer_limit = pn_config[3]
remote_stamp_cost = remote_app_data[5][0] remote_sync_limit = pn_config[4]
remote_stamp_flex = remote_app_data[5][1] remote_stamp_cost = pn_config[5][0]
remote_peering_cost = remote_app_data[5][2] remote_stamp_flex = pn_config[5][1]
remote_metadata = remote_app_data[6] remote_peering_cost = pn_config[5][2]
remote_metadata = pn_config[6]
RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug
self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata) self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata)
@ -2370,6 +2380,7 @@ class LXMRouter:
def fail_message(self, lxmessage): def fail_message(self, lxmessage):
RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG) RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)
lxmessage.progress = 0.0
if lxmessage in self.pending_outbound: if lxmessage in self.pending_outbound:
self.pending_outbound.remove(lxmessage) self.pending_outbound.remove(lxmessage)
@ -2424,7 +2435,7 @@ class LXMRouter:
selected_lxm.stamp = generated_stamp selected_lxm.stamp = generated_stamp
selected_lxm.defer_stamp = False selected_lxm.defer_stamp = False
selected_lxm.packed = None selected_lxm.packed = None
selected_lxm.pack() selected_lxm.pack(payload_updated=True)
stamp_generation_success = True stamp_generation_success = True
RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
else: else:
@ -2491,198 +2502,141 @@ class LXMRouter:
RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR) RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR)
def process_outbound(self, sender = None): def process_outbound(self, sender = None):
if self.processing_outbound: if self.outbound_processing_lock.locked(): return
return with self.outbound_processing_lock:
for lxmessage in self.pending_outbound:
if lxmessage.state == LXMessage.DELIVERED:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
for lxmessage in self.pending_outbound: # Udate ticket delivery stats
if lxmessage.state == LXMessage.DELIVERED: if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage) self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
self.save_available_tickets()
# Udate ticket delivery stats # Prepare link for backchannel communications
if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields: delivery_destination_hash = lxmessage.get_destination().hash
RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG) if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links:
self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time() direct_link = self.direct_links[delivery_destination_hash]
self.save_available_tickets() if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
if direct_link.initiator == True:
source_destination_hash = lxmessage.get_source().hash
if source_destination_hash in self.delivery_destinations:
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
direct_link.identify(backchannel_identity)
direct_link.backchannel_identified = True
self.delivery_link_established(direct_link)
RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
# Prepare link for backchannel communications elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
delivery_destination_hash = lxmessage.get_destination().hash RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links: self.pending_outbound.remove(lxmessage)
direct_link = self.direct_links[delivery_destination_hash]
if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
if direct_link.initiator == True:
source_destination_hash = lxmessage.get_source().hash
if source_destination_hash in self.delivery_destinations:
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
direct_link.identify(backchannel_identity)
direct_link.backchannel_identified = True
self.delivery_link_established(direct_link)
RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT: elif lxmessage.state == LXMessage.CANCELLED:
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage) self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
elif lxmessage.state == LXMessage.CANCELLED: elif lxmessage.state == LXMessage.REJECTED:
RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage) if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage) lxmessage.failed_callback(lxmessage)
elif lxmessage.state == LXMessage.REJECTED: else:
RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
else: if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01
RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 # Outbound handling for opportunistic messages
if lxmessage.method == LXMessage.OPPORTUNISTIC:
# Outbound handling for opportunistic messages
if lxmessage.method == LXMessage.OPPORTUNISTIC:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
def rediscover_job():
time.sleep(0.5)
RNS.Transport.request_path(lxmessage.get_destination().hash)
threading.Thread(target=rediscover_job, daemon=True).start()
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.send()
else:
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
# Outbound handling for messages transferred
# over a direct link to the final recipient
elif lxmessage.method == LXMessage.DIRECT:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None
if delivery_destination_hash in self.direct_links:
# An established direct link already exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.direct_links[delivery_destination_hash]
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
elif delivery_destination_hash in self.backchannel_links:
# An established backchannel link exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.backchannel_links[delivery_destination_hash]
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
if direct_link != None:
if direct_link.status == RNS.Link.ACTIVE:
if lxmessage.progress == None or lxmessage.progress < 0.05:
lxmessage.progress = 0.05
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(direct_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif direct_link.status == RNS.Link.CLOSED:
if direct_link.activated_at != None:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
else:
if not hasattr(lxmessage, "path_request_retried"):
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.path_request_retried = True
else:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.set_delivery_destination(None)
if delivery_destination_hash in self.direct_links:
self.direct_links.pop(delivery_destination_hash)
if delivery_destination_hash in self.backchannel_links:
self.backchannel_links.pop(delivery_destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become active or close
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
# period has elapsed.
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
delivery_link = RNS.Link(lxmessage.get_destination())
delivery_link.set_link_established_callback(self.process_outbound)
self.direct_links[delivery_destination_hash] = delivery_link
lxmessage.progress = 0.03
else:
RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
# Outbound handling for messages transported via
# propagation to a LXMF router network.
elif lxmessage.method == LXMessage.PROPAGATED:
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if self.outbound_propagation_node == None:
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
self.fail_message(lxmessage)
else:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
def rediscover_job():
time.sleep(0.5)
RNS.Transport.request_path(lxmessage.get_destination().hash)
threading.Thread(target=rediscover_job, daemon=True).start()
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.send()
else:
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
if self.outbound_propagation_link != None: # Outbound handling for messages transferred
# A link already exists, so we'll try to use it # over a direct link to the final recipient
# to deliver the message elif lxmessage.method == LXMessage.DIRECT:
if self.outbound_propagation_link.status == RNS.Link.ACTIVE: if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None
if delivery_destination_hash in self.direct_links:
# An established direct link already exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.direct_links[delivery_destination_hash]
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
elif delivery_destination_hash in self.backchannel_links:
# An established backchannel link exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.backchannel_links[delivery_destination_hash]
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
if direct_link != None:
if direct_link.status == RNS.Link.ACTIVE:
if lxmessage.progress == None or lxmessage.progress < 0.05:
lxmessage.progress = 0.05
if lxmessage.state != LXMessage.SENDING: if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG) RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(self.outbound_propagation_link) lxmessage.set_delivery_destination(direct_link)
lxmessage.send() lxmessage.send()
else: else:
if lxmessage.representation == LXMessage.RESOURCE: if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else: else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif self.outbound_propagation_link.status == RNS.Link.CLOSED: elif direct_link.status == RNS.Link.CLOSED:
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG) if direct_link.activated_at != None:
self.outbound_propagation_link = None RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
else:
if not hasattr(lxmessage, "path_request_retried"):
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.path_request_retried = True
else:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.set_delivery_destination(None)
if delivery_destination_hash in self.direct_links:
self.direct_links.pop(delivery_destination_hash)
if delivery_destination_hash in self.backchannel_links:
self.backchannel_links.pop(delivery_destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else: else:
# Simply wait for the link to become # Simply wait for the link to become active or close
# active or close RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else: else:
# No link exists, so we'll try to establish one, but # No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait # only if we've never tried before, or the retry wait
@ -2692,18 +2646,74 @@ class LXMRouter:
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(self.outbound_propagation_node): if RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) delivery_link = RNS.Link(lxmessage.get_destination())
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") delivery_link.set_link_established_callback(self.process_outbound)
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) self.direct_links[delivery_destination_hash] = delivery_link
self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet) lxmessage.progress = 0.03
self.outbound_propagation_link.for_lxmessage = lxmessage
else: else:
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node) RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else: else:
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage) self.fail_message(lxmessage)
# Outbound handling for messages transported via
# propagation to a LXMF router network.
elif lxmessage.method == LXMessage.PROPAGATED:
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if self.outbound_propagation_node == None:
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
self.fail_message(lxmessage)
else:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if self.outbound_propagation_link != None:
# A link already exists, so we'll try to use it
# to deliver the message
if self.outbound_propagation_link.status == RNS.Link.ACTIVE:
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(self.outbound_propagation_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif self.outbound_propagation_link.status == RNS.Link.CLOSED:
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG)
self.outbound_propagation_link = None
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become
# active or close
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
# period has elapsed.
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(self.outbound_propagation_node):
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet)
self.outbound_propagation_link.for_lxmessage = lxmessage
else:
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
else:
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)

View file

@ -357,7 +357,7 @@ class LXMessage:
else: else:
return None return None
def pack(self): def pack(self, payload_updated=False):
if not self.packed: if not self.packed:
if self.timestamp == None: self.timestamp = time.time() if self.timestamp == None: self.timestamp = time.time()
@ -431,7 +431,7 @@ class LXMessage:
elif self.desired_method == LXMessage.PROPAGATED: elif self.desired_method == LXMessage.PROPAGATED:
single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT
if self.__pn_encrypted_data == None: if self.__pn_encrypted_data == None or payload_updated:
self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
self.ratchet_id = self.__destination.latest_ratchet_id self.ratchet_id = self.__destination.latest_ratchet_id

View file

@ -15,6 +15,8 @@ PN_VALIDATION_POOL_MIN_SIZE = 256
active_jobs = {} active_jobs = {}
if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork")
def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
wb_st = time.time() wb_st = time.time()
workblock = b"" workblock = b""

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# MIT License # Reticulum License
# #
# Copyright (c) 2016-2022 Mark Qvist / unsigned.io # Copyright (c) 2020-2025 Mark Qvist
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy # Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal # of this software and associated documentation files (the "Software"), to deal
@ -11,8 +11,16 @@
# copies of the Software, and to permit persons to whom the Software is # copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions: # furnished to do so, subject to the following conditions:
# #
# The above copyright notice and this permission notice shall be included in all # - The Software shall not be used in any kind of system which includes amongst
# copies or substantial portions of the Software. # its functions the ability to purposefully do harm to human beings.
#
# - The Software shall not be used, directly or indirectly, in the creation of
# an artificial intelligence, machine learning or language model training
# dataset, including but not limited to any use that contributes to the
# training or development of such a model or algorithm.
#
# - The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# #
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

View file

@ -1 +1 @@
__version__ = "0.9.0" __version__ = "0.9.3"

View file

@ -16,7 +16,12 @@ create_symlinks:
-ln -s ../../LXMF ./LXMF/Utilities/LXMF -ln -s ../../LXMF ./LXMF/Utilities/LXMF
build_wheel: build_wheel:
python3 setup.py sdist bdist_wheel python3 setup.py bdist_wheel
build_sdist:
python3 setup.py sdist
build_spkg: remove_symlinks build_sdist create_symlinks
release: remove_symlinks build_wheel create_symlinks release: remove_symlinks build_wheel create_symlinks

View file

@ -183,6 +183,26 @@ options:
Or run `lxmd --exampleconfig` to generate a commented example configuration documenting all the available configuration directives. Or run `lxmd --exampleconfig` to generate a commented example configuration documenting all the available configuration directives.
## Support LXMF Development
You can help support the continued development of open, free and private communications systems by donating via one of the following channels:
- Monero:
```
84FpY1QbxHcgdseePYNmhTHcrgMX4nFfBYtz2GKYToqHVVhJp8Eaw1Z1EedRnKD19b3B8NiLCGVxzKV17UMmmeEsCrPyA5w
```
- Bitcoin
```
bc1pgqgu8h8xvj4jtafslq396v7ju7hkgymyrzyqft4llfslz5vp99psqfk3a6
```
- Ethereum
```
0x91C421DdfB8a30a49A71d63447ddb54cEBe3465E
```
- Liberapay: https://liberapay.com/Reticulum/
- Ko-Fi: https://ko-fi.com/markqvist
## Caveat Emptor ## Caveat Emptor
LXMF is beta software, and should be considered experimental. While it has been built with cryptography best practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch. LXMF is beta software, and should be considered experimental. While it has been built with cryptography best practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch.