From ad773dcbc5a6ec3d487aeedcfb013dca1fb66ce4 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 2 Oct 2021 22:04:31 +0200 Subject: [PATCH] General structure for LXMF propagation --- LXMF/LXMF.py | 336 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 328 insertions(+), 8 deletions(-) diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index f33f64f..75b77e9 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -1,3 +1,4 @@ +import os import math import time import threading @@ -476,6 +477,167 @@ class LXMFDeliveryAnnounceHandler: self.lxmrouter.process_outbound() +class LXMFPropagationAnnounceHandler: + def __init__(self, lxmrouter): + self.aspect_filter = APP_NAME+".propagation" + self.lxmrouter = lxmrouter + + def received_announce(self, destination_hash, announced_identity, app_data): + try: + data = msgpack.unpackb(app_data) + + if self.lxmrouter.autopeer: + if data[0] == True: + if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: + self.lxmrouter.peer(destination_hash, data[1]) + elif data[0] == False: + self.lxmrouter.unpeer(destination_hash, data[1]) + + except Exception as e: + RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG) + RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + + +class LXMPeer: + OFFER_REQUEST_PATH = "o" + + IDLE = 0x00 + LINK_ESTABLISHING = 0x01 + LINK_READY = 0x02 + REQUEST_SENT = 0x03 + RESPONSE_RECEIVED = 0x04 + RESOURCE_TRANSFERRING = 0x05 + + ERROR_NO_IDENTITY = 0xf0 + + @staticmethod + def from_bytes(bytes): + pass + + def to_bytes(self): + pass + + def __init__(self, router, destination_hash): + self.alive = False + self.last_heard = None + self.peering_timebase = 0 + + self.link = None + self.state = LXMPeer.IDLE + + self.unhandled_messages = {} + self.handled_messages = {} + + self.router = router + self.destination_hash = destination_hash + self.identity = RNS.Identity.recall(destination_hash) + self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + + def sync(self, initiator=True): + if len(self.unhandled_messages) > 0: + if self.state == LXMPeer.IDLE: + self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed) + self.state = LXMPeer.LINK_ESTABLISHING + else: + if self.state == LXMPeer.LINK_READY: + unhandled_ids = [] + for transient_id in self.unhandled_messages: + unhandled_ids.append(transient_id) + + # TODO: Remove + RNS.log("Sending sync request to peer") + self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) + + def request_failed(self, request_receipt): + RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) + if self.link != None: + self.link.teardown() + + def offer_response(self, request_receipt): + # TODO: Remove + RNS.log("Got offer response from peer") + try: + self.state = LXMPeer.RESPONSE_RECEIVED + response = request_receipt.response + + RNS.log(str(response)) + + wanted_messages = [] + wanted_message_ids = [] + + if response == LXMPeer.ERROR_NO_IDENTITY: + if self.link != None: + self.link.indentify() + self.state = LXMPeer.LINK_READY + + elif response == False: + # Peer already has all advertised messages + + # TODO: Remove + RNS.log("Peer had all advertised messages", RNS.LOG_DEBUG) + + for transient_id in self.unhandled_messages: + message = self.unhandled_messages[transient_id] + self.handled_messages[transient_id] = message + + self.unhandled_messages = {} + + elif response == True: + # Peer wants all advertised messages + for transient_id in self.unhandled_messages: + wanted_messages.append(self.unhandled_messages[transient_id][2]) + wanted_message_ids.append(transient_id) + + else: + # Peer wants some advertised messages + for transient_id in self.unhandled_messages: + # If the peer did not want the message, it has + # already received it from another peer. + if not transient_id in response: + message = self.unhandled_messages.pop(transient_id) + self.handled_messages[transient_id] = message + + for transient_id in response: + wanted_messages.append(self.unhandled_messages[transient_id][2]) + wanted_message_ids.append(transient_id) + + if len(wanted_messages) > 0: + # TODO: Remove + RNS.log("Peer wanted: "+str(wanted_messages), RNS.LOG_DEBUG) + + data = msgpack.packb(wanted_messages) + resource = RNS.Resource(data, self.link, callback = self.resource_concluded) + resource.transferred_messages = wanted_message_ids + self.state = LXMPeer.RESOURCE_TRANSFERRING + + except Exception as e: + RNS.log("Error while handling offer response from peer "+str(self.destination), RNS.LOG_DEBUG) + RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + + + def resource_concluded(self, resource): + if resource.status == RNS.Resource.COMPLETE: + for transient_id in resource.transferred_messages: + message = self.unhandled_messages.pop(transient_id) + self.handled_messages[transient_id] = message + else: + RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG) + + + + def link_established(self, link): + self.link.identify(self.router.identity) + self.state = LXMPeer.LINK_READY + + def link_closed(self, link): + self.link = None + self.state = LXMPeer.IDLE + + def handle_message(self, transient_id): + if not transient_id in self.handled_messages and not self.unhandled_messages: + self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id] + + class LXMRouter: MAX_DELIVERY_ATTEMPTS = 3 PROCESSING_INTERVAL = 5 @@ -483,7 +645,21 @@ class LXMRouter: PATH_REQUEST_WAIT = 5 LINK_MAX_INACTIVITY = 10*60 - def __init__(self): + AUTOPEER = True + AUTOPEER_MAXDEPTH = 4 + + # TODO: Remove + def ptest(self): + i1 = RNS.Identity() + i2 = RNS.Identity() + d1 = RNS.Destination(i1, RNS.Destination.IN, RNS.Destination.SINGLE, "test", "delivery") + d2 = RNS.Destination(i2, RNS.Destination.IN, RNS.Destination.SINGLE, "test", "delivery") + + lxm = LXMessage(d1, d2, content="No content here") + + return lxm + + def __init__(self, identity = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH): self.pending_inbound = [] self.pending_outbound = [] self.failed_outbound = [] @@ -492,12 +668,26 @@ class LXMRouter: self.processing_outbound = False self.processing_inbound = False + self.processing_count = 0 - self.identity = RNS.Identity() + self.propagation_node = False + + if identity == None: + identity = RNS.Identity() + + self.identity = identity self.lxmf_query_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, APP_NAME, "query") self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation") + self.autopeer = autopeer + self.autopeer_maxdepth = autopeer_maxdepth + + self.peers = {} + self.propagation_entries = {} + self.propagated_ids = {} + RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self)) + RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self)) self.__delivery_callback = None @@ -544,7 +734,7 @@ class LXMRouter: if destination_type == RNS.Destination.SINGLE: message.transport_encrypted = True - message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_RSA + message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC elif destination_type == RNS.Destination.GROUP: message.transport_encrypted = True message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES @@ -586,7 +776,7 @@ class LXMRouter: link.set_packet_callback(self.delivery_packet) link.set_resource_strategy(RNS.Link.ACCEPT_ALL) link.set_resource_started_callback(self.resource_transfer_began) - link.set_resource_concluded_callback(self.resource_transfer_concluded) + link.set_resource_concluded_callback(self.delivery_resource_concluded) def delivery_link_closed(self, link): pass @@ -594,11 +784,133 @@ class LXMRouter: def resource_transfer_began(self, resource): RNS.log("Transfer began for resource "+str(resource), RNS.LOG_DEBUG) - def resource_transfer_concluded(self, resource): - RNS.log("Transfer concluded for resource "+str(resource), RNS.LOG_DEBUG) + def delivery_resource_concluded(self, resource): + RNS.log("Transfer concluded for delivery resource "+str(resource), RNS.LOG_DEBUG) if resource.status == RNS.Resource.COMPLETE: self.lxmf_delivery(resource.data.read(), resource.link.type) + + def enable_propagation(self, storagepath): + try: + self.storagepath = storagepath+"/lxmf" + + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) + + # TODO: Load peers and data + + self.propagation_node = True + self.propagation_destination.set_link_established_callback(self.propagation_link_established) + + self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) + + self.announce_propagation_node() + + except Exception as e: + RNS.log("Could not enable propagation node. The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.panic() + + def disable_propagation(self): + self.propagation_node = False + self.announce_propagation_node() + + def announce_propagation_node(self): + data = msgpack.packb([self.propagation_node, int(time.time())]) + self.propagation_destination.announce(app_data=data) + + def offer_request(self, path, data, request_id, remote_identity, requested_at): + # TODO: Remove + RNS.log("Offer request") + + if remote_identity == None: + return LXMPeer.ERROR_NO_IDENTITY + else: + try: + # TODO: Remove + RNS.log(str(data)) + + transient_ids = data + wanted_ids = [] + + for transient_id in transient_ids: + if not transient_id in self.propagation_entries: + wanted_ids.append(transient_id) + + if len(wanted_ids) == 0: + return False + + elif len(wanted_ids) == len(transient_ids): + return True + + else: + return wanted_ids + + except Exception as e: + RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG) + return None + + def propagation_link_established(self, link): + link.set_resource_strategy(RNS.Link.ACCEPT_ALL) + link.set_resource_started_callback(self.resource_transfer_began) + link.set_resource_concluded_callback(self.propagation_resource_concluded) + + + def propagation_resource_concluded(self, resource): + RNS.log("Transfer concluded for delivery resource "+str(resource), RNS.LOG_DEBUG) + if resource.status == RNS.Resource.COMPLETE: + # TODO: The peer this was received from should + # have the transient id added to it's list of + # already handled messages. + self.lxmf_propagation(resource.data.read()) + + + def lxmf_propagation(self, lxmf_data): + try: + if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD: + transient_id = RNS.Identity.full_hash(lxmf_data) + if not transient_id in self.propagation_entries and not transient_id in self.propagated_ids: + received = time.time() + propagation_entry = [transient_id, received, lxmf_data] + + self.propagation_entries[transient_id] = propagation_entry + + for peer_id in self.peers: + peer = self.peers[peer_id] + peer.handle_message(transient_id) + + return True + + return False + + except Exception as e: + RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_NOTICE) + RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + return False + + + def peer(self, destination_hash, timestamp): + if destination_hash in self.peers: + peer = self.peers[destination_hash] + peer.alive = True + peer.peering_timebase = timestamp + peer.last_heard = time.time() + else: + peer = LXMPeer(self, destination_hash) + self.peers[destination_hash] = peer + RNS.log(str(self.propagation_destination)+" peered with "+str(peer.destination)) + + def unpeer(self, destination_hash, timestamp = None): + if timestamp == None: + timestamp = int(time.time()) + + if destination_hash in self.peers: + peer = self.peers[destination_hash] + + if timestamp >= peer.peering_timebase: + self.peers.pop(destination_hash) + RNS.log(str(self.propagation_destination)+" broke peering with "+str(peer.destination)) + + def jobloop(self): while (True): # TODO: Improve this to scheduling, so manual @@ -606,9 +918,17 @@ class LXMRouter: self.jobs() time.sleep(LXMRouter.PROCESSING_INTERVAL) + JOB_OUTBOUND_INTERVAL = 1 + JOB_LINKS_INTERVAL = 1 def jobs(self): - self.process_outbound() - self.clean_links() + if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: + self.process_outbound() + + if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: + self.clean_links() + + self.processing_count += 1 + def clean_links(self): closed_links = []