From bbf1eda3b0b42d8b0f4b48b16270d9ac10b4e7f3 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 12 Sep 2024 17:57:26 +0200 Subject: [PATCH] Added backchannel delivery --- LXMF/LXMRouter.py | 49 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index d26f155..7f529b1 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -66,6 +66,7 @@ class LXMRouter: self.pending_outbound = [] self.failed_outbound = [] self.direct_links = {} + self.backchannel_links = {} self.delivery_destinations = {} self.prioritised_list = [] @@ -1380,6 +1381,7 @@ class LXMRouter: link.set_resource_callback(self.delivery_resource_advertised) link.set_resource_started_callback(self.resource_transfer_began) link.set_resource_concluded_callback(self.delivery_resource_concluded) + link.set_remote_identified_callback(self.delivery_remote_identified) def delivery_link_closed(self, link): pass @@ -1406,6 +1408,11 @@ class LXMRouter: phy_stats = {"rssi": resource.link.rssi, "snr": resource.link.snr, "q": resource.link.q} self.lxmf_delivery(resource.data.read(), resource.link.type, phy_stats=phy_stats, ratchet_id=ratchet_id, method=LXMessage.DIRECT) + def delivery_remote_identified(self, link, identity): + destination_hash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", identity) + self.backchannel_links[destination_hash] = link + RNS.log(f"Backchannel became available for {RNS.prettyhexrep(destination_hash)} on delivery link {link}", RNS.LOG_DEBUG) + ### Peer Sync & Propagation ########################### ####################################################### @@ -1733,11 +1740,26 @@ class LXMRouter: if lxmessage.state == LXMessage.DELIVERED: RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) self.pending_outbound.remove(lxmessage) + + # Udate ticket delivery stats if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields: RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG) self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time() self.save_available_tickets() + # Prepare link for backchannel communications + delivery_destination_hash = lxmessage.get_destination().hash + if delivery_destination_hash in self.direct_links: + direct_link = self.direct_links[delivery_destination_hash] + if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False: + if direct_link.initiator == True: + source_destination_hash = lxmessage.get_source().hash + if source_destination_hash in self.delivery_destinations: + backchannel_identity = self.delivery_destinations[source_destination_hash].identity + direct_link.identify(backchannel_identity) + self.delivery_link_established(direct_link) + RNS.log(f"Performed backchannel identification as {backchannel_identity} on {direct_link}", RNS.LOG_DEBUG) + elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT: RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) self.pending_outbound.remove(lxmessage) @@ -1764,16 +1786,28 @@ class LXMRouter: elif lxmessage.method == LXMessage.DIRECT: if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: delivery_destination_hash = lxmessage.get_destination().hash - + direct_link = None + if delivery_destination_hash in self.direct_links: - # A link already exists, so we'll try to use it - # to deliver the message + # An established direct link already exists to + # the destination, so we'll try to use it for + # delivering the message direct_link = self.direct_links[delivery_destination_hash] + RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) + + elif delivery_destination_hash in self.backchannel_links: + # An established backchannel link exists to + # the destination, so we'll try to use it for + # delivering the message + direct_link = self.backchannel_links[delivery_destination_hash] + RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) + + if direct_link != None: if direct_link.status == RNS.Link.ACTIVE: if lxmessage.progress == None or lxmessage.progress < 0.05: lxmessage.progress = 0.05 if lxmessage.state != LXMessage.SENDING: - RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG) lxmessage.set_delivery_destination(direct_link) lxmessage.send() else: @@ -1783,7 +1817,7 @@ class LXMRouter: RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) elif direct_link.status == RNS.Link.CLOSED: if direct_link.activated_at != None: - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed", RNS.LOG_DEBUG) + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed, reason: "+str(direct_link.teardown_reason), RNS.LOG_DEBUG) else: if not hasattr(lxmessage, "path_request_retried"): RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG) @@ -1795,7 +1829,10 @@ class LXMRouter: lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT lxmessage.set_delivery_destination(None) - self.direct_links.pop(delivery_destination_hash) + if delivery_destination_hash in self.direct_links: + self.direct_links.pop(delivery_destination_hash) + if delivery_destination_hash in self.backchannel_links: + self.backchannel_links.pop(delivery_destination_hash) lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT else: # Simply wait for the link to become active or close