From 2b7ba9558b1e8a39e522248c498e473452fe24ba Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 6 Jan 2026 17:03:09 +0100 Subject: [PATCH] Cleanup --- LXMF/LXMRouter.py | 8 ++++++-- LXMF/LXStamper.py | 24 ++++++++++++++---------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index c1903cd..4b593a9 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -1365,7 +1365,9 @@ class LXMRouter: self.save_node_stats() def sigint_handler(self, signal, frame): - if threading.current_thread() != threading.main_thread(): os._exit(0) + if threading.current_thread() != threading.main_thread(): + RNS.log(f"SIGINT on non-main thread {threading.current_thread()}, exiting immediately", RNS.LOG_WARNING) + os._exit(0) else: if not self.exit_handler_running: RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING) @@ -1375,7 +1377,9 @@ class LXMRouter: RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) def sigterm_handler(self, signal, frame): - if threading.current_thread() != threading.main_thread(): os._exit(0) + if threading.current_thread() != threading.main_thread(): + RNS.log(f"SIGTERM on non-main thread {threading.current_thread()}, exiting immediately", RNS.LOG_WARNING) + os._exit(0) else: if not self.exit_handler_running: RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING) diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index 39b541b..3d7a1c2 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -15,8 +15,6 @@ PN_VALIDATION_POOL_MIN_SIZE = 256 active_jobs = {} -if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork") - def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): wb_st = time.time() workblock = b"" @@ -79,8 +77,10 @@ def validate_pn_stamps_job_multip(transient_list, target_cost): pool_count = min(cores, math.ceil(len(transient_list) / PN_VALIDATION_POOL_MIN_SIZE)) RNS.log(f"Validating {len(transient_list)} stamps using {pool_count} processes...", RNS.LOG_VERBOSE) - with multiprocessing.Pool(pool_count) as p: + with multiprocessing.get_context("spawn").Pool(pool_count) as p: validated_entries = p.starmap(validate_pn_stamp, zip(transient_list, itertools.repeat(target_cost))) + + RNS.log(f"Validation pool completed for {len(transient_list)} stamps", RNS.LOG_VERBOSE) return [e for e in validated_entries if e[0] != None] @@ -210,22 +210,19 @@ def job_linux(stamp_cost, workblock, message_id): job_procs = [] RNS.log(f"Starting {jobs} stamp generation workers", RNS.LOG_DEBUG) for jpn in range(jobs): - process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event, "pn": jpn, "sc": stamp_cost, "wb": workblock}, daemon=True) + process = multiprocessing.get_context("fork").Process(target=job, kwargs={"stop_event": stop_event, "pn": jpn, "sc": stamp_cost, "wb": workblock}, daemon=True) job_procs.append(process) process.start() active_jobs[message_id] = [stop_event, result_queue] stamp = result_queue.get() - RNS.log("Got stamp result from worker", RNS.LOG_DEBUG) # TODO: Remove # Collect any potential spurious # results from worker queue. try: - while True: - result_queue.get_nowait() - except: - pass + while True: result_queue.get_nowait() + except: pass for j in range(jobs): nrounds = 0 @@ -388,4 +385,11 @@ if __name__ == "__main__": RNS.log("", RNS.LOG_DEBUG) RNS.log("Testing peering key generation", RNS.LOG_DEBUG) message_id = os.urandom(32) - generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING) \ No newline at end of file + generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING) + + transient_list = [] + st = time.time(); count = 10000 + for i in range(count): transient_list.append(os.urandom(256)) + validate_pn_stamps(transient_list, 5) + dt = time.time()-st; mps = count/dt + RNS.log(f"Validated {count} PN stamps in {RNS.prettytime(dt)}, {round(mps,1)} m/s", RNS.LOG_DEBUG)