Memory optimisations

This commit is contained in:
Mark Qvist 2025-01-21 16:33:39 +01:00
parent 356cb6412f
commit 7701f326d9
2 changed files with 209 additions and 60 deletions

View File

@ -38,11 +38,16 @@ class LXMPeer:
@staticmethod @staticmethod
def from_bytes(peer_bytes, router): def from_bytes(peer_bytes, router):
dictionary = msgpack.unpackb(peer_bytes) dictionary = msgpack.unpackb(peer_bytes)
peer_destination_hash = dictionary["destination_hash"]
peer_peering_timebase = dictionary["peering_timebase"]
peer_alive = dictionary["alive"]
peer_last_heard = dictionary["last_heard"]
peer = LXMPeer(router, peer_destination_hash)
peer.peering_timebase = peer_peering_timebase
peer.alive = peer_alive
peer.last_heard = peer_last_heard
peer = LXMPeer(router, dictionary["destination_hash"])
peer.peering_timebase = dictionary["peering_timebase"]
peer.alive = dictionary["alive"]
peer.last_heard = dictionary["last_heard"]
if "link_establishment_rate" in dictionary: if "link_establishment_rate" in dictionary:
peer.link_establishment_rate = dictionary["link_establishment_rate"] peer.link_establishment_rate = dictionary["link_establishment_rate"]
else: else:
@ -61,13 +66,22 @@ class LXMPeer:
else: else:
peer.propagation_transfer_limit = None peer.propagation_transfer_limit = None
hm_count = 0
for transient_id in dictionary["handled_ids"]: for transient_id in dictionary["handled_ids"]:
if transient_id in router.propagation_entries: if transient_id in router.propagation_entries:
peer.handled_messages.append(transient_id) peer.add_handled_message(transient_id)
hm_count += 1
um_count = 0
for transient_id in dictionary["unhandled_ids"]: for transient_id in dictionary["unhandled_ids"]:
if transient_id in router.propagation_entries: if transient_id in router.propagation_entries:
peer.unhandled_messages.append(transient_id) peer.add_unhandled_message(transient_id)
um_count += 1
peer._hm_count = hm_count
peer._um_count = um_count
peer._hm_counts_synced = True
peer._um_counts_synced = True
del dictionary del dictionary
return peer return peer
@ -93,7 +107,10 @@ class LXMPeer:
dictionary["handled_ids"] = handled_ids dictionary["handled_ids"] = handled_ids
dictionary["unhandled_ids"] = unhandled_ids dictionary["unhandled_ids"] = unhandled_ids
return msgpack.packb(dictionary) peer_bytes = msgpack.packb(dictionary)
del dictionary
return peer_bytes
def __init__(self, router, destination_hash): def __init__(self, router, destination_hash):
self.alive = False self.alive = False
@ -106,11 +123,14 @@ class LXMPeer:
self.sync_transfer_rate = 0 self.sync_transfer_rate = 0
self.propagation_transfer_limit = None self.propagation_transfer_limit = None
self._hm_count = 0
self._um_count = 0
self._hm_counts_synced = False
self._um_counts_synced = False
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
self.unhandled_messages = []
self.handled_messages = []
self.last_offer = [] self.last_offer = []
self.router = router self.router = router
@ -173,7 +193,7 @@ class LXMPeer:
for transient_id in purged_ids: for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.unhandled_messages.remove(transient_id) self.remove_unhandled_message(transient_id)
unhandled_entries.sort(key=lambda e: e[1], reverse=False) unhandled_entries.sort(key=lambda e: e[1], reverse=False)
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
@ -228,8 +248,8 @@ class LXMPeer:
# Peer already has all advertised messages # Peer already has all advertised messages
for transient_id in self.last_offer: for transient_id in self.last_offer:
if transient_id in self.unhandled_messages: if transient_id in self.unhandled_messages:
self.handled_messages.append(transient_id) self.add_handled_message(transient_id)
self.unhandled_messages.remove(transient_id) self.remove_unhandled_message(transient_id)
elif response == True: elif response == True:
@ -244,9 +264,8 @@ class LXMPeer:
# If the peer did not want the message, it has # If the peer did not want the message, it has
# already received it from another peer. # already received it from another peer.
if not transient_id in response: if not transient_id in response:
if transient_id in self.unhandled_messages: self.add_handled_message(transient_id)
self.handled_messages.append(transient_id) self.remove_unhandled_message(transient_id)
self.unhandled_messages.remove(transient_id)
for transient_id in response: for transient_id in response:
wanted_messages.append(self.router.propagation_entries[transient_id]) wanted_messages.append(self.router.propagation_entries[transient_id])
@ -292,8 +311,8 @@ class LXMPeer:
def resource_concluded(self, resource): def resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
for transient_id in resource.transferred_messages: for transient_id in resource.transferred_messages:
self.handled_messages.append(transient_id) self.add_handled_message(transient_id)
self.unhandled_messages.remove(transient_id) self.remove_unhandled_message(transient_id)
if self.link != None: if self.link != None:
self.link.teardown() self.link.teardown()
@ -332,9 +351,72 @@ class LXMPeer:
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def handle_message(self, transient_id): def new_propagation_message(self, transient_id):
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages: if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
self.unhandled_messages.append(transient_id) self.add_unhandled_message(transient_id)
@property
def handled_messages(self):
pes = self.router.propagation_entries.copy()
hm = list(filter(lambda tid: self.destination_hash in self.router.propagation_entries[tid][4], pes))
self._hm_count = len(hm); del pes
return hm
@property
def unhandled_messages(self):
pes = self.router.propagation_entries.copy()
um = list(filter(lambda tid: self.destination_hash in self.router.propagation_entries[tid][5], pes))
self._um_count = len(um); del pes
return um
@property
def handled_message_count(self):
if not self._hm_counts_synced:
self._update_counts()
return self._hm_count
@property
def unhandled_message_count(self):
if not self._um_counts_synced:
self._update_counts()
return self._um_count
def _update_counts(self):
if not self._hm_counts_synced:
RNS.log("UPDATE HM COUNTS")
hm = self.handled_messages; del hm
self._hm_counts_synced = True
if not self._um_counts_synced:
RNS.log("UPDATE UM COUNTS")
um = self.unhandled_messages; del um
self._um_counts_synced = True
def add_handled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if not self.destination_hash in self.router.propagation_entries[transient_id][4]:
self.router.propagation_entries[transient_id][4].append(self.destination_hash)
self._hm_counts_synced = False
def add_unhandled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if not self.destination_hash in self.router.propagation_entries[transient_id][5]:
self.router.propagation_entries[transient_id][5].append(self.destination_hash)
self._um_count += 1
def remove_handled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if self.destination_hash in self.router.propagation_entries[transient_id][4]:
self.router.propagation_entries[transient_id][4].remove(self.destination_hash)
self._hm_counts_synced = False
def remove_unhandled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if self.destination_hash in self.router.propagation_entries[transient_id][5]:
self.router.propagation_entries[transient_id][5].remove(self.destination_hash)
self._um_counts_synced = False
def __str__(self): def __str__(self):
if self.destination_hash: if self.destination_hash:

View File

@ -1,9 +1,11 @@
import os import os
import sys
import time import time
import math import math
import random import random
import base64 import base64
import atexit import atexit
import signal
import threading import threading
import RNS import RNS
@ -94,6 +96,9 @@ class LXMRouter:
self.outbound_propagation_node = None self.outbound_propagation_node = None
self.outbound_propagation_link = None self.outbound_propagation_link = None
if delivery_limit == None:
delivery_limit = LXMRouter.DELIVERY_LIMIT
self.message_storage_limit = None self.message_storage_limit = None
self.information_storage_limit = None self.information_storage_limit = None
self.propagation_per_transfer_limit = propagation_limit self.propagation_per_transfer_limit = propagation_limit
@ -117,6 +122,7 @@ class LXMRouter:
self.cost_file_lock = threading.Lock() self.cost_file_lock = threading.Lock()
self.ticket_file_lock = threading.Lock() self.ticket_file_lock = threading.Lock()
self.stamp_gen_lock = threading.Lock() self.stamp_gen_lock = threading.Lock()
self.exit_handler_running = False
if identity == None: if identity == None:
identity = RNS.Identity() identity = RNS.Identity()
@ -221,6 +227,8 @@ class LXMRouter:
RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
atexit.register(self.exit_handler) atexit.register(self.exit_handler)
signal.signal(signal.SIGINT, self.sigint_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
job_thread = threading.Thread(target=self.jobloop) job_thread = threading.Thread(target=self.jobloop)
job_thread.setDaemon(True) job_thread.setDaemon(True)
@ -446,17 +454,19 @@ class LXMRouter:
file.close() file.close()
self.propagation_entries[transient_id] = [ self.propagation_entries[transient_id] = [
destination_hash, destination_hash, # 0: Destination hash
filepath, filepath, # 1: Storage location
received, received, # 2: Receive timestamp
msg_size, msg_size, # 3: Message size
[], # 4: Handled peers
[], # 5: Unhandled peers
] ]
except Exception as e: except Exception as e:
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
et = time.time(); RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {math.floor(len(self.propagation_entries)/(et-st))} msgs/s", RNS.LOG_NOTICE) et = time.time(); RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {math.floor(len(self.propagation_entries)/(et-st))} msgs/s", RNS.LOG_NOTICE)
st = time.time(); RNS.log("Loading propagation node peers...", RNS.LOG_NOTICE) st = time.time(); RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
if os.path.isfile(self.storagepath+"/peers"): if os.path.isfile(self.storagepath+"/peers"):
peers_file = open(self.storagepath+"/peers", "rb") peers_file = open(self.storagepath+"/peers", "rb")
@ -465,23 +475,25 @@ class LXMRouter:
if len(peers_data) > 0: if len(peers_data) > 0:
serialised_peers = msgpack.unpackb(peers_data) serialised_peers = msgpack.unpackb(peers_data)
del peers_data
for serialised_peer in serialised_peers: while len(serialised_peers) > 0:
serialised_peer = serialised_peers.pop()
peer = LXMPeer.from_bytes(serialised_peer, self) peer = LXMPeer.from_bytes(serialised_peer, self)
del serialised_peer
if peer.identity != None: if peer.identity != None:
self.peers[peer.destination_hash] = peer self.peers[peer.destination_hash] = peer
lim_str = ", no transfer limit" lim_str = ", no transfer limit"
if peer.propagation_transfer_limit != None: if peer.propagation_transfer_limit != None:
lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit" lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit"
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG) RNS.log("Rebuilt peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(peer.unhandled_message_count)+" unhandled messages"+lim_str, RNS.LOG_DEBUG)
else: else:
del peer
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
del peer
del serialised_peers del serialised_peers
del peers_data
RNS.log(f"Loaded {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE) RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
self.propagation_node = True self.propagation_node = True
self.propagation_destination.set_link_established_callback(self.propagation_link_established) self.propagation_destination.set_link_established_callback(self.propagation_link_established)
@ -602,6 +614,7 @@ class LXMRouter:
JOB_STORE_INTERVAL = 120 JOB_STORE_INTERVAL = 120
JOB_PEERSYNC_INTERVAL = 12 JOB_PEERSYNC_INTERVAL = 12
def jobs(self): def jobs(self):
if not self.exit_handler_running:
self.processing_count += 1 self.processing_count += 1
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
@ -626,12 +639,12 @@ class LXMRouter:
while (True): while (True):
# TODO: Improve this to scheduling, so manual # TODO: Improve this to scheduling, so manual
# triggers can delay next run # triggers can delay next run
try: try:
self.jobs() self.jobs()
except Exception as e: except Exception as e:
RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR) RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
time.sleep(LXMRouter.PROCESSING_INTERVAL) time.sleep(LXMRouter.PROCESSING_INTERVAL)
def clean_links(self): def clean_links(self):
@ -888,6 +901,7 @@ class LXMRouter:
def save_locally_delivered_transient_ids(self): def save_locally_delivered_transient_ids(self):
try: try:
if len(self.locally_delivered_transient_ids) > 0:
if not os.path.isdir(self.storagepath): if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath) os.makedirs(self.storagepath)
@ -899,6 +913,7 @@ class LXMRouter:
def save_locally_processed_transient_ids(self): def save_locally_processed_transient_ids(self):
try: try:
if len(self.locally_processed_transient_ids) > 0:
if not os.path.isdir(self.storagepath): if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath) os.makedirs(self.storagepath)
@ -1001,10 +1016,43 @@ class LXMRouter:
RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR) RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR)
def exit_handler(self): def exit_handler(self):
if self.exit_handler_running:
return
self.exit_handler_running = True
RNS.log("Tearing down delivery destinations...", RNS.LOG_NOTICE)
for destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
delivery_destination.set_packet_callback(None)
delivery_destination.set_link_established_callback(None)
for link in delivery_destination.links:
try:
if link.status == RNS.Link.ACTIVE:
link.teardown()
except Exception as e:
RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
if self.propagation_node:
RNS.log("Tearing down propagation node destination...", RNS.LOG_NOTICE)
self.propagation_destination.set_link_established_callback(None)
self.propagation_destination.set_packet_callback(None)
self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH)
self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH)
for link in self.active_propagation_links:
try:
if link.status == RNS.Link.ACTIVE:
link.teardown()
except Exception as e:
RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE)
if self.propagation_node: if self.propagation_node:
try: try:
st = time.time(); RNS.log("Saving peer synchronisation states to storage...", RNS.LOG_NOTICE)
serialised_peers = [] serialised_peers = []
for peer_id in self.peers: peer_dict = self.peers.copy()
for peer_id in peer_dict:
peer = self.peers[peer_id] peer = self.peers[peer_id]
serialised_peers.append(peer.to_bytes()) serialised_peers.append(peer.to_bytes())
@ -1012,7 +1060,7 @@ class LXMRouter:
peers_file.write(msgpack.packb(serialised_peers)) peers_file.write(msgpack.packb(serialised_peers))
peers_file.close() peers_file.close()
RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG) RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
except Exception as e: except Exception as e:
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -1020,6 +1068,20 @@ class LXMRouter:
self.save_locally_delivered_transient_ids() self.save_locally_delivered_transient_ids()
self.save_locally_processed_transient_ids() self.save_locally_processed_transient_ids()
def sigint_handler(self, signal, frame):
if not self.exit_handler_running:
RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING)
sys.exit(0)
else:
RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
def sigterm_handler(self, signal, frame):
if not self.exit_handler_running:
RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING)
sys.exit(0)
else:
RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
def __str__(self): def __str__(self):
return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">" return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">"
@ -1685,19 +1747,23 @@ class LXMRouter:
messages = data[1] messages = data[1]
for lxmf_data in messages: for lxmf_data in messages:
if remote_hash != None and remote_hash in self.peers: peer = None
transient_id = RNS.Identity.full_hash(lxmf_data) transient_id = RNS.Identity.full_hash(lxmf_data)
if remote_hash != None and remote_hash in self.peers:
peer = self.peers[remote_hash] peer = self.peers[remote_hash]
peer.handled_messages.append(transient_id)
self.lxmf_propagation(lxmf_data) self.lxmf_propagation(lxmf_data, from_peer=peer)
if peer != None:
peer.add_handled_message(transient_id)
else: else:
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
except Exception as e: except Exception as e:
RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
RNS.trace_exception(e)
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False): def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False, from_peer=None):
no_stamp_enforcement = False no_stamp_enforcement = False
if is_paper_message: if is_paper_message:
no_stamp_enforcement = True no_stamp_enforcement = True
@ -1708,7 +1774,6 @@ class LXMRouter:
if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids: if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids:
received = time.time() received = time.time()
propagation_entry = [transient_id, received, lxmf_data]
destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH] destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH]
self.locally_processed_transient_ids[transient_id] = received self.locally_processed_transient_ids[transient_id] = received
@ -1732,12 +1797,13 @@ class LXMRouter:
msg_file.write(lxmf_data) msg_file.write(lxmf_data)
msg_file.close() msg_file.close()
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data)] self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []]
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG)
for peer_id in self.peers: for peer_id in self.peers:
peer = self.peers[peer_id] peer = self.peers[peer_id]
peer.handle_message(transient_id) if peer != from_peer:
peer.new_propagation_message(transient_id)
else: else:
# TODO: Add message to sneakernet queues when implemented # TODO: Add message to sneakernet queues when implemented
@ -1757,6 +1823,7 @@ class LXMRouter:
except Exception as e: except Exception as e:
RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG) RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
RNS.trace_exception(e)
return False return False
def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None): def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None):