Resource timing, retries

This commit is contained in:
Mark Qvist 2018-04-23 23:42:16 +02:00
parent 23ff873c63
commit 5c94324230
6 changed files with 218 additions and 54 deletions

View file

@ -6,7 +6,6 @@ import threading
import vendor.umsgpack as umsgpack
from time import sleep
class Resource:
WINDOW_MIN = 1
WINDOW_MAX = 10
@ -15,9 +14,10 @@ class Resource:
SDU = RNS.Reticulum.MTU - RNS.Packet.HEADER_MAXSIZE
RANDOM_HASH_SIZE = 4
DEFAULT_TIMEOUT = RNS.Packet.TIMEOUT
MAX_RETRIES = 3
ROUNDTRIP_FACTOR = 1.5
# TODO: Should be allocated more
# intelligently
MAX_RETRIES = 5
SENDER_GRACE_TIME = 10
HASHMAP_IS_NOT_EXHAUSTED = 0x00
HASHMAP_IS_EXHAUSTED = 0xFF
@ -27,9 +27,11 @@ class Resource:
QUEUED = 0x01
ADVERTISED = 0x02
TRANSFERRING = 0x03
COMPLETE = 0x04
FAILED = 0x05
CORRUPT = 0x06
AWAITING_PROOF = 0x04
ASSEMBLING = 0x05
COMPLETE = 0x06
FAILED = 0x07
CORRUPT = 0x08
@staticmethod
def accept(advertisement_packet, callback=None, progress_callback = None):
@ -68,15 +70,24 @@ class Resource:
resource.hashmap_update(0, resource.hashmap_raw)
resource.watchdog_job()
return resource
except Exception as e:
RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_VERBOSE)
traceback.print_exc()
RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG)
return None
def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None):
self.status = Resource.NONE
self.link = link
self.max_retries = Resource.MAX_RETRIES
self.retries_left = self.max_retries
self.default_timeout = self.link.default_timeout
self.timeout_factor = self.link.timeout_factor
self.sender_grace_time = Resource.SENDER_GRACE_TIME
self.hmu_retry_ok = False
self.watchdog_lock = False
self.__watchdog_job_id = 0
self.rtt = None
if data != None:
@ -112,6 +123,7 @@ class Resource:
self.size = len(self.data)
self.hashmap = ""
self.sent_parts = 0
self.parts = []
for i in range(0,int(math.ceil(self.size/float(Resource.SDU)))):
data = self.data[i*Resource.SDU:(i+1)*Resource.SDU]
@ -142,17 +154,22 @@ class Resource:
def hashmap_update_packet(self, plaintext):
if not self.status == Resource.FAILED:
self.last_activity = time.time()
self.retries_left = self.max_retries
update = umsgpack.unpackb(plaintext[RNS.Identity.HASHLENGTH/8:])
self.hashmap_update(update[0], update[1])
def hashmap_update(self, segment, hashmap):
if not self.status == Resource.FAILED:
self.status = Resource.TRANSFERRING
seg_len = ResourceAdvertisement.HASHMAP_MAX_LEN
hashes = len(hashmap)/Resource.MAPHASH_LEN
for i in range(0,hashes):
if self.hashmap[i+segment*seg_len] == None:
self.hashmap_height += 1
self.hashmap[i+segment*seg_len] = hashmap[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN]
self.hashmap_height += 1
self.waiting_for_hmu = False
self.request_next()
@ -167,22 +184,105 @@ class Resource:
def __advertise_job(self):
data = ResourceAdvertisement(self).pack()
packet = RNS.Packet(self.link, data, context=RNS.Packet.RESOURCE_ADV)
self.advertisement_packet = RNS.Packet(self.link, data, context=RNS.Packet.RESOURCE_ADV)
while not self.link.ready_for_new_resource():
self.status = Resource.QUEUED
sleep(0.25)
packet.send()
self.advertisement_packet.send()
self.last_activity = time.time()
self.adv_sent = self.last_activity
self.rtt = None
self.status = Resource.ADVERTISED
self.link.register_outgoing_resource(self)
self.watchdog_job()
def watchdog_job(self):
thread = threading.Thread(target=self.__watchdog_job)
thread.setDaemon(True)
thread.start()
def __watchdog_job(self):
self.__watchdog_job_id += 1
this_job_id = self.__watchdog_job_id
while self.status < Resource.ASSEMBLING and this_job_id == self.__watchdog_job_id:
while self.watchdog_lock:
sleep(0.025)
sleep_time = None
if self.status == Resource.ADVERTISED:
sleep_time = (self.adv_sent+self.default_timeout)-time.time()
if sleep_time < 0:
if self.retries_left <= 0:
RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG)
self.cancel()
sleep_time = 0.001
else:
RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG)
self.retries_left -= 1
self.advertisement_packet.resend()
self.last_activity = time.time()
self.adv_sent = self.last_activity
sleep_time = 0.001
elif self.status == Resource.TRANSFERRING:
if not self.initiator:
rtt = self.link.rtt if self.rtt == None else self.rtt
sleep_time = self.last_activity + (rtt*self.timeout_factor) - time.time()
if sleep_time < 0:
if self.retries_left > 0:
RNS.log("Timeout waiting for parts, requesting retry", RNS.LOG_DEBUG)
sleep_time = 0.001
self.retries_left -= 1
self.waiting_for_hmu = False
self.request_next()
else:
self.cancel()
sleep_time = 0.001
else:
max_wait = self.rtt * self.timeout_factor * self.max_retries + self.sender_grace_time
sleep_time = self.last_activity + max_wait - time.time()
if sleep_time < 0:
RNS.log("Resource timed out waiting for part requests", RNS.LOG_DEBUG)
self.cancel()
sleep_time = 0.001
elif self.status == Resource.AWAITING_PROOF:
sleep_time = self.last_part_sent + (self.rtt*self.timeout_factor+self.sender_grace_time) - time.time()
if sleep_time < 0:
if self.retries_left <= 0:
RNS.log("Resource timed out waiting for proof", RNS.LOG_DEBUG)
self.cancel()
sleep_time = 0.001
else:
RNS.log("All parts sent, but no resource proof received, querying network cache...", RNS.LOG_DEBUG)
self.retries_left -= 1
expected_data = self.hash + self.expected_proof
expected_proof_packet = RNS.Packet(self.link, expected_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF)
expected_proof_packet.pack()
expected_proof_packet.updateHash()
RNS.Transport.cache_request(expected_proof_packet.packet_hash)
self.last_part_sent = time.time()
sleep_time = 0.001
if sleep_time == 0:
RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_WARNING)
if sleep_time == None or sleep_time < 0:
# TODO: This should probably not be here forever
RNS.log("Timing error! Closing Reticulum now.", RNS.LOG_CRITICAL)
RNS.panic()
sleep(sleep_time)
def assemble(self):
if not self.status == Resource.FAILED:
try:
RNS.log("Assembling parts...")
self.status = Resource.ASSEMBLING
stream = ""
for part in self.parts:
stream += part
@ -236,11 +336,14 @@ class Resource:
def receive_part(self, packet):
self.last_activity = time.time()
self.retries_left = self.max_retries
if self.req_resp == None:
self.req_resp = self.last_activity
rtt = self.req_resp-self.req_sent
if self.rtt == None:
self.rtt = rtt
self.watchdog_job()
elif self.rtt < rtt:
self.rtt = rtt
@ -313,7 +416,12 @@ class Resource:
if self.rtt == None:
self.rtt = rtt
self.status == Resource.TRANSFERRING
if self.status != Resource.TRANSFERRING:
self.status = Resource.TRANSFERRING
self.watchdog_job()
self.retries_left = self.max_retries
wants_more_hashmap = True if ord(request_data[0]) == Resource.HASHMAP_IS_EXHAUSTED else False
pad = 1+Resource.MAPHASH_LEN if wants_more_hashmap else 1
@ -322,16 +430,18 @@ class Resource:
for i in range(0,len(requested_hashes)/Resource.MAPHASH_LEN):
requested_hash = requested_hashes[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN]
i = 0
pi = 0
for part in self.parts:
if part.map_hash == requested_hash:
if not part.sent:
part.send()
self.sent_parts += 1
else:
part.resend()
self.last_activity = time.time()
self.last_part_sent = self.last_activity
break
i += 1
pi += 1
if wants_more_hashmap:
last_map_hash = request_data[1:Resource.MAPHASH_LEN+1]
@ -358,21 +468,26 @@ class Resource:
hmu = self.hash+umsgpack.packb([segment, hashmap])
hmu_packet = RNS.Packet(self.link, hmu, context = RNS.Packet.RESOURCE_HMU)
hmu_packet.send()
self.last_activity = time.time()
if self.sent_parts == len(self.parts):
self.status = Resource.AWAITING_PROOF
def cancel(self):
self.status = Resource.FAILED
if self.initiator:
if self.link.status == RNS.Link.ACTIVE:
cancel_packet = RNS.Packet(self.link, self.hash, context=RNS.Packet.RESOURCE_ICL)
cancel_packet.send()
self.link.cancel_outgoing_resource(self)
else:
self.link.cancel_incoming_resource(self)
if self.callback != None:
self.callback(self)
if self.status < Resource.COMPLETE:
self.status = Resource.FAILED
if self.initiator:
if self.link.status == RNS.Link.ACTIVE:
cancel_packet = RNS.Packet(self.link, self.hash, context=RNS.Packet.RESOURCE_ICL)
cancel_packet.send()
self.link.cancel_outgoing_resource(self)
else:
self.link.cancel_incoming_resource(self)
if self.callback != None:
self.callback(self)
def progress_callback(self, callback):
self.__progress_callback = callback
@ -382,7 +497,7 @@ class Resource:
return progress
def __str__(self):
return RNS.prettyHexRep(self.hash)
return RNS.prettyhexrep(self.hash)+str(self.link)
class ResourceAdvertisement: