diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index 8d43e31..5af55e1 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -1,6 +1,7 @@ import os import math import time +import random import atexit import threading import RNS @@ -554,6 +555,7 @@ class LXMFPropagationAnnounceHandler: class LXMPeer: OFFER_REQUEST_PATH = "/offer" + MESSAGE_GET_PATH = "/get" IDLE = 0x00 LINK_ESTABLISHING = 0x01 @@ -699,7 +701,8 @@ class LXMPeer: lxm_list = [] - for file_path in wanted_messages: + for message_entry in wanted_messages: + file_path = message_entry[1] # TODO: Remove RNS.log("Loading "+str(file_path)+" for transfer") file = open(file_path, "rb") @@ -737,8 +740,13 @@ class LXMPeer: self.state = LXMPeer.IDLE def handle_message(self, transient_id): - if not transient_id in self.handled_messages and not self.unhandled_messages: + if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages: + # TODO: Remove + RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was added to distribution queue for "+RNS.prettyhexrep(self.destination_hash)) self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id] + else: + # TODO: Remove + RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was NOT added to distribution queue for "+RNS.prettyhexrep(self.destination_hash)) class LXMRouter: @@ -751,7 +759,21 @@ class LXMRouter: AUTOPEER = True AUTOPEER_MAXDEPTH = 4 + PR_PATH_TIMEOUT = 10 + + PR_IDLE = 0x00 + PR_PATH_REQUESTED = 0x01 + PR_LINK_ESTABLISHING = 0x02 + PR_LINK_ESTABLISHED = 0x03 + PR_REQUEST_SENT = 0x04 + PR_RESPONSE_RECEIVED = 0x05 + PR_COMPLETE = 0x06 + + PR_ALL_MESSAGES = 0x00 + def __init__(self, identity = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH): + random.seed(os.urandom(10)) + self.pending_inbound = [] self.pending_outbound = [] self.failed_outbound = [] @@ -767,6 +789,12 @@ class LXMRouter: self.outbound_propagation_node = None self.outbound_propagation_link = None + self.wants_download_on_path_available_from = None + self.wants_download_on_path_available_to = None + self.propagation_transfer_state = LXMRouter.PR_IDLE + self.propagation_transfer_progress = 0.0 + self.propagation_transfer_last_result = None + if identity == None: identity = RNS.Identity() @@ -841,6 +869,140 @@ class LXMRouter: def get_outbound_propagation_node(self): return self.outbound_propagation_node + def cancel_propagation_node_requests(self): + if self.outbound_propagation_link != None: + self.outbound_propagation_link.teardown() + self.outbound_propagation_link = None + + self.acknowledge_sync_completion() + + def acknowledge_sync_completion(self): + self.propagation_transfer_state = LXMRouter.PR_IDLE + self.propagation_transfer_progress = 0.0 + self.propagation_transfer_last_result = None + self.wants_download_on_path_available_from = None + self.wants_download_on_path_available_to = None + + def request_messages_from_propagation_node(self, identity, max_messages = PR_ALL_MESSAGES): + self.propagation_transfer_progress = 0.0 + if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.ACTIVE: + self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHED + self.outbound_propagation_link.identify(identity) + self.outbound_propagation_link.request( + LXMPeer.MESSAGE_GET_PATH, + [None, None], + response_callback=self.message_list_response, + failed_callback=self.message_get_failed + ) + else: + if self.outbound_propagation_link == None: + if RNS.Transport.has_path(self.outbound_propagation_node): + self.wants_download_on_path_available_from = None + self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHING + RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for message download", RNS.LOG_DEBUG) + propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) + propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + def msg_request_established_callback(link): + self.request_messages_from_propagation_node(identity) + + self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=msg_request_established_callback) + else: + RNS.log("No path known for message download from propagation node "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) + RNS.Transport.request_path(self.outbound_propagation_node) + self.wants_download_on_path_available_from = self.outbound_propagation_node + self.wants_download_on_path_available_to = identity + self.wants_download_on_path_available_timeout = time.time() + LXMRouter.PR_PATH_TIMEOUT + self.propagation_transfer_state = LXMRouter.PR_PATH_REQUESTED + self.request_messages_path_job() + else: + # TODO: Remove + RNS.log("Waiting for propagation node link to become active", RNS.LOG_DEBUG) + + + def request_messages_path_job(self): + job_thread = threading.Thread(target=self.__request_messages_path_job) + job_thread.setDaemon(True) + job_thread.start() + + def __request_messages_path_job(self): + while not RNS.Transport.has_path(self.wants_download_on_path_available_from) and time.time() < self.wants_download_on_path_available_timeout: + time.sleep(0.1) + + if RNS.Transport.has_path(self.wants_download_on_path_available_from): + self.request_messages_from_propagation_node(self.wants_download_on_path_available_to) + else: + RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG) + self.acknowledge_sync_completion() + + + def has_message(self, transient_id): + return False + + def message_get_failed(self, request_receipt): + # TODO: Remove or change + RNS.log("Message list/get request failed", RNS.LOG_DEBUG) + if self.outbound_propagation_link != None: + self.outbound_propagation_link.teardown() + + def message_list_response(self, request_receipt): + # TODO: Remove + RNS.log("Got message list response from propagation node") + if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: + RNS.log("Propagation node indicated missing identification on reuquest", RNS.LOG_DEBUG) + if self.outbound_propagation_link != None: + self.outbound_propagation_link.teardown() + else: + # TODO: Remove + if request_receipt.response != None: + RNS.log("Received message list from node:", RNS.LOG_DEBUG) + haves = [] + wants = [] + if len(request_receipt.response) > 0: + for transient_id in request_receipt.response: + RNS.log(" "+RNS.prettyhexrep(transient_id), RNS.LOG_DEBUG) + if self.has_message(transient_id): + haves.append(transient_id) + else: + wants.append(transient_id) + + request_receipt.link.request( + LXMPeer.MESSAGE_GET_PATH, + [wants, haves], + response_callback=self.message_get_response, + failed_callback=self.message_get_failed, + progress_callback=self.message_get_progress + ) + else: + # TODO: Remove + RNS.log("No messages on node", RNS.LOG_DEBUG) + self.propagation_transfer_state = LXMRouter.PR_COMPLETE + self.propagation_transfer_progress = 1.0 + self.propagation_transfer_last_result = 0 + + def message_get_progress(self, request_receipt): + self.propagation_transfer_progress = request_receipt.get_progress() + RNS.log("Progress: "+str(self.propagation_transfer_progress)) + + def message_get_response(self, request_receipt): + # TODO: Remove + RNS.log("Got message download response from propagation node") + if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: + RNS.log("Propagation node indicated missing identification on reuquest", RNS.LOG_DEBUG) + if self.outbound_propagation_link != None: + self.outbound_propagation_link.teardown() + else: + if request_receipt.response != None and len(request_receipt.response) > 0: + for lxmf_data in request_receipt.response: + self.lxmf_propagation(lxmf_data) + else: + RNS.log("No LXMF data received in message download response", RNS.LOG_DEBUG) + + # TODO: Remove + RNS.log("No messages on node", RNS.LOG_DEBUG) + self.propagation_transfer_state = LXMRouter.PR_COMPLETE + self.propagation_transfer_progress = 1.0 + self.propagation_transfer_last_result = len(request_receipt.response) + def announce(self, destination_hash): if destination_hash in self.delivery_destinations: delivery_destination = self.delivery_destinations[destination_hash] @@ -957,9 +1119,14 @@ class LXMRouter: for serialised_peer in serialised_peers: peer = LXMPeer.from_bytes(serialised_peer, self) - self.peers[peer.destination_hash] = peer - # TODO: Remove - RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)) + if peer.identity != None: + self.peers[peer.destination_hash] = peer + + # TODO: Remove + RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages") + else: + RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.") + self.propagation_node = True @@ -967,6 +1134,7 @@ class LXMRouter: self.propagation_destination.set_packet_callback(self.propagation_packet) self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) + self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL) self.announce_propagation_node() @@ -1013,6 +1181,82 @@ class LXMRouter: RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG) return None + def message_get_request(self, path, data, request_id, remote_identity, requested_at): + # TODO: Remove + RNS.log("Message get request") + + if remote_identity == None: + return LXMPeer.ERROR_NO_IDENTITY + else: + try: + # TODO: Remove + RNS.log(str(data)) + + remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery") + + # If both want and have fields are empty, send a list of + # available messages. + if data[0] == None and data[1] == None: + available_messages = [] + for transient_id in self.propagation_entries: + message_entry = self.propagation_entries[transient_id] + if message_entry[0] == remote_destination.hash: + message_size = os.path.getsize(message_entry[1]) + available_entry = [transient_id, message_size] + available_messages.append(available_entry) + + available_messages.sort(key=lambda e: e[1], reverse=False) + + # TODO: Remove + for am in available_messages: + RNS.log("Msg size: "+str(am[1])) + ############## + + transient_ids = [] + for available_entry in available_messages: + transient_ids.append(available_entry[0]) + + return transient_ids + + else: + # Process messages the client already have + if data[1] != None and len(data[1]) > 0: + for transient_id in data[1]: + if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash: + try: + filepath = self.propagation_entries[transient_id][1] + self.propagation_entries.remove(transient_id) + os.unlink(filepath) + RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG) + + except Exception as e: + RNS.log("Error while processing message purge request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + + + # Process wanted messages + response_messages = [] + if data[0] != None and len(data[0]) > 0: + for transient_id in data[0]: + if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash: + try: + filepath = self.propagation_entries[transient_id][1] + RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" requested message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG) + + message_file = open(filepath, "rb") + lxmf_data = message_file.read() + response_messages.append(lxmf_data) + message_file.close() + + except Exception as e: + RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + + return response_messages + + + except Exception as e: + RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG) + return None + def propagation_link_established(self, link): link.set_packet_callback(self.propagation_packet) link.set_resource_strategy(RNS.Link.ACCEPT_ALL) @@ -1040,7 +1284,7 @@ class LXMRouter: def propagation_resource_concluded(self, resource): - RNS.log("Transfer concluded for propagation resource "+str(resource), RNS.LOG_DEBUG) + RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG) if resource.status == RNS.Resource.COMPLETE: # TODO: The peer this was received from should # have the transient id added to it's list of @@ -1124,8 +1368,10 @@ class LXMRouter: peer.last_heard = time.time() else: peer = LXMPeer(self, destination_hash) + peer.alive = True + peer.last_heard = time.time() self.peers[destination_hash] = peer - RNS.log(str(self.propagation_destination)+" peered with "+str(peer.destination)) + RNS.log("Peered with "+str(peer.destination)) def unpeer(self, destination_hash, timestamp = None): if timestamp == None: @@ -1148,6 +1394,7 @@ class LXMRouter: JOB_OUTBOUND_INTERVAL = 1 JOB_LINKS_INTERVAL = 1 + JOB_PEERSYNC_INTERVAL = 12 def jobs(self): if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: self.process_outbound() @@ -1155,6 +1402,9 @@ class LXMRouter: if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: self.clean_links() + if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: + self.sync_peers() + self.processing_count += 1 @@ -1172,6 +1422,29 @@ class LXMRouter: cleaned_link = self.direct_links.pop(link_hash) RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG) + if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED: + self.outbound_propagation_link = None + self.acknowledge_sync_completion() + RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG) + + + def sync_peers(self): + waiting_peers = [] + for peer_id in self.peers: + peer = self.peers[peer_id] + if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: + waiting_peers.append(peer) + + if len(waiting_peers) > 0: + # TODO: Remove + RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG) + selected_index = random.randint(0,len(waiting_peers)-1) + selected_peer = waiting_peers[selected_index] + # TODO: Remove + RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG) + selected_peer.sync() + + def fail_message(self, lxmessage): RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)