Compare commits

...

13 commits

8 changed files with 266 additions and 221 deletions

View file

@ -13,17 +13,6 @@ class LXMFDeliveryAnnounceHandler:
self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data):
for lxmessage in self.lxmrouter.pending_outbound:
if destination_hash == lxmessage.destination_hash:
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
lxmessage.next_delivery_attempt = time.time()
def outbound_trigger():
while self.lxmrouter.processing_outbound: time.sleep(0.1)
self.lxmrouter.process_outbound()
threading.Thread(target=outbound_trigger, daemon=True).start()
try:
stamp_cost = stamp_cost_from_app_data(app_data)
self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost)
@ -31,6 +20,17 @@ class LXMFDeliveryAnnounceHandler:
except Exception as e:
RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR)
for lxmessage in self.lxmrouter.pending_outbound:
if destination_hash == lxmessage.destination_hash:
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
lxmessage.next_delivery_attempt = time.time()
def outbound_trigger():
while self.lxmrouter.outbound_processing_lock.locked(): time.sleep(0.1)
self.lxmrouter.process_outbound()
threading.Thread(target=outbound_trigger, daemon=True).start()
class LXMFPropagationAnnounceHandler:
def __init__(self, lxmrouter):

View file

@ -109,7 +109,6 @@ class LXMRouter:
self.retain_synced_on_node = False
self.default_sync_strategy = sync_strategy
self.processing_outbound = False
self.processing_inbound = False
self.processing_count = 0
self.name = name
@ -160,6 +159,7 @@ class LXMRouter:
self.outbound_stamp_costs = {}
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
self.outbound_processing_lock = threading.Lock()
self.cost_file_lock = threading.Lock()
self.ticket_file_lock = threading.Lock()
self.stamp_gen_lock = threading.Lock()
@ -219,7 +219,7 @@ class LXMRouter:
self.locally_delivered_transient_ids = {}
except Exception as e:
RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.log(f"Could not load locally delivered message ID cache from storage. The contained exception was: {e}", RNS.LOG_ERROR)
self.locally_delivered_transient_ids = {}
try:
@ -576,13 +576,20 @@ class LXMRouter:
RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
st = time.time()
if os.path.isfile(self.storagepath+"/peers"):
peers_file = open(self.storagepath+"/peers", "rb")
peers_storage_path = self.storagepath+"/peers"
if os.path.isfile(peers_storage_path):
peers_file = open(peers_storage_path, "rb")
peers_data = peers_file.read()
peers_file.close()
if len(peers_data) > 0:
serialised_peers = msgpack.unpackb(peers_data)
try: serialised_peers = msgpack.unpackb(peers_data)
except Exception as e:
RNS.log(f"Could not load propagation node peering data from storage. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.log(f"The peering data file located at {peers_storage_path} is likely corrupt.", RNS.LOG_ERROR)
RNS.log(f"You can delete this file and attempt starting the propagation node again, but peer synchronization states will be lost.", RNS.LOG_ERROR)
raise e
del peers_data
while len(serialised_peers) > 0:
@ -1360,14 +1367,16 @@ class LXMRouter:
def sigint_handler(self, signal, frame):
if not self.exit_handler_running:
RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING)
sys.exit(0)
self.exit_handler()
RNS.exit(0)
else:
RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
def sigterm_handler(self, signal, frame):
if not self.exit_handler_running:
RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING)
sys.exit(0)
self.exit_handler()
RNS.exit(0)
else:
RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
@ -1662,10 +1671,10 @@ class LXMRouter:
lxmessage.defer_stamp = False
if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp):
while not unknown_path_requested and self.processing_outbound: time.sleep(0.05)
while not unknown_path_requested and self.outbound_processing_lock.locked(): time.sleep(0.05)
self.pending_outbound.append(lxmessage)
if not unknown_path_requested: self.process_outbound()
if not unknown_path_requested: threading.Thread(target=self.process_outbound, daemon=True).start()
else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage
@ -2198,14 +2207,15 @@ class LXMRouter:
# 4: Limit for incoming propagation node syncs
# 5: Propagation stamp costs for this node
# 6: Node metadata
if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
remote_timebase = remote_app_data[1]
remote_transfer_limit = remote_app_data[3]
remote_sync_limit = remote_app_data[4]
remote_stamp_cost = remote_app_data[5][0]
remote_stamp_flex = remote_app_data[5][1]
remote_peering_cost = remote_app_data[5][2]
remote_metadata = remote_app_data[6]
pn_config = msgpack.unpackb(remote_app_data)
if pn_config[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
remote_timebase = pn_config[1]
remote_transfer_limit = pn_config[3]
remote_sync_limit = pn_config[4]
remote_stamp_cost = pn_config[5][0]
remote_stamp_flex = pn_config[5][1]
remote_peering_cost = pn_config[5][2]
remote_metadata = pn_config[6]
RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug
self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata)
@ -2370,6 +2380,7 @@ class LXMRouter:
def fail_message(self, lxmessage):
RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)
lxmessage.progress = 0.0
if lxmessage in self.pending_outbound:
self.pending_outbound.remove(lxmessage)
@ -2424,7 +2435,7 @@ class LXMRouter:
selected_lxm.stamp = generated_stamp
selected_lxm.defer_stamp = False
selected_lxm.packed = None
selected_lxm.pack()
selected_lxm.pack(payload_updated=True)
stamp_generation_success = True
RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
else:
@ -2491,198 +2502,141 @@ class LXMRouter:
RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR)
def process_outbound(self, sender = None):
if self.processing_outbound:
return
if self.outbound_processing_lock.locked(): return
with self.outbound_processing_lock:
for lxmessage in self.pending_outbound:
if lxmessage.state == LXMessage.DELIVERED:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
for lxmessage in self.pending_outbound:
if lxmessage.state == LXMessage.DELIVERED:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
# Udate ticket delivery stats
if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
self.save_available_tickets()
# Udate ticket delivery stats
if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
self.save_available_tickets()
# Prepare link for backchannel communications
delivery_destination_hash = lxmessage.get_destination().hash
if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links:
direct_link = self.direct_links[delivery_destination_hash]
if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
if direct_link.initiator == True:
source_destination_hash = lxmessage.get_source().hash
if source_destination_hash in self.delivery_destinations:
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
direct_link.identify(backchannel_identity)
direct_link.backchannel_identified = True
self.delivery_link_established(direct_link)
RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
# Prepare link for backchannel communications
delivery_destination_hash = lxmessage.get_destination().hash
if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links:
direct_link = self.direct_links[delivery_destination_hash]
if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
if direct_link.initiator == True:
source_destination_hash = lxmessage.get_source().hash
if source_destination_hash in self.delivery_destinations:
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
direct_link.identify(backchannel_identity)
direct_link.backchannel_identified = True
self.delivery_link_established(direct_link)
RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
elif lxmessage.state == LXMessage.CANCELLED:
RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
elif lxmessage.state == LXMessage.CANCELLED:
RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
elif lxmessage.state == LXMessage.REJECTED:
RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
elif lxmessage.state == LXMessage.REJECTED:
RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
else:
RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
else:
RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01
if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01
# Outbound handling for opportunistic messages
if lxmessage.method == LXMessage.OPPORTUNISTIC:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
def rediscover_job():
time.sleep(0.5)
RNS.Transport.request_path(lxmessage.get_destination().hash)
threading.Thread(target=rediscover_job, daemon=True).start()
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.send()
else:
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
# Outbound handling for messages transferred
# over a direct link to the final recipient
elif lxmessage.method == LXMessage.DIRECT:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None
if delivery_destination_hash in self.direct_links:
# An established direct link already exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.direct_links[delivery_destination_hash]
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
elif delivery_destination_hash in self.backchannel_links:
# An established backchannel link exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.backchannel_links[delivery_destination_hash]
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
if direct_link != None:
if direct_link.status == RNS.Link.ACTIVE:
if lxmessage.progress == None or lxmessage.progress < 0.05:
lxmessage.progress = 0.05
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(direct_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif direct_link.status == RNS.Link.CLOSED:
if direct_link.activated_at != None:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
else:
if not hasattr(lxmessage, "path_request_retried"):
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.path_request_retried = True
else:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.set_delivery_destination(None)
if delivery_destination_hash in self.direct_links:
self.direct_links.pop(delivery_destination_hash)
if delivery_destination_hash in self.backchannel_links:
self.backchannel_links.pop(delivery_destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become active or close
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
# period has elapsed.
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
delivery_link = RNS.Link(lxmessage.get_destination())
delivery_link.set_link_established_callback(self.process_outbound)
self.direct_links[delivery_destination_hash] = delivery_link
lxmessage.progress = 0.03
else:
RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
# Outbound handling for messages transported via
# propagation to a LXMF router network.
elif lxmessage.method == LXMessage.PROPAGATED:
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if self.outbound_propagation_node == None:
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
self.fail_message(lxmessage)
else:
# Outbound handling for opportunistic messages
if lxmessage.method == LXMessage.OPPORTUNISTIC:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
def rediscover_job():
time.sleep(0.5)
RNS.Transport.request_path(lxmessage.get_destination().hash)
threading.Thread(target=rediscover_job, daemon=True).start()
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.send()
else:
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
if self.outbound_propagation_link != None:
# A link already exists, so we'll try to use it
# to deliver the message
if self.outbound_propagation_link.status == RNS.Link.ACTIVE:
# Outbound handling for messages transferred
# over a direct link to the final recipient
elif lxmessage.method == LXMessage.DIRECT:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None
if delivery_destination_hash in self.direct_links:
# An established direct link already exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.direct_links[delivery_destination_hash]
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
elif delivery_destination_hash in self.backchannel_links:
# An established backchannel link exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.backchannel_links[delivery_destination_hash]
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
if direct_link != None:
if direct_link.status == RNS.Link.ACTIVE:
if lxmessage.progress == None or lxmessage.progress < 0.05:
lxmessage.progress = 0.05
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(self.outbound_propagation_link)
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(direct_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif self.outbound_propagation_link.status == RNS.Link.CLOSED:
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG)
self.outbound_propagation_link = None
elif direct_link.status == RNS.Link.CLOSED:
if direct_link.activated_at != None:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
else:
if not hasattr(lxmessage, "path_request_retried"):
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.path_request_retried = True
else:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.set_delivery_destination(None)
if delivery_destination_hash in self.direct_links:
self.direct_links.pop(delivery_destination_hash)
if delivery_destination_hash in self.backchannel_links:
self.backchannel_links.pop(delivery_destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become
# active or close
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
# Simply wait for the link to become active or close
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
@ -2692,18 +2646,74 @@ class LXMRouter:
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(self.outbound_propagation_node):
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet)
self.outbound_propagation_link.for_lxmessage = lxmessage
if RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
delivery_link = RNS.Link(lxmessage.get_destination())
delivery_link.set_link_established_callback(self.process_outbound)
self.direct_links[delivery_destination_hash] = delivery_link
lxmessage.progress = 0.03
else:
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node)
RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
# Outbound handling for messages transported via
# propagation to a LXMF router network.
elif lxmessage.method == LXMessage.PROPAGATED:
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if self.outbound_propagation_node == None:
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
self.fail_message(lxmessage)
else:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if self.outbound_propagation_link != None:
# A link already exists, so we'll try to use it
# to deliver the message
if self.outbound_propagation_link.status == RNS.Link.ACTIVE:
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(self.outbound_propagation_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif self.outbound_propagation_link.status == RNS.Link.CLOSED:
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG)
self.outbound_propagation_link = None
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become
# active or close
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
# period has elapsed.
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(self.outbound_propagation_node):
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet)
self.outbound_propagation_link.for_lxmessage = lxmessage
else:
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
else:
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)

View file

@ -357,7 +357,7 @@ class LXMessage:
else:
return None
def pack(self):
def pack(self, payload_updated=False):
if not self.packed:
if self.timestamp == None: self.timestamp = time.time()
@ -431,7 +431,7 @@ class LXMessage:
elif self.desired_method == LXMessage.PROPAGATED:
single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT
if self.__pn_encrypted_data == None:
if self.__pn_encrypted_data == None or payload_updated:
self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
self.ratchet_id = self.__destination.latest_ratchet_id

View file

@ -15,6 +15,8 @@ PN_VALIDATION_POOL_MIN_SIZE = 256
active_jobs = {}
if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork")
def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
wb_st = time.time()
workblock = b""

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3
# MIT License
# Reticulum License
#
# Copyright (c) 2016-2022 Mark Qvist / unsigned.io
# Copyright (c) 2020-2025 Mark Qvist
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@ -11,8 +11,16 @@
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# - The Software shall not be used in any kind of system which includes amongst
# its functions the ability to purposefully do harm to human beings.
#
# - The Software shall not be used, directly or indirectly, in the creation of
# an artificial intelligence, machine learning or language model training
# dataset, including but not limited to any use that contributes to the
# training or development of such a model or algorithm.
#
# - The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

View file

@ -1 +1 @@
__version__ = "0.9.0"
__version__ = "0.9.3"

View file

@ -16,7 +16,12 @@ create_symlinks:
-ln -s ../../LXMF ./LXMF/Utilities/LXMF
build_wheel:
python3 setup.py sdist bdist_wheel
python3 setup.py bdist_wheel
build_sdist:
python3 setup.py sdist
build_spkg: remove_symlinks build_sdist create_symlinks
release: remove_symlinks build_wheel create_symlinks

View file

@ -183,6 +183,26 @@ options:
Or run `lxmd --exampleconfig` to generate a commented example configuration documenting all the available configuration directives.
## Support LXMF Development
You can help support the continued development of open, free and private communications systems by donating via one of the following channels:
- Monero:
```
84FpY1QbxHcgdseePYNmhTHcrgMX4nFfBYtz2GKYToqHVVhJp8Eaw1Z1EedRnKD19b3B8NiLCGVxzKV17UMmmeEsCrPyA5w
```
- Bitcoin
```
bc1pgqgu8h8xvj4jtafslq396v7ju7hkgymyrzyqft4llfslz5vp99psqfk3a6
```
- Ethereum
```
0x91C421DdfB8a30a49A71d63447ddb54cEBe3465E
```
- Liberapay: https://liberapay.com/Reticulum/
- Ko-Fi: https://ko-fi.com/markqvist
## Caveat Emptor
LXMF is beta software, and should be considered experimental. While it has been built with cryptography best practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch.