Implemented paper message handling

This commit is contained in:
Mark Qvist 2022-11-19 20:07:00 +01:00
parent 540aa1a496
commit edbb887d81
3 changed files with 78 additions and 8 deletions

View File

@ -1,6 +1,7 @@
import os
import time
import random
import base64
import atexit
import threading
@ -85,6 +86,7 @@ class LXMRouter:
self.propagation_transfer_last_result = None
self.propagation_transfer_max_messages = None
self.locally_delivered_transient_ids = {}
self.locally_processed_transient_ids = {}
if identity == None:
identity = RNS.Identity()
@ -105,7 +107,6 @@ class LXMRouter:
self.peers = {}
self.propagation_entries = {}
self.propagated_ids = {}
RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self))
RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self))
@ -118,7 +119,14 @@ class LXMRouter:
data = locally_delivered_file.read()
self.locally_delivered_transient_ids = msgpack.unpackb(data)
locally_delivered_file.close()
self.clean_transient_id_cache()
if os.path.isfile(self.storagepath+"/locally_processed"):
locally_processed_file = open(self.storagepath+"/locally_processed", "rb")
data = locally_processed_file.read()
self.locally_processed_transient_ids = msgpack.unpackb(data)
locally_processed_file.close()
self.clean_transient_id_caches()
except Exception as e:
RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -424,7 +432,7 @@ class LXMRouter:
self.clean_links()
if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0:
self.clean_transient_id_cache()
self.clean_transient_id_caches()
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
self.clean_message_store()
@ -458,7 +466,7 @@ class LXMRouter:
self.acknowledge_sync_completion()
RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG)
def clean_transient_id_cache(self):
def clean_transient_id_caches(self):
now = time.time()
removed_entries = []
for transient_id in self.locally_delivered_transient_ids:
@ -470,6 +478,16 @@ class LXMRouter:
self.locally_delivered_transient_ids.pop(transient_id)
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG)
removed_entries = []
for transient_id in self.locally_processed_transient_ids:
timestamp = self.locally_processed_transient_ids[transient_id]
if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.25:
removed_entries.append(transient_id)
for transient_id in removed_entries:
self.locally_processed_transient_ids.pop(transient_id)
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from locally processed cache", RNS.LOG_DEBUG)
def clean_message_store(self):
# Check and remove expired messages
now = time.time()
@ -571,6 +589,18 @@ class LXMRouter:
except Exception as e:
RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def save_locally_processed_transient_ids(self):
try:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
locally_processed_file = open(self.storagepath+"/locally_processed", "wb")
locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids))
locally_processed_file.close()
except Exception as e:
RNS.log("Could not save locally processed message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def exit_handler(self):
if self.propagation_node:
try:
@ -589,6 +619,7 @@ class LXMRouter:
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.save_locally_delivered_transient_ids()
self.save_locally_processed_transient_ids()
def __str__(self):
return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">"
@ -1012,15 +1043,18 @@ class LXMRouter:
except Exception as e:
RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
def lxmf_propagation(self, lxmf_data):
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None):
try:
if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
transient_id = RNS.Identity.full_hash(lxmf_data)
if not transient_id in self.propagation_entries and not transient_id in self.propagated_ids:
if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids:
received = time.time()
propagation_entry = [transient_id, received, lxmf_data]
destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH]
self.locally_processed_transient_ids[transient_id] = received
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
encrypted_lxmf_data = lxmf_data[LXMessage.DESTINATION_LENGTH:]
@ -1029,6 +1063,9 @@ class LXMRouter:
self.lxmf_delivery(delivery_data, delivery_destination.type)
self.locally_delivered_transient_ids[transient_id] = time.time()
if signal_local_delivery != None:
return signal_local_delivery
else:
if self.propagation_node:
file_path = self.messagepath+"/"+RNS.hexrep(transient_id, delimit=False)+"_"+str(received)
@ -1043,8 +1080,19 @@ class LXMRouter:
peer = self.peers[peer_id]
peer.handle_message(transient_id)
else:
# TODO: Add message to sneakernet queues when implemented
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG)
return True
else:
if signal_duplicate != None:
return signal_duplicate
else:
return False
return False
except Exception as e:
@ -1052,6 +1100,28 @@ class LXMRouter:
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
return False
def ingest_lxm_url(self, url, signal_local_delivery=None, signal_duplicate=None):
try:
if not url.lower().startswith("lxm://"):
RNS.log("Cannot ingest LXM, invalid URL provided.", RNS.LOG_ERROR)
return False
else:
lxmf_data = base64.urlsafe_b64decode(url.replace(LXMessage.URL_PROTO_SPECIFIER+"://", "").replace("/", "")+"==")
transient_id = RNS.Identity.full_hash(lxmf_data)
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate)
if router_propagation_result != False:
RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG)
return router_propagation_result
else:
RNS.log("No valid LXM could be ingested from the provided URL", RNS.LOG_DEBUG)
return False
except Exception as e:
RNS.log("Error while decoding URL-encoded LXMF message. The contained exception was: "+str(e), RNS.LOG_ERROR)
return False
def fail_message(self, lxmessage):
RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)

View File

@ -276,7 +276,7 @@ class LXMessage:
paper_content_limit = LXMessage.PAPER_MDU
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
self.paper_packed = msgpack.packb(self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data)
self.paper_packed = self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data
content_size = len(self.paper_packed)
if content_size <= paper_content_limit:

View File

@ -1 +1 @@
__version__ = "0.2.4"
__version__ = "0.2.5"