Added ability to cancel outbound messages

This commit is contained in:
Mark Qvist 2025-01-18 19:13:43 +01:00
parent d97c4f292e
commit a676954116
2 changed files with 53 additions and 15 deletions

View File

@ -1234,6 +1234,30 @@ class LXMRouter:
else:
return False
def cancel_outbound(self, message_id):
try:
lxmessage = None
for lxm in self.pending_outbound:
if lxm.message_id == message_id:
lxmessage = lxm
if message_id in self.pending_deferred_stamps:
RNS.log(f"Cancelling deferred stamp generation for {lxmessage}", RNS.LOG_DEBUG)
if lxmessage != None:
lxmessage.state = LXMessage.CANCELLED
if lxmessage in self.pending_outbound:
RNS.log(f"Cancelling {lxmessage} in outbound queue", RNS.LOG_DEBUG)
if lxmessage.representation == LXMessage.RESOURCE:
if lxmessage.resource_representation != None:
lxmessage.resource_representation.cancel()
self.process_outbound()
except Exception as e:
RNS.log(f"An error occurred while cancelling {lxmessage}: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def handle_outbound(self, lxmessage):
destination_hash = lxmessage.get_destination().hash
@ -1780,10 +1804,15 @@ class LXMRouter:
self.pending_outbound.append(selected_lxm)
RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
else:
RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
selected_lxm.stamp_generation_failed = True
self.pending_deferred_stamps.pop(selected_message_id)
self.fail_message(selected_lxm)
if selected_lxm.state == LXMessage.CANCELLED:
RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_ERROR)
selected_lxm.stamp_generation_failed = True
self.pending_deferred_stamps.pop(selected_message_id)
else:
RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
selected_lxm.stamp_generation_failed = True
self.pending_deferred_stamps.pop(selected_message_id)
self.fail_message(selected_lxm)
def process_outbound(self, sender = None):
@ -1820,8 +1849,14 @@ class LXMRouter:
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
elif lxmessage.state == LXMessage.CANCELLED:
RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
else:
RNS.log("Starting outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if lxmessage.progress == None or lxmessage.progress < 0.01:
lxmessage.progress = 0.01

View File

@ -16,8 +16,9 @@ class LXMessage:
SENDING = 0x02
SENT = 0x04
DELIVERED = 0x08
CANCELLED = 0xFE
FAILED = 0xFF
states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, FAILED]
states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, CANCELLED, FAILED]
UNKNOWN = 0x00
PACKET = 0x01
@ -564,22 +565,24 @@ class LXMessage:
if resource.status == RNS.Resource.COMPLETE:
self.__mark_delivered()
else:
resource.link.teardown()
self.state = LXMessage.OUTBOUND
if self.state != LXMessage.CANCELLED:
resource.link.teardown()
self.state = LXMessage.OUTBOUND
def __propagation_resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
self.__mark_propagated()
else:
resource.link.teardown()
self.state = LXMessage.OUTBOUND
if self.state != LXMessage.CANCELLED:
resource.link.teardown()
self.state = LXMessage.OUTBOUND
def __link_packet_timed_out(self, packet_receipt):
if packet_receipt:
packet_receipt.destination.teardown()
self.state = LXMessage.OUTBOUND
if self.state != LXMessage.CANCELLED:
if packet_receipt:
packet_receipt.destination.teardown()
self.state = LXMessage.OUTBOUND
def __update_transfer_progress(self, resource):
self.progress = 0.10 + (resource.get_progress()*0.90)