From df6271a02637087c78e0ab227d5e48ebd858258d Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 1 Nov 2025 00:10:30 +0100 Subject: [PATCH] Handle client message download for stamped propagation messages --- LXMF/LXMPeer.py | 1 + LXMF/LXMRouter.py | 62 +++++++++++++++++++++++++++++------------------ LXMF/LXStamper.py | 10 ++++---- 3 files changed, 44 insertions(+), 29 deletions(-) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index a67d3b2..786514d 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -27,6 +27,7 @@ class LXMPeer: ERROR_INVALID_DATA = 0xf4 ERROR_INVALID_STAMP = 0xf5 ERROR_THROTTLED = 0xf6 + ERROR_NOT_FOUND = 0xfd ERROR_TIMEOUT = 0xfe STRATEGY_LAZY = 0x01 diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 3ab85e8..c7c6051 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -75,6 +75,7 @@ class LXMRouter: PR_ALL_MESSAGES = 0x00 STATS_GET_PATH = "/pn/get/stats" + SYNC_REQUEST_PATH = "/pn/peer/sync" ### Developer-facing API ############################## @@ -627,6 +628,7 @@ class LXMRouter: self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash]) + self.control_destination.register_request_handler(LXMRouter.SYNC_REQUEST_PATH, self.peer_sync_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash]) if self.message_storage_limit != None: limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit) @@ -807,6 +809,18 @@ class LXMRouter: elif remote_identity.hash != self.identity.hash: return LXMPeer.ERROR_NO_ACCESS else: return self.compile_stats() + def peer_sync_request(self, path, data, request_id, remote_identity, requested_at): + if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY + elif remote_identity.hash != self.identity.hash: return LXMPeer.ERROR_NO_ACCESS + else: + if type(data) != bytes: return LXMPeer.ERROR_INVALID_DATA + elif len(data) != RNS.Identity.TRUNCATED_HASHLENGTH//8: return LXMPeer.ERROR_INVALID_DATA + else: + if not data in self.peers: return LXMPeer.ERROR_NOT_FOUND + else: + self.peers[data].sync() + return True + ### Utility & Maintenance ############################# ####################################################### @@ -1364,12 +1378,8 @@ class LXMRouter: return True def message_get_request(self, path, data, request_id, remote_identity, requested_at): - if remote_identity == None: - return LXMPeer.ERROR_NO_IDENTITY - - elif not self.identity_allowed(remote_identity): - return LXMPeer.ERROR_NO_ACCESS - + if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY + elif not self.identity_allowed(remote_identity): return LXMPeer.ERROR_NO_ACCESS else: try: remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery") @@ -1388,9 +1398,7 @@ class LXMRouter: available_messages.sort(key=lambda e: e[1], reverse=False) transient_ids = [] - for available_entry in available_messages: - transient_ids.append(available_entry[0]) - + for available_entry in available_messages: transient_ids.append(available_entry[0]) return transient_ids else: @@ -1416,8 +1424,7 @@ class LXMRouter: try: client_transfer_limit = float(data[2])*1000 RNS.log("Client indicates transfer limit of "+RNS.prettysize(client_transfer_limit), RNS.LOG_DEBUG) - except: - pass + except: pass per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now cumulative_size = 24 # Initialised to highest reasonable binary structure overhead @@ -1434,10 +1441,9 @@ class LXMRouter: lxm_size = len(lxmf_data) next_size = cumulative_size + (lxm_size+per_message_overhead) - if client_transfer_limit != None and next_size > client_transfer_limit: - pass + if client_transfer_limit != None and next_size > client_transfer_limit: pass else: - response_messages.append(lxmf_data) + response_messages.append(lxmf_data[:-LXStamper.STAMP_SIZE]) cumulative_size += (lxm_size+per_message_overhead) except Exception as e: @@ -1446,7 +1452,6 @@ class LXMRouter: self.client_propagation_messages_served += len(response_messages) return response_messages - except Exception as e: RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG) return None @@ -2061,7 +2066,8 @@ class LXMRouter: for validated_entry in validated_messages: lxmf_data = validated_entry[1] stamp_value = validated_entry[2] - self.lxmf_propagation(lxmf_data, stamp_value=stamp_value) + stamp_data = validated_entry[3] + self.lxmf_propagation(lxmf_data, stamp_value=stamp_value, stamp_data=stamp_data) self.client_propagation_messages_received += 1 if len(validated_messages) == len(messages): packet.prove() @@ -2090,7 +2096,7 @@ class LXMRouter: try: if type(data) != list and len(data) < 2: return LXMPeer.ERROR_INVALID_DATA - peering_id = self.identity.hash+remote_identity + peering_id = self.identity.hash+remote_identity.hash target_cost = self.peering_cost peering_key = data[0] transient_ids = data[1] @@ -2115,6 +2121,7 @@ class LXMRouter: except Exception as e: RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG) + RNS.trace_exception(e) return None def propagation_resource_concluded(self, resource): @@ -2158,15 +2165,18 @@ class LXMRouter: self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata) ms = "" if len(messages) == 1 else "s" - RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE) + RNS.log(f"Received {len(messages)} message{ms} from {remote_str}, validating stamps...", RNS.LOG_VERBOSE) min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost) + if len(validated_messages) == len(messages): RNS.log(f"All message stamps validated from {remote_str}", RNS.LOG_VERBOSE) + else: RNS.log(f"Transfer from {remote_str} contained {len(messages)-len(validated_messages)} invalid stamps", RNS.LOG_WARNING) for validated_entry in validated_messages: transient_id = validated_entry[0] lxmf_data = validated_entry[1] stamp_value = validated_entry[2] + stamp_data = validated_entry[3] peer = None if remote_hash != None and remote_hash in self.peers: @@ -2180,7 +2190,7 @@ class LXMRouter: else: self.client_propagation_messages_received += 1 - self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value) + self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value, stamp_data=stamp_data) if peer != None: peer.queue_handled_message(transient_id) else: @@ -2208,7 +2218,8 @@ class LXMRouter: if peer != from_peer: peer.queue_unhandled_message(transient_id) - def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None, stamp_value=None): + def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, + from_peer=None, stamp_value=None, stamp_data=None): if is_paper_message: no_stamp_enforcement = True else: no_stamp_enforcement = False @@ -2236,13 +2247,14 @@ class LXMRouter: else: if self.propagation_node: + stamped_data = lxmf_data+stamp_data value_component = f"_{stamp_value}" if stamp_value and stamp_value > 0 else "" file_path = f"{self.messagepath}/{RNS.hexrep(transient_id, delimit=False)}_{received}{value_component}" msg_file = open(file_path, "wb") - msg_file.write(lxmf_data); msg_file.close() + msg_file.write(stamped_data); msg_file.close() RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)} with stamp value {stamp_value}, adding to peer distribution queues...", RNS.LOG_EXTREME) - self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], [], stamp_value] + self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(stamped_data), [], [], stamp_value] self.enqueue_peer_distribution(transient_id, from_peer) else: @@ -2328,8 +2340,10 @@ class LXMRouter: return - if selected_lxm.stamp == None: stamp_generation_success = False - else: stamp_generation_success = True + if selected_lxm.defer_stamp: + if selected_lxm.stamp == None: stamp_generation_success = False + else: stamp_generation_success = True + else: stamp_generation_success = True if selected_lxm.desired_method == LXMessage.PROPAGATED: if selected_lxm.propagation_stamp == None: propagation_stamp_generation_success = False diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index de71101..8ebefd7 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -52,23 +52,23 @@ def validate_peering_key(peering_id, peering_key, target_cost): def validate_pn_stamp(transient_data, target_cost): from .LXMessage import LXMessage - if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return None, None, None + if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return None, None, None, None else: lxm_data = transient_data[:-STAMP_SIZE] stamp = transient_data[-STAMP_SIZE:] transient_id = RNS.Identity.full_hash(lxm_data) workblock = stamp_workblock(transient_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN) - if not stamp_valid(stamp, target_cost, workblock): return None, None, None + if not stamp_valid(stamp, target_cost, workblock): return None, None, None, None else: value = stamp_value(workblock, stamp) - return transient_id, lxm_data, value + return transient_id, lxm_data, value, stamp def validate_pn_stamps_job_simple(transient_list, target_cost): validated_messages = [] for transient_data in transient_list: - transient_id, lxm_data, value = validate_pn_stamp(transient_data, target_cost) - if transient_id: validated_messages.append([transient_id, lxm_data, value]) + transient_id, lxm_data, value, stamp_data = validate_pn_stamp(transient_data, target_cost) + if transient_id: validated_messages.append([transient_id, lxm_data, value, stamp_data]) return validated_messages