NomadNet/nomadnet/Conversation.py
2022-05-17 13:11:04 +02:00

338 lines
12 KiB
Python

import os
import RNS
import LXMF
import shutil
import nomadnet
from nomadnet.Directory import DirectoryEntry
class Conversation:
cached_conversations = {}
unread_conversations = {}
created_callback = None
aspect_filter = "lxmf.delivery"
@staticmethod
def received_announce(destination_hash, announced_identity, app_data):
app = nomadnet.NomadNetworkApp.get_shared_instance()
destination_hash_text = RNS.hexrep(destination_hash, delimit=False)
# Check if the announced destination is in
# our list of conversations
if destination_hash_text in [e[0] for e in Conversation.conversation_list(app)]:
if app.directory.find(destination_hash):
if Conversation.created_callback != None:
Conversation.created_callback()
else:
if Conversation.created_callback != None:
Conversation.created_callback()
# Add the announce to the directory announce
# stream logger
app.directory.lxmf_announce_received(destination_hash, app_data)
@staticmethod
def query_for_peer(source_hash):
try:
RNS.Transport.request_path(bytes.fromhex(source_hash))
except Exception as e:
RNS.log("Error while querying network for peer identity. The contained exception was: "+str(e), RNS.LOG_ERROR)
@staticmethod
def ingest(lxmessage, app, originator = False, delegate = None):
if originator:
source_hash = lxmessage.destination_hash
else:
source_hash = lxmessage.source_hash
source_hash_path = RNS.hexrep(source_hash, delimit=False)
conversation_path = app.conversationpath + "/" + source_hash_path
if not os.path.isdir(conversation_path):
os.makedirs(conversation_path)
if Conversation.created_callback != None:
Conversation.created_callback()
ingested_path = lxmessage.write_to_directory(conversation_path)
if RNS.hexrep(source_hash, delimit=False) in Conversation.cached_conversations:
conversation = Conversation.cached_conversations[RNS.hexrep(source_hash, delimit=False)]
conversation.scan_storage()
if not source_hash in Conversation.unread_conversations:
Conversation.unread_conversations[source_hash] = True
try:
dirname = RNS.hexrep(source_hash, delimit=False)
open(app.conversationpath + "/" + dirname + "/unread", 'a').close()
except Exception as e:
pass
if Conversation.created_callback != None:
Conversation.created_callback()
return ingested_path
@staticmethod
def conversation_list(app):
conversations = []
for dirname in os.listdir(app.conversationpath):
if len(dirname) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2 and os.path.isdir(app.conversationpath + "/" + dirname):
try:
source_hash_text = dirname
source_hash = bytes.fromhex(dirname)
app_data = RNS.Identity.recall_app_data(source_hash)
display_name = app.directory.display_name(source_hash)
unread = False
if source_hash in Conversation.unread_conversations:
unread = True
elif os.path.isfile(app.conversationpath + "/" + dirname + "/unread"):
Conversation.unread_conversations[source_hash] = True
unread = True
if display_name == None and app_data:
display_name = app_data.decode("utf-8")
if display_name == None:
sort_name = ""
else:
sort_name = display_name
trust_level = app.directory.trust_level(source_hash, display_name)
entry = (source_hash_text, display_name, trust_level, sort_name, unread)
conversations.append(entry)
except Exception as e:
RNS.log("Error while loading conversation "+str(dirname)+", skipping it. The contained exception was: "+str(e), RNS.LOG_ERROR)
conversations.sort(key=lambda e: (-e[2], e[3], e[0]), reverse=False)
return conversations
@staticmethod
def cache_conversation(conversation):
Conversation.cached_conversations[conversation.source_hash] = conversation
@staticmethod
def delete_conversation(source_hash_path, app):
conversation_path = app.conversationpath + "/" + source_hash_path
try:
if os.path.isdir(conversation_path):
shutil.rmtree(conversation_path)
except Exception as e:
RNS.log("Could not remove conversation at "+str(conversation_path)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
def __init__(self, source_hash, app, initiator=False):
self.app = app
self.source_hash = source_hash
self.send_destination = None
self.messages = []
self.messages_path = app.conversationpath + "/" + source_hash
self.messages_load_time = None
self.source_known = False
self.source_trusted = False
self.source_blocked = False
self.unread = False
self.__changed_callback = None
self.source_identity = RNS.Identity.recall(bytes.fromhex(self.source_hash))
if self.source_identity:
self.source_known = True
self.send_destination = RNS.Destination(self.source_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
if initiator:
if not os.path.isdir(self.messages_path):
os.makedirs(self.messages_path)
if Conversation.created_callback != None:
Conversation.created_callback()
self.scan_storage()
self.trust_level = app.directory.trust_level(bytes.fromhex(self.source_hash))
Conversation.cache_conversation(self)
def scan_storage(self):
old_len = len(self.messages)
self.messages = []
for filename in os.listdir(self.messages_path):
if len(filename) == RNS.Identity.HASHLENGTH//8*2:
message_path = self.messages_path + "/" + filename
self.messages.append(ConversationMessage(message_path))
new_len = len(self.messages)
if new_len > old_len:
self.unread = True
if self.__changed_callback != None:
self.__changed_callback(self)
def purge_failed(self):
purged_messages = []
for conversation_message in self.messages:
if conversation_message.get_state() == LXMF.LXMessage.FAILED:
purged_messages.append(conversation_message)
conversation_message.purge()
for purged_message in purged_messages:
self.messages.remove(purged_message)
def clear_history(self):
purged_messages = []
for conversation_message in self.messages:
purged_messages.append(conversation_message)
conversation_message.purge()
for purged_message in purged_messages:
self.messages.remove(purged_message)
def register_changed_callback(self, callback):
self.__changed_callback = callback
def send(self, content="", title=""):
if self.send_destination:
dest = self.send_destination
source = self.app.lxmf_destination
desired_method = LXMF.LXMessage.DIRECT
if self.app.directory.preferred_delivery(dest.hash) == DirectoryEntry.PROPAGATED:
if self.app.message_router.get_outbound_propagation_node() != None:
desired_method = LXMF.LXMessage.PROPAGATED
lxm = LXMF.LXMessage(dest, source, content, title=title, desired_method=desired_method)
lxm.register_delivery_callback(self.message_notification)
lxm.register_failed_callback(self.message_notification)
if self.app.message_router.get_outbound_propagation_node() != None:
lxm.try_propagation_on_fail = self.app.try_propagation_on_fail
self.app.message_router.handle_outbound(lxm)
message_path = Conversation.ingest(lxm, self.app, originator=True)
self.messages.append(ConversationMessage(message_path))
return True
else:
RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE)
return False
def message_notification(self, message):
if message.state == LXMF.LXMessage.FAILED and hasattr(message, "try_propagation_on_fail") and message.try_propagation_on_fail:
RNS.log("Direct delivery of "+str(message)+" failed. Retrying as propagated message.", RNS.LOG_VERBOSE)
message.try_propagation_on_fail = None
message.delivery_attempts = 0
del message.next_delivery_attempt
message.packed = None
message.desired_method = LXMF.LXMessage.PROPAGATED
self.app.message_router.handle_outbound(message)
else:
message_path = Conversation.ingest(message, self.app, originator=True)
def __str__(self):
string = self.source_hash
if self.source_identity:
if self.source_identity.app_data:
# TODO: Sanitise for viewing
string += " | "+self.source.source_identity.app_data.decode("utf-8")
return string
class ConversationMessage:
def __init__(self, file_path):
self.file_path = file_path
self.loaded = False
self.timestamp = None
self.lxm = None
def load(self):
try:
self.lxm = LXMF.LXMessage.unpack_from_file(open(self.file_path, "rb"))
self.loaded = True
self.timestamp = self.lxm.timestamp
self.sort_timestamp = os.path.getmtime(self.file_path)
if self.lxm.state > LXMF.LXMessage.DRAFT and self.lxm.state < LXMF.LXMessage.SENT:
found = False
for pending in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_outbound:
if pending.hash == self.lxm.hash:
found = True
if not found:
self.lxm.state = LXMF.LXMessage.FAILED
except Exception as e:
RNS.log("Error while loading LXMF message "+str(self.file_path)+" from disk. The contained exception was: "+str(e), RNS.LOG_ERROR)
def unload(self):
self.loaded = False
self.lxm = None
def purge(self):
self.unload()
if os.path.isfile(self.file_path):
os.unlink(self.file_path)
def get_timestamp(self):
if not self.loaded:
self.load()
return self.timestamp
def get_title(self):
if not self.loaded:
self.load()
return self.lxm.title_as_string()
def get_content(self):
if not self.loaded:
self.load()
return self.lxm.content_as_string()
def get_hash(self):
if not self.loaded:
self.load()
return self.lxm.hash
def get_state(self):
if not self.loaded:
self.load()
return self.lxm.state
def get_transport_encryption(self):
if not self.loaded:
self.load()
return self.lxm.transport_encryption
def get_transport_encrypted(self):
if not self.loaded:
self.load()
return self.lxm.transport_encrypted
def signature_validated(self):
if not self.loaded:
self.load()
return self.lxm.signature_validated
def get_signature_description(self):
if self.signature_validated():
return "Signature Verified"
else:
if self.lxm.unverified_reason == LXMF.LXMessage.SOURCE_UNKNOWN:
return "Unknown Origin"
elif self.lxm.unverified_reason == LXMF.LXMessage.SIGNATURE_INVALID:
return "Invalid Signature"
else:
return "Unknown signature validation failure"