Enqueue and batch process distribution queue mappings

This commit is contained in:
Mark Qvist 2025-01-21 20:20:39 +01:00
parent 1c9c744107
commit 1430b1ce90
2 changed files with 79 additions and 14 deletions

View File

@ -4,6 +4,7 @@ import time
import RNS import RNS
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
from collections import deque
from .LXMF import APP_NAME from .LXMF import APP_NAME
class LXMPeer: class LXMPeer:
@ -122,6 +123,8 @@ class LXMPeer:
self.link_establishment_rate = 0 self.link_establishment_rate = 0
self.sync_transfer_rate = 0 self.sync_transfer_rate = 0
self.propagation_transfer_limit = None self.propagation_transfer_limit = None
self.handled_messages_queue = deque()
self.unhandled_messages_queue = deque()
self._hm_count = 0 self._hm_count = 0
self._um_count = 0 self._um_count = 0
@ -351,10 +354,39 @@ class LXMPeer:
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def new_propagation_message(self, transient_id): def queued_items(self):
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages: return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0
def queue_unhandled_message(self, transient_id):
self.unhandled_messages_queue.append(transient_id)
def queue_handled_message(self, transient_id):
self.handled_messages_queue.append(transient_id)
def process_queues(self):
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
# TODO: Remove debug
# st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue)
handled_messages = self.handled_messages
unhandled_messages = self.unhandled_messages
while len(self.handled_messages_queue) > 0:
transient_id = self.handled_messages_queue.pop()
if not transient_id in handled_messages:
self.add_handled_message(transient_id)
if transient_id in unhandled_messages:
self.remove_unhandled_message(transient_id)
while len(self.unhandled_messages_queue) > 0:
transient_id = self.unhandled_messages_queue.pop()
if not transient_id in handled_messages and not transient_id in unhandled_messages:
self.add_unhandled_message(transient_id) self.add_unhandled_message(transient_id)
del handled_messages, unhandled_messages
# TODO: Remove debug
# RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}")
@property @property
def handled_messages(self): def handled_messages(self):
pes = self.router.propagation_entries.copy() pes = self.router.propagation_entries.copy()
@ -387,11 +419,9 @@ class LXMPeer:
def _update_counts(self): def _update_counts(self):
if not self._hm_counts_synced: if not self._hm_counts_synced:
RNS.log("UPDATE HM COUNTS")
hm = self.handled_messages; del hm hm = self.handled_messages; del hm
if not self._um_counts_synced: if not self._um_counts_synced:
RNS.log("UPDATE UM COUNTS")
um = self.unhandled_messages; del um um = self.unhandled_messages; del um
def add_handled_message(self, transient_id): def add_handled_message(self, transient_id):

View File

@ -8,6 +8,8 @@ import atexit
import signal import signal
import threading import threading
from collections import deque
import RNS import RNS
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
@ -143,6 +145,8 @@ class LXMRouter:
self.peers = {} self.peers = {}
self.propagation_entries = {} self.propagation_entries = {}
self.peer_distribution_queue = deque()
RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self)) RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self))
RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self)) RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self))
@ -613,6 +617,7 @@ class LXMRouter:
JOB_TRANSIENT_INTERVAL = 60 JOB_TRANSIENT_INTERVAL = 60
JOB_STORE_INTERVAL = 120 JOB_STORE_INTERVAL = 120
JOB_PEERSYNC_INTERVAL = 12 JOB_PEERSYNC_INTERVAL = 12
JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL
def jobs(self): def jobs(self):
if not self.exit_handler_running: if not self.exit_handler_running:
self.processing_count += 1 self.processing_count += 1
@ -632,6 +637,9 @@ class LXMRouter:
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
self.clean_message_store() self.clean_message_store()
if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0:
self.flush_queues()
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
self.sync_peers() self.sync_peers()
@ -647,6 +655,17 @@ class LXMRouter:
RNS.trace_exception(e) RNS.trace_exception(e)
time.sleep(LXMRouter.PROCESSING_INTERVAL) time.sleep(LXMRouter.PROCESSING_INTERVAL)
def flush_queues(self):
self.flush_peer_distribution_queue()
RNS.log("Calculating peer distribution queue mappings...", RNS.LOG_DEBUG); st = time.time()
for peer_id in self.peers.copy():
if peer_id in self.peers:
peer = self.peers[peer_id]
if peer.queued_items():
peer.process_queues()
RNS.log(f"Distribution queue mapping completed in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG)
def clean_links(self): def clean_links(self):
closed_links = [] closed_links = []
for link_hash in self.direct_links: for link_hash in self.direct_links:
@ -1047,6 +1066,7 @@ class LXMRouter:
RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR) RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE) RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE)
self.flush_queues()
if self.propagation_node: if self.propagation_node:
try: try:
st = time.time(); RNS.log("Saving peer synchronisation states to storage...", RNS.LOG_NOTICE) st = time.time(); RNS.log("Saving peer synchronisation states to storage...", RNS.LOG_NOTICE)
@ -1608,8 +1628,9 @@ class LXMRouter:
culled_peers = [] culled_peers = []
waiting_peers = [] waiting_peers = []
unresponsive_peers = [] unresponsive_peers = []
for peer_id in self.peers: peers = self.peers.copy()
peer = self.peers[peer_id] for peer_id in peers:
peer = peers[peer_id]
if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
culled_peers.append(peer_id) culled_peers.append(peer_id)
else: else:
@ -1754,7 +1775,7 @@ class LXMRouter:
self.lxmf_propagation(lxmf_data, from_peer=peer) self.lxmf_propagation(lxmf_data, from_peer=peer)
if peer != None: if peer != None:
peer.add_handled_message(transient_id) peer.queue_handled_message(transient_id)
else: else:
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
@ -1763,6 +1784,24 @@ class LXMRouter:
RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
RNS.trace_exception(e) RNS.trace_exception(e)
def enqueue_peer_distribution(self, transient_id, from_peer):
self.peer_distribution_queue.append([transient_id, from_peer])
def flush_peer_distribution_queue(self):
if len(self.peer_distribution_queue) > 0:
entries = []
while len(self.peer_distribution_queue) > 0:
entries.append(self.peer_distribution_queue.pop())
for peer_id in self.peers.copy():
if peer_id in self.peers:
peer = self.peers[peer_id]
for entry in entries:
transient_id = entry[0]
from_peer = entry[1]
if peer != from_peer:
peer.queue_unhandled_message(transient_id)
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False, from_peer=None): def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False, from_peer=None):
no_stamp_enforcement = False no_stamp_enforcement = False
if is_paper_message: if is_paper_message:
@ -1797,13 +1836,9 @@ class LXMRouter:
msg_file.write(lxmf_data) msg_file.write(lxmf_data)
msg_file.close() msg_file.close()
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []]
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG)
for peer_id in self.peers: self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []]
peer = self.peers[peer_id] self.enqueue_peer_distribution(transient_id, from_peer)
if peer != from_peer:
peer.new_propagation_message(transient_id)
else: else:
# TODO: Add message to sneakernet queues when implemented # TODO: Add message to sneakernet queues when implemented