This commit is contained in:
Mark Qvist 2026-01-06 17:03:09 +01:00
parent ef2e1234a5
commit 2b7ba9558b
2 changed files with 20 additions and 12 deletions

View file

@ -1365,7 +1365,9 @@ class LXMRouter:
self.save_node_stats()
def sigint_handler(self, signal, frame):
if threading.current_thread() != threading.main_thread(): os._exit(0)
if threading.current_thread() != threading.main_thread():
RNS.log(f"SIGINT on non-main thread {threading.current_thread()}, exiting immediately", RNS.LOG_WARNING)
os._exit(0)
else:
if not self.exit_handler_running:
RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING)
@ -1375,7 +1377,9 @@ class LXMRouter:
RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
def sigterm_handler(self, signal, frame):
if threading.current_thread() != threading.main_thread(): os._exit(0)
if threading.current_thread() != threading.main_thread():
RNS.log(f"SIGTERM on non-main thread {threading.current_thread()}, exiting immediately", RNS.LOG_WARNING)
os._exit(0)
else:
if not self.exit_handler_running:
RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING)

View file

@ -15,8 +15,6 @@ PN_VALIDATION_POOL_MIN_SIZE = 256
active_jobs = {}
if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork")
def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
wb_st = time.time()
workblock = b""
@ -79,8 +77,10 @@ def validate_pn_stamps_job_multip(transient_list, target_cost):
pool_count = min(cores, math.ceil(len(transient_list) / PN_VALIDATION_POOL_MIN_SIZE))
RNS.log(f"Validating {len(transient_list)} stamps using {pool_count} processes...", RNS.LOG_VERBOSE)
with multiprocessing.Pool(pool_count) as p:
with multiprocessing.get_context("spawn").Pool(pool_count) as p:
validated_entries = p.starmap(validate_pn_stamp, zip(transient_list, itertools.repeat(target_cost)))
RNS.log(f"Validation pool completed for {len(transient_list)} stamps", RNS.LOG_VERBOSE)
return [e for e in validated_entries if e[0] != None]
@ -210,22 +210,19 @@ def job_linux(stamp_cost, workblock, message_id):
job_procs = []
RNS.log(f"Starting {jobs} stamp generation workers", RNS.LOG_DEBUG)
for jpn in range(jobs):
process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event, "pn": jpn, "sc": stamp_cost, "wb": workblock}, daemon=True)
process = multiprocessing.get_context("fork").Process(target=job, kwargs={"stop_event": stop_event, "pn": jpn, "sc": stamp_cost, "wb": workblock}, daemon=True)
job_procs.append(process)
process.start()
active_jobs[message_id] = [stop_event, result_queue]
stamp = result_queue.get()
RNS.log("Got stamp result from worker", RNS.LOG_DEBUG) # TODO: Remove
# Collect any potential spurious
# results from worker queue.
try:
while True:
result_queue.get_nowait()
except:
pass
while True: result_queue.get_nowait()
except: pass
for j in range(jobs):
nrounds = 0
@ -388,4 +385,11 @@ if __name__ == "__main__":
RNS.log("", RNS.LOG_DEBUG)
RNS.log("Testing peering key generation", RNS.LOG_DEBUG)
message_id = os.urandom(32)
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING)
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING)
transient_list = []
st = time.time(); count = 10000
for i in range(count): transient_list.append(os.urandom(256))
validate_pn_stamps(transient_list, 5)
dt = time.time()-st; mps = count/dt
RNS.log(f"Validated {count} PN stamps in {RNS.prettytime(dt)}, {round(mps,1)} m/s", RNS.LOG_DEBUG)