diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index add54da..74a40c7 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -4,6 +4,7 @@ import time import RNS import RNS.vendor.umsgpack as msgpack +from collections import deque from .LXMF import APP_NAME class LXMPeer: @@ -122,6 +123,8 @@ class LXMPeer: self.link_establishment_rate = 0 self.sync_transfer_rate = 0 self.propagation_transfer_limit = None + self.handled_messages_queue = deque() + self.unhandled_messages_queue = deque() self._hm_count = 0 self._um_count = 0 @@ -351,9 +354,38 @@ class LXMPeer: self.link = None self.state = LXMPeer.IDLE - def new_propagation_message(self, transient_id): - if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages: - self.add_unhandled_message(transient_id) + def queued_items(self): + return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0 + + def queue_unhandled_message(self, transient_id): + self.unhandled_messages_queue.append(transient_id) + + def queue_handled_message(self, transient_id): + self.handled_messages_queue.append(transient_id) + + def process_queues(self): + if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0: + # TODO: Remove debug + # st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue) + + handled_messages = self.handled_messages + unhandled_messages = self.unhandled_messages + + while len(self.handled_messages_queue) > 0: + transient_id = self.handled_messages_queue.pop() + if not transient_id in handled_messages: + self.add_handled_message(transient_id) + if transient_id in unhandled_messages: + self.remove_unhandled_message(transient_id) + + while len(self.unhandled_messages_queue) > 0: + transient_id = self.unhandled_messages_queue.pop() + if not transient_id in handled_messages and not transient_id in unhandled_messages: + self.add_unhandled_message(transient_id) + + del handled_messages, unhandled_messages + # TODO: Remove debug + # RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}") @property def handled_messages(self): @@ -387,11 +419,9 @@ class LXMPeer: def _update_counts(self): if not self._hm_counts_synced: - RNS.log("UPDATE HM COUNTS") hm = self.handled_messages; del hm if not self._um_counts_synced: - RNS.log("UPDATE UM COUNTS") um = self.unhandled_messages; del um def add_handled_message(self, transient_id): diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 9163824..1e62914 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -8,6 +8,8 @@ import atexit import signal import threading +from collections import deque + import RNS import RNS.vendor.umsgpack as msgpack @@ -143,6 +145,8 @@ class LXMRouter: self.peers = {} self.propagation_entries = {} + self.peer_distribution_queue = deque() + RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self)) RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self)) @@ -613,6 +617,7 @@ class LXMRouter: JOB_TRANSIENT_INTERVAL = 60 JOB_STORE_INTERVAL = 120 JOB_PEERSYNC_INTERVAL = 12 + JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL def jobs(self): if not self.exit_handler_running: self.processing_count += 1 @@ -632,6 +637,9 @@ class LXMRouter: if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: self.clean_message_store() + if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0: + self.flush_queues() + if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: self.sync_peers() @@ -647,6 +655,17 @@ class LXMRouter: RNS.trace_exception(e) time.sleep(LXMRouter.PROCESSING_INTERVAL) + def flush_queues(self): + self.flush_peer_distribution_queue() + RNS.log("Calculating peer distribution queue mappings...", RNS.LOG_DEBUG); st = time.time() + for peer_id in self.peers.copy(): + if peer_id in self.peers: + peer = self.peers[peer_id] + if peer.queued_items(): + peer.process_queues() + + RNS.log(f"Distribution queue mapping completed in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG) + def clean_links(self): closed_links = [] for link_hash in self.direct_links: @@ -1047,6 +1066,7 @@ class LXMRouter: RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR) RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE) + self.flush_queues() if self.propagation_node: try: st = time.time(); RNS.log("Saving peer synchronisation states to storage...", RNS.LOG_NOTICE) @@ -1608,8 +1628,9 @@ class LXMRouter: culled_peers = [] waiting_peers = [] unresponsive_peers = [] - for peer_id in self.peers: - peer = self.peers[peer_id] + peers = self.peers.copy() + for peer_id in peers: + peer = peers[peer_id] if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: culled_peers.append(peer_id) else: @@ -1754,7 +1775,7 @@ class LXMRouter: self.lxmf_propagation(lxmf_data, from_peer=peer) if peer != None: - peer.add_handled_message(transient_id) + peer.queue_handled_message(transient_id) else: RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) @@ -1763,6 +1784,24 @@ class LXMRouter: RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) RNS.trace_exception(e) + def enqueue_peer_distribution(self, transient_id, from_peer): + self.peer_distribution_queue.append([transient_id, from_peer]) + + def flush_peer_distribution_queue(self): + if len(self.peer_distribution_queue) > 0: + entries = [] + while len(self.peer_distribution_queue) > 0: + entries.append(self.peer_distribution_queue.pop()) + + for peer_id in self.peers.copy(): + if peer_id in self.peers: + peer = self.peers[peer_id] + for entry in entries: + transient_id = entry[0] + from_peer = entry[1] + if peer != from_peer: + peer.queue_unhandled_message(transient_id) + def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False, from_peer=None): no_stamp_enforcement = False if is_paper_message: @@ -1797,13 +1836,9 @@ class LXMRouter: msg_file.write(lxmf_data) msg_file.close() - self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []] - RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) - for peer_id in self.peers: - peer = self.peers[peer_id] - if peer != from_peer: - peer.new_propagation_message(transient_id) + self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []] + self.enqueue_peer_distribution(transient_id, from_peer) else: # TODO: Add message to sneakernet queues when implemented