From 093d4f1317a920bfdb3f98cc4586e801af25e970 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sun, 3 Oct 2021 10:15:29 +0200 Subject: [PATCH] Implemented propagation node auto reverse peering --- LXMF/LXMF.py | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index 9002b7e..cccad31 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -547,6 +547,9 @@ class LXMPeer: # TODO: Remove RNS.log("Sending sync request to peer") self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) + else: + # TODO: Remove + RNS.log("No unsynced messages") def request_failed(self, request_receipt): RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) @@ -577,8 +580,8 @@ class LXMPeer: RNS.log("Peer had all advertised messages", RNS.LOG_DEBUG) for transient_id in self.unhandled_messages: - message = self.unhandled_messages[transient_id] - self.handled_messages[transient_id] = message + message_entry = self.unhandled_messages[transient_id] + self.handled_messages[transient_id] = message_entry self.unhandled_messages = {} @@ -594,8 +597,8 @@ class LXMPeer: # If the peer did not want the message, it has # already received it from another peer. if not transient_id in response: - message = self.unhandled_messages.pop(transient_id) - self.handled_messages[transient_id] = message + message_entry = self.unhandled_messages.pop(transient_id) + self.handled_messages[transient_id] = message_entry for transient_id in response: wanted_messages.append(self.unhandled_messages[transient_id][2]) @@ -605,7 +608,7 @@ class LXMPeer: # TODO: Remove RNS.log("Peer wanted: "+str(wanted_messages), RNS.LOG_DEBUG) - data = msgpack.packb(wanted_messages) + data = msgpack.packb([time.time(), wanted_messages]) resource = RNS.Resource(data, self.link, callback = self.resource_concluded) resource.transferred_messages = wanted_message_ids self.state = LXMPeer.RESOURCE_TRANSFERRING @@ -856,15 +859,32 @@ class LXMRouter: def propagation_resource_concluded(self, resource): - RNS.log("Transfer concluded for delivery resource "+str(resource), RNS.LOG_DEBUG) + RNS.log("Transfer concluded for propagation resource "+str(resource), RNS.LOG_DEBUG) if resource.status == RNS.Resource.COMPLETE: # TODO: The peer this was received from should # have the transient id added to it's list of # already handled messages. try: - messages = msgpack.unpackb(resource.data.read()) - for message in messages: - self.lxmf_propagation(lxmessage) + data = msgpack.unpackb(resource.data.read()) + remote_timebase = data[0] + remote_hash = None + remote_identity = resource.link.get_remote_identity() + + if remote_identity != None: + remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + remote_hash = remote_destination.hash + + if not remote_hash in self.peers: + self.peer(remote_hash, remote_timebase) + + messages = data[1] + for lxmf_data in messages: + if remote_hash in self.peers: + transient_id = RNS.Identity.full_hash(lxmf_data) + peer = self.peers[remote_hash] + peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data] + + self.lxmf_propagation(lxmf_data) except Exception as e: RNS.log("Error while unpacking received propagation messages", RNS.LOG_DEBUG) @@ -880,6 +900,9 @@ class LXMRouter: self.propagation_entries[transient_id] = propagation_entry + # TODO: Remove? + RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) + for peer_id in self.peers: peer = self.peers[peer_id] peer.handle_message(transient_id)