diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index 2b10987..f4c522c 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -38,11 +38,16 @@ class LXMPeer: @staticmethod def from_bytes(peer_bytes, router): dictionary = msgpack.unpackb(peer_bytes) + peer_destination_hash = dictionary["destination_hash"] + peer_peering_timebase = dictionary["peering_timebase"] + peer_alive = dictionary["alive"] + peer_last_heard = dictionary["last_heard"] + + peer = LXMPeer(router, peer_destination_hash) + peer.peering_timebase = peer_peering_timebase + peer.alive = peer_alive + peer.last_heard = peer_last_heard - peer = LXMPeer(router, dictionary["destination_hash"]) - peer.peering_timebase = dictionary["peering_timebase"] - peer.alive = dictionary["alive"] - peer.last_heard = dictionary["last_heard"] if "link_establishment_rate" in dictionary: peer.link_establishment_rate = dictionary["link_establishment_rate"] else: @@ -61,13 +66,22 @@ class LXMPeer: else: peer.propagation_transfer_limit = None + hm_count = 0 for transient_id in dictionary["handled_ids"]: if transient_id in router.propagation_entries: - peer.handled_messages.append(transient_id) + peer.add_handled_message(transient_id) + hm_count += 1 + um_count = 0 for transient_id in dictionary["unhandled_ids"]: if transient_id in router.propagation_entries: - peer.unhandled_messages.append(transient_id) + peer.add_unhandled_message(transient_id) + um_count += 1 + + peer._hm_count = hm_count + peer._um_count = um_count + peer._hm_counts_synced = True + peer._um_counts_synced = True del dictionary return peer @@ -93,7 +107,10 @@ class LXMPeer: dictionary["handled_ids"] = handled_ids dictionary["unhandled_ids"] = unhandled_ids - return msgpack.packb(dictionary) + peer_bytes = msgpack.packb(dictionary) + del dictionary + + return peer_bytes def __init__(self, router, destination_hash): self.alive = False @@ -106,11 +123,14 @@ class LXMPeer: self.sync_transfer_rate = 0 self.propagation_transfer_limit = None + self._hm_count = 0 + self._um_count = 0 + self._hm_counts_synced = False + self._um_counts_synced = False + self.link = None self.state = LXMPeer.IDLE - self.unhandled_messages = [] - self.handled_messages = [] self.last_offer = [] self.router = router @@ -173,7 +193,7 @@ class LXMPeer: for transient_id in purged_ids: RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) - self.unhandled_messages.remove(transient_id) + self.remove_unhandled_message(transient_id) unhandled_entries.sort(key=lambda e: e[1], reverse=False) per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now @@ -228,8 +248,8 @@ class LXMPeer: # Peer already has all advertised messages for transient_id in self.last_offer: if transient_id in self.unhandled_messages: - self.handled_messages.append(transient_id) - self.unhandled_messages.remove(transient_id) + self.add_handled_message(transient_id) + self.remove_unhandled_message(transient_id) elif response == True: @@ -244,9 +264,8 @@ class LXMPeer: # If the peer did not want the message, it has # already received it from another peer. if not transient_id in response: - if transient_id in self.unhandled_messages: - self.handled_messages.append(transient_id) - self.unhandled_messages.remove(transient_id) + self.add_handled_message(transient_id) + self.remove_unhandled_message(transient_id) for transient_id in response: wanted_messages.append(self.router.propagation_entries[transient_id]) @@ -292,8 +311,8 @@ class LXMPeer: def resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: for transient_id in resource.transferred_messages: - self.handled_messages.append(transient_id) - self.unhandled_messages.remove(transient_id) + self.add_handled_message(transient_id) + self.remove_unhandled_message(transient_id) if self.link != None: self.link.teardown() @@ -332,9 +351,72 @@ class LXMPeer: self.link = None self.state = LXMPeer.IDLE - def handle_message(self, transient_id): + def new_propagation_message(self, transient_id): if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages: - self.unhandled_messages.append(transient_id) + self.add_unhandled_message(transient_id) + + @property + def handled_messages(self): + pes = self.router.propagation_entries.copy() + hm = list(filter(lambda tid: self.destination_hash in self.router.propagation_entries[tid][4], pes)) + self._hm_count = len(hm); del pes + return hm + + @property + def unhandled_messages(self): + pes = self.router.propagation_entries.copy() + um = list(filter(lambda tid: self.destination_hash in self.router.propagation_entries[tid][5], pes)) + self._um_count = len(um); del pes + return um + + @property + def handled_message_count(self): + if not self._hm_counts_synced: + self._update_counts() + + return self._hm_count + + @property + def unhandled_message_count(self): + if not self._um_counts_synced: + self._update_counts() + + return self._um_count + + def _update_counts(self): + if not self._hm_counts_synced: + RNS.log("UPDATE HM COUNTS") + hm = self.handled_messages; del hm + self._hm_counts_synced = True + + if not self._um_counts_synced: + RNS.log("UPDATE UM COUNTS") + um = self.unhandled_messages; del um + self._um_counts_synced = True + + def add_handled_message(self, transient_id): + if transient_id in self.router.propagation_entries: + if not self.destination_hash in self.router.propagation_entries[transient_id][4]: + self.router.propagation_entries[transient_id][4].append(self.destination_hash) + self._hm_counts_synced = False + + def add_unhandled_message(self, transient_id): + if transient_id in self.router.propagation_entries: + if not self.destination_hash in self.router.propagation_entries[transient_id][5]: + self.router.propagation_entries[transient_id][5].append(self.destination_hash) + self._um_count += 1 + + def remove_handled_message(self, transient_id): + if transient_id in self.router.propagation_entries: + if self.destination_hash in self.router.propagation_entries[transient_id][4]: + self.router.propagation_entries[transient_id][4].remove(self.destination_hash) + self._hm_counts_synced = False + + def remove_unhandled_message(self, transient_id): + if transient_id in self.router.propagation_entries: + if self.destination_hash in self.router.propagation_entries[transient_id][5]: + self.router.propagation_entries[transient_id][5].remove(self.destination_hash) + self._um_counts_synced = False def __str__(self): if self.destination_hash: diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index a19f401..9163824 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -1,9 +1,11 @@ import os +import sys import time import math import random import base64 import atexit +import signal import threading import RNS @@ -94,6 +96,9 @@ class LXMRouter: self.outbound_propagation_node = None self.outbound_propagation_link = None + if delivery_limit == None: + delivery_limit = LXMRouter.DELIVERY_LIMIT + self.message_storage_limit = None self.information_storage_limit = None self.propagation_per_transfer_limit = propagation_limit @@ -117,6 +122,7 @@ class LXMRouter: self.cost_file_lock = threading.Lock() self.ticket_file_lock = threading.Lock() self.stamp_gen_lock = threading.Lock() + self.exit_handler_running = False if identity == None: identity = RNS.Identity() @@ -221,6 +227,8 @@ class LXMRouter: RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) atexit.register(self.exit_handler) + signal.signal(signal.SIGINT, self.sigint_handler) + signal.signal(signal.SIGTERM, self.sigterm_handler) job_thread = threading.Thread(target=self.jobloop) job_thread.setDaemon(True) @@ -446,17 +454,19 @@ class LXMRouter: file.close() self.propagation_entries[transient_id] = [ - destination_hash, - filepath, - received, - msg_size, + destination_hash, # 0: Destination hash + filepath, # 1: Storage location + received, # 2: Receive timestamp + msg_size, # 3: Message size + [], # 4: Handled peers + [], # 5: Unhandled peers ] except Exception as e: RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) et = time.time(); RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {math.floor(len(self.propagation_entries)/(et-st))} msgs/s", RNS.LOG_NOTICE) - st = time.time(); RNS.log("Loading propagation node peers...", RNS.LOG_NOTICE) + st = time.time(); RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE) if os.path.isfile(self.storagepath+"/peers"): peers_file = open(self.storagepath+"/peers", "rb") @@ -465,23 +475,25 @@ class LXMRouter: if len(peers_data) > 0: serialised_peers = msgpack.unpackb(peers_data) + del peers_data - for serialised_peer in serialised_peers: + while len(serialised_peers) > 0: + serialised_peer = serialised_peers.pop() peer = LXMPeer.from_bytes(serialised_peer, self) + del serialised_peer if peer.identity != None: self.peers[peer.destination_hash] = peer lim_str = ", no transfer limit" if peer.propagation_transfer_limit != None: lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit" - RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG) + RNS.log("Rebuilt peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(peer.unhandled_message_count)+" unhandled messages"+lim_str, RNS.LOG_DEBUG) else: - del peer RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) + del peer del serialised_peers - del peers_data - RNS.log(f"Loaded {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE) + RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE) self.propagation_node = True self.propagation_destination.set_link_established_callback(self.propagation_link_established) @@ -602,36 +614,37 @@ class LXMRouter: JOB_STORE_INTERVAL = 120 JOB_PEERSYNC_INTERVAL = 12 def jobs(self): - self.processing_count += 1 + if not self.exit_handler_running: + self.processing_count += 1 - if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: - self.process_outbound() + if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: + self.process_outbound() - if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0: - threading.Thread(target=self.process_deferred_stamps, daemon=True).start() + if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0: + threading.Thread(target=self.process_deferred_stamps, daemon=True).start() - if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: - self.clean_links() + if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: + self.clean_links() - if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0: - self.clean_transient_id_caches() + if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0: + self.clean_transient_id_caches() - if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: - self.clean_message_store() + if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: + self.clean_message_store() - if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: - self.sync_peers() + if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: + self.sync_peers() def jobloop(self): while (True): # TODO: Improve this to scheduling, so manual # triggers can delay next run - try: self.jobs() except Exception as e: RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.trace_exception(e) time.sleep(LXMRouter.PROCESSING_INTERVAL) def clean_links(self): @@ -888,22 +901,24 @@ class LXMRouter: def save_locally_delivered_transient_ids(self): try: - if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) + if len(self.locally_delivered_transient_ids) > 0: + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) - with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file: - locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) + with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file: + locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) except Exception as e: RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) def save_locally_processed_transient_ids(self): try: - if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) + if len(self.locally_processed_transient_ids) > 0: + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) - with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file: - locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) + with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file: + locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) except Exception as e: RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -1001,10 +1016,43 @@ class LXMRouter: RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR) def exit_handler(self): + if self.exit_handler_running: + return + + self.exit_handler_running = True + + RNS.log("Tearing down delivery destinations...", RNS.LOG_NOTICE) + for destination_hash in self.delivery_destinations: + delivery_destination = self.delivery_destinations[destination_hash] + delivery_destination.set_packet_callback(None) + delivery_destination.set_link_established_callback(None) + for link in delivery_destination.links: + try: + if link.status == RNS.Link.ACTIVE: + link.teardown() + except Exception as e: + RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR) + + if self.propagation_node: + RNS.log("Tearing down propagation node destination...", RNS.LOG_NOTICE) + self.propagation_destination.set_link_established_callback(None) + self.propagation_destination.set_packet_callback(None) + self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH) + self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH) + for link in self.active_propagation_links: + try: + if link.status == RNS.Link.ACTIVE: + link.teardown() + except Exception as e: + RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR) + + RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE) if self.propagation_node: try: + st = time.time(); RNS.log("Saving peer synchronisation states to storage...", RNS.LOG_NOTICE) serialised_peers = [] - for peer_id in self.peers: + peer_dict = self.peers.copy() + for peer_id in peer_dict: peer = self.peers[peer_id] serialised_peers.append(peer.to_bytes()) @@ -1012,7 +1060,7 @@ class LXMRouter: peers_file.write(msgpack.packb(serialised_peers)) peers_file.close() - RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG) + RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE) except Exception as e: RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -1020,6 +1068,20 @@ class LXMRouter: self.save_locally_delivered_transient_ids() self.save_locally_processed_transient_ids() + def sigint_handler(self, signal, frame): + if not self.exit_handler_running: + RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING) + sys.exit(0) + else: + RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) + + def sigterm_handler(self, signal, frame): + if not self.exit_handler_running: + RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING) + sys.exit(0) + else: + RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) + def __str__(self): return "" @@ -1685,19 +1747,23 @@ class LXMRouter: messages = data[1] for lxmf_data in messages: + peer = None + transient_id = RNS.Identity.full_hash(lxmf_data) if remote_hash != None and remote_hash in self.peers: - transient_id = RNS.Identity.full_hash(lxmf_data) peer = self.peers[remote_hash] - peer.handled_messages.append(transient_id) - self.lxmf_propagation(lxmf_data) + self.lxmf_propagation(lxmf_data, from_peer=peer) + if peer != None: + peer.add_handled_message(transient_id) + else: RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) except Exception as e: RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) + RNS.trace_exception(e) - def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False): + def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False, from_peer=None): no_stamp_enforcement = False if is_paper_message: no_stamp_enforcement = True @@ -1708,7 +1774,6 @@ class LXMRouter: if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids: received = time.time() - propagation_entry = [transient_id, received, lxmf_data] destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH] self.locally_processed_transient_ids[transient_id] = received @@ -1732,12 +1797,13 @@ class LXMRouter: msg_file.write(lxmf_data) msg_file.close() - self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data)] + self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []] RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) for peer_id in self.peers: peer = self.peers[peer_id] - peer.handle_message(transient_id) + if peer != from_peer: + peer.new_propagation_message(transient_id) else: # TODO: Add message to sneakernet queues when implemented @@ -1757,6 +1823,7 @@ class LXMRouter: except Exception as e: RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG) RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + RNS.trace_exception(e) return False def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None):