From 704b37dc167abe6dcc59d3afd36ed62b1882278f Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 31 Oct 2025 21:45:40 +0100 Subject: [PATCH] Implemented client-side propagation stamp generation and inclusion in outbound propagation messages --- LXMF/Handlers.py | 4 +- LXMF/LXMF.py | 2 +- LXMF/LXMPeer.py | 3 +- LXMF/LXMRouter.py | 191 +++++++++++++++++++++++++++++++---------- LXMF/LXMessage.py | 90 +++++++++++++------ LXMF/LXStamper.py | 16 ++-- LXMF/Utilities/lxmd.py | 12 ++- 7 files changed, 228 insertions(+), 90 deletions(-) diff --git a/LXMF/Handlers.py b/LXMF/Handlers.py index a65978c..aa39ea2 100644 --- a/LXMF/Handlers.py +++ b/LXMF/Handlers.py @@ -42,8 +42,8 @@ class LXMFPropagationAnnounceHandler: try: if type(app_data) == bytes: if self.lxmrouter.propagation_node: - data = msgpack.unpackb(app_data) - if pn_announce_data_is_valid(data): + if pn_announce_data_is_valid(app_data): + data = msgpack.unpackb(app_data) node_timebase = int(data[1]) propagation_enabled = data[2] propagation_transfer_limit = int(data[3]) diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index 3a20b0e..fd2abf0 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -160,7 +160,7 @@ def pn_announce_data_is_valid(data): except: raise ValueError("Invalid announce data: Could not decode propagation transfer limit") try: int(data[4]) except: raise ValueError("Invalid announce data: Could not decode propagation sync limit") - if type(data[4]) != list: raise ValueError("Invalid announce data: Could not decode stamp costs") + if type(data[5]) != list: raise ValueError("Invalid announce data: Could not decode stamp costs") try: int(data[5][0]) except: raise ValueError("Invalid announce data: Could not decode target stamp cost") try: int(data[5][1]) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index 7d851f2..199ee2d 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -24,7 +24,8 @@ class LXMPeer: ERROR_NO_ACCESS = 0xf1 ERROR_INVALID_KEY = 0xf3 ERROR_INVALID_DATA = 0xf4 - ERROR_THROTTLED = 0xf5 + ERROR_INVALID_STAMP = 0xf5 + ERROR_THROTTLED = 0xf6 ERROR_TIMEOUT = 0xfe STRATEGY_LAZY = 0x01 diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 798cd5a..39ee7ec 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -159,6 +159,7 @@ class LXMRouter: self.identity = identity self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation") + self.propagation_destination.set_default_app_data(self.get_propagation_node_app_data) self.control_destination = None self.client_propagation_messages_received = 0 self.client_propagation_messages_served = 0 @@ -286,22 +287,24 @@ class LXMRouter: if destination_hash in self.delivery_destinations: self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash), attached_interface=attached_interface) + def get_propagation_node_app_data(self): + node_state = self.propagation_node and not self.from_static_only + stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility, self.peering_cost] + metadata = {} + announce_data = [ False, # 0: Legacy LXMF PN support + int(time.time()), # 1: Current node timebase + node_state, # 2: Boolean flag signalling propagation node state + self.propagation_per_transfer_limit, # 3: Per-transfer limit for message propagation in kilobytes + self.propagation_per_sync_limit, # 4: Limit for incoming propagation node syncs + stamp_cost, # 5: Propagation stamp cost for this node + metadata ] # 6: Node metadata + + return msgpack.packb(announce_data) + def announce_propagation_node(self): def delayed_announce(): time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) - node_state = self.propagation_node and not self.from_static_only - stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility, self.peering_cost] - metadata = {} - announce_data = [ False, # 0: Legacy LXMF PN support - int(time.time()), # 1: Current node timebase - node_state, # 2: Boolean flag signalling propagation node state - self.propagation_per_transfer_limit, # 3: Per-transfer limit for message propagation in kilobytes - self.propagation_per_sync_limit, # 4: Limit for incoming propagation node syncs - stamp_cost, # 5: Propagation stamp cost for this node - metadata ] # 6: Node metadata - - data = msgpack.packb(announce_data) - self.propagation_destination.announce(app_data=data) + self.propagation_destination.announce(app_data=self.get_propagation_node_app_data()) da_thread = threading.Thread(target=delayed_announce) da_thread.setDaemon(True) @@ -380,6 +383,29 @@ class LXMRouter: def get_outbound_propagation_node(self): return self.outbound_propagation_node + def get_outbound_propagation_cost(self): + target_propagation_cost = None + pn_destination_hash = self.get_outbound_propagation_node() + pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash) + if pn_announce_data_is_valid(pn_app_data): + pn_config = msgpack.unpackb(pn_app_data) + target_propagation_cost = pn_config[5][0] + + if not target_propagation_cost: + RNS.log(f"Could not retrieve cached propagation node config. Requesting path to propagation node to get target propagation cost...", RNS.LOG_DEBUG) + RNS.Transport.request_path(pn_destination_hash) + timeout = time.time() + LXMRouter.PATH_REQUEST_WAIT + while not RNS.Identity.recall_app_data(pn_destination_hash) and time.time() < timeout: + time.sleep(0.5) + + pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash) + if pn_announce_data_is_valid(pn_app_data): + pn_config = msgpack.unpackb(pn_app_data) + target_propagation_cost = pn_config[5][0] + + if not target_propagation_cost: RNS.log("Propagation node stamp cost still unavailable after path request", RNS.LOG_ERROR) + return target_propagation_cost + def set_inbound_propagation_node(self, destination_hash): # TODO: Implement raise NotImplementedError("Inbound/outbound propagation node differentiation is currently not implemented") @@ -1525,12 +1551,12 @@ class LXMRouter: else: return False - def cancel_outbound(self, message_id): + def cancel_outbound(self, message_id, cancel_state=LXMessage.CANCELLED): try: if message_id in self.pending_deferred_stamps: lxm = self.pending_deferred_stamps[message_id] RNS.log(f"Cancelling deferred stamp generation for {lxm}", RNS.LOG_DEBUG) - lxm.state = LXMessage.CANCELLED + lxm.state = cancel_state LXStamper.cancel_work(message_id) lxmessage = None @@ -1539,7 +1565,7 @@ class LXMRouter: lxmessage = lxm if lxmessage != None: - lxmessage.state = LXMessage.CANCELLED + lxmessage.state = cancel_state if lxmessage in self.pending_outbound: RNS.log(f"Cancelling {lxmessage} in outbound queue", RNS.LOG_DEBUG) if lxmessage.representation == LXMessage.RESOURCE: @@ -1574,11 +1600,9 @@ class LXMRouter: # destination to reply without generating a stamp. if lxmessage.include_ticket: ticket = self.generate_ticket(lxmessage.destination_hash) - if ticket: - lxmessage.fields[FIELD_TICKET] = ticket + if ticket: lxmessage.fields[FIELD_TICKET] = ticket - if not lxmessage.packed: - lxmessage.pack() + if not lxmessage.packed: lxmessage.pack() unknown_path_requested = False if not RNS.Transport.has_path(destination_hash) and lxmessage.method == LXMessage.OPPORTUNISTIC: @@ -1593,16 +1617,13 @@ class LXMRouter: RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but no stamp is required, processing immediately", RNS.LOG_DEBUG) lxmessage.defer_stamp = False - if not lxmessage.defer_stamp: - while not unknown_path_requested and self.processing_outbound: - time.sleep(0.05) + if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp): + while not unknown_path_requested and self.processing_outbound: time.sleep(0.05) self.pending_outbound.append(lxmessage) - if not unknown_path_requested: - self.process_outbound() + if not unknown_path_requested: self.process_outbound() - else: - self.pending_deferred_stamps[lxmessage.message_id] = lxmessage + else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage def get_outbound_progress(self, lxm_hash): for lxm in self.pending_outbound: @@ -1626,6 +1647,17 @@ class LXMRouter: return None + def get_outbound_lxm_propagation_stamp_cost(self, lxm_hash): + for lxm in self.pending_outbound: + if lxm.hash == lxm_hash: + return lxm.propagation_target_cost + + for lxm_id in self.pending_deferred_stamps: + if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: + return self.pending_deferred_stamps[lxm_id].stamp_cost + + return None + ### Message Routing & Delivery ######################## ####################################################### @@ -2022,7 +2054,12 @@ class LXMRouter: self.lxmf_propagation(lxmf_data, stamp_value=stamp_value) self.client_propagation_messages_received += 1 - if stamps_valid: packet.prove() + if len(validated_messages) == len(messages): packet.prove() + else: + RNS.log("Propagation transfer from client contained messages with invalid stamps", RNS.LOG_NOTICE) + reject_data = msgpack.packb([LXMPeer.ERROR_INVALID_STAMP]) + RNS.Packet(packet.link, reject_data).send() + packet.link.teardown() except Exception as e: RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR) @@ -2281,29 +2318,87 @@ class LXMRouter: return - RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) - generated_stamp = selected_lxm.get_stamp() - if generated_stamp: - selected_lxm.stamp = generated_stamp - selected_lxm.defer_stamp = False - selected_lxm.packed = None - selected_lxm.pack() - self.pending_deferred_stamps.pop(selected_message_id) - self.pending_outbound.append(selected_lxm) - RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) - else: - if selected_lxm.state == LXMessage.CANCELLED: - RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) - selected_lxm.stamp_generation_failed = True - self.pending_deferred_stamps.pop(selected_message_id) - if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): - selected_lxm.failed_callback(lxmessage) + if selected_lxm.stamp == None: stamp_generation_success = False + else: stamp_generation_success = True + + if selected_lxm.desired_method == LXMessage.PROPAGATED: + if selected_lxm.propagation_stamp == None: propagation_stamp_generation_success = False + else: propagation_stamp_generation_success = True + else: propagation_stamp_generation_success = True + + if stamp_generation_success == False: + RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) + generated_stamp = selected_lxm.get_stamp() + if generated_stamp: + selected_lxm.stamp = generated_stamp + selected_lxm.defer_stamp = False + selected_lxm.packed = None + selected_lxm.pack() + stamp_generation_success = True + RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) else: - RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) + if selected_lxm.state == LXMessage.CANCELLED: + RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): + selected_lxm.failed_callback(lxmessage) + else: + RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + self.fail_message(selected_lxm) + + if propagation_stamp_generation_success == False: + RNS.log(f"Starting propagation stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) + pn_target_cost = self.get_outbound_propagation_cost() + if pn_target_cost == None: + RNS.log("Failed to get propagation node stamp cost, cannot generate propagation stamp", RNS.LOG_ERROR) selected_lxm.stamp_generation_failed = True self.pending_deferred_stamps.pop(selected_message_id) self.fail_message(selected_lxm) + else: + propagation_stamp = selected_lxm.get_propagation_stamp(target_cost=pn_target_cost) + RNS.log(f"Generated propagation stamp: {RNS.hexrep(propagation_stamp)}") + if propagation_stamp: + selected_lxm.propagation_stamp = propagation_stamp + selected_lxm.defer_propagation_stamp = False + selected_lxm.packed = None + selected_lxm.pack() + propagation_stamp_generation_success = True + RNS.log(f"Propagation stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) + else: + if selected_lxm.state == LXMessage.CANCELLED: + RNS.log(f"Message cancelled during deferred propagation stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): + selected_lxm.failed_callback(lxmessage) + else: + RNS.log(f"Deferred propagation stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + self.fail_message(selected_lxm) + + if stamp_generation_success and propagation_stamp_generation_success: + self.pending_deferred_stamps.pop(selected_message_id) + self.pending_outbound.append(selected_lxm) + + def propagation_transfer_signalling_packet(self, data, packet): + try: + unpacked = msgpack.unpackb(data) + if type(unpacked) == list and len(unpacked) >= 1: + signal = unpacked[0] + if signal == LXMPeer.ERROR_INVALID_STAMP: + RNS.log("Message rejected by propagation node", RNS.LOG_ERROR) + if hasattr(packet, "link") and hasattr(packet.link, "for_lxmessage"): + lxm = packet.link.for_lxmessage + RNS.log(f"Invalid propagation stamp on {lxm}", RNS.LOG_ERROR) + self.cancel_outbound(lxm.message_id, cancel_state=LXMessage.REJECTED) + + except Exception as e: + RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR) def process_outbound(self, sender = None): if self.processing_outbound: @@ -2347,7 +2442,7 @@ class LXMRouter: elif lxmessage.state == LXMessage.REJECTED: RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - self.pending_outbound.remove(lxmessage) + if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage) if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): lxmessage.failed_callback(lxmessage) @@ -2512,6 +2607,8 @@ class LXMRouter: propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) + self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet) + self.outbound_propagation_link.for_lxmessage = lxmessage else: RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) RNS.Transport.request_path(self.outbound_propagation_node) diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 4739f30..0533f07 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -145,26 +145,32 @@ class LXMessage: self.set_fields(fields) - self.payload = None - self.timestamp = None - self.signature = None - self.hash = None - self.packed = None - self.state = LXMessage.GENERATING - self.method = LXMessage.UNKNOWN - self.progress = 0.0 - self.rssi = None - self.snr = None - self.q = None + self.payload = None + self.timestamp = None + self.signature = None + self.hash = None + self.transient_id = None + self.packed = None + self.state = LXMessage.GENERATING + self.method = LXMessage.UNKNOWN + self.progress = 0.0 + self.rssi = None + self.snr = None + self.q = None - self.stamp = None - self.stamp_cost = stamp_cost - self.stamp_value = None - self.stamp_valid = False - self.stamp_checked = False - self.defer_stamp = True - self.outbound_ticket = None - self.include_ticket = include_ticket + self.stamp = None + self.stamp_cost = stamp_cost + self.stamp_value = None + self.stamp_valid = False + self.stamp_checked = False + self.propagation_stamp = None + self.propagation_stamp_value = None + self.propagation_stamp_valid = False + self.propagation_target_cost = None + self.defer_stamp = True + self.defer_propagation_stamp = True + self.outbound_ticket = None + self.include_ticket = include_ticket self.propagation_packed = None self.paper_packed = None @@ -184,6 +190,7 @@ class LXMessage: self.resource_representation = None self.__delivery_destination = None self.__delivery_callback = None + self.__pn_encrypted_data = None self.failed_callback = None self.deferred_stamp_generating = False @@ -324,10 +331,35 @@ class LXMessage: else: return None + def get_propagation_stamp(self, target_cost, timeout=None): + # If a stamp was already generated, return + # it immediately. + if self.propagation_stamp != None: + return self.propagation_stamp + + # Otherwise, we will need to generate a + # valid stamp according to the cost that + # the propagation node has specified. + else: + self.propagation_target_cost = target_cost + if self.propagation_target_cost == None: + raise ValueError("Cannot generate propagation stamp without configured target propagation cost") + + + if not self.transient_id: self.pack() + generated_stamp, value = LXStamper.generate_stamp(self.transient_id, target_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN) + if generated_stamp: + self.propagation_stamp = generated_stamp + self.propagation_stamp_value = value + self.propagation_stamp_valid = True + return generated_stamp + + else: + return None + def pack(self): if not self.packed: - if self.timestamp == None: - self.timestamp = time.time() + if self.timestamp == None: self.timestamp = time.time() self.propagation_packed = None self.paper_packed = None @@ -343,9 +375,8 @@ class LXMessage: if not self.defer_stamp: self.stamp = self.get_stamp() - if self.stamp != None: - self.payload.append(self.stamp) - + if self.stamp != None: self.payload.append(self.stamp) + signed_part = b"" signed_part += hashed_part signed_part += self.hash @@ -400,9 +431,14 @@ class LXMessage: elif self.desired_method == LXMessage.PROPAGATED: single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT - encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) - self.ratchet_id = self.__destination.latest_ratchet_id - self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]]) + if self.__pn_encrypted_data == None: + self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) + self.ratchet_id = self.__destination.latest_ratchet_id + + lxmf_data = self.packed[:LXMessage.DESTINATION_LENGTH]+self.__pn_encrypted_data + self.transient_id = RNS.Identity.full_hash(lxmf_data) + if self.propagation_stamp != None: lxmf_data += self.propagation_stamp + self.propagation_packed = msgpack.packb([time.time(), [lxmf_data]]) content_size = len(self.propagation_packed) if content_size <= single_packet_content_limit: diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index 2a74295..de71101 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -10,7 +10,7 @@ import multiprocessing WORKBLOCK_EXPAND_ROUNDS = 3000 WORKBLOCK_EXPAND_ROUNDS_PN = 1000 WORKBLOCK_EXPAND_ROUNDS_PEERING = 25 -STAMP_SIZE = RNS.Identity.HASHLENGTH +STAMP_SIZE = RNS.Identity.HASHLENGTH//8 PN_VALIDATION_POOL_MIN_SIZE = 256 active_jobs = {} @@ -24,7 +24,7 @@ def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): salt=RNS.Identity.full_hash(material+msgpack.packb(n)), context=None) wb_time = time.time() - wb_st - RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG) + # RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG) return workblock @@ -52,23 +52,23 @@ def validate_peering_key(peering_id, peering_key, target_cost): def validate_pn_stamp(transient_data, target_cost): from .LXMessage import LXMessage - if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return False, None, None + if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return None, None, None else: lxm_data = transient_data[:-STAMP_SIZE] stamp = transient_data[-STAMP_SIZE:] transient_id = RNS.Identity.full_hash(lxm_data) workblock = stamp_workblock(transient_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN) - if not stamp_valid(stamp, target_cost, workblock): return False, None, None + if not stamp_valid(stamp, target_cost, workblock): return None, None, None else: value = stamp_value(workblock, stamp) - return True, transient_id, value + return transient_id, lxm_data, value def validate_pn_stamps_job_simple(transient_list, target_cost): validated_messages = [] for transient_data in transient_list: - stamp_valid, transient_id, value = validate_pn_stamp(transient_data, target_cost) - if stamp_valid: validated_messages.append([transient_id, transient_data, value]) + transient_id, lxm_data, value = validate_pn_stamp(transient_data, target_cost) + if transient_id: validated_messages.append([transient_id, lxm_data, value]) return validated_messages @@ -80,7 +80,7 @@ def validate_pn_stamps_job_multip(transient_list, target_cost): with multiprocessing.Pool(pool_count) as p: validated_entries = p.starmap(validate_pn_stamp, zip(transient_list, itertools.repeat(target_cost))) - return [e for e in validated_entries if e[0] == True] + return [e for e in validated_entries if e[0] != None] def validate_pn_stamps(transient_list, target_cost): non_mp_platform = RNS.vendor.platformutils.is_android() diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index b2bc302..69bb26e 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -588,16 +588,18 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = if show_status: msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"]) - ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"]) + ptl = RNS.prettysize(s["propagation_limit"]*1000); psl = RNS.prettysize(s["sync_limit"]*1000); + uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"]) mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"] ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"] cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"] psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"] pc = s["peering_cost"]; pcm = s["max_peering_cost"] print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})") - print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit") print(f"Required propagation stamp cost is {psc}, flexibility is {scf}") print(f"Peering cost is {pc}, max remote peering cost is {pcm}") + print(f"Accepting propagated messages from {who_str}") + print(f"{ptl} message limit, {psl} sync limit") print(f"") print(f"Peers : {stp} total (peer limit is {smp})") print(f" {sdp} discovered, {ssp} static") @@ -642,14 +644,16 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = else: ls = "never synced" - sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000) + sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]) + stl = RNS.prettysize(p["transfer_limit"]*1000); ssl = RNS.prettysize(p["sync_limit"]*1000) srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"] pmi = pm["incoming"]; pmuh = pm["unhandled"] print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}") print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago") print(f"{ind*2}Costs : Propagation {psc} (flex {psf}), peering {pc}") print(f"{ind*2}Sync key : {pk}") - print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit") + print(f"{ind*2}Speeds : {sstr} STR, {sler} LER") + print(f"{ind*2}Limits : {stl} message limit, {ssl} sync limit") print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming") print(f"{ind*2}Traffic : {srxb} received, {stxb} sent") ms = "" if pm["unhandled"] == 1 else "s"