diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index a88f6da..2b10987 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -63,12 +63,13 @@ class LXMPeer: for transient_id in dictionary["handled_ids"]: if transient_id in router.propagation_entries: - peer.handled_messages[transient_id] = router.propagation_entries[transient_id] + peer.handled_messages.append(transient_id) for transient_id in dictionary["unhandled_ids"]: if transient_id in router.propagation_entries: - peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id] + peer.unhandled_messages.append(transient_id) + del dictionary return peer def to_bytes(self): @@ -108,8 +109,8 @@ class LXMPeer: self.link = None self.state = LXMPeer.IDLE - self.unhandled_messages = {} - self.handled_messages = {} + self.unhandled_messages = [] + self.handled_messages = [] self.last_offer = [] self.router = router @@ -118,6 +119,7 @@ class LXMPeer: if self.identity != None: self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") else: + self.destination = None RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING) def sync(self): @@ -171,7 +173,7 @@ class LXMPeer: for transient_id in purged_ids: RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) - self.unhandled_messages.pop(transient_id) + self.unhandled_messages.remove(transient_id) unhandled_entries.sort(key=lambda e: e[1], reverse=False) per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now @@ -189,7 +191,7 @@ class LXMPeer: RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) self.last_offer = unhandled_ids - self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed) + self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) self.state = LXMPeer.REQUEST_SENT else: @@ -226,13 +228,14 @@ class LXMPeer: # Peer already has all advertised messages for transient_id in self.last_offer: if transient_id in self.unhandled_messages: - self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id) + self.handled_messages.append(transient_id) + self.unhandled_messages.remove(transient_id) elif response == True: # Peer wants all advertised messages for transient_id in self.last_offer: - wanted_messages.append(self.unhandled_messages[transient_id]) + wanted_messages.append(self.router.propagation_entries[transient_id]) wanted_message_ids.append(transient_id) else: @@ -242,10 +245,11 @@ class LXMPeer: # already received it from another peer. if not transient_id in response: if transient_id in self.unhandled_messages: - self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id) + self.handled_messages.append(transient_id) + self.unhandled_messages.remove(transient_id) for transient_id in response: - wanted_messages.append(self.unhandled_messages[transient_id]) + wanted_messages.append(self.router.propagation_entries[transient_id]) wanted_message_ids.append(transient_id) if len(wanted_messages) > 0: @@ -288,8 +292,8 @@ class LXMPeer: def resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: for transient_id in resource.transferred_messages: - message = self.unhandled_messages.pop(transient_id) - self.handled_messages[transient_id] = message + self.handled_messages.append(transient_id) + self.unhandled_messages.remove(transient_id) if self.link != None: self.link.teardown() @@ -330,7 +334,7 @@ class LXMPeer: def handle_message(self, transient_id): if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages: - self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id] + self.unhandled_messages.append(transient_id) def __str__(self): if self.destination_hash: diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 79678c6..a19f401 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -1,5 +1,6 @@ import os import time +import math import random import base64 import atexit @@ -427,6 +428,8 @@ class LXMRouter: os.makedirs(self.messagepath) self.propagation_entries = {} + + st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE) for filename in os.listdir(self.messagepath): components = filename.split("_") if len(components) == 2: @@ -452,9 +455,13 @@ class LXMRouter: except Exception as e: RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) + et = time.time(); RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {math.floor(len(self.propagation_entries)/(et-st))} msgs/s", RNS.LOG_NOTICE) + st = time.time(); RNS.log("Loading propagation node peers...", RNS.LOG_NOTICE) + if os.path.isfile(self.storagepath+"/peers"): peers_file = open(self.storagepath+"/peers", "rb") peers_data = peers_file.read() + peers_file.close() if len(peers_data) > 0: serialised_peers = msgpack.unpackb(peers_data) @@ -468,8 +475,13 @@ class LXMRouter: lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit" RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG) else: + del peer RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) + del serialised_peers + del peers_data + + RNS.log(f"Loaded {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE) self.propagation_node = True self.propagation_destination.set_link_established_callback(self.propagation_link_established) @@ -1676,7 +1688,7 @@ class LXMRouter: if remote_hash != None and remote_hash in self.peers: transient_id = RNS.Identity.full_hash(lxmf_data) peer = self.peers[remote_hash] - peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data] + peer.handled_messages.append(transient_id) self.lxmf_propagation(lxmf_data) else: