Added ability to cancel stamp generation

This commit is contained in:
Mark Qvist 2025-01-18 20:11:31 +01:00
parent a676954116
commit d6b1b9c94d
3 changed files with 78 additions and 14 deletions

View File

@ -16,6 +16,8 @@ from .LXMessage import LXMessage
from .Handlers import LXMFDeliveryAnnounceHandler
from .Handlers import LXMFPropagationAnnounceHandler
import LXMF.LXStamper as LXStamper
class LXMRouter:
MAX_DELIVERY_ATTEMPTS = 5
PROCESSING_INTERVAL = 4
@ -1236,14 +1238,17 @@ class LXMRouter:
def cancel_outbound(self, message_id):
try:
if message_id in self.pending_deferred_stamps:
lxm = self.pending_deferred_stamps[message_id]
RNS.log(f"Cancelling deferred stamp generation for {lxm}", RNS.LOG_DEBUG)
lxm.state = LXMessage.CANCELLED
LXStamper.cancel_work(message_id)
lxmessage = None
for lxm in self.pending_outbound:
if lxm.message_id == message_id:
lxmessage = lxm
if message_id in self.pending_deferred_stamps:
RNS.log(f"Cancelling deferred stamp generation for {lxmessage}", RNS.LOG_DEBUG)
if lxmessage != None:
lxmessage.state = LXMessage.CANCELLED
if lxmessage in self.pending_outbound:
@ -1793,6 +1798,15 @@ class LXMRouter:
selected_message_id = message_id
if selected_lxm != None:
if selected_lxm.state == LXMessage.CANCELLED:
RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
selected_lxm.stamp_generation_failed = True
self.pending_deferred_stamps.pop(selected_message_id)
if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
selected_lxm.failed_callback(lxmessage)
return
RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
generated_stamp = selected_lxm.get_stamp()
if generated_stamp:
@ -1805,9 +1819,11 @@ class LXMRouter:
RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
else:
if selected_lxm.state == LXMessage.CANCELLED:
RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_ERROR)
RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
selected_lxm.stamp_generation_failed = True
self.pending_deferred_stamps.pop(selected_message_id)
if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
selected_lxm.failed_callback(lxmessage)
else:
RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
selected_lxm.stamp_generation_failed = True

View File

@ -7,6 +7,8 @@ import multiprocessing
WORKBLOCK_EXPAND_ROUNDS = 3000
active_jobs = {}
def stamp_workblock(message_id):
wb_st = time.time()
expand_rounds = WORKBLOCK_EXPAND_ROUNDS
@ -44,23 +46,56 @@ def generate_stamp(message_id, stamp_cost):
value = 0
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
stamp, rounds = job_simple(stamp_cost, workblock)
stamp, rounds = job_simple(stamp_cost, workblock, message_id)
elif RNS.vendor.platformutils.is_android():
stamp, rounds = job_android(stamp_cost, workblock)
stamp, rounds = job_android(stamp_cost, workblock, message_id)
else:
stamp, rounds = job_linux(stamp_cost, workblock)
stamp, rounds = job_linux(stamp_cost, workblock, message_id)
duration = time.time() - start_time
speed = rounds/duration
if stamp != None:
value = stamp_value(workblock, stamp)
RNS.log(f"Stamp with value {value} generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG)
return stamp, value
def job_simple(stamp_cost, workblock):
def cancel_work(message_id):
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
try:
if message_id in active_jobs:
active_jobs[message_id] = True
except Exception as e:
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
elif RNS.vendor.platformutils.is_android():
try:
if message_id in active_jobs:
active_jobs[message_id] = True
except Exception as e:
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
else:
try:
if message_id in active_jobs:
stop_event = active_jobs[message_id][0]
result_queue = active_jobs[message_id][1]
stop_event.set()
result_queue.put(None)
active_jobs.pop(message_id)
except Exception as e:
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def job_simple(stamp_cost, workblock, message_id):
# A simple, single-process stamp generator.
# should work on any platform, and is used
# as a fall-back, in case of limited multi-
@ -73,6 +108,8 @@ def job_simple(stamp_cost, workblock):
pstamp = os.urandom(256//8)
st = time.time()
active_jobs[message_id] = False;
def sv(s, c, w):
target = 0b1<<256-c; m = w+s
result = RNS.Identity.full_hash(m)
@ -81,15 +118,20 @@ def job_simple(stamp_cost, workblock):
else:
return True
while not sv(pstamp, stamp_cost, workblock):
while not sv(pstamp, stamp_cost, workblock) and not active_jobs[message_id]:
pstamp = os.urandom(256//8); rounds += 1
if rounds % 2500 == 0:
speed = rounds / (time.time()-st)
RNS.log(f"Stamp generation running. {rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG)
if active_jobs[message_id] == True:
pstamp = None
active_jobs.pop(message_id)
return pstamp, rounds
def job_linux(stamp_cost, workblock):
def job_linux(stamp_cost, workblock, message_id):
allow_kill = True
stamp = None
total_rounds = 0
@ -126,6 +168,8 @@ def job_linux(stamp_cost, workblock):
job_procs.append(process)
process.start()
active_jobs[message_id] = [stop_event, result_queue]
stamp = result_queue.get()
RNS.log("Got stamp result from worker", RNS.LOG_DEBUG) # TODO: Remove
@ -170,7 +214,7 @@ def job_linux(stamp_cost, workblock):
return stamp, total_rounds
def job_android(stamp_cost, workblock):
def job_android(stamp_cost, workblock, message_id):
# Semaphore support is flaky to non-existent on
# Android, so we need to manually dispatch and
# manage workloads here, while periodically
@ -230,10 +274,12 @@ def job_android(stamp_cost, workblock):
RNS.log(f"Stamp generation worker error: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
active_jobs[message_id] = False;
RNS.log(f"Dispatching {jobs} workers for stamp generation...", RNS.LOG_DEBUG) # TODO: Remove
results_dict = wm.dict()
while stamp == None:
while stamp == None and active_jobs[message_id] == False:
job_procs = []
try:
for pnum in range(jobs):
@ -260,6 +306,8 @@ def job_android(stamp_cost, workblock):
RNS.log(f"Stamp generation job error: {e}")
RNS.trace_exception(e)
active_jobs.pop(message_id)
return stamp, total_rounds
if __name__ == "__main__":