Improved outbound message processing speed

This commit is contained in:
Mark Qvist 2025-12-02 20:17:46 +01:00
parent f4c805ea35
commit a6f5a56a38
2 changed files with 198 additions and 198 deletions

View file

@ -13,17 +13,6 @@ class LXMFDeliveryAnnounceHandler:
self.lxmrouter = lxmrouter self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data): def received_announce(self, destination_hash, announced_identity, app_data):
for lxmessage in self.lxmrouter.pending_outbound:
if destination_hash == lxmessage.destination_hash:
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
lxmessage.next_delivery_attempt = time.time()
def outbound_trigger():
while self.lxmrouter.processing_outbound: time.sleep(0.1)
self.lxmrouter.process_outbound()
threading.Thread(target=outbound_trigger, daemon=True).start()
try: try:
stamp_cost = stamp_cost_from_app_data(app_data) stamp_cost = stamp_cost_from_app_data(app_data)
self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost) self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost)
@ -31,6 +20,17 @@ class LXMFDeliveryAnnounceHandler:
except Exception as e: except Exception as e:
RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR) RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR)
for lxmessage in self.lxmrouter.pending_outbound:
if destination_hash == lxmessage.destination_hash:
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
lxmessage.next_delivery_attempt = time.time()
def outbound_trigger():
while self.lxmrouter.outbound_processing_lock.locked(): time.sleep(0.1)
self.lxmrouter.process_outbound()
threading.Thread(target=outbound_trigger, daemon=True).start()
class LXMFPropagationAnnounceHandler: class LXMFPropagationAnnounceHandler:
def __init__(self, lxmrouter): def __init__(self, lxmrouter):

View file

@ -109,7 +109,6 @@ class LXMRouter:
self.retain_synced_on_node = False self.retain_synced_on_node = False
self.default_sync_strategy = sync_strategy self.default_sync_strategy = sync_strategy
self.processing_outbound = False
self.processing_inbound = False self.processing_inbound = False
self.processing_count = 0 self.processing_count = 0
self.name = name self.name = name
@ -160,6 +159,7 @@ class LXMRouter:
self.outbound_stamp_costs = {} self.outbound_stamp_costs = {}
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}} self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
self.outbound_processing_lock = threading.Lock()
self.cost_file_lock = threading.Lock() self.cost_file_lock = threading.Lock()
self.ticket_file_lock = threading.Lock() self.ticket_file_lock = threading.Lock()
self.stamp_gen_lock = threading.Lock() self.stamp_gen_lock = threading.Lock()
@ -1664,10 +1664,10 @@ class LXMRouter:
lxmessage.defer_stamp = False lxmessage.defer_stamp = False
if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp): if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp):
while not unknown_path_requested and self.processing_outbound: time.sleep(0.05) while not unknown_path_requested and self.outbound_processing_lock.locked(): time.sleep(0.05)
self.pending_outbound.append(lxmessage) self.pending_outbound.append(lxmessage)
if not unknown_path_requested: self.process_outbound() if not unknown_path_requested: threading.Thread(target=self.process_outbound, daemon=True).start()
else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage
@ -2373,6 +2373,7 @@ class LXMRouter:
def fail_message(self, lxmessage): def fail_message(self, lxmessage):
RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG) RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)
lxmessage.progress = 0.0
if lxmessage in self.pending_outbound: if lxmessage in self.pending_outbound:
self.pending_outbound.remove(lxmessage) self.pending_outbound.remove(lxmessage)
@ -2494,198 +2495,141 @@ class LXMRouter:
RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR) RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR)
def process_outbound(self, sender = None): def process_outbound(self, sender = None):
if self.processing_outbound: if self.outbound_processing_lock.locked(): return
return with self.outbound_processing_lock:
for lxmessage in self.pending_outbound:
if lxmessage.state == LXMessage.DELIVERED:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
for lxmessage in self.pending_outbound: # Udate ticket delivery stats
if lxmessage.state == LXMessage.DELIVERED: if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage) self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
self.save_available_tickets()
# Udate ticket delivery stats # Prepare link for backchannel communications
if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields: delivery_destination_hash = lxmessage.get_destination().hash
RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG) if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links:
self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time() direct_link = self.direct_links[delivery_destination_hash]
self.save_available_tickets() if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
if direct_link.initiator == True:
source_destination_hash = lxmessage.get_source().hash
if source_destination_hash in self.delivery_destinations:
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
direct_link.identify(backchannel_identity)
direct_link.backchannel_identified = True
self.delivery_link_established(direct_link)
RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
# Prepare link for backchannel communications elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
delivery_destination_hash = lxmessage.get_destination().hash RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links: self.pending_outbound.remove(lxmessage)
direct_link = self.direct_links[delivery_destination_hash]
if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
if direct_link.initiator == True:
source_destination_hash = lxmessage.get_source().hash
if source_destination_hash in self.delivery_destinations:
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
direct_link.identify(backchannel_identity)
direct_link.backchannel_identified = True
self.delivery_link_established(direct_link)
RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT: elif lxmessage.state == LXMessage.CANCELLED:
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage) self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
elif lxmessage.state == LXMessage.CANCELLED: elif lxmessage.state == LXMessage.REJECTED:
RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage) if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage) lxmessage.failed_callback(lxmessage)
elif lxmessage.state == LXMessage.REJECTED: else:
RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
else: if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01
RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 # Outbound handling for opportunistic messages
if lxmessage.method == LXMessage.OPPORTUNISTIC:
# Outbound handling for opportunistic messages
if lxmessage.method == LXMessage.OPPORTUNISTIC:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
def rediscover_job():
time.sleep(0.5)
RNS.Transport.request_path(lxmessage.get_destination().hash)
threading.Thread(target=rediscover_job, daemon=True).start()
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.send()
else:
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
# Outbound handling for messages transferred
# over a direct link to the final recipient
elif lxmessage.method == LXMessage.DIRECT:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None
if delivery_destination_hash in self.direct_links:
# An established direct link already exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.direct_links[delivery_destination_hash]
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
elif delivery_destination_hash in self.backchannel_links:
# An established backchannel link exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.backchannel_links[delivery_destination_hash]
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
if direct_link != None:
if direct_link.status == RNS.Link.ACTIVE:
if lxmessage.progress == None or lxmessage.progress < 0.05:
lxmessage.progress = 0.05
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(direct_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif direct_link.status == RNS.Link.CLOSED:
if direct_link.activated_at != None:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
else:
if not hasattr(lxmessage, "path_request_retried"):
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.path_request_retried = True
else:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.set_delivery_destination(None)
if delivery_destination_hash in self.direct_links:
self.direct_links.pop(delivery_destination_hash)
if delivery_destination_hash in self.backchannel_links:
self.backchannel_links.pop(delivery_destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become active or close
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
# period has elapsed.
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
delivery_link = RNS.Link(lxmessage.get_destination())
delivery_link.set_link_established_callback(self.process_outbound)
self.direct_links[delivery_destination_hash] = delivery_link
lxmessage.progress = 0.03
else:
RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
# Outbound handling for messages transported via
# propagation to a LXMF router network.
elif lxmessage.method == LXMessage.PROPAGATED:
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if self.outbound_propagation_node == None:
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
self.fail_message(lxmessage)
else:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
def rediscover_job():
time.sleep(0.5)
RNS.Transport.request_path(lxmessage.get_destination().hash)
threading.Thread(target=rediscover_job, daemon=True).start()
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.send()
else:
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
if self.outbound_propagation_link != None: # Outbound handling for messages transferred
# A link already exists, so we'll try to use it # over a direct link to the final recipient
# to deliver the message elif lxmessage.method == LXMessage.DIRECT:
if self.outbound_propagation_link.status == RNS.Link.ACTIVE: if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None
if delivery_destination_hash in self.direct_links:
# An established direct link already exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.direct_links[delivery_destination_hash]
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
elif delivery_destination_hash in self.backchannel_links:
# An established backchannel link exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.backchannel_links[delivery_destination_hash]
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
if direct_link != None:
if direct_link.status == RNS.Link.ACTIVE:
if lxmessage.progress == None or lxmessage.progress < 0.05:
lxmessage.progress = 0.05
if lxmessage.state != LXMessage.SENDING: if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG) RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(self.outbound_propagation_link) lxmessage.set_delivery_destination(direct_link)
lxmessage.send() lxmessage.send()
else: else:
if lxmessage.representation == LXMessage.RESOURCE: if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else: else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif self.outbound_propagation_link.status == RNS.Link.CLOSED: elif direct_link.status == RNS.Link.CLOSED:
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG) if direct_link.activated_at != None:
self.outbound_propagation_link = None RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
else:
if not hasattr(lxmessage, "path_request_retried"):
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.path_request_retried = True
else:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.set_delivery_destination(None)
if delivery_destination_hash in self.direct_links:
self.direct_links.pop(delivery_destination_hash)
if delivery_destination_hash in self.backchannel_links:
self.backchannel_links.pop(delivery_destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else: else:
# Simply wait for the link to become # Simply wait for the link to become active or close
# active or close RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else: else:
# No link exists, so we'll try to establish one, but # No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait # only if we've never tried before, or the retry wait
@ -2695,18 +2639,74 @@ class LXMRouter:
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(self.outbound_propagation_node): if RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) delivery_link = RNS.Link(lxmessage.get_destination())
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") delivery_link.set_link_established_callback(self.process_outbound)
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) self.direct_links[delivery_destination_hash] = delivery_link
self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet) lxmessage.progress = 0.03
self.outbound_propagation_link.for_lxmessage = lxmessage
else: else:
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node) RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else: else:
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage) self.fail_message(lxmessage)
# Outbound handling for messages transported via
# propagation to a LXMF router network.
elif lxmessage.method == LXMessage.PROPAGATED:
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if self.outbound_propagation_node == None:
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
self.fail_message(lxmessage)
else:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if self.outbound_propagation_link != None:
# A link already exists, so we'll try to use it
# to deliver the message
if self.outbound_propagation_link.status == RNS.Link.ACTIVE:
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(self.outbound_propagation_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif self.outbound_propagation_link.status == RNS.Link.CLOSED:
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG)
self.outbound_propagation_link = None
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become
# active or close
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
# period has elapsed.
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(self.outbound_propagation_node):
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet)
self.outbound_propagation_link.for_lxmessage = lxmessage
else:
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
else:
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)