From 6b2b66aa259045f5ba27bb0bd5f72ac2d14f83dd Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 12 Aug 2020 20:48:16 +0200 Subject: [PATCH] Moving large transfers to recursive resource segmentation --- Examples/Filetransfer.py | 10 +- RNS/Bundle.py | 227 --------------------------------------- RNS/Resource.py | 159 ++++++++++++++++++++++----- RNS/Reticulum.py | 12 +-- RNS/Transport.py | 52 --------- RNS/__init__.py | 1 - 6 files changed, 141 insertions(+), 320 deletions(-) delete mode 100644 RNS/Bundle.py diff --git a/Examples/Filetransfer.py b/Examples/Filetransfer.py index 257f510..655730b 100644 --- a/Examples/Filetransfer.py +++ b/Examples/Filetransfer.py @@ -135,16 +135,14 @@ def client_request(message, packet): # read it and pack it as a resource RNS.log("Client requested \""+filename+"\"") file = open(os.path.join(serve_path, filename), "rb") - file_data = file.read() - file.close() - - file_resource = RNS.Resource(file_data, packet.link, callback=resource_sending_concluded) + file_resource = RNS.Resource(file, packet.link, callback=resource_sending_concluded) file_resource.filename = filename - except: + except Exception as e: # If somethign went wrong, we close # the link RNS.log("Error while reading file \""+filename+"\"", RNS.LOG_ERROR) packet.link.teardown() + raise e else: # If we don't have it, we close the link RNS.log("Client requested an unknown file") @@ -484,7 +482,7 @@ def download_concluded(resource): try: file = open(saved_filename, "wb") - file.write(resource.data) + file.write(resource.data.read()) file.close() menu_mode = "download_concluded" except: diff --git a/RNS/Bundle.py b/RNS/Bundle.py deleted file mode 100644 index 0b7e17a..0000000 --- a/RNS/Bundle.py +++ /dev/null @@ -1,227 +0,0 @@ -import RNS -import time -import os -import os.path -from .vendor import umsgpack as umsgpack - -class Bundle: - APP_NAME = "rnsbundle" - - NO_CUSTODY = 0x00; - TAKING_CUSTODY = 0x01; - FULL_CUSTODY = 0x02; - REMOVED = 0xFF; - - CHUNK_SIZE = RNS.Resource.MAX_EFFICIENT_SIZE / 4 - - def __init__(self, destination_hash = None, data = None, filepath = None, advertisement_data = None): - self.destination_hash = None - self.is_originator = False - self.state = None - self.data_file = None - self.meta_file = None - self.data_hash = None - self.id = None - self.storagepath = None - self.size = None - self.chunks = 0 - self.created = time.time() - self.heartbeat = created - self.transferring = False - - self.chunk_request_destination = None - - try: - if data != None or filepath != None: - self.destination_hash = destination_hash - self.is_originator = True - self.id = RNS.Identity.getRandomHash() - - if filepath == None and data != None: - try: - self.data_hash = RNS.Identity.fullHash(data) - self.storagepath = Reticulum.bundlepath+"/"+self.id.hex() - self.datapath = self.storagepath+"/data" - self.metadatapath = self.storagepath+"/metadata" - - if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) - else: - RNS.log("Warning, bundle already exists in storage location, recreating", RNS.LOG_DEBUG) - - self.data_file = open(self.datapath, "wb") - self.data_file.write(data) - self.data_file.close() - except Exception as e: - RNS.log("Error while initialising bundle from data, the contained exception was:", RNS.LOG_ERROR) - RNS.log(str(e)) - - self.state = Bundle.FULL_CUSTODY - - elif data == None and filepath != None: - try: - input_file = open(filepath, "rb") - self.data_hash = RNS.Identity.fullHash(input_file.read()) - input_file.seek(0) - - self.storagepath = RNS.Reticulum.bundlepath+"/"+self.id.hex() - self.datapath = self.storagepath+"/data" - self.metadatapath = self.storagepath+"/metadata" - - if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) - else: - RNS.log("Warning, bundle already exists in storage location, recreating", RNS.LOG_DEBUG) - - self.data_file = open(self.datapath, "wb") - self.data_file.write(input_file.read()) - self.data_file.close() - input_file.close() - - except Exception as e: - RNS.log("Error while reading input file for bundle, the contained exception was:", RNS.LOG_ERROR) - RNS.log(str(e)) - - self.state = Bundle.FULL_CUSTODY - - else: - raise ValueError("Bundle cannot be created from data and file path at the same time") - - # Prepare file handles and metadata - self.size = os.stat(self.datapath).st_size - if self.size < 1: - raise IOError("Bundle data is empty") - self.data_file = open(self.datapath, "rb") - - elif advertisement_data != None: - # Incoming bundle transfer - self.id = advertisement_data[1] - self.destination_hash = advertisement_data[0] - self.state = Bundle.TAKING_CUSTODY - - self.storagepath = Reticulum.bundlepath+"/"+self.id.hex() - self.datapath = self.storagepath+"/data" - self.metadatapath = self.storagepath+"/metadata" - - if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) - else: - RNS.log("Warning, bundle already exists in storage location, recreating", RNS.LOG_DEBUG) - - self.data_file = open(self.datapath, "wb") - self.data_file.close() - - self.size = advertisement_data[2] - self.data_file = open(self.datapath, "wb") - - else: - raise ValueError("No source of data specified for bundle initialisation") - - self.chunks = ((self.size-1)//Bundle.CHUNK_SIZE)+1 - self.flush_metadata() - - RNS.Transport.register_bundle(self) - - except Exception as e: - RNS.log("Error while initialising bundle. The contained exception was:", RNS.LOG_ERROR) - RNS.log(str(e), RNS.LOG_ERROR) - # TODO: Remove - raise e - - def get_packed_metadata(self): - metadata = { - "destination": self.destination, - "heartbeat": self.heartbeat, - "size": self.size, - "is_originator": self.is_originator - "state": self.state} - - return umsgpack.packb(metadata) - - def flush_metadata(self): - try: - self.meta_file = open(self.metadatapath, "wb") - self.meta_file.write(self.get_packed_metadata()) - self.meta_file.close() - - except Exception as e: - RNS.log("Error while flushing metadata for bundle "+RNS.prettyhexrep(self.id), RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - - def register_destinations(self, destination): - self.chunk_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "chunk", "request") - self.chunk_request_destination.link_established_callback(requester_connected) - - def advertise(self, advertise_to): - advertisement = [ - self.destination, - self.id, - self.size, - self.chunks] - - advertisement_data = umsgpack.packb(advertisement) - advertisement_packet = RNS.Packet(advertise_to, advertisement_data) - advertisement.packet.send() - - def requester_connected(self, link): - RNS.log("Requester connected to bundle "+RNS.prettyhexrep(self.id), RNS.LOG_DEBUG) - link.packet_callback(chunk_request) - - def chunk_request(self, data, packet): - chunk_index = data[0] - RNS.log("Request for chunk "+str(chunk_index)+"/"+str(self.chunks)+" of bundle "+RNS.prettyhexrep(self.id), RNS.LOG_DEBUG) - if chunk_index < self.chunks: - self.emit_resource(packet.link, chunk_index) - else: - RNS.log("Bundle transfer client requested chunk index out of range, tearing down link.", RNS.LOG_ERROR) - packet.link.teardown() - - def emit_resource(self, link, chunk_index): - if not self.transferring: - chunk_max = self.size-1 - chunk_start = chunk_index*CHUNK_SIZE - chunk_end = (chunk_index+1)*CHUNK_SIZE-1 - if chunk_end > chunk_max: - chunk_end = chunk_max - read_size = chunk_end - chunk_start - - try: - file = open(self.datapath, "rb") - file.seek(chunk_start) - data = file.read(read_size) - chunk_resource = RNS.Resource(data, link, callback=resource_concluded) - chunk_resource.chunk_index = chunk_index - except Exception as e: - RNS.log("Could not read bundle data from storage, the contained exception was:", RNS.LOG_ERROR) - RNS.log(str(e)) - link.teardown() - else: - RNS.log("Bundle chunk "+str(chunk_index)+" for "+RNS.prettyhexrep(self.id)+" was requested while a transfer was already in progress", RNS.LOG_ERROR) - - def resource_concluded(self, resource): - RNS.log("Concluded transferring chunk "+str(resource.chunk_index)+"/"+str(self.chunks)+" of bundle "+RNS.prettyhexrep(self.id), RNS.LOG_DEBUG) - self.transferring = False - - def resign_custody(self): - self.state = Bundle.NO_CUSTODY - self.heartbeat = time.time() - - def custody_proof(self, proof): - pass - - def remove(self): - try: - self.state = Bundle.REMOVED - RNS.Transport.deregister_destination(self.chunk_request_destination) - os.unlink(self.datapath) - os.unlink(self.metadatapath) - os.rmdir(self.storagepath) - except Exception as e: - RNS.log("Error while removing bundle from storage, the contained exception was:", RNS.LOG_ERROR) - RNS.log(str(e), RNS.LOG_ERROR) - - - - -class BundleAdvertisement: - pass \ No newline at end of file diff --git a/RNS/Resource.py b/RNS/Resource.py index 3455005..2149724 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -1,4 +1,5 @@ import RNS +import os import bz2 import math import time @@ -63,6 +64,7 @@ class Resource: resource.size = adv.t resource.uncompressed_size = adv.d resource.hash = adv.h + resource.original_hash = adv.o resource.random_hash = adv.r resource.hashmap_raw = adv.m resource.encrypted = True if resource.flags & 0x01 else False @@ -80,6 +82,14 @@ class Resource: resource.window_flexibility = Resource.WINDOW_FLEXIBILITY resource.last_activity = time.time() + resource.storagepath = RNS.Reticulum.resourcepath+"/"+resource.original_hash.hex() + resource.segment_index = adv.i + resource.total_segments = adv.l + if adv.l > 1: + resource.split = True + else: + resource.split = False + resource.hashmap = [None] * resource.total_parts resource.hashmap_height = 0 resource.waiting_for_hmu = False @@ -98,11 +108,53 @@ class Resource: resource.watchdog_job() return resource - except: + except Exception as e: RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG) + # TODO: Remove + raise e return None - def __init__(self, data, link, advertise=True, auto_compress=True, must_compress=False, callback=None, progress_callback=None): + # Create a resource for transmission to a remote destination + # The data passed can be either a bytes-array or a file opened + # in binary read mode. + def __init__(self, data, link, advertise=True, auto_compress=True, must_compress=False, callback=None, progress_callback=None, segment_index = 1, original_hash = None): + data_size = None + resource_data = None + if hasattr(data, "read"): + data_size = os.stat(data.name).st_size + + if data_size <= Resource.MAX_EFFICIENT_SIZE: + self.total_segments = 1 + self.segment_index = 1 + self.split = False + resource_data = data.read() + data.close() + else: + self.total_segments = ((data_size-1)//Resource.MAX_EFFICIENT_SIZE)+1 + self.segment_index = segment_index + self.split = True + seek_index = segment_index-1 + seek_position = seek_index*Resource.MAX_EFFICIENT_SIZE + + data.seek(seek_position) + resource_data = data.read(Resource.MAX_EFFICIENT_SIZE) + self.input_file = data + + elif isinstance(data, bytes): + data_size = len(data) + resource_data = data + self.total_segments = 1 + self.segment_index = 1 + self.split = False + + elif data == None: + pass + + else: + raise TypeError("Invalid data instance type passed to resource initialisation") + + data = resource_data + self.status = Resource.NONE self.link = link self.max_retries = Resource.MAX_RETRIES @@ -115,6 +167,7 @@ class Resource: self.__watchdog_job_id = 0 self.__progress_callback = progress_callback self.rtt = None + self.grand_total_parts = math.ceil(data_size/Resource.SDU) self.receiver_min_consecutive_height = 0 @@ -179,6 +232,11 @@ class Resource: self.hash = RNS.Identity.fullHash(data+self.random_hash) self.expected_proof = RNS.Identity.fullHash(data+self.hash) + if original_hash == None: + self.original_hash = self.hash + else: + self.original_hash = original_hash + self.parts = [] self.hashmap = b"" collision_guard_list = [] @@ -379,6 +437,9 @@ class Resource: calculated_hash = RNS.Identity.fullHash(self.data+self.random_hash) if calculated_hash == self.hash: + self.file = open(self.storagepath, "ab") + self.file.write(self.data) + self.file.close() self.status = Resource.COMPLETE self.prove() else: @@ -390,9 +451,21 @@ class Resource: RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) self.status = Resource.CORRUPT - if self.callback != None: - self.link.resource_concluded(self) - self.callback(self) + self.link.resource_concluded(self) + + if self.segment_index == self.total_segments: + if self.callback != None: + self.data = open(self.storagepath, "rb") + self.callback(self) + + try: + self.data.close() + os.unlink(self.storagepath) + except Exception as e: + RNS.log("Error while cleaning up resource files, the contained exception was:", RNS.LOG_ERROR) + RNS.log(str(e)) + else: + RNS.log("Resource segment "+str(self.segment_index)+" of "+str(self.total_segments)+" received, waiting for next segment to be announced", RNS.LOG_VERBOSE) def prove(self): @@ -412,9 +485,16 @@ class Resource: if len(proof_data) == RNS.Identity.HASHLENGTH//8*2: if proof_data[RNS.Identity.HASHLENGTH//8:] == self.expected_proof: self.status = Resource.COMPLETE - if self.callback != None: - self.link.resource_concluded(self) - self.callback(self) + self.link.resource_concluded(self) + if self.segment_index == self.total_segments: + # If all segments were processed, we'll + # signal that the resource sending concluded + if self.callback != None: + self.callback(self) + else: + # Otherwise we'll recursively create the + # next segment of the resource + Resource(self.input_file, self.link, callback = self.callback, segment_index = self.segment_index+1, original_hash=self.original_hash) else: pass else: @@ -637,9 +717,21 @@ class Resource: def progress(self): if self.initiator: - progress = self.sent_parts / len(self.parts) - else: - progress = self.received_count / float(self.total_parts) + # TODO: Remove + # progress = self.sent_parts / len(self.parts) + self.processed_parts = (self.segment_index-1)*math.ceil(Resource.MAX_EFFICIENT_SIZE/Resource.SDU) + self.processed_parts += self.sent_parts + self.progress_total_parts = float(self.grand_total_parts) + else: + self.processed_parts = (self.segment_index-1)*math.ceil(Resource.MAX_EFFICIENT_SIZE/Resource.SDU) + self.processed_parts += self.received_count + if self.split: + self.progress_total_parts = float((self.total_segments-1)*math.ceil(Resource.MAX_EFFICIENT_SIZE/Resource.SDU)+self.total_parts) + else: + self.progress_total_parts = float(self.total_parts) + + + progress = self.processed_parts / self.progress_total_parts return progress def __str__(self): @@ -647,36 +739,43 @@ class Resource: class ResourceAdvertisement: - HASHMAP_MAX_LEN = 84 + HASHMAP_MAX_LEN = 75 COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN def __init__(self, resource=None): if resource != None: - self.t = resource.size # Transfer size - self.d = resource.uncompressed_size # Data size - self.n = len(resource.parts) # Number of parts - self.h = resource.hash # Resource hash - self.r = resource.random_hash # Resource random hash - self.m = resource.hashmap # Resource hashmap - self.c = resource.compressed # Compression flag - self.e = resource.encrypted # Encryption flag - self.f = 0x00 | self.c << 1 | self.e # Flags + self.t = resource.size # Transfer size + self.d = resource.uncompressed_size # Data size + self.n = len(resource.parts) # Number of parts + self.h = resource.hash # Resource hash + self.r = resource.random_hash # Resource random hash + self.o = resource.original_hash # First-segment hash + self.m = resource.hashmap # Resource hashmap + self.c = resource.compressed # Compression flag + self.e = resource.encrypted # Encryption flag + self.s = resource.split # Split flag + self.i = resource.segment_index # Segment index + self.l = resource.total_segments # Total segments + self.f = 0x00 | self.s << 2 | self.c << 1 | self.e # Flags def pack(self, segment=0): hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN - hashmap_end = min((segment+1)*ResourceAdvertisement.HASHMAP_MAX_LEN, self.n) + hashmap_end = min((segment+1)*(ResourceAdvertisement.HASHMAP_MAX_LEN), self.n) hashmap = b"" for i in range(hashmap_start,hashmap_end): hashmap += self.m[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] dictionary = { - "t": self.t, - "d": self.d, - "n": self.n, - "h": self.h, - "r": self.r, - "f": self.f, + "t": self.t, # Transfer size + "d": self.d, # Data size + "n": self.n, # Number of parts + "h": self.h, # Resource hash + "r": self.r, # Resource random hash + "o": self.o, # Original hash + "i": self.i, # Segment index + "l": self.l, # Total segments + "f": self.f, # Resource flags "m": hashmap } @@ -692,9 +791,13 @@ class ResourceAdvertisement: adv.n = dictionary["n"] adv.h = dictionary["h"] adv.r = dictionary["r"] + adv.o = dictionary["o"] adv.m = dictionary["m"] adv.f = dictionary["f"] + adv.i = dictionary["i"] + adv.l = dictionary["l"] adv.e = True if (adv.f & 0x01) == 0x01 else False adv.c = True if ((adv.f >> 1) & 0x01) == 0x01 else False + adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False return adv \ No newline at end of file diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index e64dfa9..445f670 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -31,10 +31,10 @@ class Reticulum: if configdir != None: Reticulum.configdir = configdir - Reticulum.configpath = Reticulum.configdir+"/config" - Reticulum.storagepath = Reticulum.configdir+"/storage" - Reticulum.cachepath = Reticulum.configdir+"/storage/cache" - Reticulum.bundlepath = Reticulum.configdir+"/storage/bundles" + Reticulum.configpath = Reticulum.configdir+"/config" + Reticulum.storagepath = Reticulum.configdir+"/storage" + Reticulum.cachepath = Reticulum.configdir+"/storage/cache" + Reticulum.resourcepath = Reticulum.configdir+"/storage/resources" Reticulum.__allow_unencrypted = False Reticulum.__transport_enabled = False @@ -53,8 +53,8 @@ class Reticulum: if not os.path.isdir(Reticulum.cachepath): os.makedirs(Reticulum.cachepath) - if not os.path.isdir(Reticulum.bundlepath): - os.makedirs(Reticulum.bundlepath) + if not os.path.isdir(Reticulum.resourcepath): + os.makedirs(Reticulum.resourcepath) if os.path.isfile(self.configpath): try: diff --git a/RNS/Transport.py b/RNS/Transport.py index 8a6bc0c..a1e5459 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -43,9 +43,6 @@ class Transport: DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of - BUNDLE_TIMEOUT = 60*60*24*7 # Bundles time out after 7 days - BUNDLE_INTERVAL = 180 # How often we should attempt to transfer bundles to their next hop - interfaces = [] # All active interfaces destinations = [] # All active destinations pending_links = [] # Links that are being established @@ -58,7 +55,6 @@ class Transport: destination_table = {} # A lookup table containing the next hop to a given destination reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies link_table = {} # A lookup table containing hops for links - bundle_table = {} # A table for holding references to bundles in transport held_announces = {} # A table containing temporarily held announce-table entries # Transport control destinations are used @@ -112,16 +108,9 @@ class Transport: # Create transport-specific destinations Transport.path_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request") Transport.path_request_destination.packet_callback(Transport.path_request_handler) - - Transport.bundle_advertisement_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "bundle", "advertisement", ) - Transport.bundle_advertisement_destination.packet_callback(Transport.bundle_advertisement_handler) - Transport.control_destinations.append(Transport.path_request_destination) Transport.control_hashes.append(Transport.path_request_destination.hash) - Transport.control_destinations.append(Transport.bundle_advertisement_destination) - Transport.control_hashes.append(Transport.bundle_advertisement_destination.hash) - thread = threading.Thread(target=Transport.jobloop) thread.setDaemon(True) thread.start() @@ -311,8 +300,6 @@ class Transport: Transport.tables_last_culled = time.time() - Transport.bundle_jobs() - except Exception as e: RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -323,35 +310,6 @@ class Transport: for packet in outgoing: packet.send() - @staticmethod - def bundle_jobs(): - removed_bundles = [] - for bundle in Transport.bundle_table: - # The bundle could not be passed on within the allowed - # time, and should be removed from storage - if bundle.heartbeat+Transport.BUNDLE_TIMEOUT < time.time(): - RNS.log("Removing stale bundle "+RNS.prettyhexrep(bundle.id)+" from storage", RNS.LOG_VERBOSE) - removed_bundles.append(bundle) - bundle.remove() - - # Custody was transferred to another node, we'll remove the bundle - if bundle.state == RNS.Bundle.NO_CUSTODY: - RNS.log("Removing bundle "+RNS.prettyhexrep(bundle.id)+" from storage since custody was transferred", RNS.LOG_VERBOSE) - removed_bundles.append(bundle) - bundle.remove() - - # This is an incoming bundle, attempt to retrieve it - if bundle.state == RNS.Bundle.TAKING_CUSTODY: - pass - - # We have custody over this bundle, and we should attempt - # to deliver it to it's next hop. - if bundle.state == RNS.Bundle.FULL_CUSTODY: - pass - - for bundle in removed_bundles: - Transport.bundle_table.remove(bundle) - @staticmethod def outbound(packet): while (Transport.jobs_running): @@ -458,7 +416,6 @@ class Transport: Transport.jobs_locked = False return sent - @staticmethod def packet_filter(packet): # TODO: Think long and hard about this. @@ -917,11 +874,6 @@ class Transport: else: RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR) - @staticmethod - def register_bundle(bundle): - RNS.log("Transport instance registered bundle "+RNS.prettyhexrep(bundle.id), RNS.LOG_DEBUG) - self.bundle_table.append(bundle) - @staticmethod def find_interface_from_hash(interface_hash): for interface in Transport.interfaces: @@ -1043,10 +995,6 @@ class Transport: packet.receiving_interface ) - @staticmethod - def bundle_advertisement_handler(data, packet): - pass - @staticmethod def pathRequest(destination_hash, is_from_local_client, attached_interface): RNS.log("Path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG) diff --git a/RNS/__init__.py b/RNS/__init__.py index 6534da5..033e20d 100755 --- a/RNS/__init__.py +++ b/RNS/__init__.py @@ -12,7 +12,6 @@ from .Destination import Destination from .Packet import Packet from .Packet import PacketReceipt from .Resource import Resource -from .Bundle import Bundle modules = glob.glob(os.path.dirname(__file__)+"/*.py") __all__ = [ os.path.basename(f)[:-3] for f in modules if not f.endswith('__init__.py')]