|
|
|
|
@ -109,7 +109,6 @@ class LXMRouter:
|
|
|
|
|
self.retain_synced_on_node = False
|
|
|
|
|
|
|
|
|
|
self.default_sync_strategy = sync_strategy
|
|
|
|
|
self.processing_outbound = False
|
|
|
|
|
self.processing_inbound = False
|
|
|
|
|
self.processing_count = 0
|
|
|
|
|
self.name = name
|
|
|
|
|
@ -160,6 +159,7 @@ class LXMRouter:
|
|
|
|
|
self.outbound_stamp_costs = {}
|
|
|
|
|
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
|
|
|
|
|
|
|
|
|
|
self.outbound_processing_lock = threading.Lock()
|
|
|
|
|
self.cost_file_lock = threading.Lock()
|
|
|
|
|
self.ticket_file_lock = threading.Lock()
|
|
|
|
|
self.stamp_gen_lock = threading.Lock()
|
|
|
|
|
@ -219,7 +219,7 @@ class LXMRouter:
|
|
|
|
|
self.locally_delivered_transient_ids = {}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
|
|
|
|
RNS.log(f"Could not load locally delivered message ID cache from storage. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
|
self.locally_delivered_transient_ids = {}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
@ -576,13 +576,20 @@ class LXMRouter:
|
|
|
|
|
RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
|
|
|
|
|
st = time.time()
|
|
|
|
|
|
|
|
|
|
if os.path.isfile(self.storagepath+"/peers"):
|
|
|
|
|
peers_file = open(self.storagepath+"/peers", "rb")
|
|
|
|
|
peers_storage_path = self.storagepath+"/peers"
|
|
|
|
|
if os.path.isfile(peers_storage_path):
|
|
|
|
|
peers_file = open(peers_storage_path, "rb")
|
|
|
|
|
peers_data = peers_file.read()
|
|
|
|
|
peers_file.close()
|
|
|
|
|
|
|
|
|
|
if len(peers_data) > 0:
|
|
|
|
|
serialised_peers = msgpack.unpackb(peers_data)
|
|
|
|
|
try: serialised_peers = msgpack.unpackb(peers_data)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
RNS.log(f"Could not load propagation node peering data from storage. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
|
RNS.log(f"The peering data file located at {peers_storage_path} is likely corrupt.", RNS.LOG_ERROR)
|
|
|
|
|
RNS.log(f"You can delete this file and attempt starting the propagation node again, but peer synchronization states will be lost.", RNS.LOG_ERROR)
|
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
del peers_data
|
|
|
|
|
|
|
|
|
|
while len(serialised_peers) > 0:
|
|
|
|
|
@ -1360,14 +1367,16 @@ class LXMRouter:
|
|
|
|
|
def sigint_handler(self, signal, frame):
|
|
|
|
|
if not self.exit_handler_running:
|
|
|
|
|
RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING)
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
self.exit_handler()
|
|
|
|
|
RNS.exit(0)
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
|
|
|
|
|
|
|
|
|
|
def sigterm_handler(self, signal, frame):
|
|
|
|
|
if not self.exit_handler_running:
|
|
|
|
|
RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING)
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
self.exit_handler()
|
|
|
|
|
RNS.exit(0)
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
|
|
|
|
|
|
|
|
|
|
@ -1662,10 +1671,10 @@ class LXMRouter:
|
|
|
|
|
lxmessage.defer_stamp = False
|
|
|
|
|
|
|
|
|
|
if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp):
|
|
|
|
|
while not unknown_path_requested and self.processing_outbound: time.sleep(0.05)
|
|
|
|
|
while not unknown_path_requested and self.outbound_processing_lock.locked(): time.sleep(0.05)
|
|
|
|
|
|
|
|
|
|
self.pending_outbound.append(lxmessage)
|
|
|
|
|
if not unknown_path_requested: self.process_outbound()
|
|
|
|
|
if not unknown_path_requested: threading.Thread(target=self.process_outbound, daemon=True).start()
|
|
|
|
|
|
|
|
|
|
else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage
|
|
|
|
|
|
|
|
|
|
@ -2198,14 +2207,15 @@ class LXMRouter:
|
|
|
|
|
# 4: Limit for incoming propagation node syncs
|
|
|
|
|
# 5: Propagation stamp costs for this node
|
|
|
|
|
# 6: Node metadata
|
|
|
|
|
if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
|
|
|
|
|
remote_timebase = remote_app_data[1]
|
|
|
|
|
remote_transfer_limit = remote_app_data[3]
|
|
|
|
|
remote_sync_limit = remote_app_data[4]
|
|
|
|
|
remote_stamp_cost = remote_app_data[5][0]
|
|
|
|
|
remote_stamp_flex = remote_app_data[5][1]
|
|
|
|
|
remote_peering_cost = remote_app_data[5][2]
|
|
|
|
|
remote_metadata = remote_app_data[6]
|
|
|
|
|
pn_config = msgpack.unpackb(remote_app_data)
|
|
|
|
|
if pn_config[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
|
|
|
|
|
remote_timebase = pn_config[1]
|
|
|
|
|
remote_transfer_limit = pn_config[3]
|
|
|
|
|
remote_sync_limit = pn_config[4]
|
|
|
|
|
remote_stamp_cost = pn_config[5][0]
|
|
|
|
|
remote_stamp_flex = pn_config[5][1]
|
|
|
|
|
remote_peering_cost = pn_config[5][2]
|
|
|
|
|
remote_metadata = pn_config[6]
|
|
|
|
|
|
|
|
|
|
RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug
|
|
|
|
|
self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata)
|
|
|
|
|
@ -2370,6 +2380,7 @@ class LXMRouter:
|
|
|
|
|
def fail_message(self, lxmessage):
|
|
|
|
|
RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
lxmessage.progress = 0.0
|
|
|
|
|
if lxmessage in self.pending_outbound:
|
|
|
|
|
self.pending_outbound.remove(lxmessage)
|
|
|
|
|
|
|
|
|
|
@ -2424,7 +2435,7 @@ class LXMRouter:
|
|
|
|
|
selected_lxm.stamp = generated_stamp
|
|
|
|
|
selected_lxm.defer_stamp = False
|
|
|
|
|
selected_lxm.packed = None
|
|
|
|
|
selected_lxm.pack()
|
|
|
|
|
selected_lxm.pack(payload_updated=True)
|
|
|
|
|
stamp_generation_success = True
|
|
|
|
|
RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
|
|
|
|
|
else:
|
|
|
|
|
@ -2491,198 +2502,141 @@ class LXMRouter:
|
|
|
|
|
RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
|
|
|
|
|
|
def process_outbound(self, sender = None):
|
|
|
|
|
if self.processing_outbound:
|
|
|
|
|
return
|
|
|
|
|
if self.outbound_processing_lock.locked(): return
|
|
|
|
|
with self.outbound_processing_lock:
|
|
|
|
|
for lxmessage in self.pending_outbound:
|
|
|
|
|
if lxmessage.state == LXMessage.DELIVERED:
|
|
|
|
|
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
|
|
|
|
self.pending_outbound.remove(lxmessage)
|
|
|
|
|
|
|
|
|
|
for lxmessage in self.pending_outbound:
|
|
|
|
|
if lxmessage.state == LXMessage.DELIVERED:
|
|
|
|
|
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
|
|
|
|
self.pending_outbound.remove(lxmessage)
|
|
|
|
|
# Udate ticket delivery stats
|
|
|
|
|
if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
|
|
|
|
|
RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
|
|
|
|
|
self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
|
|
|
|
|
self.save_available_tickets()
|
|
|
|
|
|
|
|
|
|
# Udate ticket delivery stats
|
|
|
|
|
if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
|
|
|
|
|
RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
|
|
|
|
|
self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
|
|
|
|
|
self.save_available_tickets()
|
|
|
|
|
# Prepare link for backchannel communications
|
|
|
|
|
delivery_destination_hash = lxmessage.get_destination().hash
|
|
|
|
|
if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links:
|
|
|
|
|
direct_link = self.direct_links[delivery_destination_hash]
|
|
|
|
|
if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
|
|
|
|
|
if direct_link.initiator == True:
|
|
|
|
|
source_destination_hash = lxmessage.get_source().hash
|
|
|
|
|
if source_destination_hash in self.delivery_destinations:
|
|
|
|
|
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
|
|
|
|
|
backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
|
|
|
|
|
direct_link.identify(backchannel_identity)
|
|
|
|
|
direct_link.backchannel_identified = True
|
|
|
|
|
self.delivery_link_established(direct_link)
|
|
|
|
|
RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
# Prepare link for backchannel communications
|
|
|
|
|
delivery_destination_hash = lxmessage.get_destination().hash
|
|
|
|
|
if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links:
|
|
|
|
|
direct_link = self.direct_links[delivery_destination_hash]
|
|
|
|
|
if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
|
|
|
|
|
if direct_link.initiator == True:
|
|
|
|
|
source_destination_hash = lxmessage.get_source().hash
|
|
|
|
|
if source_destination_hash in self.delivery_destinations:
|
|
|
|
|
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
|
|
|
|
|
backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
|
|
|
|
|
direct_link.identify(backchannel_identity)
|
|
|
|
|
direct_link.backchannel_identified = True
|
|
|
|
|
self.delivery_link_established(direct_link)
|
|
|
|
|
RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
|
|
|
|
|
elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
|
|
|
|
|
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
|
|
|
|
self.pending_outbound.remove(lxmessage)
|
|
|
|
|
|
|
|
|
|
elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
|
|
|
|
|
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
|
|
|
|
self.pending_outbound.remove(lxmessage)
|
|
|
|
|
elif lxmessage.state == LXMessage.CANCELLED:
|
|
|
|
|
RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
|
|
|
|
self.pending_outbound.remove(lxmessage)
|
|
|
|
|
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
|
|
|
|
|
lxmessage.failed_callback(lxmessage)
|
|
|
|
|
|
|
|
|
|
elif lxmessage.state == LXMessage.CANCELLED:
|
|
|
|
|
RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
|
|
|
|
self.pending_outbound.remove(lxmessage)
|
|
|
|
|
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
|
|
|
|
|
lxmessage.failed_callback(lxmessage)
|
|
|
|
|
elif lxmessage.state == LXMessage.REJECTED:
|
|
|
|
|
RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
|
|
|
|
if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
|
|
|
|
|
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
|
|
|
|
|
lxmessage.failed_callback(lxmessage)
|
|
|
|
|
|
|
|
|
|
elif lxmessage.state == LXMessage.REJECTED:
|
|
|
|
|
RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
|
|
|
|
if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
|
|
|
|
|
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
|
|
|
|
|
lxmessage.failed_callback(lxmessage)
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01
|
|
|
|
|
|
|
|
|
|
if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01
|
|
|
|
|
|
|
|
|
|
# Outbound handling for opportunistic messages
|
|
|
|
|
if lxmessage.method == LXMessage.OPPORTUNISTIC:
|
|
|
|
|
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
|
|
|
|
|
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
|
|
|
|
|
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.delivery_attempts += 1
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
|
|
|
|
lxmessage.progress = 0.01
|
|
|
|
|
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
|
|
|
|
|
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.delivery_attempts += 1
|
|
|
|
|
RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
|
|
|
|
|
def rediscover_job():
|
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
threading.Thread(target=rediscover_job, daemon=True).start()
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
|
|
|
|
lxmessage.progress = 0.01
|
|
|
|
|
else:
|
|
|
|
|
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
|
|
|
|
|
lxmessage.delivery_attempts += 1
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
|
|
|
|
|
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.send()
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
self.fail_message(lxmessage)
|
|
|
|
|
|
|
|
|
|
# Outbound handling for messages transferred
|
|
|
|
|
# over a direct link to the final recipient
|
|
|
|
|
elif lxmessage.method == LXMessage.DIRECT:
|
|
|
|
|
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
|
|
|
|
|
delivery_destination_hash = lxmessage.get_destination().hash
|
|
|
|
|
direct_link = None
|
|
|
|
|
|
|
|
|
|
if delivery_destination_hash in self.direct_links:
|
|
|
|
|
# An established direct link already exists to
|
|
|
|
|
# the destination, so we'll try to use it for
|
|
|
|
|
# delivering the message
|
|
|
|
|
direct_link = self.direct_links[delivery_destination_hash]
|
|
|
|
|
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
elif delivery_destination_hash in self.backchannel_links:
|
|
|
|
|
# An established backchannel link exists to
|
|
|
|
|
# the destination, so we'll try to use it for
|
|
|
|
|
# delivering the message
|
|
|
|
|
direct_link = self.backchannel_links[delivery_destination_hash]
|
|
|
|
|
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
if direct_link != None:
|
|
|
|
|
if direct_link.status == RNS.Link.ACTIVE:
|
|
|
|
|
if lxmessage.progress == None or lxmessage.progress < 0.05:
|
|
|
|
|
lxmessage.progress = 0.05
|
|
|
|
|
if lxmessage.state != LXMessage.SENDING:
|
|
|
|
|
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.set_delivery_destination(direct_link)
|
|
|
|
|
lxmessage.send()
|
|
|
|
|
else:
|
|
|
|
|
if lxmessage.representation == LXMessage.RESOURCE:
|
|
|
|
|
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
|
|
|
|
|
elif direct_link.status == RNS.Link.CLOSED:
|
|
|
|
|
if direct_link.activated_at != None:
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
else:
|
|
|
|
|
if not hasattr(lxmessage, "path_request_retried"):
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
lxmessage.path_request_retried = True
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
|
|
|
|
|
|
|
|
|
lxmessage.set_delivery_destination(None)
|
|
|
|
|
if delivery_destination_hash in self.direct_links:
|
|
|
|
|
self.direct_links.pop(delivery_destination_hash)
|
|
|
|
|
if delivery_destination_hash in self.backchannel_links:
|
|
|
|
|
self.backchannel_links.pop(delivery_destination_hash)
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
|
|
|
|
|
else:
|
|
|
|
|
# Simply wait for the link to become active or close
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
|
|
|
|
|
else:
|
|
|
|
|
# No link exists, so we'll try to establish one, but
|
|
|
|
|
# only if we've never tried before, or the retry wait
|
|
|
|
|
# period has elapsed.
|
|
|
|
|
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
|
|
|
|
|
lxmessage.delivery_attempts += 1
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
|
|
|
|
|
|
|
|
|
|
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
|
|
|
|
|
if RNS.Transport.has_path(lxmessage.get_destination().hash):
|
|
|
|
|
RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
delivery_link = RNS.Link(lxmessage.get_destination())
|
|
|
|
|
delivery_link.set_link_established_callback(self.process_outbound)
|
|
|
|
|
self.direct_links[delivery_destination_hash] = delivery_link
|
|
|
|
|
lxmessage.progress = 0.03
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
|
|
|
|
lxmessage.progress = 0.01
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
self.fail_message(lxmessage)
|
|
|
|
|
|
|
|
|
|
# Outbound handling for messages transported via
|
|
|
|
|
# propagation to a LXMF router network.
|
|
|
|
|
elif lxmessage.method == LXMessage.PROPAGATED:
|
|
|
|
|
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
if self.outbound_propagation_node == None:
|
|
|
|
|
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
|
|
|
|
|
self.fail_message(lxmessage)
|
|
|
|
|
else:
|
|
|
|
|
# Outbound handling for opportunistic messages
|
|
|
|
|
if lxmessage.method == LXMessage.OPPORTUNISTIC:
|
|
|
|
|
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
|
|
|
|
|
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
|
|
|
|
|
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.delivery_attempts += 1
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
|
|
|
|
lxmessage.progress = 0.01
|
|
|
|
|
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
|
|
|
|
|
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.delivery_attempts += 1
|
|
|
|
|
RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
|
|
|
|
|
def rediscover_job():
|
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
threading.Thread(target=rediscover_job, daemon=True).start()
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
|
|
|
|
lxmessage.progress = 0.01
|
|
|
|
|
else:
|
|
|
|
|
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
|
|
|
|
|
lxmessage.delivery_attempts += 1
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
|
|
|
|
|
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.send()
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
self.fail_message(lxmessage)
|
|
|
|
|
|
|
|
|
|
if self.outbound_propagation_link != None:
|
|
|
|
|
# A link already exists, so we'll try to use it
|
|
|
|
|
# to deliver the message
|
|
|
|
|
if self.outbound_propagation_link.status == RNS.Link.ACTIVE:
|
|
|
|
|
# Outbound handling for messages transferred
|
|
|
|
|
# over a direct link to the final recipient
|
|
|
|
|
elif lxmessage.method == LXMessage.DIRECT:
|
|
|
|
|
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
|
|
|
|
|
delivery_destination_hash = lxmessage.get_destination().hash
|
|
|
|
|
direct_link = None
|
|
|
|
|
|
|
|
|
|
if delivery_destination_hash in self.direct_links:
|
|
|
|
|
# An established direct link already exists to
|
|
|
|
|
# the destination, so we'll try to use it for
|
|
|
|
|
# delivering the message
|
|
|
|
|
direct_link = self.direct_links[delivery_destination_hash]
|
|
|
|
|
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
elif delivery_destination_hash in self.backchannel_links:
|
|
|
|
|
# An established backchannel link exists to
|
|
|
|
|
# the destination, so we'll try to use it for
|
|
|
|
|
# delivering the message
|
|
|
|
|
direct_link = self.backchannel_links[delivery_destination_hash]
|
|
|
|
|
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
if direct_link != None:
|
|
|
|
|
if direct_link.status == RNS.Link.ACTIVE:
|
|
|
|
|
if lxmessage.progress == None or lxmessage.progress < 0.05:
|
|
|
|
|
lxmessage.progress = 0.05
|
|
|
|
|
if lxmessage.state != LXMessage.SENDING:
|
|
|
|
|
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.set_delivery_destination(self.outbound_propagation_link)
|
|
|
|
|
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.set_delivery_destination(direct_link)
|
|
|
|
|
lxmessage.send()
|
|
|
|
|
else:
|
|
|
|
|
if lxmessage.representation == LXMessage.RESOURCE:
|
|
|
|
|
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
|
|
|
|
|
elif self.outbound_propagation_link.status == RNS.Link.CLOSED:
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG)
|
|
|
|
|
self.outbound_propagation_link = None
|
|
|
|
|
elif direct_link.status == RNS.Link.CLOSED:
|
|
|
|
|
if direct_link.activated_at != None:
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
else:
|
|
|
|
|
if not hasattr(lxmessage, "path_request_retried"):
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
lxmessage.path_request_retried = True
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
|
|
|
|
|
|
|
|
|
lxmessage.set_delivery_destination(None)
|
|
|
|
|
if delivery_destination_hash in self.direct_links:
|
|
|
|
|
self.direct_links.pop(delivery_destination_hash)
|
|
|
|
|
if delivery_destination_hash in self.backchannel_links:
|
|
|
|
|
self.backchannel_links.pop(delivery_destination_hash)
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
|
|
|
|
|
else:
|
|
|
|
|
# Simply wait for the link to become
|
|
|
|
|
# active or close
|
|
|
|
|
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
|
|
|
|
|
# Simply wait for the link to become active or close
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
|
|
|
|
|
else:
|
|
|
|
|
# No link exists, so we'll try to establish one, but
|
|
|
|
|
# only if we've never tried before, or the retry wait
|
|
|
|
|
@ -2692,18 +2646,74 @@ class LXMRouter:
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
|
|
|
|
|
|
|
|
|
|
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
|
|
|
|
|
if RNS.Transport.has_path(self.outbound_propagation_node):
|
|
|
|
|
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
|
|
|
|
|
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
|
|
|
|
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
|
|
|
|
|
self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet)
|
|
|
|
|
self.outbound_propagation_link.for_lxmessage = lxmessage
|
|
|
|
|
if RNS.Transport.has_path(lxmessage.get_destination().hash):
|
|
|
|
|
RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
delivery_link = RNS.Link(lxmessage.get_destination())
|
|
|
|
|
delivery_link.set_link_established_callback(self.process_outbound)
|
|
|
|
|
self.direct_links[delivery_destination_hash] = delivery_link
|
|
|
|
|
lxmessage.progress = 0.03
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
|
|
|
|
|
RNS.Transport.request_path(self.outbound_propagation_node)
|
|
|
|
|
RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
|
|
|
|
|
RNS.Transport.request_path(lxmessage.get_destination().hash)
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
|
|
|
|
|
|
|
|
|
lxmessage.progress = 0.01
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
self.fail_message(lxmessage)
|
|
|
|
|
|
|
|
|
|
# Outbound handling for messages transported via
|
|
|
|
|
# propagation to a LXMF router network.
|
|
|
|
|
elif lxmessage.method == LXMessage.PROPAGATED:
|
|
|
|
|
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
|
|
|
|
|
if self.outbound_propagation_node == None:
|
|
|
|
|
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
|
|
|
|
|
self.fail_message(lxmessage)
|
|
|
|
|
else:
|
|
|
|
|
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
|
|
|
|
|
|
|
|
|
|
if self.outbound_propagation_link != None:
|
|
|
|
|
# A link already exists, so we'll try to use it
|
|
|
|
|
# to deliver the message
|
|
|
|
|
if self.outbound_propagation_link.status == RNS.Link.ACTIVE:
|
|
|
|
|
if lxmessage.state != LXMessage.SENDING:
|
|
|
|
|
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG)
|
|
|
|
|
lxmessage.set_delivery_destination(self.outbound_propagation_link)
|
|
|
|
|
lxmessage.send()
|
|
|
|
|
else:
|
|
|
|
|
if lxmessage.representation == LXMessage.RESOURCE:
|
|
|
|
|
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
|
|
|
|
|
elif self.outbound_propagation_link.status == RNS.Link.CLOSED:
|
|
|
|
|
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG)
|
|
|
|
|
self.outbound_propagation_link = None
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
|
|
|
|
|
else:
|
|
|
|
|
# Simply wait for the link to become
|
|
|
|
|
# active or close
|
|
|
|
|
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
|
|
|
|
|
else:
|
|
|
|
|
# No link exists, so we'll try to establish one, but
|
|
|
|
|
# only if we've never tried before, or the retry wait
|
|
|
|
|
# period has elapsed.
|
|
|
|
|
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
|
|
|
|
|
lxmessage.delivery_attempts += 1
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
|
|
|
|
|
|
|
|
|
|
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
|
|
|
|
|
if RNS.Transport.has_path(self.outbound_propagation_node):
|
|
|
|
|
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
|
|
|
|
|
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
|
|
|
|
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
|
|
|
|
|
self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet)
|
|
|
|
|
self.outbound_propagation_link.for_lxmessage = lxmessage
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
|
|
|
|
|
RNS.Transport.request_path(self.outbound_propagation_node)
|
|
|
|
|
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
|
|
|
|
self.fail_message(lxmessage)
|
|
|
|
|
|