Added outbound transfer progress monitoring

This commit is contained in:
Mark Qvist 2024-03-19 11:03:12 +01:00
parent 3fbe2e94da
commit d2b2ef54e8
2 changed files with 25 additions and 10 deletions

View File

@ -910,10 +910,6 @@ class LXMRouter:
else: else:
return False return False
### Message Routing & Delivery ########################
#######################################################
def handle_outbound(self, lxmessage): def handle_outbound(self, lxmessage):
lxmessage.state = LXMessage.OUTBOUND lxmessage.state = LXMessage.OUTBOUND
if not lxmessage.packed: if not lxmessage.packed:
@ -927,6 +923,17 @@ class LXMRouter:
self.pending_outbound.append(lxmessage) self.pending_outbound.append(lxmessage)
self.process_outbound() self.process_outbound()
def get_outbound_progress(self, lxm_hash):
for lxm in self.pending_outbound:
if lxm.hash == lxm_hash:
return lxm.progress
return None
### Message Routing & Delivery ########################
#######################################################
def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None): def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None):
try: try:
message = LXMessage.unpack_from_bytes(lxmf_data) message = LXMessage.unpack_from_bytes(lxmf_data)
@ -1340,6 +1347,8 @@ class LXMRouter:
# to deliver the message # to deliver the message
direct_link = self.direct_links[delivery_destination_hash] direct_link = self.direct_links[delivery_destination_hash]
if direct_link.status == RNS.Link.ACTIVE: if direct_link.status == RNS.Link.ACTIVE:
if lxmessage.progress == None or lxmessage.progress < 0.05:
lxmessage.progress = 0.05
if lxmessage.state != LXMessage.SENDING: if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(direct_link) lxmessage.set_delivery_destination(direct_link)
@ -1366,8 +1375,7 @@ class LXMRouter:
self.direct_links.pop(delivery_destination_hash) self.direct_links.pop(delivery_destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else: else:
# Simply wait for the link to become # Simply wait for the link to become active or close
# active or close
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else: else:
# No link exists, so we'll try to establish one, but # No link exists, so we'll try to establish one, but
@ -1383,10 +1391,12 @@ class LXMRouter:
delivery_link = RNS.Link(lxmessage.get_destination()) delivery_link = RNS.Link(lxmessage.get_destination())
delivery_link.set_link_established_callback(self.process_outbound) delivery_link.set_link_established_callback(self.process_outbound)
self.direct_links[delivery_destination_hash] = delivery_link self.direct_links[delivery_destination_hash] = delivery_link
lxmessage.progress = 0.03
else: else:
RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG) RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash) RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else: else:
RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage) self.fail_message(lxmessage)

View File

@ -118,7 +118,7 @@ class LXMessage:
self.signature = None self.signature = None
self.hash = None self.hash = None
self.packed = None self.packed = None
self.progress = None self.progress = 0.0
self.state = LXMessage.DRAFT self.state = LXMessage.DRAFT
self.method = LXMessage.UNKNOWN self.method = LXMessage.UNKNOWN
self.rssi = None self.rssi = None
@ -324,12 +324,14 @@ class LXMessage:
if receipt: if receipt:
receipt.set_delivery_callback(self.__mark_delivered) receipt.set_delivery_callback(self.__mark_delivered)
receipt.set_timeout_callback(self.__link_packet_timed_out) receipt.set_timeout_callback(self.__link_packet_timed_out)
self.progress = 0.50
else: else:
if self.__delivery_destination: if self.__delivery_destination:
self.__delivery_destination.teardown() self.__delivery_destination.teardown()
elif self.representation == LXMessage.RESOURCE: elif self.representation == LXMessage.RESOURCE:
self.resource_representation = self.__as_resource() self.resource_representation = self.__as_resource()
self.progress = 0.10
elif self.method == LXMessage.PROPAGATED: elif self.method == LXMessage.PROPAGATED:
self.state = LXMessage.SENDING self.state = LXMessage.SENDING
@ -339,11 +341,13 @@ class LXMessage:
if receipt: if receipt:
receipt.set_delivery_callback(self.__mark_propagated) receipt.set_delivery_callback(self.__mark_propagated)
receipt.set_timeout_callback(self.__link_packet_timed_out) receipt.set_timeout_callback(self.__link_packet_timed_out)
self.progress = 0.50
else: else:
self.__delivery_destination.teardown() self.__delivery_destination.teardown()
elif self.representation == LXMessage.RESOURCE: elif self.representation == LXMessage.RESOURCE:
self.resource_representation = self.__as_resource() self.resource_representation = self.__as_resource()
self.progress = 0.10
def determine_transport_encryption(self): def determine_transport_encryption(self):
@ -387,6 +391,7 @@ class LXMessage:
def __mark_delivered(self, receipt = None): def __mark_delivered(self, receipt = None):
RNS.log("Received delivery notification for "+str(self), RNS.LOG_DEBUG) RNS.log("Received delivery notification for "+str(self), RNS.LOG_DEBUG)
self.state = LXMessage.DELIVERED self.state = LXMessage.DELIVERED
self.progress = 1.0
if self.__delivery_callback != None and callable(self.__delivery_callback): if self.__delivery_callback != None and callable(self.__delivery_callback):
try: try:
@ -397,6 +402,7 @@ class LXMessage:
def __mark_propagated(self, receipt = None): def __mark_propagated(self, receipt = None):
RNS.log("Received propagation success notification for "+str(self), RNS.LOG_DEBUG) RNS.log("Received propagation success notification for "+str(self), RNS.LOG_DEBUG)
self.state = LXMessage.SENT self.state = LXMessage.SENT
self.progress = 1.0
if self.__delivery_callback != None and callable(self.__delivery_callback): if self.__delivery_callback != None and callable(self.__delivery_callback):
try: try:
@ -407,6 +413,7 @@ class LXMessage:
def __mark_paper_generated(self, receipt = None): def __mark_paper_generated(self, receipt = None):
RNS.log("Paper message generation succeeded for "+str(self), RNS.LOG_DEBUG) RNS.log("Paper message generation succeeded for "+str(self), RNS.LOG_DEBUG)
self.state = LXMessage.PAPER self.state = LXMessage.PAPER
self.progress = 1.0
if self.__delivery_callback != None and callable(self.__delivery_callback): if self.__delivery_callback != None and callable(self.__delivery_callback):
try: try:
@ -436,7 +443,7 @@ class LXMessage:
def __update_transfer_progress(self, resource): def __update_transfer_progress(self, resource):
self.progress = resource.get_progress() self.progress = 0.10 + (resource.get_progress()*0.90)
def __as_packet(self): def __as_packet(self):
if not self.packed: if not self.packed:
@ -465,8 +472,6 @@ class LXMessage:
if not self.__delivery_destination.status == RNS.Link.ACTIVE: if not self.__delivery_destination.status == RNS.Link.ACTIVE:
raise ConnectionError("Tried to synthesize resource for LXMF message on a link that was not active") raise ConnectionError("Tried to synthesize resource for LXMF message on a link that was not active")
self.progress = 0.0
if self.method == LXMessage.DIRECT: if self.method == LXMessage.DIRECT:
return RNS.Resource(self.packed, self.__delivery_destination, callback = self.__resource_concluded, progress_callback = self.__update_transfer_progress) return RNS.Resource(self.packed, self.__delivery_destination, callback = self.__resource_concluded, progress_callback = self.__update_transfer_progress)
elif self.method == LXMessage.PROPAGATED: elif self.method == LXMessage.PROPAGATED: