From 2c71cea7a0d2fc0a3ab5bbd26883befb5a0dd9fc Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 23 Jan 2025 14:13:08 +0100 Subject: [PATCH] Added local node stats request handler --- LXMF/LXMRouter.py | 134 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 131 insertions(+), 3 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 5465356..22ef3ac 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -64,6 +64,8 @@ class LXMRouter: PR_ALL_MESSAGES = 0x00 + STATS_GET_PATH = "/pn/get/stats" + ### Developer-facing API ############################## ####################################################### @@ -92,6 +94,7 @@ class LXMRouter: self.processing_count = 0 self.propagation_node = False + self.propagation_node_start_time = None if storagepath == None: raise ValueError("LXMF cannot be initialised without a storage path") @@ -135,6 +138,11 @@ class LXMRouter: self.identity = identity self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation") + self.control_destination = None + self.client_propagation_messages_received = 0 + self.client_propagation_messages_served = 0 + self.unpeered_propagation_incoming = 0 + self.unpeered_propagation_rx_bytes = 0 if autopeer != None: self.autopeer = autopeer @@ -541,13 +549,35 @@ class LXMRouter: RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE) + try: + if os.path.isfile(self.storagepath+"/node_stats"): + node_stats_file = open(self.storagepath+"/node_stats", "rb") + data = node_stats_file.read() + node_stats_file.close() + node_stats = msgpack.unpackb(data) + + if not type(node_stats) == dict: + RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR) + else: + self.client_propagation_messages_received = node_stats["client_propagation_messages_received"] + self.client_propagation_messages_served = node_stats["client_propagation_messages_served"] + self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"] + self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"] + + except Exception as e: + RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR) + self.propagation_node = True + self.propagation_node_start_time = time.time() self.propagation_destination.set_link_established_callback(self.propagation_link_established) self.propagation_destination.set_packet_callback(self.propagation_packet) self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL) + self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") + self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash]) + if self.message_storage_limit != None: limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit) else: @@ -650,6 +680,76 @@ class LXMRouter: return False + ### Propagation Node Control ########################## + ####################################################### + + def compile_stats(self): + if not self.propagation_node: + return None + else: + peer_stats = {} + for peer_id in self.peers.copy(): + peer = self.peers[peer_id] + peer_stats[peer_id] = { + "type": "static" if peer_id in self.static_peers else "discovered", + "state": peer.state, + "alive": peer.alive, + "last_heard": int(peer.last_heard), + "next_sync_attempt": peer.next_sync_attempt, + "last_sync_attempt": peer.last_sync_attempt, + "sync_backoff": peer.sync_backoff, + "peering_timebase": peer.peering_timebase, + "ler": int(peer.link_establishment_rate), + "str": int(peer.sync_transfer_rate), + "transfer_limit": peer.propagation_transfer_limit, + "network_distance": RNS.Transport.hops_to(peer_id), + "rx_bytes": peer.rx_bytes, + "tx_bytes": peer.tx_bytes, + "messages": { + "offered": peer.offered, + "outgoing": peer.outgoing, + "incoming": peer.incoming, + }, + } + + node_stats = { + "identity_hash": self.identity.hash, + "destination_hash": self.propagation_destination.hash, + "uptime": time.time()-self.propagation_node_start_time, + "delivery_limit": self.delivery_per_transfer_limit, + "propagation_limit": self.propagation_per_transfer_limit, + "autopeer_maxdepth": self.autopeer_maxdepth, + "from_static_only": self.from_static_only, + "messagestore": { + "count": len(self.propagation_entries), + "bytes": self.message_storage_size(), + "limit": self.message_storage_limit, + }, + "clients" : { + "client_propagation_messages_received": self.client_propagation_messages_received, + "client_propagation_messages_served": self.client_propagation_messages_served, + }, + "unpeered_propagation_incoming": self.unpeered_propagation_incoming, + "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes, + "static_peers": len(self.static_peers), + "discovered_peers": len(self.peers)-len(self.static_peers), + "total_peers": len(self.peers), + "max_peers": self.max_peers, + "peers": peer_stats, + } + + return node_stats + + def stats_get_request(self, path, data, request_id, remote_identity, requested_at): + RNS.log("Stats request", RNS.LOG_DEBUG) # TODO: Remove debug + if remote_identity == None: + return LXMPeer.ERROR_NO_IDENTITY + elif remote_identity.hash != self.identity.hash: + return LXMPeer.ERROR_NO_ACCESS + else: + return self.compile_stats() + + ### Utility & Maintenance ############################# ####################################################### @@ -970,7 +1070,7 @@ class LXMRouter: try: if len(self.locally_delivered_transient_ids) > 0: if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) + os.makedirs(self.storagepath) with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file: locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) @@ -982,7 +1082,7 @@ class LXMRouter: try: if len(self.locally_processed_transient_ids) > 0: if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) + os.makedirs(self.storagepath) with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file: locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) @@ -990,6 +1090,24 @@ class LXMRouter: except Exception as e: RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + def save_node_stats(self): + try: + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) + + with open(self.storagepath+"/node_stats", "wb") as stats_file: + node_stats = { + "client_propagation_messages_received": self.client_propagation_messages_received, + "client_propagation_messages_served": self.client_propagation_messages_served, + "unpeered_propagation_incoming": self.unpeered_propagation_incoming, + "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes, + } + stats_file.write(msgpack.packb(node_stats)) + + except Exception as e: + RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + + def clean_outbound_stamp_costs(self): try: expired = [] @@ -1106,6 +1224,7 @@ class LXMRouter: self.propagation_destination.set_packet_callback(None) self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH) self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH) + self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH) for link in self.active_propagation_links: try: if link.status == RNS.Link.ACTIVE: @@ -1135,6 +1254,7 @@ class LXMRouter: self.save_locally_delivered_transient_ids() self.save_locally_processed_transient_ids() + self.save_node_stats() def sigint_handler(self, signal, frame): if not self.exit_handler_running: @@ -1263,6 +1383,7 @@ class LXMRouter: except Exception as e: RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + self.client_propagation_messages_served += len(response_messages) return response_messages @@ -1777,6 +1898,7 @@ class LXMRouter: messages = data[1] for lxmf_data in messages: self.lxmf_propagation(lxmf_data) + self.client_propagation_messages_received += 1 packet.prove() @@ -1849,7 +1971,7 @@ class LXMRouter: remote_str = f"peer {remote_str}" messages = data[1] - RNS.log(f"Received {len(messages)} messages from {remote_str}", RNS.LOG_VERBOSE) + RNS.log(f"Received {len(messages)} message{"" if len(messages) == 1 else "s"} from {remote_str}", RNS.LOG_VERBOSE) for lxmf_data in messages: peer = None transient_id = RNS.Identity.full_hash(lxmf_data) @@ -1857,6 +1979,12 @@ class LXMRouter: peer = self.peers[remote_hash] peer.incoming += 1 peer.rx_bytes += len(lxmf_data) + else: + if remote_identity != None: + self.unpeered_propagation_incoming += 1 + self.unpeered_propagation_rx_bytes += len(lxmf_data) + else: + self.client_propagation_messages_received += 1 self.lxmf_propagation(lxmf_data, from_peer=peer) if peer != None: