From f5cb49b46b757bbfd56b82f56b3f9e512ff44b9e Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sun, 3 Oct 2021 21:00:28 +0200 Subject: [PATCH] Propagation node state save and restore --- LXMF/LXMF.py | 132 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 120 insertions(+), 12 deletions(-) diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index 7502fb6..8d43e31 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -1,6 +1,7 @@ import os import math import time +import atexit import threading import RNS import RNS.vendor.umsgpack as msgpack @@ -291,8 +292,11 @@ class LXMessage: if self.representation == LXMessage.PACKET: receipt = self.__as_packet().send() - receipt.set_delivery_callback(self.__mark_propagated) - receipt.set_timeout_callback(self.__link_packet_timed_out) + if receipt: + receipt.set_delivery_callback(self.__mark_propagated) + receipt.set_timeout_callback(self.__link_packet_timed_out) + else: + self.__delivery_destination.teardown() elif self.representation == LXMessage.RESOURCE: self.resource_representation = self.__as_resource() @@ -355,7 +359,9 @@ class LXMessage: self.state = LXMessage.OUTBOUND def __link_packet_timed_out(self, packet_receipt): - packet_receipt.destination.teardown() + if packet_receipt: + packet_receipt.destination.teardown() + self.state = LXMessage.OUTBOUND @@ -559,11 +565,43 @@ class LXMPeer: ERROR_NO_IDENTITY = 0xf0 @staticmethod - def from_bytes(bytes): - pass + def from_bytes(peer_bytes, router): + dictionary = msgpack.unpackb(peer_bytes) + + peer = LXMPeer(router, dictionary["destination_hash"]) + peer.peering_timebase = dictionary["peering_timebase"] + peer.alive = dictionary["alive"] + peer.last_heard = dictionary["last_heard"] + + for transient_id in dictionary["handled_ids"]: + if transient_id in router.propagation_entries: + peer.handled_messages[transient_id] = router.propagation_entries[transient_id] + + for transient_id in dictionary["unhandled_ids"]: + if transient_id in router.propagation_entries: + peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id] + + return peer def to_bytes(self): - pass + dictionary = {} + dictionary["peering_timebase"] = self.peering_timebase + dictionary["alive"] = self.alive + dictionary["last_heard"] = self.last_heard + dictionary["destination_hash"] = self.destination_hash + + handled_ids = [] + for transient_id in self.handled_messages: + handled_ids.append(transient_id) + + unhandled_ids = [] + for transient_id in self.unhandled_messages: + unhandled_ids.append(transient_id) + + dictionary["handled_ids"] = handled_ids + dictionary["unhandled_ids"] = unhandled_ids + + return msgpack.packb(dictionary) def __init__(self, router, destination_hash): self.alive = False @@ -639,7 +677,7 @@ class LXMPeer: elif response == True: # Peer wants all advertised messages for transient_id in self.unhandled_messages: - wanted_messages.append(self.unhandled_messages[transient_id][2]) + wanted_messages.append(self.unhandled_messages[transient_id]) wanted_message_ids.append(transient_id) else: @@ -652,14 +690,24 @@ class LXMPeer: self.handled_messages[transient_id] = message_entry for transient_id in response: - wanted_messages.append(self.unhandled_messages[transient_id][2]) + wanted_messages.append(self.unhandled_messages[transient_id]) wanted_message_ids.append(transient_id) if len(wanted_messages) > 0: # TODO: Remove RNS.log("Peer wanted: "+str(wanted_messages), RNS.LOG_DEBUG) - data = msgpack.packb([time.time(), wanted_messages]) + lxm_list = [] + + for file_path in wanted_messages: + # TODO: Remove + RNS.log("Loading "+str(file_path)+" for transfer") + file = open(file_path, "rb") + lxmf_data = file.read() + file.close() + lxm_list.append(lxmf_data) + + data = msgpack.packb([time.time(), lxm_list]) resource = RNS.Resource(data, self.link, callback = self.resource_concluded) resource.transferred_messages = wanted_message_ids self.state = LXMPeer.RESOURCE_TRANSFERRING @@ -738,10 +786,33 @@ class LXMRouter: self.__delivery_callback = None + atexit.register(self.exit_handler) + job_thread = threading.Thread(target=self.jobloop) job_thread.setDaemon(True) job_thread.start() + def exit_handler(self): + try: + serialised_peers = [] + for peer_id in self.peers: + peer = self.peers[peer_id] + serialised_peers.append(peer.to_bytes()) + # TODO: Remove + RNS.log("Saving peer "+str(peer)) + + peers_file = open(self.storagepath+"/peers", "wb") + peers_file.write(msgpack.packb(serialised_peers)) + peers_file.close() + + # TODO: Remove + RNS.log("Saved peers") + + + except Exception as e: + RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + + def register_delivery_identity(self, identity, display_name = None): delivery_destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery") delivery_destination.set_packet_callback(self.delivery_packet) @@ -853,11 +924,43 @@ class LXMRouter: def enable_propagation(self, storagepath): try: self.storagepath = storagepath+"/lxmf" + self.messagepath = self.storagepath+"/messagestore" if not os.path.isdir(self.storagepath): os.makedirs(self.storagepath) - # TODO: Load peers and data + if not os.path.isdir(self.messagepath): + os.makedirs(self.messagepath) + + self.propagation_entries = {} + for filename in os.listdir(self.messagepath): + components = filename.split("_") + if len(components) == 2: + if float(components[1]) > 0: + if len(components[0]) == RNS.Identity.HASHLENGTH//8*2: + transient_id = bytes.fromhex(components[0]) + received = components[1] + + filepath = self.messagepath+"/"+filename + file = open(filepath, "rb") + destination_hash = file.read(LXMessage.DESTINATION_LENGTH) + file.close() + + self.propagation_entries[transient_id] = [destination_hash, filepath] + + # TODO: Remove + RNS.log("Registered msg "+RNS.prettyhexrep(transient_id)+" at "+filepath+" for "+RNS.prettyhexrep(destination_hash)) + + if os.path.isfile(self.storagepath+"/peers"): + peers_file = open(self.storagepath+"/peers", "rb") + serialised_peers = msgpack.unpackb(peers_file.read()) + + for serialised_peer in serialised_peers: + peer = LXMPeer.from_bytes(serialised_peer, self) + self.peers[peer.destination_hash] = peer + # TODO: Remove + RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)) + self.propagation_node = True self.propagation_destination.set_link_established_callback(self.propagation_link_established) @@ -991,7 +1094,12 @@ class LXMRouter: else: if self.propagation_node: - self.propagation_entries[transient_id] = propagation_entry + file_path = self.messagepath+"/"+RNS.hexrep(transient_id, delimit=False)+"_"+str(received) + msg_file = open(file_path, "wb") + msg_file.write(lxmf_data) + msg_file.close() + + self.propagation_entries[transient_id] = [destination_hash, file_path] RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) for peer_id in self.peers: @@ -1003,7 +1111,7 @@ class LXMRouter: return False except Exception as e: - RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_NOTICE) + RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG) RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) return False