Added local node stats request handler

This commit is contained in:
Mark Qvist 2025-01-23 14:13:08 +01:00
parent 61b1ecce27
commit 2c71cea7a0

View File

@ -64,6 +64,8 @@ class LXMRouter:
PR_ALL_MESSAGES = 0x00
STATS_GET_PATH = "/pn/get/stats"
### Developer-facing API ##############################
#######################################################
@ -92,6 +94,7 @@ class LXMRouter:
self.processing_count = 0
self.propagation_node = False
self.propagation_node_start_time = None
if storagepath == None:
raise ValueError("LXMF cannot be initialised without a storage path")
@ -135,6 +138,11 @@ class LXMRouter:
self.identity = identity
self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.control_destination = None
self.client_propagation_messages_received = 0
self.client_propagation_messages_served = 0
self.unpeered_propagation_incoming = 0
self.unpeered_propagation_rx_bytes = 0
if autopeer != None:
self.autopeer = autopeer
@ -541,13 +549,35 @@ class LXMRouter:
RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
try:
if os.path.isfile(self.storagepath+"/node_stats"):
node_stats_file = open(self.storagepath+"/node_stats", "rb")
data = node_stats_file.read()
node_stats_file.close()
node_stats = msgpack.unpackb(data)
if not type(node_stats) == dict:
RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR)
else:
self.client_propagation_messages_received = node_stats["client_propagation_messages_received"]
self.client_propagation_messages_served = node_stats["client_propagation_messages_served"]
self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"]
self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"]
except Exception as e:
RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.propagation_node = True
self.propagation_node_start_time = time.time()
self.propagation_destination.set_link_established_callback(self.propagation_link_established)
self.propagation_destination.set_packet_callback(self.propagation_packet)
self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)
self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash])
if self.message_storage_limit != None:
limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit)
else:
@ -650,6 +680,76 @@ class LXMRouter:
return False
### Propagation Node Control ##########################
#######################################################
def compile_stats(self):
if not self.propagation_node:
return None
else:
peer_stats = {}
for peer_id in self.peers.copy():
peer = self.peers[peer_id]
peer_stats[peer_id] = {
"type": "static" if peer_id in self.static_peers else "discovered",
"state": peer.state,
"alive": peer.alive,
"last_heard": int(peer.last_heard),
"next_sync_attempt": peer.next_sync_attempt,
"last_sync_attempt": peer.last_sync_attempt,
"sync_backoff": peer.sync_backoff,
"peering_timebase": peer.peering_timebase,
"ler": int(peer.link_establishment_rate),
"str": int(peer.sync_transfer_rate),
"transfer_limit": peer.propagation_transfer_limit,
"network_distance": RNS.Transport.hops_to(peer_id),
"rx_bytes": peer.rx_bytes,
"tx_bytes": peer.tx_bytes,
"messages": {
"offered": peer.offered,
"outgoing": peer.outgoing,
"incoming": peer.incoming,
},
}
node_stats = {
"identity_hash": self.identity.hash,
"destination_hash": self.propagation_destination.hash,
"uptime": time.time()-self.propagation_node_start_time,
"delivery_limit": self.delivery_per_transfer_limit,
"propagation_limit": self.propagation_per_transfer_limit,
"autopeer_maxdepth": self.autopeer_maxdepth,
"from_static_only": self.from_static_only,
"messagestore": {
"count": len(self.propagation_entries),
"bytes": self.message_storage_size(),
"limit": self.message_storage_limit,
},
"clients" : {
"client_propagation_messages_received": self.client_propagation_messages_received,
"client_propagation_messages_served": self.client_propagation_messages_served,
},
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
"static_peers": len(self.static_peers),
"discovered_peers": len(self.peers)-len(self.static_peers),
"total_peers": len(self.peers),
"max_peers": self.max_peers,
"peers": peer_stats,
}
return node_stats
def stats_get_request(self, path, data, request_id, remote_identity, requested_at):
RNS.log("Stats request", RNS.LOG_DEBUG) # TODO: Remove debug
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
elif remote_identity.hash != self.identity.hash:
return LXMPeer.ERROR_NO_ACCESS
else:
return self.compile_stats()
### Utility & Maintenance #############################
#######################################################
@ -970,7 +1070,7 @@ class LXMRouter:
try:
if len(self.locally_delivered_transient_ids) > 0:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
os.makedirs(self.storagepath)
with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file:
locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids))
@ -982,7 +1082,7 @@ class LXMRouter:
try:
if len(self.locally_processed_transient_ids) > 0:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
os.makedirs(self.storagepath)
with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file:
locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids))
@ -990,6 +1090,24 @@ class LXMRouter:
except Exception as e:
RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def save_node_stats(self):
try:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
with open(self.storagepath+"/node_stats", "wb") as stats_file:
node_stats = {
"client_propagation_messages_received": self.client_propagation_messages_received,
"client_propagation_messages_served": self.client_propagation_messages_served,
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
}
stats_file.write(msgpack.packb(node_stats))
except Exception as e:
RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def clean_outbound_stamp_costs(self):
try:
expired = []
@ -1106,6 +1224,7 @@ class LXMRouter:
self.propagation_destination.set_packet_callback(None)
self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH)
self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH)
self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH)
for link in self.active_propagation_links:
try:
if link.status == RNS.Link.ACTIVE:
@ -1135,6 +1254,7 @@ class LXMRouter:
self.save_locally_delivered_transient_ids()
self.save_locally_processed_transient_ids()
self.save_node_stats()
def sigint_handler(self, signal, frame):
if not self.exit_handler_running:
@ -1263,6 +1383,7 @@ class LXMRouter:
except Exception as e:
RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
self.client_propagation_messages_served += len(response_messages)
return response_messages
@ -1777,6 +1898,7 @@ class LXMRouter:
messages = data[1]
for lxmf_data in messages:
self.lxmf_propagation(lxmf_data)
self.client_propagation_messages_received += 1
packet.prove()
@ -1849,7 +1971,7 @@ class LXMRouter:
remote_str = f"peer {remote_str}"
messages = data[1]
RNS.log(f"Received {len(messages)} messages from {remote_str}", RNS.LOG_VERBOSE)
RNS.log(f"Received {len(messages)} message{"" if len(messages) == 1 else "s"} from {remote_str}", RNS.LOG_VERBOSE)
for lxmf_data in messages:
peer = None
transient_id = RNS.Identity.full_hash(lxmf_data)
@ -1857,6 +1979,12 @@ class LXMRouter:
peer = self.peers[remote_hash]
peer.incoming += 1
peer.rx_bytes += len(lxmf_data)
else:
if remote_identity != None:
self.unpeered_propagation_incoming += 1
self.unpeered_propagation_rx_bytes += len(lxmf_data)
else:
self.client_propagation_messages_received += 1
self.lxmf_propagation(lxmf_data, from_peer=peer)
if peer != None: