Propagation node state save and restore

This commit is contained in:
Mark Qvist 2021-10-03 21:00:28 +02:00
parent 52dc086a26
commit f5cb49b46b

View File

@ -1,6 +1,7 @@
import os
import math
import time
import atexit
import threading
import RNS
import RNS.vendor.umsgpack as msgpack
@ -291,8 +292,11 @@ class LXMessage:
if self.representation == LXMessage.PACKET:
receipt = self.__as_packet().send()
receipt.set_delivery_callback(self.__mark_propagated)
receipt.set_timeout_callback(self.__link_packet_timed_out)
if receipt:
receipt.set_delivery_callback(self.__mark_propagated)
receipt.set_timeout_callback(self.__link_packet_timed_out)
else:
self.__delivery_destination.teardown()
elif self.representation == LXMessage.RESOURCE:
self.resource_representation = self.__as_resource()
@ -355,7 +359,9 @@ class LXMessage:
self.state = LXMessage.OUTBOUND
def __link_packet_timed_out(self, packet_receipt):
packet_receipt.destination.teardown()
if packet_receipt:
packet_receipt.destination.teardown()
self.state = LXMessage.OUTBOUND
@ -559,11 +565,43 @@ class LXMPeer:
ERROR_NO_IDENTITY = 0xf0
@staticmethod
def from_bytes(bytes):
pass
def from_bytes(peer_bytes, router):
dictionary = msgpack.unpackb(peer_bytes)
peer = LXMPeer(router, dictionary["destination_hash"])
peer.peering_timebase = dictionary["peering_timebase"]
peer.alive = dictionary["alive"]
peer.last_heard = dictionary["last_heard"]
for transient_id in dictionary["handled_ids"]:
if transient_id in router.propagation_entries:
peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
for transient_id in dictionary["unhandled_ids"]:
if transient_id in router.propagation_entries:
peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id]
return peer
def to_bytes(self):
pass
dictionary = {}
dictionary["peering_timebase"] = self.peering_timebase
dictionary["alive"] = self.alive
dictionary["last_heard"] = self.last_heard
dictionary["destination_hash"] = self.destination_hash
handled_ids = []
for transient_id in self.handled_messages:
handled_ids.append(transient_id)
unhandled_ids = []
for transient_id in self.unhandled_messages:
unhandled_ids.append(transient_id)
dictionary["handled_ids"] = handled_ids
dictionary["unhandled_ids"] = unhandled_ids
return msgpack.packb(dictionary)
def __init__(self, router, destination_hash):
self.alive = False
@ -639,7 +677,7 @@ class LXMPeer:
elif response == True:
# Peer wants all advertised messages
for transient_id in self.unhandled_messages:
wanted_messages.append(self.unhandled_messages[transient_id][2])
wanted_messages.append(self.unhandled_messages[transient_id])
wanted_message_ids.append(transient_id)
else:
@ -652,14 +690,24 @@ class LXMPeer:
self.handled_messages[transient_id] = message_entry
for transient_id in response:
wanted_messages.append(self.unhandled_messages[transient_id][2])
wanted_messages.append(self.unhandled_messages[transient_id])
wanted_message_ids.append(transient_id)
if len(wanted_messages) > 0:
# TODO: Remove
RNS.log("Peer wanted: "+str(wanted_messages), RNS.LOG_DEBUG)
data = msgpack.packb([time.time(), wanted_messages])
lxm_list = []
for file_path in wanted_messages:
# TODO: Remove
RNS.log("Loading "+str(file_path)+" for transfer")
file = open(file_path, "rb")
lxmf_data = file.read()
file.close()
lxm_list.append(lxmf_data)
data = msgpack.packb([time.time(), lxm_list])
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
resource.transferred_messages = wanted_message_ids
self.state = LXMPeer.RESOURCE_TRANSFERRING
@ -738,10 +786,33 @@ class LXMRouter:
self.__delivery_callback = None
atexit.register(self.exit_handler)
job_thread = threading.Thread(target=self.jobloop)
job_thread.setDaemon(True)
job_thread.start()
def exit_handler(self):
try:
serialised_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
serialised_peers.append(peer.to_bytes())
# TODO: Remove
RNS.log("Saving peer "+str(peer))
peers_file = open(self.storagepath+"/peers", "wb")
peers_file.write(msgpack.packb(serialised_peers))
peers_file.close()
# TODO: Remove
RNS.log("Saved peers")
except Exception as e:
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def register_delivery_identity(self, identity, display_name = None):
delivery_destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
delivery_destination.set_packet_callback(self.delivery_packet)
@ -853,11 +924,43 @@ class LXMRouter:
def enable_propagation(self, storagepath):
try:
self.storagepath = storagepath+"/lxmf"
self.messagepath = self.storagepath+"/messagestore"
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
# TODO: Load peers and data
if not os.path.isdir(self.messagepath):
os.makedirs(self.messagepath)
self.propagation_entries = {}
for filename in os.listdir(self.messagepath):
components = filename.split("_")
if len(components) == 2:
if float(components[1]) > 0:
if len(components[0]) == RNS.Identity.HASHLENGTH//8*2:
transient_id = bytes.fromhex(components[0])
received = components[1]
filepath = self.messagepath+"/"+filename
file = open(filepath, "rb")
destination_hash = file.read(LXMessage.DESTINATION_LENGTH)
file.close()
self.propagation_entries[transient_id] = [destination_hash, filepath]
# TODO: Remove
RNS.log("Registered msg "+RNS.prettyhexrep(transient_id)+" at "+filepath+" for "+RNS.prettyhexrep(destination_hash))
if os.path.isfile(self.storagepath+"/peers"):
peers_file = open(self.storagepath+"/peers", "rb")
serialised_peers = msgpack.unpackb(peers_file.read())
for serialised_peer in serialised_peers:
peer = LXMPeer.from_bytes(serialised_peer, self)
self.peers[peer.destination_hash] = peer
# TODO: Remove
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash))
self.propagation_node = True
self.propagation_destination.set_link_established_callback(self.propagation_link_established)
@ -991,7 +1094,12 @@ class LXMRouter:
else:
if self.propagation_node:
self.propagation_entries[transient_id] = propagation_entry
file_path = self.messagepath+"/"+RNS.hexrep(transient_id, delimit=False)+"_"+str(received)
msg_file = open(file_path, "wb")
msg_file.write(lxmf_data)
msg_file.close()
self.propagation_entries[transient_id] = [destination_hash, file_path]
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG)
for peer_id in self.peers:
@ -1003,7 +1111,7 @@ class LXMRouter:
return False
except Exception as e:
RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_NOTICE)
RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
return False