From 763078a1aeccc03f9ffe24396ef92ce0251ba06a Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 10 May 2025 15:38:06 +0200 Subject: [PATCH] Added ability to include metadata on resource transfers --- RNS/Resource.py | 142 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 98 insertions(+), 44 deletions(-) diff --git a/RNS/Resource.py b/RNS/Resource.py index aa97ad9..af75e71 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -33,6 +33,7 @@ import os import bz2 import math import time +import struct import tempfile import threading from threading import Lock @@ -119,6 +120,9 @@ class Resource: # fit in 3 bytes in resource advertisements. MAX_EFFICIENT_SIZE = 16 * 1024 * 1024 - 1 RESPONSE_MAX_GRACE_TIME = 10 + + # Max metadata size is 16777215 (0xFFFFFF) bytes + METADATA_MAX_SIZE = 16 * 1024 * 1024 - 1 # The maximum size to auto-compress with # bz2 before sending. @@ -196,12 +200,15 @@ class Resource: resource.started_transferring = resource.last_activity resource.storagepath = RNS.Reticulum.resourcepath+"/"+resource.original_hash.hex() + resource.meta_storagepath = resource.storagepath+".meta" resource.segment_index = adv.i resource.total_segments = adv.l - if adv.l > 1: - resource.split = True - else: - resource.split = False + + if adv.l > 1: resource.split = True + else: resource.split = False + + if adv.x: resource.has_metadata = True + else: resource.has_metadata = False resource.hashmap = [None] * resource.total_parts resource.hashmap_height = 0 @@ -227,9 +234,7 @@ class Resource: RNS.log("Error while executing resource started callback from "+str(resource)+". The contained exception was: "+str(e), RNS.LOG_ERROR) resource.hashmap_update(0, resource.hashmap_raw) - resource.watchdog_job() - return resource else: @@ -243,15 +248,32 @@ class Resource: # Create a resource for transmission to a remote destination # The data passed can be either a bytes-array or a file opened # in binary read mode. - def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, timeout = None, segment_index = 1, original_hash = None, request_id = None, is_response = False): + def __init__(self, data, link, metadata=None, advertise=True, auto_compress=True, callback=None, progress_callback=None, + timeout = None, segment_index = 1, original_hash = None, request_id = None, is_response = False, sent_metadata_size=0): + data_size = None resource_data = None self.assembly_lock = False self.preparing_next_segment = False self.next_segment = None + self.metadata = None + self.has_metadata = False + self.metadata_size = sent_metadata_size + + if metadata != None: + packed_metadata = umsgpack.packb(metadata) + metadata_size = len(packed_metadata) + if metadata_size > Resource.METADATA_MAX_SIZE: + raise SystemError("Resource metadata size exceeded") + else: + self.metadata = struct.pack(">I", metadata_size)[1:] + packed_metadata + self.metadata_size = len(self.metadata) + self.has_metadata = True + else: + self.metadata = b"" if data != None: - if not hasattr(data, "read") and len(data) > Resource.MAX_EFFICIENT_SIZE: + if not hasattr(data, "read") and self.metadata_size + len(data) > Resource.MAX_EFFICIENT_SIZE: original_data = data data_size = len(original_data) data = tempfile.TemporaryFile() @@ -259,31 +281,43 @@ class Resource: del original_data if hasattr(data, "read"): - if data_size == None: - data_size = os.stat(data.name).st_size + if data_size == None: data_size = os.stat(data.name).st_size + self.total_size = data_size + self.metadata_size - self.total_size = data_size - - if data_size <= Resource.MAX_EFFICIENT_SIZE: + if self.total_size <= Resource.MAX_EFFICIENT_SIZE: self.total_segments = 1 self.segment_index = 1 self.split = False resource_data = data.read() data.close() + else: - self.total_segments = ((data_size-1)//Resource.MAX_EFFICIENT_SIZE)+1 + # self.total_segments = ((data_size-1)//Resource.MAX_EFFICIENT_SIZE)+1 + # self.segment_index = segment_index + # self.split = True + # seek_index = segment_index-1 + # seek_position = seek_index*Resource.MAX_EFFICIENT_SIZE + + self.total_segments = ((self.total_size-1)//Resource.MAX_EFFICIENT_SIZE)+1 self.segment_index = segment_index self.split = True seek_index = segment_index-1 - seek_position = seek_index*Resource.MAX_EFFICIENT_SIZE + first_read_size = Resource.MAX_EFFICIENT_SIZE - self.metadata_size + + if segment_index == 1: + seek_position = 0 + segment_read_size = first_read_size + else: + seek_position = first_read_size + ((seek_index-1)*Resource.MAX_EFFICIENT_SIZE) + segment_read_size = Resource.MAX_EFFICIENT_SIZE data.seek(seek_position) - resource_data = data.read(Resource.MAX_EFFICIENT_SIZE) + resource_data = data.read(segment_read_size) self.input_file = data elif isinstance(data, bytes): data_size = len(data) - self.total_size = data_size + self.total_size = data_size + self.metadata_size resource_data = data self.total_segments = 1 @@ -296,7 +330,9 @@ class Resource: else: raise TypeError("Invalid data instance type passed to resource initialisation") - data = resource_data + if resource_data: + if self.has_metadata: data = self.metadata + resource_data + else: data = resource_data self.status = Resource.NONE self.link = link @@ -559,6 +595,7 @@ class Resource: else: sleep_time = self.last_activity + self.part_timeout_factor*((3*self.sdu)/self.eifr) + Resource.RETRY_GRACE_TIME + extra_wait - time.time() + # TODO: Remove debug at some point # RNS.log(f"EIFR {RNS.prettyspeed(self.eifr)}, ETOF {RNS.prettyshorttime(expected_tof_remaining)} ", RNS.LOG_DEBUG, pt=True) # RNS.log(f"Resource ST {RNS.prettyshorttime(sleep_time)}, RTT {RNS.prettyshorttime(self.rtt or self.link.rtt)}, {self.outstanding_parts} left", RNS.LOG_DEBUG, pt=True) @@ -628,22 +665,27 @@ class Resource: self.status = Resource.ASSEMBLING stream = b"".join(self.parts) - if self.encrypted: - data = self.link.decrypt(stream) - else: - data = stream + if self.encrypted: data = self.link.decrypt(stream) + else: data = stream # Strip off random hash data = data[Resource.RANDOM_HASH_SIZE:] - if self.compressed: - self.data = bz2.decompress(data) - else: - self.data = data + if self.compressed: self.data = bz2.decompress(data) + else: self.data = data calculated_hash = RNS.Identity.full_hash(self.data+self.random_hash) - if calculated_hash == self.hash: + if self.has_metadata and self.segment_index == 1: + # TODO: Add early metadata_ready callback + metadata_size = data[0] << 16 | data[1] << 8 | data[2] + packed_metadata = data[3:3+metadata_size] + metadata_file = open(self.meta_storagepath, "wb") + metadata_file.write(packed_metadata) + metadata_file.close() + del packed_metadata + data = data[3+metadata_size:] + self.file = open(self.storagepath, "ab") self.file.write(self.data) self.file.close() @@ -662,21 +704,27 @@ class Resource: if self.segment_index == self.total_segments: if self.callback != None: + if not os.path.isfile(self.meta_storagepath): + self.metadata = None + else: + metadata_file = open(self.meta_storagepath, "rb") + self.metadata = umsgpack.unpackb(metadata_file.read()) + metadata_file.close() + try: os.unlink(self.meta_storagepath) + except Exception as e: + RNS.log(f"Error while cleaning up resource metadata file, the contained exception was: {e}", RNS.LOG_ERROR) + self.data = open(self.storagepath, "rb") - try: - self.callback(self) + try: self.callback(self) except Exception as e: RNS.log("Error while executing resource assembled callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) try: - if hasattr(self.data, "close") and callable(self.data.close): - self.data.close() - + if hasattr(self.data, "close") and callable(self.data.close): self.data.close() os.unlink(self.storagepath) except Exception as e: - RNS.log("Error while cleaning up resource files, the contained exception was:", RNS.LOG_ERROR) - RNS.log(str(e)) + RNS.log(f"Error while cleaning up resource files, the contained exception was: {e}", RNS.LOG_ERROR) else: RNS.log("Resource segment "+str(self.segment_index)+" of "+str(self.total_segments)+" received, waiting for next segment to be announced", RNS.LOG_DEBUG) @@ -708,6 +756,7 @@ class Resource: is_response = self.is_response, advertise = False, auto_compress = self.auto_compress, + sent_metadata_size = self.metadata_size, ) def validate_proof(self, proof_data): @@ -720,18 +769,18 @@ class Resource: # If all segments were processed, we'll # signal that the resource sending concluded if self.callback != None: - try: - self.callback(self) - except Exception as e: - RNS.log("Error while executing resource concluded callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + try: self.callback(self) + except Exception as e: RNS.log("Error while executing resource concluded callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) finally: try: if hasattr(self, "input_file"): - if hasattr(self.input_file, "close") and callable(self.input_file.close): - self.input_file.close() - - except Exception as e: - RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR) + if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close() + except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR) + else: + try: + if hasattr(self, "input_file"): + if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close() + except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR) else: # Otherwise we'll recursively create the # next segment of the resource @@ -1202,6 +1251,7 @@ class ResourceAdvertisement: self.c = resource.compressed # Compression flag self.e = resource.encrypted # Encryption flag self.s = resource.split # Split flag + self.x = resource.has_metadata # Metadata flag self.i = resource.segment_index # Segment index self.l = resource.total_segments # Total segments self.q = resource.request_id # ID of associated request @@ -1217,7 +1267,7 @@ class ResourceAdvertisement: self.p = True # Flags - self.f = 0x00 | self.p << 4 | self.u << 3 | self.s << 2 | self.c << 1 | self.e + self.f = 0x00 | self.x << 5 | self.p << 4 | self.u << 3 | self.s << 2 | self.c << 1 | self.e def get_transfer_size(self): return self.t @@ -1237,6 +1287,9 @@ class ResourceAdvertisement: def is_compressed(self): return self.c + def has_metadata(self): + return self.x + def get_link(self): return self.link @@ -1286,5 +1339,6 @@ class ResourceAdvertisement: adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False adv.u = True if ((adv.f >> 3) & 0x01) == 0x01 else False adv.p = True if ((adv.f >> 4) & 0x01) == 0x01 else False + adv.x = True if ((adv.f >> 5) & 0x01) == 0x01 else False return adv \ No newline at end of file