Implemented LXMF propagation client request handlers

This commit is contained in:
Mark Qvist 2021-10-07 18:12:54 +02:00
parent f5cb49b46b
commit b675e1c47e

View File

@ -1,6 +1,7 @@
import os import os
import math import math
import time import time
import random
import atexit import atexit
import threading import threading
import RNS import RNS
@ -554,6 +555,7 @@ class LXMFPropagationAnnounceHandler:
class LXMPeer: class LXMPeer:
OFFER_REQUEST_PATH = "/offer" OFFER_REQUEST_PATH = "/offer"
MESSAGE_GET_PATH = "/get"
IDLE = 0x00 IDLE = 0x00
LINK_ESTABLISHING = 0x01 LINK_ESTABLISHING = 0x01
@ -699,7 +701,8 @@ class LXMPeer:
lxm_list = [] lxm_list = []
for file_path in wanted_messages: for message_entry in wanted_messages:
file_path = message_entry[1]
# TODO: Remove # TODO: Remove
RNS.log("Loading "+str(file_path)+" for transfer") RNS.log("Loading "+str(file_path)+" for transfer")
file = open(file_path, "rb") file = open(file_path, "rb")
@ -737,8 +740,13 @@ class LXMPeer:
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def handle_message(self, transient_id): def handle_message(self, transient_id):
if not transient_id in self.handled_messages and not self.unhandled_messages: if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
# TODO: Remove
RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was added to distribution queue for "+RNS.prettyhexrep(self.destination_hash))
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id] self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
else:
# TODO: Remove
RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was NOT added to distribution queue for "+RNS.prettyhexrep(self.destination_hash))
class LXMRouter: class LXMRouter:
@ -751,7 +759,21 @@ class LXMRouter:
AUTOPEER = True AUTOPEER = True
AUTOPEER_MAXDEPTH = 4 AUTOPEER_MAXDEPTH = 4
PR_PATH_TIMEOUT = 10
PR_IDLE = 0x00
PR_PATH_REQUESTED = 0x01
PR_LINK_ESTABLISHING = 0x02
PR_LINK_ESTABLISHED = 0x03
PR_REQUEST_SENT = 0x04
PR_RESPONSE_RECEIVED = 0x05
PR_COMPLETE = 0x06
PR_ALL_MESSAGES = 0x00
def __init__(self, identity = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH): def __init__(self, identity = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH):
random.seed(os.urandom(10))
self.pending_inbound = [] self.pending_inbound = []
self.pending_outbound = [] self.pending_outbound = []
self.failed_outbound = [] self.failed_outbound = []
@ -767,6 +789,12 @@ class LXMRouter:
self.outbound_propagation_node = None self.outbound_propagation_node = None
self.outbound_propagation_link = None self.outbound_propagation_link = None
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
self.propagation_transfer_state = LXMRouter.PR_IDLE
self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = None
if identity == None: if identity == None:
identity = RNS.Identity() identity = RNS.Identity()
@ -841,6 +869,140 @@ class LXMRouter:
def get_outbound_propagation_node(self): def get_outbound_propagation_node(self):
return self.outbound_propagation_node return self.outbound_propagation_node
def cancel_propagation_node_requests(self):
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
self.outbound_propagation_link = None
self.acknowledge_sync_completion()
def acknowledge_sync_completion(self):
self.propagation_transfer_state = LXMRouter.PR_IDLE
self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = None
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
def request_messages_from_propagation_node(self, identity, max_messages = PR_ALL_MESSAGES):
self.propagation_transfer_progress = 0.0
if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.ACTIVE:
self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHED
self.outbound_propagation_link.identify(identity)
self.outbound_propagation_link.request(
LXMPeer.MESSAGE_GET_PATH,
[None, None],
response_callback=self.message_list_response,
failed_callback=self.message_get_failed
)
else:
if self.outbound_propagation_link == None:
if RNS.Transport.has_path(self.outbound_propagation_node):
self.wants_download_on_path_available_from = None
self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHING
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for message download", RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
def msg_request_established_callback(link):
self.request_messages_from_propagation_node(identity)
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=msg_request_established_callback)
else:
RNS.log("No path known for message download from propagation node "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node)
self.wants_download_on_path_available_from = self.outbound_propagation_node
self.wants_download_on_path_available_to = identity
self.wants_download_on_path_available_timeout = time.time() + LXMRouter.PR_PATH_TIMEOUT
self.propagation_transfer_state = LXMRouter.PR_PATH_REQUESTED
self.request_messages_path_job()
else:
# TODO: Remove
RNS.log("Waiting for propagation node link to become active", RNS.LOG_DEBUG)
def request_messages_path_job(self):
job_thread = threading.Thread(target=self.__request_messages_path_job)
job_thread.setDaemon(True)
job_thread.start()
def __request_messages_path_job(self):
while not RNS.Transport.has_path(self.wants_download_on_path_available_from) and time.time() < self.wants_download_on_path_available_timeout:
time.sleep(0.1)
if RNS.Transport.has_path(self.wants_download_on_path_available_from):
self.request_messages_from_propagation_node(self.wants_download_on_path_available_to)
else:
RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG)
self.acknowledge_sync_completion()
def has_message(self, transient_id):
return False
def message_get_failed(self, request_receipt):
# TODO: Remove or change
RNS.log("Message list/get request failed", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
def message_list_response(self, request_receipt):
# TODO: Remove
RNS.log("Got message list response from propagation node")
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Propagation node indicated missing identification on reuquest", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
else:
# TODO: Remove
if request_receipt.response != None:
RNS.log("Received message list from node:", RNS.LOG_DEBUG)
haves = []
wants = []
if len(request_receipt.response) > 0:
for transient_id in request_receipt.response:
RNS.log(" "+RNS.prettyhexrep(transient_id), RNS.LOG_DEBUG)
if self.has_message(transient_id):
haves.append(transient_id)
else:
wants.append(transient_id)
request_receipt.link.request(
LXMPeer.MESSAGE_GET_PATH,
[wants, haves],
response_callback=self.message_get_response,
failed_callback=self.message_get_failed,
progress_callback=self.message_get_progress
)
else:
# TODO: Remove
RNS.log("No messages on node", RNS.LOG_DEBUG)
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = 0
def message_get_progress(self, request_receipt):
self.propagation_transfer_progress = request_receipt.get_progress()
RNS.log("Progress: "+str(self.propagation_transfer_progress))
def message_get_response(self, request_receipt):
# TODO: Remove
RNS.log("Got message download response from propagation node")
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Propagation node indicated missing identification on reuquest", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
else:
if request_receipt.response != None and len(request_receipt.response) > 0:
for lxmf_data in request_receipt.response:
self.lxmf_propagation(lxmf_data)
else:
RNS.log("No LXMF data received in message download response", RNS.LOG_DEBUG)
# TODO: Remove
RNS.log("No messages on node", RNS.LOG_DEBUG)
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = len(request_receipt.response)
def announce(self, destination_hash): def announce(self, destination_hash):
if destination_hash in self.delivery_destinations: if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash] delivery_destination = self.delivery_destinations[destination_hash]
@ -957,9 +1119,14 @@ class LXMRouter:
for serialised_peer in serialised_peers: for serialised_peer in serialised_peers:
peer = LXMPeer.from_bytes(serialised_peer, self) peer = LXMPeer.from_bytes(serialised_peer, self)
self.peers[peer.destination_hash] = peer if peer.identity != None:
# TODO: Remove self.peers[peer.destination_hash] = peer
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash))
# TODO: Remove
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages")
else:
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.")
self.propagation_node = True self.propagation_node = True
@ -967,6 +1134,7 @@ class LXMRouter:
self.propagation_destination.set_packet_callback(self.propagation_packet) self.propagation_destination.set_packet_callback(self.propagation_packet)
self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)
self.announce_propagation_node() self.announce_propagation_node()
@ -1013,6 +1181,82 @@ class LXMRouter:
RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG) RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None return None
def message_get_request(self, path, data, request_id, remote_identity, requested_at):
# TODO: Remove
RNS.log("Message get request")
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
else:
try:
# TODO: Remove
RNS.log(str(data))
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery")
# If both want and have fields are empty, send a list of
# available messages.
if data[0] == None and data[1] == None:
available_messages = []
for transient_id in self.propagation_entries:
message_entry = self.propagation_entries[transient_id]
if message_entry[0] == remote_destination.hash:
message_size = os.path.getsize(message_entry[1])
available_entry = [transient_id, message_size]
available_messages.append(available_entry)
available_messages.sort(key=lambda e: e[1], reverse=False)
# TODO: Remove
for am in available_messages:
RNS.log("Msg size: "+str(am[1]))
##############
transient_ids = []
for available_entry in available_messages:
transient_ids.append(available_entry[0])
return transient_ids
else:
# Process messages the client already have
if data[1] != None and len(data[1]) > 0:
for transient_id in data[1]:
if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash:
try:
filepath = self.propagation_entries[transient_id][1]
self.propagation_entries.remove(transient_id)
os.unlink(filepath)
RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Error while processing message purge request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
# Process wanted messages
response_messages = []
if data[0] != None and len(data[0]) > 0:
for transient_id in data[0]:
if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash:
try:
filepath = self.propagation_entries[transient_id][1]
RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" requested message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)
message_file = open(filepath, "rb")
lxmf_data = message_file.read()
response_messages.append(lxmf_data)
message_file.close()
except Exception as e:
RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
return response_messages
except Exception as e:
RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
def propagation_link_established(self, link): def propagation_link_established(self, link):
link.set_packet_callback(self.propagation_packet) link.set_packet_callback(self.propagation_packet)
link.set_resource_strategy(RNS.Link.ACCEPT_ALL) link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
@ -1040,7 +1284,7 @@ class LXMRouter:
def propagation_resource_concluded(self, resource): def propagation_resource_concluded(self, resource):
RNS.log("Transfer concluded for propagation resource "+str(resource), RNS.LOG_DEBUG) RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
# TODO: The peer this was received from should # TODO: The peer this was received from should
# have the transient id added to it's list of # have the transient id added to it's list of
@ -1124,8 +1368,10 @@ class LXMRouter:
peer.last_heard = time.time() peer.last_heard = time.time()
else: else:
peer = LXMPeer(self, destination_hash) peer = LXMPeer(self, destination_hash)
peer.alive = True
peer.last_heard = time.time()
self.peers[destination_hash] = peer self.peers[destination_hash] = peer
RNS.log(str(self.propagation_destination)+" peered with "+str(peer.destination)) RNS.log("Peered with "+str(peer.destination))
def unpeer(self, destination_hash, timestamp = None): def unpeer(self, destination_hash, timestamp = None):
if timestamp == None: if timestamp == None:
@ -1148,6 +1394,7 @@ class LXMRouter:
JOB_OUTBOUND_INTERVAL = 1 JOB_OUTBOUND_INTERVAL = 1
JOB_LINKS_INTERVAL = 1 JOB_LINKS_INTERVAL = 1
JOB_PEERSYNC_INTERVAL = 12
def jobs(self): def jobs(self):
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
self.process_outbound() self.process_outbound()
@ -1155,6 +1402,9 @@ class LXMRouter:
if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
self.clean_links() self.clean_links()
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
self.sync_peers()
self.processing_count += 1 self.processing_count += 1
@ -1172,6 +1422,29 @@ class LXMRouter:
cleaned_link = self.direct_links.pop(link_hash) cleaned_link = self.direct_links.pop(link_hash)
RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG) RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG)
if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED:
self.outbound_propagation_link = None
self.acknowledge_sync_completion()
RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG)
def sync_peers(self):
waiting_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
waiting_peers.append(peer)
if len(waiting_peers) > 0:
# TODO: Remove
RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)
selected_index = random.randint(0,len(waiting_peers)-1)
selected_peer = waiting_peers[selected_index]
# TODO: Remove
RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG)
selected_peer.sync()
def fail_message(self, lxmessage): def fail_message(self, lxmessage):
RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG) RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)