diff --git a/RNS/Resource.py b/RNS/Resource.py index e8e84f2..a0e867f 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -26,6 +26,7 @@ import bz2 import math import time import threading +from threading import Lock from .vendor import umsgpack as umsgpack from time import sleep @@ -47,7 +48,7 @@ class Resource: WINDOW = 4 # Absolute minimum window size during transfer - WINDOW_MIN = 1 + WINDOW_MIN = 2 # The maximum window size for transfers on slow links WINDOW_MAX_SLOW = 10 @@ -103,7 +104,7 @@ class Resource: PART_TIMEOUT_FACTOR = 4 PART_TIMEOUT_FACTOR_AFTER_RTT = 2 - MAX_RETRIES = 8 + MAX_RETRIES = 16 MAX_ADV_RETRIES = 4 SENDER_GRACE_TIME = 10 RETRY_GRACE_TIME = 0.25 @@ -170,7 +171,8 @@ class Resource: resource.receiving_part = False - resource.consecutive_completed_height = 0 + # TODO: Recheck + resource.consecutive_completed_height = -1 if not resource.link.has_incoming_resource(resource): resource.link.register_incoming_resource(resource) @@ -366,7 +368,8 @@ class Resource: if advertise: self.advertise() else: - pass + self.receive_lock = Lock() + def hashmap_update_packet(self, plaintext): if not self.status == Resource.FAILED: @@ -623,99 +626,99 @@ class Resource: def receive_part(self, packet): - while self.receiving_part: - sleep(0.001) + with self.receive_lock: - self.receiving_part = True - self.last_activity = time.time() - self.retries_left = self.max_retries + self.receiving_part = True + self.last_activity = time.time() + self.retries_left = self.max_retries - if self.req_resp == None: - self.req_resp = self.last_activity - rtt = self.req_resp-self.req_sent - - self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT - if self.rtt == None: - self.rtt = self.link.rtt - self.watchdog_job() - elif rtt < self.rtt: - self.rtt = max(self.rtt - self.rtt*0.05, rtt) - elif rtt > self.rtt: - self.rtt = min(self.rtt + self.rtt*0.05, rtt) + if self.req_resp == None: + self.req_resp = self.last_activity + rtt = self.req_resp-self.req_sent + + self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT + if self.rtt == None: + self.rtt = self.link.rtt + self.watchdog_job() + elif rtt < self.rtt: + self.rtt = max(self.rtt - self.rtt*0.05, rtt) + elif rtt > self.rtt: + self.rtt = min(self.rtt + self.rtt*0.05, rtt) - if rtt > 0: - req_resp_cost = len(packet.raw)+self.req_sent_bytes - self.req_resp_rtt_rate = req_resp_cost / rtt + if rtt > 0: + req_resp_cost = len(packet.raw)+self.req_sent_bytes + self.req_resp_rtt_rate = req_resp_cost / rtt - if self.req_resp_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: - self.fast_rate_rounds += 1 + if self.req_resp_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: + self.fast_rate_rounds += 1 - if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: - self.window_max = Resource.WINDOW_MAX_FAST + if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: + self.window_max = Resource.WINDOW_MAX_FAST - if not self.status == Resource.FAILED: - self.status = Resource.TRANSFERRING - part_data = packet.data - part_hash = self.get_map_hash(part_data) + if not self.status == Resource.FAILED: + self.status = Resource.TRANSFERRING + part_data = packet.data + part_hash = self.get_map_hash(part_data) - i = self.consecutive_completed_height - for map_hash in self.hashmap[self.consecutive_completed_height:self.consecutive_completed_height+self.window]: - if map_hash == part_hash: - if self.parts[i] == None: + consecutive_index = self.consecutive_completed_height if self.consecutive_completed_height >= 0 else 0 + i = consecutive_index + for map_hash in self.hashmap[consecutive_index:consecutive_index+self.window]: + if map_hash == part_hash: + if self.parts[i] == None: - # Insert data into parts list - self.parts[i] = part_data - self.rtt_rxd_bytes += len(part_data) - self.received_count += 1 - self.outstanding_parts -= 1 + # Insert data into parts list + self.parts[i] = part_data + self.rtt_rxd_bytes += len(part_data) + self.received_count += 1 + self.outstanding_parts -= 1 - # Update consecutive completed pointer - if i == self.consecutive_completed_height + 1: - self.consecutive_completed_height = i - - cp = self.consecutive_completed_height + 1 - while cp < len(self.parts) and self.parts[cp] != None: - self.consecutive_completed_height = cp - cp += 1 + # Update consecutive completed pointer + if i == self.consecutive_completed_height + 1: + self.consecutive_completed_height = i + + cp = self.consecutive_completed_height + 1 + while cp < len(self.parts) and self.parts[cp] != None: + self.consecutive_completed_height = cp + cp += 1 - if self.__progress_callback != None: - try: - self.__progress_callback(self) - except Exception as e: - RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + if self.__progress_callback != None: + try: + self.__progress_callback(self) + except Exception as e: + RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) - i += 1 + i += 1 - self.receiving_part = False + self.receiving_part = False - if self.received_count == self.total_parts and not self.assembly_lock: - self.assembly_lock = True - self.assemble() - elif self.outstanding_parts == 0: - # TODO: Figure out if there is a mathematically - # optimal way to adjust windows - if self.window < self.window_max: - self.window += 1 - if (self.window - self.window_min) > (self.window_flexibility-1): - self.window_min += 1 + if self.received_count == self.total_parts and not self.assembly_lock: + self.assembly_lock = True + self.assemble() + elif self.outstanding_parts == 0: + # TODO: Figure out if there is a mathematically + # optimal way to adjust windows + if self.window < self.window_max: + self.window += 1 + if (self.window - self.window_min) > (self.window_flexibility-1): + self.window_min += 1 - if self.req_sent != 0: - rtt = time.time()-self.req_sent - req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req + if self.req_sent != 0: + rtt = time.time()-self.req_sent + req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req - if rtt != 0: - self.req_data_rtt_rate = req_transferred/rtt - self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes + if rtt != 0: + self.req_data_rtt_rate = req_transferred/rtt + self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes - if self.req_data_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: - self.fast_rate_rounds += 1 + if self.req_data_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: + self.fast_rate_rounds += 1 - if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: - self.window_max = Resource.WINDOW_MAX_FAST + if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: + self.window_max = Resource.WINDOW_MAX_FAST - self.request_next() - else: - self.receiving_part = False + self.request_next() + else: + self.receiving_part = False # Called on incoming resource to send a request for more data def request_next(self): @@ -728,11 +731,25 @@ class Resource: hashmap_exhausted = Resource.HASHMAP_IS_NOT_EXHAUSTED requested_hashes = b"" - offset = (1 if self.consecutive_completed_height > 0 else 0) - i = 0; pn = self.consecutive_completed_height+offset + i = 0; pn = self.consecutive_completed_height+1 search_start = pn + search_size = self.window - for part in self.parts[search_start:search_start+self.window]: + # TODO: Remove + # tpm = [] + # tpi = 0 + # try: + # for p in self.parts: + # if p == None: + # tpm.append(None) + # else: + # tpm.append(tpi) + # tpi+=1 + # except Exception as e: + # print(str(e)) + # RNS.log(f"Partmap: "+str(tpm)) + + for part in self.parts[search_start:search_start+search_size]: if part == None: part_hash = self.hashmap[pn] if part_hash != None: @@ -752,7 +769,6 @@ class Resource: hmu_part += last_map_hash self.waiting_for_hmu = True - requested_data = b"" request_data = hmu_part + self.hash + requested_hashes request_packet = RNS.Packet(self.link, request_data, context = RNS.Packet.RESOURCE_REQ) @@ -908,8 +924,7 @@ class Resource: else: self.progress_total_parts = float(self.total_parts) - - progress = self.processed_parts / self.progress_total_parts + progress = min(1.0, self.processed_parts / self.progress_total_parts) return progress def get_transfer_size(self):