Implemented message state in disk format. Implemented automated link cleaning.

This commit is contained in:
Mark Qvist 2021-05-13 16:55:32 +02:00
parent 3d53ca0124
commit 892fa7fc9d

View File

@ -121,6 +121,7 @@ class LXMessage:
self.packet_representation = None
self.resource_representation = None
self.__delivery_destination = None
self.__delivery_callback = None
def set_title_from_string(self, title_string):
self.title = title_string.encode("utf-8")
@ -176,9 +177,14 @@ class LXMessage:
def set_delivery_destination(self, delivery_destination):
self.__delivery_destination = delivery_destination
def register_delivery_callback(self, callback):
self.__delivery_callback = callback
def pack(self):
if not self.packed:
self.timestamp = time.time()
if self.timestamp == None:
self.timestamp = time.time()
self.payload = [self.timestamp, self.title, self.content, self.fields]
hashed_part = b""
@ -248,6 +254,9 @@ class LXMessage:
RNS.log("Received delivery notification for "+str(self), RNS.LOG_DEBUG)
self.state = LXMessage.DELIVERED
if self.__delivery_callback != None:
self.__delivery_callback(self)
def __resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
self.__mark_delivered()
@ -295,12 +304,18 @@ class LXMessage:
if not self.packed:
self.pack()
container = {"state": self.state, "lxmf_bytes": self.packed}
packed_container = msgpack.packb(container)
file = open(file_path, "wb")
file.write(self.packed)
file.write(packed_container)
file.close()
return file_path
except Exception as e:
RNS.log("Error while writing LXMF message to file \""+str(file_path)+"\". The contained exception was: "+str(e), RNS.LOG_ERROR)
return None
@staticmethod
@ -368,7 +383,10 @@ class LXMessage:
@staticmethod
def unpack_from_file(lxmf_file_handle):
try:
return LXMessage.unpack_from_bytes(lxmf_file_handle.read())
container = msgpack.unpackb(lxmf_file_handle.read())
lxm = LXMessage.unpack_from_bytes(container["lxmf_bytes"])
lxm.state = container["state"]
return lxm
except Exception as e:
RNS.log("Could not unpack LXMessage from file. The contained exception was: "+str(e), RNS.LOG_ERROR)
return None
@ -378,6 +396,7 @@ class LXMRouter:
MAX_DELIVERY_ATTEMPTS = 3
PROCESSING_INTERVAL = 5
DELIVERY_RETRY_WAIT = 15
LINK_MAX_INACTIVITY = 30
def __init__(self):
self.pending_inbound = []
@ -411,6 +430,11 @@ class LXMRouter:
def register_delivery_callback(self, callback):
self.__delivery_callback = callback
def announce(self, destination_hash):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
delivery_destination.announce(delivery_destination.display_name.encode("utf-8"))
def handle_outbound(self, lxmessage):
if not lxmessage.packed:
lxmessage.pack()
@ -493,6 +517,23 @@ class LXMRouter:
def jobs(self):
self.process_outbound()
self.clean_links()
def clean_links(self):
closed_links = []
for link_hash in self.direct_links:
#TODO: Fix
link = self.direct_links[link_hash]
inactive_time = link.inactive_for()
if inactive_time > LXMRouter.LINK_MAX_INACTIVITY:
link.teardown()
closed_links.append(link_hash)
RNS.log(str(link)+" was inactive for "+str(inactive_time)+" seconds and closed")
for link_hash in closed_links:
self.direct_links.pop(link_hash)
RNS.log("Removed "+RNS.hexrep(link_hash, delimit=False)+" from direct link list, since it was closed")
def process_outbound(self, sender = None):
if self.processing_outbound: