import os import RNS import LXMF import shutil import nomadnet from nomadnet.Directory import DirectoryEntry class Conversation: cached_conversations = {} created_callback = None aspect_filter = "lxmf.delivery" @staticmethod def received_announce(destination_hash, announced_identity, app_data): app = nomadnet.NomadNetworkApp.get_shared_instance() destination_hash_text = RNS.hexrep(destination_hash, delimit=False) # Check if the announced destination is in # our list of conversations if destination_hash_text in [e[0] for e in Conversation.conversation_list(app)]: RNS.log("Announced LXMF destination is in our conversation list") RNS.log("app_data = "+str(app_data)) if app.directory.find(destination_hash): RNS.log("It is also in the directory") if Conversation.created_callback != None: Conversation.created_callback() else: RNS.log("But it is not in the directory") if Conversation.created_callback != None: Conversation.created_callback() # Add the announce to the directory announce # stream logger app.directory.announce_received(destination_hash, app_data) @staticmethod def query_for_peer(source_hash): try: RNS.Transport.requestPath(bytes.fromhex(source_hash)) except Exception as e: RNS.log("Error while querying network for peer identity. The contained exception was: "+str(e), RNS.LOG_ERROR) @staticmethod def ingest(lxmessage, app, originator = False, delegate = None): if originator: source_hash = lxmessage.destination_hash else: source_hash = lxmessage.source_hash source_hash_path = RNS.hexrep(source_hash, delimit=False) conversation_path = app.conversationpath + "/" + source_hash_path if not os.path.isdir(conversation_path): os.makedirs(conversation_path) if Conversation.created_callback != None: Conversation.created_callback() ingested_path = lxmessage.write_to_directory(conversation_path) if RNS.hexrep(source_hash, delimit=False) in Conversation.cached_conversations: conversation = Conversation.cached_conversations[RNS.hexrep(source_hash, delimit=False)] conversation.scan_storage() return ingested_path @staticmethod def conversation_list(app): conversations = [] for dirname in os.listdir(app.conversationpath): if len(dirname) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2 and os.path.isdir(app.conversationpath + "/" + dirname): try: source_hash_text = dirname source_hash = bytes.fromhex(dirname) app_data = RNS.Identity.recall_app_data(source_hash) display_name = app.directory.display_name(source_hash) if display_name == None and app_data: display_name = app_data.decode("utf-8") if display_name == None: sort_name = "" else: sort_name = display_name trust_level = app.directory.trust_level(source_hash, display_name) entry = (source_hash_text, display_name, trust_level, sort_name) conversations.append(entry) except Exception as e: RNS.log("Error while loading conversation "+str(dirname)+", skipping it. The contained exception was: "+str(e), RNS.LOG_ERROR) conversations.sort(key=lambda e: (-e[2], e[3], e[0]), reverse=False) return conversations @staticmethod def cache_conversation(conversation): Conversation.cached_conversations[conversation.source_hash] = conversation @staticmethod def delete_conversation(source_hash_path, app): conversation_path = app.conversationpath + "/" + source_hash_path try: if os.path.isdir(conversation_path): shutil.rmtree(conversation_path) except Exception as e: RNS.log("Could not remove conversation at "+str(conversation_path)+". The contained exception was: "+str(e), RNS.LOG_ERROR) def __init__(self, source_hash, app, initiator=False): self.app = app self.source_hash = source_hash self.send_destination = None self.messages = [] self.messages_path = app.conversationpath + "/" + source_hash self.messages_load_time = None self.source_known = False self.source_trusted = False self.source_blocked = False self.__changed_callback = None self.source_identity = RNS.Identity.recall(bytes.fromhex(self.source_hash)) if self.source_identity: self.source_known = True self.send_destination = RNS.Destination(self.source_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery") if initiator: if not os.path.isdir(self.messages_path): os.makedirs(self.messages_path) if Conversation.created_callback != None: Conversation.created_callback() self.scan_storage() self.trust_level = app.directory.trust_level(bytes.fromhex(self.source_hash)) Conversation.cache_conversation(self) def scan_storage(self): old_len = len(self.messages) self.messages = [] for filename in os.listdir(self.messages_path): if len(filename) == RNS.Identity.HASHLENGTH//8*2: message_path = self.messages_path + "/" + filename self.messages.append(ConversationMessage(message_path)) if self.__changed_callback != None: self.__changed_callback(self) def purge_failed(self): purged_messages = [] for conversation_message in self.messages: if conversation_message.get_state() == LXMF.LXMessage.FAILED: purged_messages.append(conversation_message) conversation_message.purge() for purged_message in purged_messages: self.messages.remove(purged_message) def register_changed_callback(self, callback): self.__changed_callback = callback def send(self, content="", title=""): if self.send_destination: dest = self.send_destination source = self.app.lxmf_destination lxm = LXMF.LXMessage(dest, source, content, title=title, desired_method=LXMF.LXMessage.DIRECT) lxm.register_delivery_callback(self.message_notification) lxm.register_failed_callback(self.message_notification) self.app.message_router.handle_outbound(lxm) message_path = Conversation.ingest(lxm, self.app, originator=True) self.messages.append(ConversationMessage(message_path)) return True else: RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE) return False def message_notification(self, message): message_path = Conversation.ingest(message, self.app, originator=True) def __str__(self): string = self.source_hash if self.source_identity: if self.source_identity.app_data: # TODO: Sanitise for viewing string += " | "+self.source.source_identity.app_data.decode("utf-8") return string class ConversationMessage: def __init__(self, file_path): self.file_path = file_path self.loaded = False self.timestamp = None self.lxm = None def load(self): try: self.lxm = LXMF.LXMessage.unpack_from_file(open(self.file_path, "rb")) self.loaded = True self.timestamp = self.lxm.timestamp if self.lxm.state > LXMF.LXMessage.DRAFT and self.lxm.state < LXMF.LXMessage.SENT: found = False for pending in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_outbound: if pending.hash == self.lxm.hash: found = True if not found: self.lxm.state = LXMF.LXMessage.FAILED except Exception as e: RNS.log("Error while loading LXMF message "+str(self.file_path)+" from disk. The contained exception was: "+str(e), RNS.LOG_ERROR) def unload(self): self.loaded = False self.lxm = None def purge(self): self.unload() if os.path.isfile(self.file_path): os.unlink(self.file_path) def get_timestamp(self): if not self.loaded: self.load() return self.timestamp def get_title(self): if not self.loaded: self.load() return self.lxm.title_as_string() def get_content(self): if not self.loaded: self.load() return self.lxm.content_as_string() def get_hash(self): if not self.loaded: self.load() return self.lxm.hash def get_state(self): if not self.loaded: self.load() return self.lxm.state def get_transport_encryption(self): if not self.loaded: self.load() return self.lxm.transport_encryption def get_transport_encrypted(self): if not self.loaded: self.load() return self.lxm.transport_encrypted def signature_validated(self): if not self.loaded: self.load() return self.lxm.signature_validated def get_signature_description(self): if self.signature_validated(): return "Signature Verified" else: if self.lxm.unverified_reason == LXMF.LXMessage.SOURCE_UNKNOWN: return "Unknown Origin" elif self.lxm.unverified_reason == LXMF.LXMessage.SIGNATURE_INVALID: return "Invalid Signature" else: return "Unknown signature validation failure"