From d6b1b9c94dc245b3255f3d5b5830ede1b5f97ef7 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 18 Jan 2025 20:11:31 +0100 Subject: [PATCH] Added ability to cancel stamp generation --- LXMF/LXMRouter.py | 24 ++++++++++++--- LXMF/LXStamper.py | 66 ++++++++++++++++++++++++++++++++++------ docs/example_receiver.py | 2 +- 3 files changed, 78 insertions(+), 14 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 5c71f93..2ec4e8e 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -16,6 +16,8 @@ from .LXMessage import LXMessage from .Handlers import LXMFDeliveryAnnounceHandler from .Handlers import LXMFPropagationAnnounceHandler +import LXMF.LXStamper as LXStamper + class LXMRouter: MAX_DELIVERY_ATTEMPTS = 5 PROCESSING_INTERVAL = 4 @@ -1236,14 +1238,17 @@ class LXMRouter: def cancel_outbound(self, message_id): try: + if message_id in self.pending_deferred_stamps: + lxm = self.pending_deferred_stamps[message_id] + RNS.log(f"Cancelling deferred stamp generation for {lxm}", RNS.LOG_DEBUG) + lxm.state = LXMessage.CANCELLED + LXStamper.cancel_work(message_id) + lxmessage = None for lxm in self.pending_outbound: if lxm.message_id == message_id: lxmessage = lxm - if message_id in self.pending_deferred_stamps: - RNS.log(f"Cancelling deferred stamp generation for {lxmessage}", RNS.LOG_DEBUG) - if lxmessage != None: lxmessage.state = LXMessage.CANCELLED if lxmessage in self.pending_outbound: @@ -1793,6 +1798,15 @@ class LXMRouter: selected_message_id = message_id if selected_lxm != None: + if selected_lxm.state == LXMessage.CANCELLED: + RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): + selected_lxm.failed_callback(lxmessage) + + return + RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) generated_stamp = selected_lxm.get_stamp() if generated_stamp: @@ -1805,9 +1819,11 @@ class LXMRouter: RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) else: if selected_lxm.state == LXMessage.CANCELLED: - RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_ERROR) + RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) selected_lxm.stamp_generation_failed = True self.pending_deferred_stamps.pop(selected_message_id) + if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): + selected_lxm.failed_callback(lxmessage) else: RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) selected_lxm.stamp_generation_failed = True diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index 2023ec0..bcfa95b 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -7,6 +7,8 @@ import multiprocessing WORKBLOCK_EXPAND_ROUNDS = 3000 +active_jobs = {} + def stamp_workblock(message_id): wb_st = time.time() expand_rounds = WORKBLOCK_EXPAND_ROUNDS @@ -44,23 +46,56 @@ def generate_stamp(message_id, stamp_cost): value = 0 if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin(): - stamp, rounds = job_simple(stamp_cost, workblock) + stamp, rounds = job_simple(stamp_cost, workblock, message_id) elif RNS.vendor.platformutils.is_android(): - stamp, rounds = job_android(stamp_cost, workblock) + stamp, rounds = job_android(stamp_cost, workblock, message_id) else: - stamp, rounds = job_linux(stamp_cost, workblock) + stamp, rounds = job_linux(stamp_cost, workblock, message_id) duration = time.time() - start_time speed = rounds/duration - value = stamp_value(workblock, stamp) + if stamp != None: + value = stamp_value(workblock, stamp) RNS.log(f"Stamp with value {value} generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG) return stamp, value -def job_simple(stamp_cost, workblock): +def cancel_work(message_id): + if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin(): + try: + if message_id in active_jobs: + active_jobs[message_id] = True + + except Exception as e: + RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + + elif RNS.vendor.platformutils.is_android(): + try: + if message_id in active_jobs: + active_jobs[message_id] = True + + except Exception as e: + RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + + else: + try: + if message_id in active_jobs: + stop_event = active_jobs[message_id][0] + result_queue = active_jobs[message_id][1] + stop_event.set() + result_queue.put(None) + active_jobs.pop(message_id) + + except Exception as e: + RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + +def job_simple(stamp_cost, workblock, message_id): # A simple, single-process stamp generator. # should work on any platform, and is used # as a fall-back, in case of limited multi- @@ -73,6 +108,8 @@ def job_simple(stamp_cost, workblock): pstamp = os.urandom(256//8) st = time.time() + active_jobs[message_id] = False; + def sv(s, c, w): target = 0b1<<256-c; m = w+s result = RNS.Identity.full_hash(m) @@ -81,15 +118,20 @@ def job_simple(stamp_cost, workblock): else: return True - while not sv(pstamp, stamp_cost, workblock): + while not sv(pstamp, stamp_cost, workblock) and not active_jobs[message_id]: pstamp = os.urandom(256//8); rounds += 1 if rounds % 2500 == 0: speed = rounds / (time.time()-st) RNS.log(f"Stamp generation running. {rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG) + if active_jobs[message_id] == True: + pstamp = None + + active_jobs.pop(message_id) + return pstamp, rounds -def job_linux(stamp_cost, workblock): +def job_linux(stamp_cost, workblock, message_id): allow_kill = True stamp = None total_rounds = 0 @@ -126,6 +168,8 @@ def job_linux(stamp_cost, workblock): job_procs.append(process) process.start() + active_jobs[message_id] = [stop_event, result_queue] + stamp = result_queue.get() RNS.log("Got stamp result from worker", RNS.LOG_DEBUG) # TODO: Remove @@ -170,7 +214,7 @@ def job_linux(stamp_cost, workblock): return stamp, total_rounds -def job_android(stamp_cost, workblock): +def job_android(stamp_cost, workblock, message_id): # Semaphore support is flaky to non-existent on # Android, so we need to manually dispatch and # manage workloads here, while periodically @@ -230,10 +274,12 @@ def job_android(stamp_cost, workblock): RNS.log(f"Stamp generation worker error: {e}", RNS.LOG_ERROR) RNS.trace_exception(e) + active_jobs[message_id] = False; + RNS.log(f"Dispatching {jobs} workers for stamp generation...", RNS.LOG_DEBUG) # TODO: Remove results_dict = wm.dict() - while stamp == None: + while stamp == None and active_jobs[message_id] == False: job_procs = [] try: for pnum in range(jobs): @@ -260,6 +306,8 @@ def job_android(stamp_cost, workblock): RNS.log(f"Stamp generation job error: {e}") RNS.trace_exception(e) + active_jobs.pop(message_id) + return stamp, total_rounds if __name__ == "__main__": diff --git a/docs/example_receiver.py b/docs/example_receiver.py index 9bf1c61..20c2efe 100644 --- a/docs/example_receiver.py +++ b/docs/example_receiver.py @@ -69,4 +69,4 @@ while True: # input() # RNS.log("Requesting messages from propagation node...") - # router.request_messages_from_propagation_node(identity) \ No newline at end of file + # router.request_messages_from_propagation_node(identity)