diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index b717252..d3d967e 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -121,6 +121,7 @@ class LXMessage: self.packet_representation = None self.resource_representation = None self.__delivery_destination = None + self.__delivery_callback = None def set_title_from_string(self, title_string): self.title = title_string.encode("utf-8") @@ -176,9 +177,14 @@ class LXMessage: def set_delivery_destination(self, delivery_destination): self.__delivery_destination = delivery_destination + def register_delivery_callback(self, callback): + self.__delivery_callback = callback + def pack(self): if not self.packed: - self.timestamp = time.time() + if self.timestamp == None: + self.timestamp = time.time() + self.payload = [self.timestamp, self.title, self.content, self.fields] hashed_part = b"" @@ -248,6 +254,9 @@ class LXMessage: RNS.log("Received delivery notification for "+str(self), RNS.LOG_DEBUG) self.state = LXMessage.DELIVERED + if self.__delivery_callback != None: + self.__delivery_callback(self) + def __resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: self.__mark_delivered() @@ -295,12 +304,18 @@ class LXMessage: if not self.packed: self.pack() + container = {"state": self.state, "lxmf_bytes": self.packed} + packed_container = msgpack.packb(container) + file = open(file_path, "wb") - file.write(self.packed) + file.write(packed_container) file.close() + return file_path + except Exception as e: RNS.log("Error while writing LXMF message to file \""+str(file_path)+"\". The contained exception was: "+str(e), RNS.LOG_ERROR) + return None @staticmethod @@ -368,7 +383,10 @@ class LXMessage: @staticmethod def unpack_from_file(lxmf_file_handle): try: - return LXMessage.unpack_from_bytes(lxmf_file_handle.read()) + container = msgpack.unpackb(lxmf_file_handle.read()) + lxm = LXMessage.unpack_from_bytes(container["lxmf_bytes"]) + lxm.state = container["state"] + return lxm except Exception as e: RNS.log("Could not unpack LXMessage from file. The contained exception was: "+str(e), RNS.LOG_ERROR) return None @@ -378,6 +396,7 @@ class LXMRouter: MAX_DELIVERY_ATTEMPTS = 3 PROCESSING_INTERVAL = 5 DELIVERY_RETRY_WAIT = 15 + LINK_MAX_INACTIVITY = 30 def __init__(self): self.pending_inbound = [] @@ -411,6 +430,11 @@ class LXMRouter: def register_delivery_callback(self, callback): self.__delivery_callback = callback + def announce(self, destination_hash): + if destination_hash in self.delivery_destinations: + delivery_destination = self.delivery_destinations[destination_hash] + delivery_destination.announce(delivery_destination.display_name.encode("utf-8")) + def handle_outbound(self, lxmessage): if not lxmessage.packed: lxmessage.pack() @@ -493,6 +517,23 @@ class LXMRouter: def jobs(self): self.process_outbound() + self.clean_links() + + def clean_links(self): + closed_links = [] + for link_hash in self.direct_links: + #TODO: Fix + link = self.direct_links[link_hash] + inactive_time = link.inactive_for() + + if inactive_time > LXMRouter.LINK_MAX_INACTIVITY: + link.teardown() + closed_links.append(link_hash) + RNS.log(str(link)+" was inactive for "+str(inactive_time)+" seconds and closed") + + for link_hash in closed_links: + self.direct_links.pop(link_hash) + RNS.log("Removed "+RNS.hexrep(link_hash, delimit=False)+" from direct link list, since it was closed") def process_outbound(self, sender = None): if self.processing_outbound: