Improved timeout calculation and handling.

This commit is contained in:
Mark Qvist 2021-08-28 20:01:01 +02:00
parent 6d441dac02
commit 2678aeb6a1
5 changed files with 112 additions and 41 deletions

View file

@ -46,14 +46,15 @@ class Link:
MDU = math.floor((RNS.Reticulum.MDU-RNS.Identity.AES_HMAC_OVERHEAD)/RNS.Identity.AES128_BLOCKSIZE)*RNS.Identity.AES128_BLOCKSIZE - 1
# TODO: This should not be hardcoded,
# but calculated from something like
# first-hop RTT latency and distance
DEFAULT_TIMEOUT = 60.0
# This value is set at a reasonable
# level for a 1 Kb/s channel.
ESTABLISHMENT_TIMEOUT_PER_HOP = 3
"""
Default timeout for link establishment in seconds.
Default timeout for link establishment in seconds per hop to destination.
"""
TIMEOUT_FACTOR = 3
TRAFFIC_TIMEOUT_FACTOR = 20
KEEPALIVE_TIMEOUT_FACTOR = 4
STALE_GRACE = 2
KEEPALIVE = 360
"""
@ -119,9 +120,10 @@ class Link:
self.rx = 0
self.txbytes = 0
self.rxbytes = 0
self.default_timeout = Link.DEFAULT_TIMEOUT
self.proof_timeout = self.default_timeout
self.timeout_factor = Link.TIMEOUT_FACTOR
self.establishment_timeout = Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, RNS.Transport.hops_to(destination.hash))
RNS.log("Establishment timeout set to: "+str(self.establishment_timeout))
self.traffic_timeout_factor = Link.TRAFFIC_TIMEOUT_FACTOR
self.keepalive_timeout_factor = Link.KEEPALIVE_TIMEOUT_FACTOR
self.keepalive = Link.KEEPALIVE
self.watchdog_lock = False
self.status = Link.PENDING
@ -273,31 +275,35 @@ class Link:
self.had_outbound()
def request(self, path, data = None, response_callback = None, failed_callback = None, timeout = None):
def request(self, path, data = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None):
"""
Sends a request to the remote peer.
:param path: The request path.
:param response_callback: An optional function or method with the signature *response_callback(request_receipt)* to be called when a response is received. See the :ref:`Request Example<example-request>` for more info.
:param failed_callback: An optional function or method with the signature *failed_callback(request_receipt)* to be called when a request fails. See the :ref:`Request Example<example-request>` for more info.
:param timeout: An optional timeout in seconds for the request. If *None* is supplied, this defaults to ``RNS.Packet.TIMEOUT``.
:param progress_callback: An optional function or method with the signature *progress_callback(request_receipt)* to be called when progress is made receiving the response. Progress can be accessed as a float between 0.0 and 1.0 by the *request_receipt.progress* property.
:param timeout: An optional timeout in seconds for the request. If *None* is supplied it will be calculated based on link RTT.
"""
request_path_hash = RNS.Identity.truncated_hash(path.encode("utf-8"))
unpacked_request = [time.time(), request_path_hash, data]
packed_request = umsgpack.packb(unpacked_request)
if timeout == None:
timeout = self.rtt * self.traffic_timeout_factor
if len(packed_request) <= Link.MDU:
request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST)
packet_receipt = request_packet.send()
if timeout != None:
packet_receipt.set_timeout(timeout)
packet_receipt.set_timeout(timeout)
return RequestReceipt(
self,
packet_receipt = packet_receipt,
response_callback = response_callback,
failed_callback = failed_callback,
progress_callback = progress_callback,
timeout = timeout
)
@ -311,6 +317,7 @@ class Link:
resource = request_resource,
response_callback = response_callback,
failed_callback = failed_callback,
progress_callback = progress_callback,
timeout = timeout
)
@ -425,9 +432,9 @@ class Link:
# Link was initiated, but no response
# from destination yet
if self.status == Link.PENDING:
next_check = self.request_time + self.proof_timeout
next_check = self.request_time + self.establishment_timeout
sleep_time = next_check - time.time()
if time.time() >= self.request_time + self.proof_timeout:
if time.time() >= self.request_time + self.establishment_timeout:
RNS.log("Link establishment timed out", RNS.LOG_VERBOSE)
self.status = Link.CLOSED
self.teardown_reason = Link.TIMEOUT
@ -435,9 +442,9 @@ class Link:
sleep_time = 0.001
elif self.status == Link.HANDSHAKE:
next_check = self.request_time + self.proof_timeout
next_check = self.request_time + self.establishment_timeout
sleep_time = next_check - time.time()
if time.time() >= self.request_time + self.proof_timeout:
if time.time() >= self.request_time + self.establishment_timeout:
RNS.log("Timeout waiting for RTT packet from link initiator", RNS.LOG_DEBUG)
self.status = Link.CLOSED
self.teardown_reason = Link.TIMEOUT
@ -446,7 +453,7 @@ class Link:
elif self.status == Link.ACTIVE:
if time.time() >= self.last_inbound + self.keepalive:
sleep_time = self.rtt * self.timeout_factor + Link.STALE_GRACE
sleep_time = self.rtt * self.keepalive_timeout_factor + Link.STALE_GRACE
self.status = Link.STALE
if self.initiator:
self.send_keepalive()
@ -620,7 +627,13 @@ class Link:
if RNS.ResourceAdvertisement.is_request(packet):
RNS.Resource.accept(packet, callback=self.request_resource_concluded)
elif RNS.ResourceAdvertisement.is_response(packet):
RNS.Resource.accept(packet, callback=self.response_resource_concluded)
request_id = RNS.ResourceAdvertisement.get_request_id(packet)
for pending_request in self.pending_requests:
if pending_request.request_id == request_id:
RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress)
pending_request.response_size = RNS.ResourceAdvertisement.get_size(packet)
pending_request.response_transfer_size = RNS.ResourceAdvertisement.get_transfer_size(packet)
pending_request.started_at = time.time()
elif self.resource_strategy == Link.ACCEPT_NONE:
pass
elif self.resource_strategy == Link.ACCEPT_APP:
@ -846,34 +859,41 @@ class RequestReceipt():
DELIVERED = 0x02
READY = 0x03
def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, timeout = None):
def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None):
self.packet_receipt = packet_receipt
self.resource = resource
self.started_at = None
if self.packet_receipt != None:
self.hash = packet_receipt.truncated_hash
self.packet_receipt.set_timeout_callback(self.request_timed_out)
self.started_at = time.time()
elif self.resource != None:
self.hash = resource.request_id
resource.set_callback(self.request_resource_concluded)
self.link = link
self.request_id = self.hash
self.link = link
self.request_id = self.hash
self.response = None
self.status = RequestReceipt.SENT
self.sent_at = time.time()
self.concluded_at = None
self.response = None
self.response_transfer_size = None
self.response_size = None
self.status = RequestReceipt.SENT
self.sent_at = time.time()
self.progress = 0
self.concluded_at = None
self.response_concluded_at = None
if timeout != None:
self.timeout = timeout
else:
self.timeout = RNS.Packet.TIMEOUT
raise ValueError("No timeout specified for request receipt")
self.callbacks = RequestReceiptCallbacks()
self.callbacks.response = response_callback
self.callbacks.failed = failed_callback
self.callbacks.progress = progress_callback
self.link.pending_requests.append(self)
@ -881,6 +901,7 @@ class RequestReceipt():
def request_resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG)
self.started_at = time.time()
self.status = RequestReceipt.DELIVERED
self.__resource_response_timeout = time.time()+self.timeout
load_thread = threading.Thread(target=self.__resource_response_timeout_job)
@ -904,7 +925,13 @@ class RequestReceipt():
if self.callbacks.failed != None:
self.callbacks.failed(self)
def response_resource_progress(self, resource):
self.progress = resource.progress()
self.__resource_response_timeout = time.time()+self.timeout
if self.callbacks.progress != None:
self.callbacks.progress(self)
def __resource_response_timeout_job(self):
while self.status == RequestReceipt.DELIVERED:
if time.time() > self.__resource_response_timeout:
@ -914,8 +941,14 @@ class RequestReceipt():
def response_received(self, response):
self.progress = 1.0
self.response = response
self.status = RequestReceipt.READY
self.response_concluded_at = time.time()
if len(response) <= Link.MDU:
self.response_size = len(response)
self.response_transfer_size = len(response)
if self.packet_receipt != None:
self.packet_receipt.status = RNS.PacketReceipt.DELIVERED
@ -924,11 +957,20 @@ class RequestReceipt():
if self.packet_receipt.callbacks.delivery != None:
self.packet_receipt.callbacks.delivery(self)
if self.callbacks.progress != None:
self.callbacks.progress(self)
if self.callbacks.response != None:
self.callbacks.response(self)
def response_time(self):
if self.status == RequestReceipt.READY:
return self.response_concluded_at - self.started_at
class RequestReceiptCallbacks:
def __init__(self):
self.response = None
self.failed = None
self.failed = None
self.progress = None