Added backchannel delivery

This commit is contained in:
Mark Qvist 2024-09-12 17:57:26 +02:00
parent d8e2e2a45f
commit bbf1eda3b0

View File

@ -66,6 +66,7 @@ class LXMRouter:
self.pending_outbound = []
self.failed_outbound = []
self.direct_links = {}
self.backchannel_links = {}
self.delivery_destinations = {}
self.prioritised_list = []
@ -1380,6 +1381,7 @@ class LXMRouter:
link.set_resource_callback(self.delivery_resource_advertised)
link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.delivery_resource_concluded)
link.set_remote_identified_callback(self.delivery_remote_identified)
def delivery_link_closed(self, link):
pass
@ -1406,6 +1408,11 @@ class LXMRouter:
phy_stats = {"rssi": resource.link.rssi, "snr": resource.link.snr, "q": resource.link.q}
self.lxmf_delivery(resource.data.read(), resource.link.type, phy_stats=phy_stats, ratchet_id=ratchet_id, method=LXMessage.DIRECT)
def delivery_remote_identified(self, link, identity):
destination_hash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", identity)
self.backchannel_links[destination_hash] = link
RNS.log(f"Backchannel became available for {RNS.prettyhexrep(destination_hash)} on delivery link {link}", RNS.LOG_DEBUG)
### Peer Sync & Propagation ###########################
#######################################################
@ -1733,11 +1740,26 @@ class LXMRouter:
if lxmessage.state == LXMessage.DELIVERED:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
# Udate ticket delivery stats
if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
self.save_available_tickets()
# Prepare link for backchannel communications
delivery_destination_hash = lxmessage.get_destination().hash
if delivery_destination_hash in self.direct_links:
direct_link = self.direct_links[delivery_destination_hash]
if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
if direct_link.initiator == True:
source_destination_hash = lxmessage.get_source().hash
if source_destination_hash in self.delivery_destinations:
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
direct_link.identify(backchannel_identity)
self.delivery_link_established(direct_link)
RNS.log(f"Performed backchannel identification as {backchannel_identity} on {direct_link}", RNS.LOG_DEBUG)
elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
@ -1764,16 +1786,28 @@ class LXMRouter:
elif lxmessage.method == LXMessage.DIRECT:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None
if delivery_destination_hash in self.direct_links:
# A link already exists, so we'll try to use it
# to deliver the message
# An established direct link already exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.direct_links[delivery_destination_hash]
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
elif delivery_destination_hash in self.backchannel_links:
# An established backchannel link exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.backchannel_links[delivery_destination_hash]
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
if direct_link != None:
if direct_link.status == RNS.Link.ACTIVE:
if lxmessage.progress == None or lxmessage.progress < 0.05:
lxmessage.progress = 0.05
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(direct_link)
lxmessage.send()
else:
@ -1783,7 +1817,7 @@ class LXMRouter:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif direct_link.status == RNS.Link.CLOSED:
if direct_link.activated_at != None:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed", RNS.LOG_DEBUG)
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed, reason: "+str(direct_link.teardown_reason), RNS.LOG_DEBUG)
else:
if not hasattr(lxmessage, "path_request_retried"):
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
@ -1795,7 +1829,10 @@ class LXMRouter:
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.set_delivery_destination(None)
self.direct_links.pop(delivery_destination_hash)
if delivery_destination_hash in self.direct_links:
self.direct_links.pop(delivery_destination_hash)
if delivery_destination_hash in self.backchannel_links:
self.backchannel_links.pop(delivery_destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become active or close