mirror of
https://github.com/markqvist/LXMF.git
synced 2024-10-01 01:35:36 -04:00
Stamp cost API functions and multi-process stamp generation on Android
This commit is contained in:
parent
4b5e27a5e2
commit
0d76eee6cd
13
LXMF/LXMF.py
13
LXMF/LXMF.py
@ -85,6 +85,8 @@ import RNS.vendor.umsgpack as msgpack
|
||||
def display_name_from_app_data(app_data=None):
|
||||
if app_data == None:
|
||||
return None
|
||||
elif len(app_data) == 0:
|
||||
return None
|
||||
else:
|
||||
# Version 0.5.0+ announce format
|
||||
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
|
||||
@ -93,7 +95,16 @@ def display_name_from_app_data(app_data=None):
|
||||
if len(peer_data) < 1:
|
||||
return None
|
||||
else:
|
||||
return peer_data[0].decode("utf-8")
|
||||
dn = peer_data[0]
|
||||
if dn == None:
|
||||
return None
|
||||
else:
|
||||
try:
|
||||
decoded = dn.decode("utf-8")
|
||||
return decoded
|
||||
except:
|
||||
RNS.log("Could not decode display name in included announce data. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
return None
|
||||
|
||||
# Original announce format
|
||||
else:
|
||||
|
@ -173,9 +173,9 @@ class LXMRouter:
|
||||
job_thread.setDaemon(True)
|
||||
job_thread.start()
|
||||
|
||||
def announce(self, destination_hash):
|
||||
def announce(self, destination_hash, attached_interface=None):
|
||||
if destination_hash in self.delivery_destinations:
|
||||
self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash))
|
||||
self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash), attached_interface=attached_interface)
|
||||
|
||||
def announce_propagation_node(self):
|
||||
def delayed_announce():
|
||||
@ -202,7 +202,6 @@ class LXMRouter:
|
||||
delivery_destination.set_packet_callback(self.delivery_packet)
|
||||
delivery_destination.set_link_established_callback(self.delivery_link_established)
|
||||
delivery_destination.display_name = display_name
|
||||
delivery_destination.stamp_cost = stamp_cost
|
||||
|
||||
if self.enforce_ratchets:
|
||||
delivery_destination.enforce_ratchets()
|
||||
@ -214,11 +213,38 @@ class LXMRouter:
|
||||
delivery_destination.set_default_app_data(get_app_data)
|
||||
|
||||
self.delivery_destinations[delivery_destination.hash] = delivery_destination
|
||||
self.set_inbound_stamp_cost(delivery_destination.hash, stamp_cost)
|
||||
|
||||
return delivery_destination
|
||||
|
||||
def register_delivery_callback(self, callback):
|
||||
self.__delivery_callback = callback
|
||||
|
||||
def set_inbound_stamp_cost(self, destination_hash, stamp_cost):
|
||||
if destination_hash in self.delivery_destinations:
|
||||
delivery_destination = self.delivery_destinations[destination_hash]
|
||||
if stamp_cost == None:
|
||||
delivery_destination.stamp_cost = None
|
||||
return True
|
||||
elif type(stamp_cost) == int:
|
||||
if stamp_cost < 1:
|
||||
delivery_destination.stamp_cost = None
|
||||
elif stamp_cost < 255:
|
||||
delivery_destination.stamp_cost = stamp_cost
|
||||
else:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_outbound_stamp_cost(self, destination_hash):
|
||||
if destination_hash in self.outbound_stamp_costs:
|
||||
stamp_cost = self.outbound_stamp_costs[destination_hash][1]
|
||||
return stamp_cost
|
||||
else:
|
||||
return None
|
||||
|
||||
def set_outbound_propagation_node(self, destination_hash):
|
||||
if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8 or type(destination_hash) != bytes:
|
||||
raise ValueError("Invalid destination hash for outbound propagation node")
|
||||
@ -1020,7 +1046,13 @@ class LXMRouter:
|
||||
time.sleep(0.1)
|
||||
|
||||
self.pending_outbound.append(lxmessage)
|
||||
self.process_outbound()
|
||||
|
||||
if lxmessage.defer_stamp and lxmessage.stamp_cost == None:
|
||||
RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but no stamp is required, processing immediately", RNS.LOG_DEBUG)
|
||||
lxmessage.defer_stamp = False
|
||||
|
||||
if not lxmessage.defer_stamp:
|
||||
self.process_outbound()
|
||||
|
||||
def get_outbound_progress(self, lxm_hash):
|
||||
for lxm in self.pending_outbound:
|
||||
@ -1029,6 +1061,13 @@ class LXMRouter:
|
||||
|
||||
return None
|
||||
|
||||
def get_outbound_lxm_stamp_cost(self, lxm_hash):
|
||||
for lxm in self.pending_outbound:
|
||||
if lxm.hash == lxm_hash:
|
||||
return lxm.stamp_cost
|
||||
|
||||
return None
|
||||
|
||||
|
||||
### Message Routing & Delivery ########################
|
||||
#######################################################
|
||||
@ -1440,6 +1479,18 @@ class LXMRouter:
|
||||
self.pending_outbound.remove(lxmessage)
|
||||
else:
|
||||
RNS.log("Starting outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
|
||||
|
||||
# Handle potentially deferred stamp generation
|
||||
if lxmessage.defer_stamp and lxmessage.stamp == None:
|
||||
RNS.log(f"Generating deferred stamp for {lxmessage} now", RNS.LOG_DEBUG)
|
||||
lxmessage.stamp = lxmessage.get_stamp()
|
||||
lxmessage.defer_stamp = False
|
||||
lxmessage.packed = None
|
||||
lxmessage.pack()
|
||||
|
||||
if lxmessage.progress == None or lxmessage.progress < 0.01:
|
||||
lxmessage.progress = 0.01
|
||||
|
||||
# Outbound handling for opportunistic messages
|
||||
if lxmessage.method == LXMessage.OPPORTUNISTIC:
|
||||
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
|
||||
|
@ -10,13 +10,13 @@ import multiprocessing
|
||||
from .LXMF import APP_NAME
|
||||
|
||||
class LXMessage:
|
||||
DRAFT = 0x00
|
||||
GENERATING = 0x00
|
||||
OUTBOUND = 0x01
|
||||
SENDING = 0x02
|
||||
SENT = 0x04
|
||||
DELIVERED = 0x08
|
||||
FAILED = 0xFF
|
||||
states = [DRAFT, OUTBOUND, SENDING, SENT, DELIVERED, FAILED]
|
||||
states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, FAILED]
|
||||
|
||||
UNKNOWN = 0x00
|
||||
PACKET = 0x01
|
||||
@ -126,7 +126,8 @@ class LXMessage:
|
||||
self.stamp = None
|
||||
self.stamp_cost = stamp_cost
|
||||
self.stamp_valid = False
|
||||
self.state = LXMessage.DRAFT
|
||||
self.defer_stamp = False
|
||||
self.state = LXMessage.GENERATING
|
||||
self.method = LXMessage.UNKNOWN
|
||||
self.progress = 0.0
|
||||
self.rssi = None
|
||||
@ -277,53 +278,128 @@ class LXMessage:
|
||||
start_time = time.time()
|
||||
total_rounds = 0
|
||||
|
||||
stop_event = multiprocessing.Event()
|
||||
result_queue = multiprocessing.Queue(maxsize=1)
|
||||
rounds_queue = multiprocessing.Queue()
|
||||
def job(stop_event):
|
||||
terminated = False
|
||||
rounds = 0
|
||||
|
||||
stamp = os.urandom(256//8)
|
||||
while not LXMessage.stamp_valid(stamp, self.stamp_cost, workblock):
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
if timeout != None and rounds % 10000 == 0:
|
||||
if time.time() > start_time + timeout:
|
||||
RNS.log(f"Stamp generation for {self} timed out", RNS.LOG_ERROR)
|
||||
return None
|
||||
if not RNS.vendor.platformutils.is_android():
|
||||
stop_event = multiprocessing.Event()
|
||||
result_queue = multiprocessing.Queue(maxsize=1)
|
||||
rounds_queue = multiprocessing.Queue()
|
||||
def job(stop_event):
|
||||
terminated = False
|
||||
rounds = 0
|
||||
|
||||
stamp = os.urandom(256//8)
|
||||
rounds += 1
|
||||
while not LXMessage.stamp_valid(stamp, self.stamp_cost, workblock):
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
rounds_queue.put(rounds)
|
||||
if not stop_event.is_set():
|
||||
result_queue.put(stamp)
|
||||
if timeout != None and rounds % 10000 == 0:
|
||||
if time.time() > start_time + timeout:
|
||||
RNS.log(f"Stamp generation for {self} timed out", RNS.LOG_ERROR)
|
||||
return None
|
||||
|
||||
job_procs = []
|
||||
jobs = multiprocessing.cpu_count()
|
||||
for _ in range(jobs):
|
||||
process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event},)
|
||||
job_procs.append(process)
|
||||
process.start()
|
||||
stamp = os.urandom(256//8)
|
||||
rounds += 1
|
||||
|
||||
stamp = result_queue.get()
|
||||
stop_event.set()
|
||||
rounds_queue.put(rounds)
|
||||
if not stop_event.is_set():
|
||||
result_queue.put(stamp)
|
||||
|
||||
for j in range(jobs):
|
||||
process = job_procs[j]
|
||||
process.join()
|
||||
total_rounds += rounds_queue.get()
|
||||
job_procs = []
|
||||
jobs = multiprocessing.cpu_count()
|
||||
for _ in range(jobs):
|
||||
process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event},)
|
||||
job_procs.append(process)
|
||||
process.start()
|
||||
|
||||
duration = time.time() - start_time
|
||||
rounds = total_rounds
|
||||
stamp = result_queue.get()
|
||||
stop_event.set()
|
||||
|
||||
for j in range(jobs):
|
||||
process = job_procs[j]
|
||||
process.join()
|
||||
total_rounds += rounds_queue.get()
|
||||
|
||||
duration = time.time() - start_time
|
||||
rounds = total_rounds
|
||||
|
||||
else:
|
||||
# Semaphore support is flaky to non-existent on
|
||||
# Android, so we need to manually dispatch and
|
||||
# manage workloads here, while periodically
|
||||
# checking in on the progress.
|
||||
|
||||
use_nacl = False
|
||||
try:
|
||||
import nacl.encoding
|
||||
import nacl.hash
|
||||
use_nacl = True
|
||||
except:
|
||||
pass
|
||||
|
||||
def full_hash(m):
|
||||
if use_nacl:
|
||||
return nacl.hash.sha256(m, encoder=nacl.encoding.RawEncoder)
|
||||
else:
|
||||
return RNS.Identity.full_hash(m)
|
||||
|
||||
def sv(s, c, w):
|
||||
target = 0b1<<256-c
|
||||
m = w+s
|
||||
result = full_hash(m)
|
||||
if int.from_bytes(result, byteorder="big") > target:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
stamp = None
|
||||
wm = multiprocessing.Manager()
|
||||
jobs = multiprocessing.cpu_count()
|
||||
|
||||
RNS.log(f"Dispatching {jobs} workers for stamp generation...") # TODO: Remove
|
||||
|
||||
results_dict = wm.dict()
|
||||
while stamp == None:
|
||||
job_procs = []
|
||||
|
||||
def job(procnum=None, results_dict=None, wb=None):
|
||||
RNS.log(f"Worker {procnum} starting...") # TODO: Remove
|
||||
rounds = 0
|
||||
|
||||
stamp = os.urandom(256//8)
|
||||
while not sv(stamp, self.stamp_cost, wb):
|
||||
if rounds >= 500:
|
||||
stamp = None
|
||||
RNS.log(f"Worker {procnum} found no result in {rounds} rounds") # TODO: Remove
|
||||
break
|
||||
|
||||
stamp = os.urandom(256//8)
|
||||
rounds += 1
|
||||
|
||||
results_dict[procnum] = [stamp, rounds]
|
||||
|
||||
for pnum in range(jobs):
|
||||
process = multiprocessing.Process(target=job, kwargs={"procnum":pnum, "results_dict": results_dict, "wb": workblock},)
|
||||
job_procs.append(process)
|
||||
process.start()
|
||||
|
||||
for process in job_procs:
|
||||
process.join()
|
||||
|
||||
for j in results_dict:
|
||||
r = results_dict[j]
|
||||
RNS.log(f"Result from {r}: {r[1]} rounds, stamp: {r[0]}") # TODO: Remove
|
||||
total_rounds += r[1]
|
||||
if r[0] != None:
|
||||
stamp = r[0]
|
||||
RNS.log(f"Found stamp: {stamp}") # TODO: Remove
|
||||
|
||||
duration = time.time() - start_time
|
||||
rounds = total_rounds
|
||||
|
||||
# TODO: Remove stats output
|
||||
RNS.log(f"Stamp generated in {RNS.prettytime(duration)} / {rounds} rounds", RNS.LOG_DEBUG)
|
||||
RNS.log(f"Rounds per second {int(rounds/duration)}", RNS.LOG_DEBUG)
|
||||
RNS.log(f"Stamp: {RNS.hexrep(stamp)}", RNS.LOG_DEBUG)
|
||||
RNS.log(f"Resulting hash: {RNS.hexrep(RNS.Identity.full_hash(workblock+stamp))}", RNS.LOG_DEBUG)
|
||||
# RNS.log(f"Rounds per second {int(rounds/duration)}", RNS.LOG_DEBUG)
|
||||
# RNS.log(f"Stamp: {RNS.hexrep(stamp)}", RNS.LOG_DEBUG)
|
||||
# RNS.log(f"Resulting hash: {RNS.hexrep(RNS.Identity.full_hash(workblock+stamp))}", RNS.LOG_DEBUG)
|
||||
###########################
|
||||
|
||||
return stamp
|
||||
@ -344,9 +420,11 @@ class LXMessage:
|
||||
hashed_part += msgpack.packb(self.payload)
|
||||
self.hash = RNS.Identity.full_hash(hashed_part)
|
||||
self.message_id = self.hash
|
||||
self.stamp = self.get_stamp()
|
||||
if self.stamp != None:
|
||||
self.payload.append(self.stamp)
|
||||
|
||||
if not self.defer_stamp:
|
||||
self.stamp = self.get_stamp()
|
||||
if self.stamp != None:
|
||||
self.payload.append(self.stamp)
|
||||
|
||||
signed_part = b""
|
||||
signed_part += hashed_part
|
||||
|
@ -39,7 +39,7 @@ r = RNS.Reticulum()
|
||||
|
||||
router = LXMF.LXMRouter(storagepath="./tmp1", enforce_stamps=enforce_stamps)
|
||||
identity = RNS.Identity()
|
||||
my_lxmf_destination = router.register_delivery_identity(identity, stamp_cost=required_stamp_cost)
|
||||
my_lxmf_destination = router.register_delivery_identity(identity, display_name="Anonymous Peer", stamp_cost=required_stamp_cost)
|
||||
router.register_delivery_callback(delivery_callback)
|
||||
|
||||
RNS.log("Ready to receive on: "+RNS.prettyhexrep(my_lxmf_destination.hash))
|
||||
|
Loading…
Reference in New Issue
Block a user