Improved opportunistic delivery logic and performance

This commit is contained in:
Mark Qvist 2024-09-16 17:49:54 +02:00
parent 0e2f0fb090
commit 7789e0bc26
2 changed files with 45 additions and 21 deletions

View File

@ -21,6 +21,7 @@ class LXMRouter:
PROCESSING_INTERVAL = 4
DELIVERY_RETRY_WAIT = 10
PATH_REQUEST_WAIT = 7
MAX_PATHLESS_TRIES = 2
LINK_MAX_INACTIVITY = 10*60
P_LINK_MAX_INACTIVITY = 3*60
@ -367,7 +368,6 @@ class LXMRouter:
self.propagation_transfer_state = LXMRouter.PR_PATH_REQUESTED
self.request_messages_path_job()
else:
# TODO: Remove at some point
RNS.log("Waiting for propagation node link to become active", RNS.LOG_EXTREME)
else:
RNS.log("Cannot request LXMF propagation node sync, no default propagation node configured", RNS.LOG_WARNING)
@ -893,7 +893,6 @@ class LXMRouter:
expired_outbound.append(destination_hash)
for destination_hash in expired_outbound:
RNS.log(f"Cleaning expired outbound ticket for {destination_hash}") # TODO: Remove
self.available_tickets["outbound"].pop(destination_hash)
# Clean inbound tickets
@ -906,7 +905,6 @@ class LXMRouter:
expired_inbound.append(inbound_ticket)
for inbound_ticket in expired_inbound:
RNS.log(f"Cleaning expired inbound ticket for {destination_hash}") # TODO: Remove
self.available_tickets["inbound"][destination_hash].pop(destination_hash)
except Exception as e:
@ -916,7 +914,6 @@ class LXMRouter:
def save_available_tickets(self):
with self.ticket_file_lock:
try:
RNS.log("Saving available tickets...", RNS.LOG_DEBUG) # TODO: Remove
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
@ -1774,11 +1771,24 @@ class LXMRouter:
# Outbound handling for opportunistic messages
if lxmessage.method == LXMessage.OPPORTUNISTIC:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.send()
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.00
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+2 and RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to update path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.00
else:
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.send()
else:
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)

View File

@ -49,18 +49,20 @@ class LXMessage:
TICKET_INTERVAL = 1*24*60*60
COST_TICKET = 0x100
# LXMF overhead is 111 bytes per message:
# LXMF overhead is 112 bytes per message:
# 16 bytes for destination hash
# 16 bytes for source hash
# 64 bytes for Ed25519 signature
# 8 bytes for timestamp
# 7 bytes for msgpack structure
LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + 8 + 7
# 8 bytes for msgpack structure
TIMESTAMP_SIZE = 8
STRUCT_OVERHEAD = 8
LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + TIMESTAMP_SIZE + STRUCT_OVERHEAD
# With an MTU of 500, the maximum amount of data
# we can send in a single encrypted packet is
# 383 bytes.
ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU
# 391 bytes.
ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU + TIMESTAMP_SIZE
# The max content length we can fit in LXMF message
# inside a single RNS packet is the encrypted MDU, minus
@ -69,7 +71,7 @@ class LXMessage:
# field of the packet, therefore we also add the length
# of a destination hash to the calculation. With default
# RNS and LXMF parameters, the largest single-packet
# LXMF message we can send is 288 bytes. If a message
# LXMF message we can send is 295 bytes. If a message
# is larger than that, a Reticulum link will be used.
ENCRYPTED_PACKET_MAX_CONTENT = ENCRYPTED_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
@ -79,13 +81,13 @@ class LXMessage:
LINK_PACKET_MDU = RNS.Link.MDU
# Which means that we can deliver single-packet LXMF
# messages with content of up to 320 bytes over a link.
# messages with content of up to 319 bytes over a link.
# If a message is larger than that, LXMF will sequence
# and transfer it as a RNS resource over the link instead.
LINK_PACKET_MAX_CONTENT = LINK_PACKET_MDU - LXMF_OVERHEAD
# For plain packets without encryption, we can
# fit up to 369 bytes of content.
# fit up to 368 bytes of content.
PLAIN_PACKET_MDU = RNS.Packet.PLAIN_MDU
PLAIN_PACKET_MAX_CONTENT = PLAIN_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
@ -129,8 +131,16 @@ class LXMessage:
if title == None:
title = ""
self.set_title_from_string(title)
self.set_content_from_string(content)
if type(title) == bytes:
self.set_title_from_bytes(title)
else:
self.set_title_from_string(title)
if type(content) == bytes:
self.set_content_from_bytes(content)
else:
self.set_content_from_string(content)
self.set_fields(fields)
self.payload = None
@ -192,7 +202,11 @@ class LXMessage:
self.content = content_bytes
def content_as_string(self):
return self.content.decode("utf-8")
try:
return self.content.decode("utf-8")
except Exception as e:
RNS.log(f"{self} could not decode message content as string: {e}")
return None
def set_fields(self, fields):
if isinstance(fields, dict) or fields == None:
@ -352,7 +366,7 @@ class LXMessage:
self.packed += self.signature
self.packed += packed_payload
self.packed_size = len(self.packed)
content_size = len(packed_payload)
content_size = len(packed_payload)-LXMessage.TIMESTAMP_SIZE-LXMessage.STRUCT_OVERHEAD
# If no desired delivery method has been defined,
# one will be chosen according to these rules:
@ -367,7 +381,7 @@ class LXMessage:
single_packet_content_limit = LXMessage.PLAIN_PACKET_MAX_CONTENT
if content_size > single_packet_content_limit:
raise TypeError("LXMessage desired opportunistic delivery method, but content exceeds single-packet size.")
raise TypeError(f"LXMessage desired opportunistic delivery method, but content of length {content_size} exceeds single-packet content limit of {single_packet_content_limit}.")
else:
self.method = LXMessage.OPPORTUNISTIC
self.representation = LXMessage.PACKET