Implemented request and response API

This commit is contained in:
Mark Qvist 2021-08-20 23:29:06 +02:00
parent 69930e5652
commit 1dc6655017
19 changed files with 881 additions and 42 deletions

View file

@ -110,6 +110,7 @@ class Link:
self.resource_strategy = Link.ACCEPT_NONE
self.outgoing_resources = []
self.incoming_resources = []
self.pending_requests = []
self.last_inbound = 0
self.last_outbound = 0
self.tx = 0
@ -265,6 +266,26 @@ class Link:
self.had_outbound()
def request(self, path, data = None, response_callback = None, failed_callback = None):
"""
Sends a request to the remote peer.
:param path: The request path.
:param response_callback: A function or method with the signature *response_callback(request_receipt)* to be called when a response is received. See the :ref:`Request Example<example-request>` for more info.
:param failed_callback: A function or method with the signature *failed_callback(request_receipt)* to be called when a request fails. See the :ref:`Request Example<example-request>` for more info.
"""
request_path_hash = RNS.Identity.truncated_hash(path.encode("utf-8"))
unpacked_request = [time.time(), request_path_hash, data]
packed_request = umsgpack.packb(unpacked_request)
if len(packed_request) <= Link.MDU:
request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST)
return RequestReceipt(self, request_packet.send(), response_callback, failed_callback)
else:
# TODO: Implement sending requests as Resources
raise IOError("Request size of "+str(len(packed_request))+" exceeds MDU of "+str(Link.MDU)+" bytes")
def rtt_packet(self, packet):
try:
# TODO: This is crude, we should use the delta
@ -467,6 +488,70 @@ class Link:
if self.callbacks.remote_identified != None:
self.callbacks.remote_identified(self.__remote_identity)
elif packet.context == RNS.Packet.REQUEST:
try:
request_id = packet.getTruncatedHash()
packed_request = self.decrypt(packet.data)
unpacked_request = umsgpack.unpackb(packed_request)
requested_at = unpacked_request[0]
path_hash = unpacked_request[1]
request_data = unpacked_request[2]
if path_hash in self.destination.request_handlers:
request_handler = self.destination.request_handlers[path_hash]
path = request_handler[0]
response_generator = request_handler[1]
allow = request_handler[2]
allowed_list = request_handler[3]
allowed = False
if not allow == RNS.Destination.ALLOW_NONE:
if allow == RNS.Destination.ALLOW_LIST:
if self.__remote_identity in allowed_list:
allowed = True
elif allow == RNS.Destination.ALLOW_ALL:
allowed = True
if allowed:
response = response_generator(path, request_data, request_id, self.__remote_identity, requested_at)
if response != None:
packed_response = umsgpack.packb([request_id, True, response])
if len(packed_response) <= Link.MDU:
RNS.Packet(self, packed_response, RNS.Packet.DATA, context = RNS.Packet.RESPONSE).send()
else:
# TODO: Implement transfer as resource
packed_response = umsgpack.packb([request_id, False, None])
raise Exception("Response transfer as resource not implemented")
except Exception as e:
RNS.log("Error occurred while handling request. The contained exception was: "+str(e), RNS.LOG_ERROR)
elif packet.context == RNS.Packet.RESPONSE:
packed_response = self.decrypt(packet.data)
unpacked_response = umsgpack.unpackb(packed_response)
request_id = unpacked_response[0]
if unpacked_response[1] == True:
remove = None
for pending_request in self.pending_requests:
if pending_request.request_id == request_id:
response_data = unpacked_response[2]
remove = pending_request
try:
pending_request.response_received(response_data)
except Exception as e:
RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR)
break
if remove != None:
self.pending_requests.remove(remove)
else:
# TODO: Implement receiving responses as Resources
raise Exception("Response transfer as resource not implemented")
elif packet.context == RNS.Packet.LRRTT:
if not self.initiator:
self.rtt_packet(packet)
@ -691,4 +776,58 @@ class Link:
return self.__encryption_disabled
def __str__(self):
return RNS.prettyhexrep(self.link_id)
return RNS.prettyhexrep(self.link_id)
class RequestReceipt():
FAILED = 0x00
SENT = 0x01
DELIVERED = 0x02
READY = 0x03
def __init__(self, link, packet_receipt, response_callback = None, failed_callback = None):
self.hash = packet_receipt.truncated_hash
self.link = link
self.request_id = self.hash
self.response = None
self.status = RequestReceipt.SENT
self.sent_at = time.time()
self.timeout = RNS.Packet.TIMEOUT
self.concluded_at = None
self.callbacks = RequestReceiptCallbacks()
self.callbacks.response = response_callback
self.callbacks.failed = failed_callback
self.packet_receipt = packet_receipt
self.packet_receipt.set_timeout_callback(self.request_timed_out)
self.link.pending_requests.append(self)
def request_timed_out(self, packet_receipt):
self.status = RequestReceipt.FAILED
self.concluded_at = time.time()
self.link.pending_requests.remove(self)
if self.callbacks.failed != None:
self.callbacks.failed(self)
def response_received(self, response):
self.response = response
self.packet_receipt.status = RNS.PacketReceipt.DELIVERED
self.packet_receipt.proved = True
self.packet_receipt.concluded_at = time.time()
if self.packet_receipt.callbacks.delivery != None:
self.packet_receipt.callbacks.delivery(self)
if self.callbacks.response != None:
self.callbacks.response(self)
class RequestReceiptCallbacks:
def __init__(self):
self.response = None
self.failed = None