diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index 0fe4a2c..3618912 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -85,6 +85,8 @@ import RNS.vendor.umsgpack as msgpack def display_name_from_app_data(app_data=None): if app_data == None: return None + elif len(app_data) == 0: + return None else: # Version 0.5.0+ announce format if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc: @@ -93,7 +95,16 @@ def display_name_from_app_data(app_data=None): if len(peer_data) < 1: return None else: - return peer_data[0].decode("utf-8") + dn = peer_data[0] + if dn == None: + return None + else: + try: + decoded = dn.decode("utf-8") + return decoded + except: + RNS.log("Could not decode display name in included announce data. The contained exception was: {e}", RNS.LOG_ERROR) + return None # Original announce format else: diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 5e3bf36..e0d61fe 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -173,9 +173,9 @@ class LXMRouter: job_thread.setDaemon(True) job_thread.start() - def announce(self, destination_hash): + def announce(self, destination_hash, attached_interface=None): if destination_hash in self.delivery_destinations: - self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash)) + self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash), attached_interface=attached_interface) def announce_propagation_node(self): def delayed_announce(): @@ -202,7 +202,6 @@ class LXMRouter: delivery_destination.set_packet_callback(self.delivery_packet) delivery_destination.set_link_established_callback(self.delivery_link_established) delivery_destination.display_name = display_name - delivery_destination.stamp_cost = stamp_cost if self.enforce_ratchets: delivery_destination.enforce_ratchets() @@ -214,11 +213,38 @@ class LXMRouter: delivery_destination.set_default_app_data(get_app_data) self.delivery_destinations[delivery_destination.hash] = delivery_destination + self.set_inbound_stamp_cost(delivery_destination.hash, stamp_cost) + return delivery_destination def register_delivery_callback(self, callback): self.__delivery_callback = callback + def set_inbound_stamp_cost(self, destination_hash, stamp_cost): + if destination_hash in self.delivery_destinations: + delivery_destination = self.delivery_destinations[destination_hash] + if stamp_cost == None: + delivery_destination.stamp_cost = None + return True + elif type(stamp_cost) == int: + if stamp_cost < 1: + delivery_destination.stamp_cost = None + elif stamp_cost < 255: + delivery_destination.stamp_cost = stamp_cost + else: + return False + + return True + + return False + + def get_outbound_stamp_cost(self, destination_hash): + if destination_hash in self.outbound_stamp_costs: + stamp_cost = self.outbound_stamp_costs[destination_hash][1] + return stamp_cost + else: + return None + def set_outbound_propagation_node(self, destination_hash): if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8 or type(destination_hash) != bytes: raise ValueError("Invalid destination hash for outbound propagation node") @@ -1020,7 +1046,13 @@ class LXMRouter: time.sleep(0.1) self.pending_outbound.append(lxmessage) - self.process_outbound() + + if lxmessage.defer_stamp and lxmessage.stamp_cost == None: + RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but no stamp is required, processing immediately", RNS.LOG_DEBUG) + lxmessage.defer_stamp = False + + if not lxmessage.defer_stamp: + self.process_outbound() def get_outbound_progress(self, lxm_hash): for lxm in self.pending_outbound: @@ -1029,6 +1061,13 @@ class LXMRouter: return None + def get_outbound_lxm_stamp_cost(self, lxm_hash): + for lxm in self.pending_outbound: + if lxm.hash == lxm_hash: + return lxm.stamp_cost + + return None + ### Message Routing & Delivery ######################## ####################################################### @@ -1440,6 +1479,18 @@ class LXMRouter: self.pending_outbound.remove(lxmessage) else: RNS.log("Starting outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + + # Handle potentially deferred stamp generation + if lxmessage.defer_stamp and lxmessage.stamp == None: + RNS.log(f"Generating deferred stamp for {lxmessage} now", RNS.LOG_DEBUG) + lxmessage.stamp = lxmessage.get_stamp() + lxmessage.defer_stamp = False + lxmessage.packed = None + lxmessage.pack() + + if lxmessage.progress == None or lxmessage.progress < 0.01: + lxmessage.progress = 0.01 + # Outbound handling for opportunistic messages if lxmessage.method == LXMessage.OPPORTUNISTIC: if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 76a8661..3b4956d 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -10,13 +10,13 @@ import multiprocessing from .LXMF import APP_NAME class LXMessage: - DRAFT = 0x00 + GENERATING = 0x00 OUTBOUND = 0x01 SENDING = 0x02 SENT = 0x04 DELIVERED = 0x08 FAILED = 0xFF - states = [DRAFT, OUTBOUND, SENDING, SENT, DELIVERED, FAILED] + states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, FAILED] UNKNOWN = 0x00 PACKET = 0x01 @@ -126,7 +126,8 @@ class LXMessage: self.stamp = None self.stamp_cost = stamp_cost self.stamp_valid = False - self.state = LXMessage.DRAFT + self.defer_stamp = False + self.state = LXMessage.GENERATING self.method = LXMessage.UNKNOWN self.progress = 0.0 self.rssi = None @@ -277,53 +278,128 @@ class LXMessage: start_time = time.time() total_rounds = 0 - stop_event = multiprocessing.Event() - result_queue = multiprocessing.Queue(maxsize=1) - rounds_queue = multiprocessing.Queue() - def job(stop_event): - terminated = False - rounds = 0 - - stamp = os.urandom(256//8) - while not LXMessage.stamp_valid(stamp, self.stamp_cost, workblock): - if stop_event.is_set(): - break - - if timeout != None and rounds % 10000 == 0: - if time.time() > start_time + timeout: - RNS.log(f"Stamp generation for {self} timed out", RNS.LOG_ERROR) - return None + if not RNS.vendor.platformutils.is_android(): + stop_event = multiprocessing.Event() + result_queue = multiprocessing.Queue(maxsize=1) + rounds_queue = multiprocessing.Queue() + def job(stop_event): + terminated = False + rounds = 0 stamp = os.urandom(256//8) - rounds += 1 + while not LXMessage.stamp_valid(stamp, self.stamp_cost, workblock): + if stop_event.is_set(): + break - rounds_queue.put(rounds) - if not stop_event.is_set(): - result_queue.put(stamp) + if timeout != None and rounds % 10000 == 0: + if time.time() > start_time + timeout: + RNS.log(f"Stamp generation for {self} timed out", RNS.LOG_ERROR) + return None - job_procs = [] - jobs = multiprocessing.cpu_count() - for _ in range(jobs): - process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event},) - job_procs.append(process) - process.start() + stamp = os.urandom(256//8) + rounds += 1 - stamp = result_queue.get() - stop_event.set() + rounds_queue.put(rounds) + if not stop_event.is_set(): + result_queue.put(stamp) - for j in range(jobs): - process = job_procs[j] - process.join() - total_rounds += rounds_queue.get() + job_procs = [] + jobs = multiprocessing.cpu_count() + for _ in range(jobs): + process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event},) + job_procs.append(process) + process.start() - duration = time.time() - start_time - rounds = total_rounds + stamp = result_queue.get() + stop_event.set() + + for j in range(jobs): + process = job_procs[j] + process.join() + total_rounds += rounds_queue.get() + + duration = time.time() - start_time + rounds = total_rounds + + else: + # Semaphore support is flaky to non-existent on + # Android, so we need to manually dispatch and + # manage workloads here, while periodically + # checking in on the progress. + + use_nacl = False + try: + import nacl.encoding + import nacl.hash + use_nacl = True + except: + pass + + def full_hash(m): + if use_nacl: + return nacl.hash.sha256(m, encoder=nacl.encoding.RawEncoder) + else: + return RNS.Identity.full_hash(m) + + def sv(s, c, w): + target = 0b1<<256-c + m = w+s + result = full_hash(m) + if int.from_bytes(result, byteorder="big") > target: + return False + else: + return True + + stamp = None + wm = multiprocessing.Manager() + jobs = multiprocessing.cpu_count() + + RNS.log(f"Dispatching {jobs} workers for stamp generation...") # TODO: Remove + + results_dict = wm.dict() + while stamp == None: + job_procs = [] + + def job(procnum=None, results_dict=None, wb=None): + RNS.log(f"Worker {procnum} starting...") # TODO: Remove + rounds = 0 + + stamp = os.urandom(256//8) + while not sv(stamp, self.stamp_cost, wb): + if rounds >= 500: + stamp = None + RNS.log(f"Worker {procnum} found no result in {rounds} rounds") # TODO: Remove + break + + stamp = os.urandom(256//8) + rounds += 1 + + results_dict[procnum] = [stamp, rounds] + + for pnum in range(jobs): + process = multiprocessing.Process(target=job, kwargs={"procnum":pnum, "results_dict": results_dict, "wb": workblock},) + job_procs.append(process) + process.start() + + for process in job_procs: + process.join() + + for j in results_dict: + r = results_dict[j] + RNS.log(f"Result from {r}: {r[1]} rounds, stamp: {r[0]}") # TODO: Remove + total_rounds += r[1] + if r[0] != None: + stamp = r[0] + RNS.log(f"Found stamp: {stamp}") # TODO: Remove + + duration = time.time() - start_time + rounds = total_rounds # TODO: Remove stats output RNS.log(f"Stamp generated in {RNS.prettytime(duration)} / {rounds} rounds", RNS.LOG_DEBUG) - RNS.log(f"Rounds per second {int(rounds/duration)}", RNS.LOG_DEBUG) - RNS.log(f"Stamp: {RNS.hexrep(stamp)}", RNS.LOG_DEBUG) - RNS.log(f"Resulting hash: {RNS.hexrep(RNS.Identity.full_hash(workblock+stamp))}", RNS.LOG_DEBUG) + # RNS.log(f"Rounds per second {int(rounds/duration)}", RNS.LOG_DEBUG) + # RNS.log(f"Stamp: {RNS.hexrep(stamp)}", RNS.LOG_DEBUG) + # RNS.log(f"Resulting hash: {RNS.hexrep(RNS.Identity.full_hash(workblock+stamp))}", RNS.LOG_DEBUG) ########################### return stamp @@ -344,9 +420,11 @@ class LXMessage: hashed_part += msgpack.packb(self.payload) self.hash = RNS.Identity.full_hash(hashed_part) self.message_id = self.hash - self.stamp = self.get_stamp() - if self.stamp != None: - self.payload.append(self.stamp) + + if not self.defer_stamp: + self.stamp = self.get_stamp() + if self.stamp != None: + self.payload.append(self.stamp) signed_part = b"" signed_part += hashed_part diff --git a/docs/example_receiver.py b/docs/example_receiver.py index 8effd3e..64f914a 100644 --- a/docs/example_receiver.py +++ b/docs/example_receiver.py @@ -39,7 +39,7 @@ r = RNS.Reticulum() router = LXMF.LXMRouter(storagepath="./tmp1", enforce_stamps=enforce_stamps) identity = RNS.Identity() -my_lxmf_destination = router.register_delivery_identity(identity, stamp_cost=required_stamp_cost) +my_lxmf_destination = router.register_delivery_identity(identity, display_name="Anonymous Peer", stamp_cost=required_stamp_cost) router.register_delivery_callback(delivery_callback) RNS.log("Ready to receive on: "+RNS.prettyhexrep(my_lxmf_destination.hash))