Handle client message download for stamped propagation messages

This commit is contained in:
Mark Qvist 2025-11-01 00:10:30 +01:00
parent 4afb92bf3e
commit df6271a026
3 changed files with 44 additions and 29 deletions

View file

@ -27,6 +27,7 @@ class LXMPeer:
ERROR_INVALID_DATA = 0xf4
ERROR_INVALID_STAMP = 0xf5
ERROR_THROTTLED = 0xf6
ERROR_NOT_FOUND = 0xfd
ERROR_TIMEOUT = 0xfe
STRATEGY_LAZY = 0x01

View file

@ -75,6 +75,7 @@ class LXMRouter:
PR_ALL_MESSAGES = 0x00
STATS_GET_PATH = "/pn/get/stats"
SYNC_REQUEST_PATH = "/pn/peer/sync"
### Developer-facing API ##############################
@ -627,6 +628,7 @@ class LXMRouter:
self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash])
self.control_destination.register_request_handler(LXMRouter.SYNC_REQUEST_PATH, self.peer_sync_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash])
if self.message_storage_limit != None:
limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit)
@ -807,6 +809,18 @@ class LXMRouter:
elif remote_identity.hash != self.identity.hash: return LXMPeer.ERROR_NO_ACCESS
else: return self.compile_stats()
def peer_sync_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY
elif remote_identity.hash != self.identity.hash: return LXMPeer.ERROR_NO_ACCESS
else:
if type(data) != bytes: return LXMPeer.ERROR_INVALID_DATA
elif len(data) != RNS.Identity.TRUNCATED_HASHLENGTH//8: return LXMPeer.ERROR_INVALID_DATA
else:
if not data in self.peers: return LXMPeer.ERROR_NOT_FOUND
else:
self.peers[data].sync()
return True
### Utility & Maintenance #############################
#######################################################
@ -1364,12 +1378,8 @@ class LXMRouter:
return True
def message_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
elif not self.identity_allowed(remote_identity):
return LXMPeer.ERROR_NO_ACCESS
if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY
elif not self.identity_allowed(remote_identity): return LXMPeer.ERROR_NO_ACCESS
else:
try:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery")
@ -1388,9 +1398,7 @@ class LXMRouter:
available_messages.sort(key=lambda e: e[1], reverse=False)
transient_ids = []
for available_entry in available_messages:
transient_ids.append(available_entry[0])
for available_entry in available_messages: transient_ids.append(available_entry[0])
return transient_ids
else:
@ -1416,8 +1424,7 @@ class LXMRouter:
try:
client_transfer_limit = float(data[2])*1000
RNS.log("Client indicates transfer limit of "+RNS.prettysize(client_transfer_limit), RNS.LOG_DEBUG)
except:
pass
except: pass
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
@ -1434,10 +1441,9 @@ class LXMRouter:
lxm_size = len(lxmf_data)
next_size = cumulative_size + (lxm_size+per_message_overhead)
if client_transfer_limit != None and next_size > client_transfer_limit:
pass
if client_transfer_limit != None and next_size > client_transfer_limit: pass
else:
response_messages.append(lxmf_data)
response_messages.append(lxmf_data[:-LXStamper.STAMP_SIZE])
cumulative_size += (lxm_size+per_message_overhead)
except Exception as e:
@ -1446,7 +1452,6 @@ class LXMRouter:
self.client_propagation_messages_served += len(response_messages)
return response_messages
except Exception as e:
RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
@ -2061,7 +2066,8 @@ class LXMRouter:
for validated_entry in validated_messages:
lxmf_data = validated_entry[1]
stamp_value = validated_entry[2]
self.lxmf_propagation(lxmf_data, stamp_value=stamp_value)
stamp_data = validated_entry[3]
self.lxmf_propagation(lxmf_data, stamp_value=stamp_value, stamp_data=stamp_data)
self.client_propagation_messages_received += 1
if len(validated_messages) == len(messages): packet.prove()
@ -2090,7 +2096,7 @@ class LXMRouter:
try:
if type(data) != list and len(data) < 2: return LXMPeer.ERROR_INVALID_DATA
peering_id = self.identity.hash+remote_identity
peering_id = self.identity.hash+remote_identity.hash
target_cost = self.peering_cost
peering_key = data[0]
transient_ids = data[1]
@ -2115,6 +2121,7 @@ class LXMRouter:
except Exception as e:
RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
RNS.trace_exception(e)
return None
def propagation_resource_concluded(self, resource):
@ -2158,15 +2165,18 @@ class LXMRouter:
self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata)
ms = "" if len(messages) == 1 else "s"
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE)
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}, validating stamps...", RNS.LOG_VERBOSE)
min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost)
if len(validated_messages) == len(messages): RNS.log(f"All message stamps validated from {remote_str}", RNS.LOG_VERBOSE)
else: RNS.log(f"Transfer from {remote_str} contained {len(messages)-len(validated_messages)} invalid stamps", RNS.LOG_WARNING)
for validated_entry in validated_messages:
transient_id = validated_entry[0]
lxmf_data = validated_entry[1]
stamp_value = validated_entry[2]
stamp_data = validated_entry[3]
peer = None
if remote_hash != None and remote_hash in self.peers:
@ -2180,7 +2190,7 @@ class LXMRouter:
else:
self.client_propagation_messages_received += 1
self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value)
self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value, stamp_data=stamp_data)
if peer != None: peer.queue_handled_message(transient_id)
else:
@ -2208,7 +2218,8 @@ class LXMRouter:
if peer != from_peer:
peer.queue_unhandled_message(transient_id)
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None, stamp_value=None):
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False,
from_peer=None, stamp_value=None, stamp_data=None):
if is_paper_message: no_stamp_enforcement = True
else: no_stamp_enforcement = False
@ -2236,13 +2247,14 @@ class LXMRouter:
else:
if self.propagation_node:
stamped_data = lxmf_data+stamp_data
value_component = f"_{stamp_value}" if stamp_value and stamp_value > 0 else ""
file_path = f"{self.messagepath}/{RNS.hexrep(transient_id, delimit=False)}_{received}{value_component}"
msg_file = open(file_path, "wb")
msg_file.write(lxmf_data); msg_file.close()
msg_file.write(stamped_data); msg_file.close()
RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)} with stamp value {stamp_value}, adding to peer distribution queues...", RNS.LOG_EXTREME)
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], [], stamp_value]
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(stamped_data), [], [], stamp_value]
self.enqueue_peer_distribution(transient_id, from_peer)
else:
@ -2328,8 +2340,10 @@ class LXMRouter:
return
if selected_lxm.defer_stamp:
if selected_lxm.stamp == None: stamp_generation_success = False
else: stamp_generation_success = True
else: stamp_generation_success = True
if selected_lxm.desired_method == LXMessage.PROPAGATED:
if selected_lxm.propagation_stamp == None: propagation_stamp_generation_success = False

View file

@ -52,23 +52,23 @@ def validate_peering_key(peering_id, peering_key, target_cost):
def validate_pn_stamp(transient_data, target_cost):
from .LXMessage import LXMessage
if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return None, None, None
if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return None, None, None, None
else:
lxm_data = transient_data[:-STAMP_SIZE]
stamp = transient_data[-STAMP_SIZE:]
transient_id = RNS.Identity.full_hash(lxm_data)
workblock = stamp_workblock(transient_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN)
if not stamp_valid(stamp, target_cost, workblock): return None, None, None
if not stamp_valid(stamp, target_cost, workblock): return None, None, None, None
else:
value = stamp_value(workblock, stamp)
return transient_id, lxm_data, value
return transient_id, lxm_data, value, stamp
def validate_pn_stamps_job_simple(transient_list, target_cost):
validated_messages = []
for transient_data in transient_list:
transient_id, lxm_data, value = validate_pn_stamp(transient_data, target_cost)
if transient_id: validated_messages.append([transient_id, lxm_data, value])
transient_id, lxm_data, value, stamp_data = validate_pn_stamp(transient_data, target_cost)
if transient_id: validated_messages.append([transient_id, lxm_data, value, stamp_data])
return validated_messages