diff --git a/.github/ISSUE_TEMPLATE/🐛-bug-report.md b/.github/ISSUE_TEMPLATE/🐛-bug-report.md index 65b492e..77ad6c2 100644 --- a/.github/ISSUE_TEMPLATE/🐛-bug-report.md +++ b/.github/ISSUE_TEMPLATE/🐛-bug-report.md @@ -12,7 +12,7 @@ Before creating a bug report on this issue tracker, you **must** read the [Contr - The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**. - Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**. -- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**. +- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), delete this section from your bug report. **Describe the Bug** A clear and concise description of what the bug is. diff --git a/FUNDING.yml b/FUNDING.yml deleted file mode 100644 index d125d55..0000000 --- a/FUNDING.yml +++ /dev/null @@ -1,3 +0,0 @@ -liberapay: Reticulum -ko_fi: markqvist -custom: "https://unsigned.io/donate" diff --git a/LICENSE b/LICENSE index f5fb92d..a25bd7a 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -Reticulum License +MIT License -Copyright (c) 2020-2025 Mark Qvist +Copyright (c) 2020 Mark Qvist / unsigned.io Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -9,16 +9,8 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -- The Software shall not be used in any kind of system which includes amongst - its functions the ability to purposefully do harm to human beings. - -- The Software shall not be used, directly or indirectly, in the creation of - an artificial intelligence, machine learning or language model training - dataset, including but not limited to any use that contributes to the - training or development of such a model or algorithm. - -- The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, diff --git a/LXMF/Handlers.py b/LXMF/Handlers.py index 5671170..7420ea5 100644 --- a/LXMF/Handlers.py +++ b/LXMF/Handlers.py @@ -1,5 +1,4 @@ import time -import threading import RNS import RNS.vendor.umsgpack as msgpack @@ -18,11 +17,10 @@ class LXMFDeliveryAnnounceHandler: if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC: lxmessage.next_delivery_attempt = time.time() - def outbound_trigger(): - while self.lxmrouter.processing_outbound: time.sleep(0.1) - self.lxmrouter.process_outbound() + while self.lxmrouter.processing_outbound: + time.sleep(0.1) - threading.Thread(target=outbound_trigger, daemon=True).start() + self.lxmrouter.process_outbound() try: stamp_cost = stamp_cost_from_app_data(app_data) @@ -47,29 +45,18 @@ class LXMFPropagationAnnounceHandler: if pn_announce_data_is_valid(data): node_timebase = data[1] propagation_transfer_limit = None - wanted_inbound_peers = None - if len(data) >= 4: - # TODO: Rethink, probably not necessary anymore - # try: - # wanted_inbound_peers = int(data[3]) - # except: - # wanted_inbound_peers = None - pass - if len(data) >= 3: - try: propagation_transfer_limit = float(data[2]) - except: propagation_transfer_limit = None + try: + propagation_transfer_limit = float(data[2]) + except: + propagation_transfer_limit = None - if destination_hash in self.lxmrouter.static_peers: - self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers) + if data[0] == True: + if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: + self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit) - else: - if data[0] == True: - if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: - self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers) - - elif data[0] == False: - self.lxmrouter.unpeer(destination_hash, node_timebase) + elif data[0] == False: + self.lxmrouter.unpeer(destination_hash, node_timebase) except Exception as e: RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index c1294bd..a88f6da 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -4,7 +4,6 @@ import time import RNS import RNS.vendor.umsgpack as msgpack -from collections import deque from .LXMF import APP_NAME class LXMPeer: @@ -20,7 +19,6 @@ class LXMPeer: ERROR_NO_IDENTITY = 0xf0 ERROR_NO_ACCESS = 0xf1 - ERROR_TIMEOUT = 0xfe # Maximum amount of time a peer can # be unreachable before it is removed @@ -40,16 +38,11 @@ class LXMPeer: @staticmethod def from_bytes(peer_bytes, router): dictionary = msgpack.unpackb(peer_bytes) - peer_destination_hash = dictionary["destination_hash"] - peer_peering_timebase = dictionary["peering_timebase"] - peer_alive = dictionary["alive"] - peer_last_heard = dictionary["last_heard"] - - peer = LXMPeer(router, peer_destination_hash) - peer.peering_timebase = peer_peering_timebase - peer.alive = peer_alive - peer.last_heard = peer_last_heard + peer = LXMPeer(router, dictionary["destination_hash"]) + peer.peering_timebase = dictionary["peering_timebase"] + peer.alive = dictionary["alive"] + peer.last_heard = dictionary["last_heard"] if "link_establishment_rate" in dictionary: peer.link_establishment_rate = dictionary["link_establishment_rate"] else: @@ -67,55 +60,15 @@ class LXMPeer: peer.propagation_transfer_limit = None else: peer.propagation_transfer_limit = None - - if "offered" in dictionary: - peer.offered = dictionary["offered"] - else: - peer.offered = 0 - if "outgoing" in dictionary: - peer.outgoing = dictionary["outgoing"] - else: - peer.outgoing = 0 - - if "incoming" in dictionary: - peer.incoming = dictionary["incoming"] - else: - peer.incoming = 0 - - if "rx_bytes" in dictionary: - peer.rx_bytes = dictionary["rx_bytes"] - else: - peer.rx_bytes = 0 - - if "tx_bytes" in dictionary: - peer.tx_bytes = dictionary["tx_bytes"] - else: - peer.tx_bytes = 0 - - if "last_sync_attempt" in dictionary: - peer.last_sync_attempt = dictionary["last_sync_attempt"] - else: - peer.last_sync_attempt = 0 - - hm_count = 0 for transient_id in dictionary["handled_ids"]: if transient_id in router.propagation_entries: - peer.add_handled_message(transient_id) - hm_count += 1 + peer.handled_messages[transient_id] = router.propagation_entries[transient_id] - um_count = 0 for transient_id in dictionary["unhandled_ids"]: if transient_id in router.propagation_entries: - peer.add_unhandled_message(transient_id) - um_count += 1 + peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id] - peer._hm_count = hm_count - peer._um_count = um_count - peer._hm_counts_synced = True - peer._um_counts_synced = True - - del dictionary return peer def to_bytes(self): @@ -127,12 +80,6 @@ class LXMPeer: dictionary["link_establishment_rate"] = self.link_establishment_rate dictionary["sync_transfer_rate"] = self.sync_transfer_rate dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit - dictionary["last_sync_attempt"] = self.last_sync_attempt - dictionary["offered"] = self.offered - dictionary["outgoing"] = self.outgoing - dictionary["incoming"] = self.incoming - dictionary["rx_bytes"] = self.rx_bytes - dictionary["tx_bytes"] = self.tx_bytes handled_ids = [] for transient_id in self.handled_messages: @@ -145,10 +92,7 @@ class LXMPeer: dictionary["handled_ids"] = handled_ids dictionary["unhandled_ids"] = unhandled_ids - peer_bytes = msgpack.packb(dictionary) - del dictionary - - return peer_bytes + return msgpack.packb(dictionary) def __init__(self, router, destination_hash): self.alive = False @@ -160,23 +104,12 @@ class LXMPeer: self.link_establishment_rate = 0 self.sync_transfer_rate = 0 self.propagation_transfer_limit = None - self.handled_messages_queue = deque() - self.unhandled_messages_queue = deque() - - self.offered = 0 # Messages offered to this peer - self.outgoing = 0 # Messages transferred to this peer - self.incoming = 0 # Messages received from this peer - self.rx_bytes = 0 # Bytes received from this peer - self.tx_bytes = 0 # Bytes sent to this peer - - self._hm_count = 0 - self._um_count = 0 - self._hm_counts_synced = False - self._um_counts_synced = False self.link = None self.state = LXMPeer.IDLE + self.unhandled_messages = {} + self.handled_messages = {} self.last_offer = [] self.router = router @@ -185,7 +118,6 @@ class LXMPeer: if self.identity != None: self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") else: - self.destination = None RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING) def sync(self): @@ -239,7 +171,7 @@ class LXMPeer: for transient_id in purged_ids: RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) - self.remove_unhandled_message(transient_id) + self.unhandled_messages.pop(transient_id) unhandled_entries.sort(key=lambda e: e[1], reverse=False) per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now @@ -250,17 +182,14 @@ class LXMPeer: lxm_size = unhandled_entry[2] next_size = cumulative_size + (lxm_size+per_message_overhead) if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000): - if lxm_size+per_message_overhead > (self.propagation_transfer_limit*1000): - self.remove_unhandled_message(transient_id) - self.add_handled_message(transient_id) - RNS.log(f"Message {RNS.prettyhexrep(transient_id)} exceeds transfer limit for {self}, considering handled", RNS.LOG_DEBUG) + pass else: cumulative_size += (lxm_size+per_message_overhead) unhandled_ids.append(transient_id) - RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_VERBOSE) + RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) self.last_offer = unhandled_ids - self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) + self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed) self.state = LXMPeer.REQUEST_SENT else: @@ -288,29 +217,22 @@ class LXMPeer: if response == LXMPeer.ERROR_NO_IDENTITY: if self.link != None: - RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE) + RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG) self.link.identify() self.state = LXMPeer.LINK_READY self.sync() - return - - elif response == LXMPeer.ERROR_NO_ACCESS: - RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE) - self.router.unpeer(self.destination_hash) - return elif response == False: # Peer already has all advertised messages for transient_id in self.last_offer: if transient_id in self.unhandled_messages: - self.add_handled_message(transient_id) - self.remove_unhandled_message(transient_id) + self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id) elif response == True: # Peer wants all advertised messages for transient_id in self.last_offer: - wanted_messages.append(self.router.propagation_entries[transient_id]) + wanted_messages.append(self.unhandled_messages[transient_id]) wanted_message_ids.append(transient_id) else: @@ -319,17 +241,18 @@ class LXMPeer: # If the peer did not want the message, it has # already received it from another peer. if not transient_id in response: - self.add_handled_message(transient_id) - self.remove_unhandled_message(transient_id) + if transient_id in self.unhandled_messages: + self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id) for transient_id in response: - wanted_messages.append(self.router.propagation_entries[transient_id]) + wanted_messages.append(self.unhandled_messages[transient_id]) wanted_message_ids.append(transient_id) if len(wanted_messages) > 0: - RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_VERBOSE) + RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG) lxm_list = [] + for message_entry in wanted_messages: file_path = message_entry[1] if os.path.isfile(file_path): @@ -345,8 +268,7 @@ class LXMPeer: self.state = LXMPeer.RESOURCE_TRANSFERRING else: - RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_VERBOSE) - self.offered += len(self.last_offer) + RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG) if self.link != None: self.link.teardown() @@ -366,8 +288,8 @@ class LXMPeer: def resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: for transient_id in resource.transferred_messages: - self.add_handled_message(transient_id) - self.remove_unhandled_message(transient_id) + message = self.unhandled_messages.pop(transient_id) + self.handled_messages[transient_id] = message if self.link != None: self.link.teardown() @@ -380,15 +302,12 @@ class LXMPeer: self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started) rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}" - RNS.log(f"Syncing {len(resource.transferred_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE) + RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed"+rate_str, RNS.LOG_DEBUG) self.alive = True self.last_heard = time.time() - self.offered += len(self.last_offer) - self.outgoing += len(resource.transferred_messages) - self.tx_bytes += resource.get_data_size() else: - RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE) + RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG) if self.link != None: self.link.teardown() @@ -409,103 +328,9 @@ class LXMPeer: self.link = None self.state = LXMPeer.IDLE - def queued_items(self): - return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0 - - def queue_unhandled_message(self, transient_id): - self.unhandled_messages_queue.append(transient_id) - - def queue_handled_message(self, transient_id): - self.handled_messages_queue.append(transient_id) - - def process_queues(self): - if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0: - # TODO: Remove debug - # st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue) - - handled_messages = self.handled_messages - unhandled_messages = self.unhandled_messages - - while len(self.handled_messages_queue) > 0: - transient_id = self.handled_messages_queue.pop() - if not transient_id in handled_messages: - self.add_handled_message(transient_id) - if transient_id in unhandled_messages: - self.remove_unhandled_message(transient_id) - - while len(self.unhandled_messages_queue) > 0: - transient_id = self.unhandled_messages_queue.pop() - if not transient_id in handled_messages and not transient_id in unhandled_messages: - self.add_unhandled_message(transient_id) - - del handled_messages, unhandled_messages - # TODO: Remove debug - # RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}") - - @property - def handled_messages(self): - pes = self.router.propagation_entries.copy() - hm = list(filter(lambda tid: self.destination_hash in pes[tid][4], pes)) - self._hm_count = len(hm); del pes - self._hm_counts_synced = True - return hm - - @property - def unhandled_messages(self): - pes = self.router.propagation_entries.copy() - um = list(filter(lambda tid: self.destination_hash in pes[tid][5], pes)) - self._um_count = len(um); del pes - self._um_counts_synced = True - return um - - @property - def handled_message_count(self): - if not self._hm_counts_synced: - self._update_counts() - - return self._hm_count - - @property - def unhandled_message_count(self): - if not self._um_counts_synced: - self._update_counts() - - return self._um_count - - @property - def acceptance_rate(self): - return 0 if self.offered == 0 else (self.outgoing/self.offered) - - def _update_counts(self): - if not self._hm_counts_synced: - hm = self.handled_messages; del hm - - if not self._um_counts_synced: - um = self.unhandled_messages; del um - - def add_handled_message(self, transient_id): - if transient_id in self.router.propagation_entries: - if not self.destination_hash in self.router.propagation_entries[transient_id][4]: - self.router.propagation_entries[transient_id][4].append(self.destination_hash) - self._hm_counts_synced = False - - def add_unhandled_message(self, transient_id): - if transient_id in self.router.propagation_entries: - if not self.destination_hash in self.router.propagation_entries[transient_id][5]: - self.router.propagation_entries[transient_id][5].append(self.destination_hash) - self._um_count += 1 - - def remove_handled_message(self, transient_id): - if transient_id in self.router.propagation_entries: - if self.destination_hash in self.router.propagation_entries[transient_id][4]: - self.router.propagation_entries[transient_id][4].remove(self.destination_hash) - self._hm_counts_synced = False - - def remove_unhandled_message(self, transient_id): - if transient_id in self.router.propagation_entries: - if self.destination_hash in self.router.propagation_entries[transient_id][5]: - self.router.propagation_entries[transient_id][5].remove(self.destination_hash) - self._um_counts_synced = False + def handle_message(self, transient_id): + if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages: + self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id] def __str__(self): if self.destination_hash: diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 5b7a5c2..79678c6 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -1,15 +1,10 @@ import os -import sys import time -import math import random import base64 import atexit -import signal import threading -from collections import deque - import RNS import RNS.vendor.umsgpack as msgpack @@ -37,12 +32,9 @@ class LXMRouter: NODE_ANNOUNCE_DELAY = 20 - MAX_PEERS = 50 AUTOPEER = True AUTOPEER_MAXDEPTH = 4 FASTEST_N_RANDOM_POOL = 2 - ROTATION_HEADROOM_PCT = 10 - ROTATION_AR_MAX = 0.5 PROPAGATION_LIMIT = 256 DELIVERY_LIMIT = 1000 @@ -66,16 +58,11 @@ class LXMRouter: PR_ALL_MESSAGES = 0x00 - STATS_GET_PATH = "/pn/get/stats" - ### Developer-facing API ############################## ####################################################### - def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None, - propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, enforce_ratchets=False, - enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False): - + def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None, propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT, enforce_ratchets = False, enforce_stamps = False): random.seed(os.urandom(10)) self.pending_inbound = [] @@ -96,7 +83,6 @@ class LXMRouter: self.processing_count = 0 self.propagation_node = False - self.propagation_node_start_time = None if storagepath == None: raise ValueError("LXMF cannot be initialised without a storage path") @@ -107,9 +93,6 @@ class LXMRouter: self.outbound_propagation_node = None self.outbound_propagation_link = None - if delivery_limit == None: - delivery_limit = LXMRouter.DELIVERY_LIMIT - self.message_storage_limit = None self.information_storage_limit = None self.propagation_per_transfer_limit = propagation_limit @@ -124,7 +107,6 @@ class LXMRouter: self.propagation_transfer_progress = 0.0 self.propagation_transfer_last_result = None self.propagation_transfer_max_messages = None - self.prioritise_rotating_unreachable_peers = False self.active_propagation_links = [] self.locally_delivered_transient_ids = {} self.locally_processed_transient_ids = {} @@ -134,18 +116,12 @@ class LXMRouter: self.cost_file_lock = threading.Lock() self.ticket_file_lock = threading.Lock() self.stamp_gen_lock = threading.Lock() - self.exit_handler_running = False if identity == None: identity = RNS.Identity() self.identity = identity self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation") - self.control_destination = None - self.client_propagation_messages_received = 0 - self.client_propagation_messages_served = 0 - self.unpeered_propagation_incoming = 0 - self.unpeered_propagation_rx_bytes = 0 if autopeer != None: self.autopeer = autopeer @@ -157,32 +133,9 @@ class LXMRouter: else: self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH - if max_peers == None: - self.max_peers = LXMRouter.MAX_PEERS - else: - if type(max_peers) == int and max_peers >= 0: - self.max_peers = max_peers - else: - raise ValueError(f"Invalid value for max_peers: {max_peers}") - - self.from_static_only = from_static_only - if type(static_peers) != list: - raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}") - else: - for static_peer in static_peers: - if type(static_peer) != bytes: - raise ValueError(f"Invalid static peer destination hash: {static_peer}") - else: - if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: - raise ValueError(f"Invalid static peer destination hash: {static_peer}") - - self.static_peers = static_peers - self.peers = {} self.propagation_entries = {} - self.peer_distribution_queue = deque() - RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self)) RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self)) @@ -267,8 +220,6 @@ class LXMRouter: RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) atexit.register(self.exit_handler) - signal.signal(signal.SIGINT, self.sigint_handler) - signal.signal(signal.SIGTERM, self.sigterm_handler) job_thread = threading.Thread(target=self.jobloop) job_thread.setDaemon(True) @@ -281,12 +232,10 @@ class LXMRouter: def announce_propagation_node(self): def delayed_announce(): time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) - node_state = self.propagation_node and not self.from_static_only announce_data = [ - node_state, # Boolean flag signalling propagation node state + self.propagation_node, # Boolean flag signalling propagation node state int(time.time()), # Current node timebase self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes - None, # How many more inbound peers this node wants ] data = msgpack.packb(announce_data) @@ -478,8 +427,6 @@ class LXMRouter: os.makedirs(self.messagepath) self.propagation_entries = {} - - st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE) for filename in os.listdir(self.messagepath): components = filename.split("_") if len(components) == 2: @@ -496,94 +443,41 @@ class LXMRouter: file.close() self.propagation_entries[transient_id] = [ - destination_hash, # 0: Destination hash - filepath, # 1: Storage location - received, # 2: Receive timestamp - msg_size, # 3: Message size - [], # 4: Handled peers - [], # 5: Unhandled peers + destination_hash, + filepath, + received, + msg_size, ] except Exception as e: RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) - et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st)) - RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE) - RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE) - st = time.time(); - if os.path.isfile(self.storagepath+"/peers"): peers_file = open(self.storagepath+"/peers", "rb") peers_data = peers_file.read() - peers_file.close() if len(peers_data) > 0: serialised_peers = msgpack.unpackb(peers_data) - del peers_data - while len(serialised_peers) > 0: - serialised_peer = serialised_peers.pop() + for serialised_peer in serialised_peers: peer = LXMPeer.from_bytes(serialised_peer, self) - del serialised_peer - if peer.destination_hash in self.static_peers and peer.last_heard == 0: - # TODO: Allow path request responses through announce handler - # momentarily here, so peering config can be updated even if - # the static peer is not available to directly send an announce. - RNS.Transport.request_path(peer.destination_hash) if peer.identity != None: self.peers[peer.destination_hash] = peer lim_str = ", no transfer limit" if peer.propagation_transfer_limit != None: lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit" - RNS.log("Rebuilt peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(peer.unhandled_message_count)+" unhandled messages"+lim_str, RNS.LOG_DEBUG) + RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG) else: RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) - del peer - del serialised_peers - - if len(self.static_peers) > 0: - for static_peer in self.static_peers: - if not static_peer in self.peers: - RNS.log(f"Activating static peering with {RNS.prettyhexrep(static_peer)}", RNS.LOG_NOTICE) - self.peers[static_peer] = LXMPeer(self, static_peer) - if self.peers[static_peer].last_heard == 0: - # TODO: Allow path request responses through announce handler - # momentarily here, so peering config can be updated even if - # the static peer is not available to directly send an announce. - RNS.Transport.request_path(static_peer) - - RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE) - - try: - if os.path.isfile(self.storagepath+"/node_stats"): - node_stats_file = open(self.storagepath+"/node_stats", "rb") - data = node_stats_file.read() - node_stats_file.close() - node_stats = msgpack.unpackb(data) - - if not type(node_stats) == dict: - RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR) - else: - self.client_propagation_messages_received = node_stats["client_propagation_messages_received"] - self.client_propagation_messages_served = node_stats["client_propagation_messages_served"] - self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"] - self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"] - - except Exception as e: - RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR) self.propagation_node = True - self.propagation_node_start_time = time.time() self.propagation_destination.set_link_established_callback(self.propagation_link_established) self.propagation_destination.set_packet_callback(self.propagation_packet) self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL) - self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") - self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash]) - if self.message_storage_limit != None: limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit) else: @@ -686,76 +580,6 @@ class LXMRouter: return False - ### Propagation Node Control ########################## - ####################################################### - - def compile_stats(self): - if not self.propagation_node: - return None - else: - peer_stats = {} - for peer_id in self.peers.copy(): - peer = self.peers[peer_id] - peer_stats[peer_id] = { - "type": "static" if peer_id in self.static_peers else "discovered", - "state": peer.state, - "alive": peer.alive, - "last_heard": int(peer.last_heard), - "next_sync_attempt": peer.next_sync_attempt, - "last_sync_attempt": peer.last_sync_attempt, - "sync_backoff": peer.sync_backoff, - "peering_timebase": peer.peering_timebase, - "ler": int(peer.link_establishment_rate), - "str": int(peer.sync_transfer_rate), - "transfer_limit": peer.propagation_transfer_limit, - "network_distance": RNS.Transport.hops_to(peer_id), - "rx_bytes": peer.rx_bytes, - "tx_bytes": peer.tx_bytes, - "messages": { - "offered": peer.offered, - "outgoing": peer.outgoing, - "incoming": peer.incoming, - "unhandled": peer.unhandled_message_count - }, - } - - node_stats = { - "identity_hash": self.identity.hash, - "destination_hash": self.propagation_destination.hash, - "uptime": time.time()-self.propagation_node_start_time, - "delivery_limit": self.delivery_per_transfer_limit, - "propagation_limit": self.propagation_per_transfer_limit, - "autopeer_maxdepth": self.autopeer_maxdepth, - "from_static_only": self.from_static_only, - "messagestore": { - "count": len(self.propagation_entries), - "bytes": self.message_storage_size(), - "limit": self.message_storage_limit, - }, - "clients" : { - "client_propagation_messages_received": self.client_propagation_messages_received, - "client_propagation_messages_served": self.client_propagation_messages_served, - }, - "unpeered_propagation_incoming": self.unpeered_propagation_incoming, - "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes, - "static_peers": len(self.static_peers), - "discovered_peers": len(self.peers)-len(self.static_peers), - "total_peers": len(self.peers), - "max_peers": self.max_peers, - "peers": peer_stats, - } - - return node_stats - - def stats_get_request(self, path, data, request_id, remote_identity, requested_at): - if remote_identity == None: - return LXMPeer.ERROR_NO_IDENTITY - elif remote_identity.hash != self.identity.hash: - return LXMPeer.ERROR_NO_ACCESS - else: - return self.compile_stats() - - ### Utility & Maintenance ############################# ####################################################### @@ -765,69 +589,44 @@ class LXMRouter: JOB_TRANSIENT_INTERVAL = 60 JOB_STORE_INTERVAL = 120 JOB_PEERSYNC_INTERVAL = 12 - JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL - JOB_ROTATE_INTERVAL = 56*JOB_PEERINGEST_INTERVAL def jobs(self): - if not self.exit_handler_running: - self.processing_count += 1 + self.processing_count += 1 - if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: - self.process_outbound() + if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: + self.process_outbound() - if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0: - threading.Thread(target=self.process_deferred_stamps, daemon=True).start() + if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0: + threading.Thread(target=self.process_deferred_stamps, daemon=True).start() - if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: - self.clean_links() + if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: + self.clean_links() - if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0: - self.clean_transient_id_caches() + if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0: + self.clean_transient_id_caches() - if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: - if self.propagation_node == True: - self.clean_message_store() + if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: + self.clean_message_store() - if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0: - if self.propagation_node == True: - self.flush_queues() - - if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0: - if self.propagation_node == True: - self.rotate_peers() - - if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: - if self.propagation_node == True: - self.sync_peers() + if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: + self.sync_peers() def jobloop(self): while (True): # TODO: Improve this to scheduling, so manual # triggers can delay next run + try: self.jobs() except Exception as e: RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - RNS.trace_exception(e) time.sleep(LXMRouter.PROCESSING_INTERVAL) - def flush_queues(self): - if len(self.peers) > 0: - self.flush_peer_distribution_queue() - RNS.log("Calculating peer distribution queue mappings...", RNS.LOG_DEBUG); st = time.time() - for peer_id in self.peers.copy(): - if peer_id in self.peers: - peer = self.peers[peer_id] - if peer.queued_items(): - peer.process_queues() - - RNS.log(f"Distribution queue mapping completed in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG) - def clean_links(self): closed_links = [] for link_hash in self.direct_links: link = self.direct_links[link_hash] - inactive_time = link.no_data_for() + inactive_time = link.inactive_for() if inactive_time > LXMRouter.LINK_MAX_INACTIVITY: link.teardown() @@ -894,11 +693,6 @@ class LXMRouter: self.save_outbound_stamp_costs() threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start() - def get_wanted_inbound_peers(self): - # TODO: Implement/rethink. - # Probably not necessary anymore. - return None - def get_announce_app_data(self, destination_hash): if destination_hash in self.delivery_destinations: delivery_destination = self.delivery_destinations[destination_hash] @@ -1000,12 +794,12 @@ class LXMRouter: lxm_size = self.propagation_entries[transient_id][3] return lxm_size + def clean_message_store(self): - RNS.log("Cleaning message store", RNS.LOG_VERBOSE) # Check and remove expired messages now = time.time() removed_entries = {} - for transient_id in self.propagation_entries.copy(): + for transient_id in self.propagation_entries: entry = self.propagation_entries[transient_id] filepath = entry[1] components = filepath.split("_") @@ -1013,7 +807,7 @@ class LXMRouter: if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2: timestamp = float(components[1]) if now > timestamp+LXMRouter.MESSAGE_EXPIRY: - RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_EXTREME) + RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG) removed_entries[transient_id] = filepath else: RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING) @@ -1031,7 +825,7 @@ class LXMRouter: RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) if removed_count > 0: - RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_VERBOSE) + RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG) # Check size of message store and cull if needed try: @@ -1043,7 +837,7 @@ class LXMRouter: bytes_cleaned = 0 weighted_entries = [] - for transient_id in self.propagation_entries.copy(): + for transient_id in self.propagation_entries: weighted_entries.append([ self.propagation_entries[transient_id], self.get_weight(transient_id), @@ -1082,46 +876,26 @@ class LXMRouter: def save_locally_delivered_transient_ids(self): try: - if len(self.locally_delivered_transient_ids) > 0: - if not os.path.isdir(self.storagepath): + if not os.path.isdir(self.storagepath): os.makedirs(self.storagepath) - with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file: - locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) + with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file: + locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) except Exception as e: RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) def save_locally_processed_transient_ids(self): try: - if len(self.locally_processed_transient_ids) > 0: - if not os.path.isdir(self.storagepath): + if not os.path.isdir(self.storagepath): os.makedirs(self.storagepath) - with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file: - locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) + with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file: + locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) except Exception as e: RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) - def save_node_stats(self): - try: - if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) - - with open(self.storagepath+"/node_stats", "wb") as stats_file: - node_stats = { - "client_propagation_messages_received": self.client_propagation_messages_received, - "client_propagation_messages_served": self.client_propagation_messages_served, - "unpeered_propagation_incoming": self.unpeered_propagation_incoming, - "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes, - } - stats_file.write(msgpack.packb(node_stats)) - - except Exception as e: - RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) - - def clean_outbound_stamp_costs(self): try: expired = [] @@ -1215,45 +989,10 @@ class LXMRouter: RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR) def exit_handler(self): - if self.exit_handler_running: - return - - self.exit_handler_running = True - - RNS.log("Tearing down delivery destinations...", RNS.LOG_NOTICE) - for destination_hash in self.delivery_destinations: - delivery_destination = self.delivery_destinations[destination_hash] - delivery_destination.set_packet_callback(None) - delivery_destination.set_link_established_callback(None) - for link in delivery_destination.links: - try: - if link.status == RNS.Link.ACTIVE: - link.teardown() - except Exception as e: - RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR) - - if self.propagation_node: - RNS.log("Tearing down propagation node destination...", RNS.LOG_NOTICE) - self.propagation_destination.set_link_established_callback(None) - self.propagation_destination.set_packet_callback(None) - self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH) - self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH) - self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH) - for link in self.active_propagation_links: - try: - if link.status == RNS.Link.ACTIVE: - link.teardown() - except Exception as e: - RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR) - - RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE) - self.flush_queues() if self.propagation_node: try: - st = time.time(); RNS.log(f"Saving {len(self.peers)} peer synchronisation states to storage...", RNS.LOG_NOTICE) serialised_peers = [] - peer_dict = self.peers.copy() - for peer_id in peer_dict: + for peer_id in self.peers: peer = self.peers[peer_id] serialised_peers.append(peer.to_bytes()) @@ -1261,28 +1000,13 @@ class LXMRouter: peers_file.write(msgpack.packb(serialised_peers)) peers_file.close() - RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_NOTICE) + RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG) except Exception as e: RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) self.save_locally_delivered_transient_ids() self.save_locally_processed_transient_ids() - self.save_node_stats() - - def sigint_handler(self, signal, frame): - if not self.exit_handler_running: - RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING) - sys.exit(0) - else: - RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) - - def sigterm_handler(self, signal, frame): - if not self.exit_handler_running: - RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING) - sys.exit(0) - else: - RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) def __str__(self): return "" @@ -1397,7 +1121,6 @@ class LXMRouter: except Exception as e: RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) - self.client_propagation_messages_served += len(response_messages) return response_messages @@ -1618,7 +1341,7 @@ class LXMRouter: ### Message Routing & Delivery ######################## ####################################################### - def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False, allow_duplicate=False): + def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False): try: message = LXMessage.unpack_from_bytes(lxmf_data) if ratchet_id and not message.ratchet_id: @@ -1685,7 +1408,7 @@ class LXMRouter: RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) return False - if not allow_duplicate and self.has_message(message.hash): + if self.has_message(message.hash): RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) return False else: @@ -1777,7 +1500,7 @@ class LXMRouter: ### Peer Sync & Propagation ########################### ####################################################### - def peer(self, destination_hash, timestamp, propagation_transfer_limit, wanted_inbound_peers = None): + def peer(self, destination_hash, timestamp, propagation_transfer_limit): if destination_hash in self.peers: peer = self.peers[destination_hash] if timestamp > peer.peering_timebase: @@ -1787,18 +1510,14 @@ class LXMRouter: peer.peering_timebase = timestamp peer.last_heard = time.time() peer.propagation_transfer_limit = propagation_transfer_limit - RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE) else: - if len(self.peers) < self.max_peers: - peer = LXMPeer(self, destination_hash) - peer.alive = True - peer.last_heard = time.time() - peer.propagation_transfer_limit = propagation_transfer_limit - self.peers[destination_hash] = peer - RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE) - else: - RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) + peer = LXMPeer(self, destination_hash) + peer.alive = True + peer.last_heard = time.time() + peer.propagation_transfer_limit = propagation_transfer_limit + self.peers[destination_hash] = peer + RNS.log("Peered with "+str(peer.destination)) def unpeer(self, destination_hash, timestamp = None): if timestamp == None: @@ -1811,92 +1530,14 @@ class LXMRouter: self.peers.pop(destination_hash) RNS.log("Broke peering with "+str(peer.destination)) - def rotate_peers(self): - try: - rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0))) - required_drops = len(self.peers) - (self.max_peers - rotation_headroom) - if required_drops > 0 and len(self.peers) - required_drops > 1: - peers = self.peers.copy() - untested_peers = [] - for peer_id in self.peers: - peer = self.peers[peer_id] - if peer.last_sync_attempt == 0: - untested_peers.append(peer) - - if len(untested_peers) >= rotation_headroom: - RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG) - return - - fully_synced_peers = {} - for peer_id in peers: - peer = peers[peer_id] - if peer.unhandled_message_count == 0: - fully_synced_peers[peer_id] = peer - - if len(fully_synced_peers) > 0: - peers = fully_synced_peers - ms = "" if len(fully_synced_peers) == 1 else "s" - RNS.log(f"Found {len(fully_synced_peers)} fully synced peer{ms}, using as peer rotation pool basis", RNS.LOG_DEBUG) - - culled_peers = [] - waiting_peers = [] - unresponsive_peers = [] - for peer_id in peers: - peer = peers[peer_id] - if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE: - if peer.alive: - if peer.offered == 0: - # Don't consider for unpeering until at - # least one message has been offered - pass - else: - waiting_peers.append(peer) - else: - unresponsive_peers.append(peer) - - drop_pool = [] - if len(unresponsive_peers) > 0: - drop_pool.extend(unresponsive_peers) - if not self.prioritise_rotating_unreachable_peers: - drop_pool.extend(waiting_peers) - - else: - drop_pool.extend(waiting_peers) - - if len(drop_pool) > 0: - drop_count = min(required_drops, len(drop_pool)) - low_acceptance_rate_peers = sorted( - drop_pool, - key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ), - reverse=False - )[0:drop_count] - - dropped_peers = 0 - for peer in low_acceptance_rate_peers: - ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2) - if ar < LXMRouter.ROTATION_AR_MAX*100: - reachable_str = "reachable" if peer.alive else "unreachable" - RNS.log(f"Acceptance rate for {reachable_str} peer {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing}/{peer.offered}, {peer.unhandled_message_count} unhandled messages)", RNS.LOG_DEBUG) - self.unpeer(peer.destination_hash) - dropped_peers += 1 - - ms = "" if dropped_peers == 1 else "s" - RNS.log(f"Dropped {dropped_peers} low acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG) - - except Exception as e: - RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR) - RNS.trace_exception(e) - def sync_peers(self): culled_peers = [] waiting_peers = [] unresponsive_peers = [] - peers = self.peers.copy() - for peer_id in peers: - peer = peers[peer_id] + for peer_id in self.peers: + peer = self.peers[peer_id] if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: - if not peer_id in self.static_peers: - culled_peers.append(peer_id) + culled_peers.append(peer_id) else: if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: if peer.alive: @@ -1956,23 +1597,10 @@ class LXMRouter: self.active_propagation_links.append(link) def propagation_resource_advertised(self, resource): - if self.from_static_only: - remote_identity = resource.link.get_remote_identity() - if remote_identity == None: - RNS.log(f"Rejecting propagation resource from unidentified peer", RNS.LOG_DEBUG) - return False - else: - remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - remote_hash = remote_destination.hash - remote_str = RNS.prettyhexrep(remote_hash) - if not remote_hash in self.static_peers: - RNS.log(f"Rejecting propagation resource from {remote_str} not in static peers list", RNS.LOG_DEBUG) - return False - size = resource.get_data_size() limit = self.propagation_per_transfer_limit*1000 if limit != None and size > limit: - RNS.log(f"Rejecting {RNS.prettysize(size)} incoming propagation resource, since it exceeds the limit of {RNS.prettysize(limit)}", RNS.LOG_DEBUG) + RNS.log("Rejecting "+RNS.prettysize(size)+" incoming LXMF propagation resource, since it exceeds the limit of "+RNS.prettysize(limit), RNS.LOG_DEBUG) return False else: return True @@ -1988,7 +1616,6 @@ class LXMRouter: messages = data[1] for lxmf_data in messages: self.lxmf_propagation(lxmf_data) - self.client_propagation_messages_received += 1 packet.prove() @@ -2000,14 +1627,6 @@ class LXMRouter: if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY else: - if self.from_static_only: - remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - remote_hash = remote_destination.hash - remote_str = RNS.prettyhexrep(remote_hash) - if not remote_hash in self.static_peers: - RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG) - return LXMPeer.ERROR_NO_ACCESS - try: transient_ids = data wanted_ids = [] @@ -2030,6 +1649,7 @@ class LXMRouter: return None def propagation_resource_concluded(self, resource): + RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG) if resource.status == RNS.Resource.COMPLETE: # TODO: The peer this was received from should # have the transient id added to its list of @@ -2041,73 +1661,31 @@ class LXMRouter: # This is a series of propagation messages from a peer or originator remote_timebase = data[0] remote_hash = None - remote_str = "unknown peer" remote_identity = resource.link.get_remote_identity() if remote_identity != None: remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") remote_hash = remote_destination.hash - remote_str = RNS.prettyhexrep(remote_hash) if not remote_hash in self.peers: if self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: - # TODO: Query cache for an announce and get propagation - # transfer limit from that. For now, initialise it to a - # sane default value, and wait for an announce to arrive - # that will update the peering config to the actual limit. - propagation_transfer_limit = LXMRouter.PROPAGATION_LIMIT//4 - wanted_inbound_peers = None - self.peer(remote_hash, remote_timebase, propagation_transfer_limit, wanted_inbound_peers) - else: - remote_str = f"peer {remote_str}" + self.peer(remote_hash, remote_timebase) messages = data[1] - ms = "" if len(messages) == 1 else "s" - RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE) for lxmf_data in messages: - peer = None - transient_id = RNS.Identity.full_hash(lxmf_data) if remote_hash != None and remote_hash in self.peers: + transient_id = RNS.Identity.full_hash(lxmf_data) peer = self.peers[remote_hash] - peer.incoming += 1 - peer.rx_bytes += len(lxmf_data) - else: - if remote_identity != None: - self.unpeered_propagation_incoming += 1 - self.unpeered_propagation_rx_bytes += len(lxmf_data) - else: - self.client_propagation_messages_received += 1 - - self.lxmf_propagation(lxmf_data, from_peer=peer) - if peer != None: - peer.queue_handled_message(transient_id) + peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data] + self.lxmf_propagation(lxmf_data) else: RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) except Exception as e: RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) - RNS.trace_exception(e) - def enqueue_peer_distribution(self, transient_id, from_peer): - self.peer_distribution_queue.append([transient_id, from_peer]) - - def flush_peer_distribution_queue(self): - if len(self.peer_distribution_queue) > 0: - entries = [] - while len(self.peer_distribution_queue) > 0: - entries.append(self.peer_distribution_queue.pop()) - - for peer_id in self.peers.copy(): - if peer_id in self.peers: - peer = self.peers[peer_id] - for entry in entries: - transient_id = entry[0] - from_peer = entry[1] - if peer != from_peer: - peer.queue_unhandled_message(transient_id) - - def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None): + def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False): no_stamp_enforcement = False if is_paper_message: no_stamp_enforcement = True @@ -2116,8 +1694,9 @@ class LXMRouter: if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD: transient_id = RNS.Identity.full_hash(lxmf_data) - if (not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids) or allow_duplicate == True: + if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids: received = time.time() + propagation_entry = [transient_id, received, lxmf_data] destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH] self.locally_processed_transient_ids[transient_id] = received @@ -2128,7 +1707,7 @@ class LXMRouter: decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data) if decrypted_lxmf_data != None: delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data - self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement, allow_duplicate=allow_duplicate) + self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement) self.locally_delivered_transient_ids[transient_id] = time.time() if signal_local_delivery != None: @@ -2141,9 +1720,12 @@ class LXMRouter: msg_file.write(lxmf_data) msg_file.close() - RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME) - self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []] - self.enqueue_peer_distribution(transient_id, from_peer) + self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data)] + + RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) + for peer_id in self.peers: + peer = self.peers[peer_id] + peer.handle_message(transient_id) else: # TODO: Add message to sneakernet queues when implemented @@ -2163,10 +1745,9 @@ class LXMRouter: except Exception as e: RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG) RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) - RNS.trace_exception(e) return False - def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False): + def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None): try: if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"): RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR) @@ -2176,7 +1757,7 @@ class LXMRouter: lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==") transient_id = RNS.Identity.full_hash(lxmf_data) - router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True) + router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, is_paper_message=True) if router_propagation_result != False: RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG) return router_propagation_result @@ -2301,7 +1882,8 @@ class LXMRouter: else: RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 + if lxmessage.progress == None or lxmessage.progress < 0.01: + lxmessage.progress = 0.01 # Outbound handling for opportunistic messages if lxmessage.method == LXMessage.OPPORTUNISTIC: diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 515ab11..2342708 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -380,7 +380,7 @@ class LXMessage: if self.desired_method == LXMessage.OPPORTUNISTIC: if self.__destination.type == RNS.Destination.SINGLE: if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT: - RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG) + RNS.log(f"Opportunistic delivery was requested for {self}, but content exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG) self.desired_method = LXMessage.DIRECT # Set delivery parameters according to delivery method diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index a4ccaf5..38e71b1 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -35,7 +35,6 @@ import time import os from LXMF._version import __version__ -from LXMF import APP_NAME from RNS.vendor.configobj import ConfigObj @@ -127,7 +126,7 @@ def apply_config(): if active_configuration["message_storage_limit"] < 0.005: active_configuration["message_storage_limit"] = 0.005 else: - active_configuration["message_storage_limit"] = 500 + active_configuration["message_storage_limit"] = 2000 if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]: active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size") @@ -141,24 +140,6 @@ def apply_config(): else: active_configuration["prioritised_lxmf_destinations"] = [] - if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]: - static_peers = lxmd_config["propagation"].as_list("static_peers") - active_configuration["static_peers"] = [] - for static_peer in static_peers: - active_configuration["static_peers"].append(bytes.fromhex(static_peer)) - else: - active_configuration["static_peers"] = [] - - if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]: - active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers") - else: - active_configuration["max_peers"] = None - - if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]: - active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only") - else: - active_configuration["from_static_only"] = False - # Load various settings if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]: targetloglevel = lxmd_config["logging"].as_int("loglevel") @@ -324,10 +305,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo autopeer_maxdepth = active_configuration["autopeer_maxdepth"], propagation_limit = active_configuration["propagation_transfer_max_accepted_size"], delivery_limit = active_configuration["delivery_transfer_max_accepted_size"], - max_peers = active_configuration["max_peers"], - static_peers = active_configuration["static_peers"], - from_static_only = active_configuration["from_static_only"]) - + ) message_router.register_delivery_callback(lxmf_delivery) for destination_hash in active_configuration["ignored_lxmf_destinations"]: @@ -384,13 +362,13 @@ def jobs(): try: if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None: if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]: - RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE) + RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME) message_router.announce(lxmf_destination.hash) last_peer_announce = time.time() if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None: if time.time() > last_node_announce + active_configuration["node_announce_interval"]: - RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE) + RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME) message_router.announce_propagation_node() last_node_announce = time.time() @@ -403,7 +381,7 @@ def deferred_start_jobs(): global active_configuration, last_peer_announce, last_node_announce global message_router, lxmf_destination time.sleep(DEFFERED_JOBS_DELAY) - RNS.log("Running deferred start jobs", RNS.LOG_DEBUG) + RNS.log("Running deferred start jobs") if active_configuration["peer_announce_at_start"]: RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME) message_router.announce(lxmf_destination.hash) @@ -416,190 +394,6 @@ def deferred_start_jobs(): last_node_announce = time.time() threading.Thread(target=jobs, daemon=True).start() -def query_status(identity, timeout=5, exit_on_fail=False): - control_destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") - - timeout = time.time()+timeout - def check_timeout(): - if time.time() > timeout: - if exit_on_fail: - RNS.log("Getting lxmd statistics timed out, exiting now", RNS.LOG_ERROR) - exit(200) - else: - return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT - else: - time.sleep(0.1) - - if not RNS.Transport.has_path(control_destination.hash): - RNS.Transport.request_path(control_destination.hash) - while not RNS.Transport.has_path(control_destination.hash): - tc = check_timeout() - if tc: - return tc - - link = RNS.Link(control_destination) - while not link.status == RNS.Link.ACTIVE: - tc = check_timeout() - if tc: - return tc - - link.identify(identity) - request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None) - while not request_receipt.get_status() == RNS.RequestReceipt.READY: - tc = check_timeout() - if tc: - return tc - - link.teardown() - return request_receipt.get_response() - -def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = 0, timeout=5, show_status=False, show_peers=False, identity_path=None): - global configpath, identitypath, storagedir, lxmdir - global lxmd_config, active_configuration, targetloglevel - targetlogdest = RNS.LOG_STDOUT - - if identity_path == None: - if configdir == None: - if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"): - configdir = "/etc/lxmd" - elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"): - configdir = RNS.Reticulum.userdir+"/.config/lxmd" - else: - configdir = RNS.Reticulum.userdir+"/.lxmd" - - configpath = configdir+"/config" - identitypath = configdir+"/identity" - identity = None - - if not os.path.isdir(configdir): - RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR) - exit(201) - if not os.path.isfile(identitypath): - RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR) - exit(202) - else: - identity = RNS.Identity.from_file(identitypath) - if identity == None: - RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR) - exit(4) - - else: - if not os.path.isfile(identity_path): - RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR) - exit(202) - else: - identity = RNS.Identity.from_file(identity_path) - if identity == None: - RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR) - exit(4) - - if targetloglevel == None: - targetloglevel = 3 - if verbosity != 0 or quietness != 0: - targetloglevel = targetloglevel+verbosity-quietness - - reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest) - response = query_status(identity, timeout=timeout, exit_on_fail=True) - - if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY: - RNS.log("Remote received no identity") - exit(203) - if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS: - RNS.log("Access denied") - exit(204) - else: - s = response - mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2) - ms_util = f"{mutil}%" - if s["from_static_only"]: - who_str = "static peers only" - else: - who_str = "all nodes" - - available_peers = 0 - unreachable_peers = 0 - peered_incoming = 0 - peered_outgoing = 0 - peered_rx_bytes = 0 - peered_tx_bytes = 0 - for peer_id in s["peers"]: - p = s["peers"][peer_id] - pm = p["messages"] - peered_incoming += pm["incoming"] - peered_outgoing += pm["outgoing"] - peered_rx_bytes += p["rx_bytes"] - peered_tx_bytes += p["tx_bytes"] - - if p["alive"]: available_peers += 1 - else: unreachable_peers += 1 - - total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"] - total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"] - if total_incoming != 0: df = round(peered_outgoing/total_incoming, 2) - else: df = 0 - - dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"]) - print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}") - - if show_status: - msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"]) - ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"]) - mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"] - ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"] - cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"] - print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})") - print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit") - print(f"") - print(f"Peers : {stp} total (peer limit is {smp})") - print(f" {sdp} discovered, {ssp} static") - print(f" {available_peers} available, {unreachable_peers} unreachable") - print(f"") - print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})") - print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})") - print(f" {upi} messages received from unpeered nodes ({uprx})") - print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})") - print(f" {cprr} propagation messages received directly from clients") - print(f" {cprs} propagation messages served to clients") - print(f" Distribution factor is {df}") - print(f"") - - if show_peers: - if not show_status: - print("") - - for peer_id in s["peers"]: - ind = " " - p = s["peers"][peer_id] - if p["type"] == "static": - t = "Static peer " - elif p["type"] == "discovered": - t = "Discovered peer " - else: - t = "Unknown peer " - a = "Available" if p["alive"] == True else "Unreachable" - h = max(time.time()-p["last_heard"], 0) - hops = p["network_distance"] - hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away" - pm = p["messages"] - if p["last_sync_attempt"] != 0: - lsa = p["last_sync_attempt"] - ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago" - else: - ls = "never synced" - - sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000) - srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"] - pmi = pm["incoming"]; pmuh = pm["unhandled"] - print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}") - print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago") - print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit") - print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming") - print(f"{ind*2}Traffic : {srxb} received, {stxb} sent") - ms = "" if pm["unhandled"] == 1 else "s" - print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}") - print("") - - def main(): try: parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon") @@ -610,10 +404,6 @@ def main(): parser.add_argument("-v", "--verbose", action="count", default=0) parser.add_argument("-q", "--quiet", action="count", default=0) parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file") - parser.add_argument("--status", action="store_true", default=False, help="display node status") - parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes") - parser.add_argument("--timeout", action="store", default=5, help="timeout in seconds for query operations", type=float) - parser.add_argument("--identity", action="store", default=None, help="path to identity used for query request", type=str) parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit") parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__)) @@ -623,24 +413,15 @@ def main(): print(__default_lxmd_config__) exit() - if args.status or args.peers: - get_status(configdir = args.config, - rnsconfigdir=args.rnsconfig, - verbosity=args.verbose, - quietness=args.quiet, - timeout=args.timeout, - show_status=args.status, - show_peers=args.peers, - identity_path=args.identity) - exit() - - program_setup(configdir = args.config, - rnsconfigdir=args.rnsconfig, - run_pn=args.propagation_node, - on_inbound=args.on_inbound, - verbosity=args.verbose, - quietness=args.quiet, - service=args.service) + program_setup( + configdir = args.config, + rnsconfigdir=args.rnsconfig, + run_pn=args.propagation_node, + on_inbound=args.on_inbound, + verbosity=args.verbose, + quietness=args.quiet, + service=args.service + ) except KeyboardInterrupt: print("") @@ -696,9 +477,9 @@ propagation_transfer_max_accepted_size = 256 # LXMF prioritises keeping messages that are # new and small. Large and old messages will # be removed first. This setting is optional -# and defaults to 500 megabytes. +# and defaults to 2 gigabytes. -# message_storage_limit = 500 +# message_storage_limit = 2000 # You can tell the LXMF message router to # prioritise storage for one or more @@ -710,25 +491,6 @@ propagation_transfer_max_accepted_size = 256 # prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf -# You can configure the maximum number of other -# propagation nodes that this node will peer -# with automatically. The default is 50. - -# max_peers = 25 - -# You can configure a list of static propagation -# node peers, that this node will always be -# peered with, by specifying a list of -# destination hashes. - -# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4 - -# You can configure the propagation node to -# only accept incoming propagation messages -# from configured static peers. - -# from_static_only = True - # By default, any destination is allowed to # connect and download messages, but you can # optionally restrict this. If you enable diff --git a/LXMF/_version.py b/LXMF/_version.py index a5f830a..906d362 100644 --- a/LXMF/_version.py +++ b/LXMF/_version.py @@ -1 +1 @@ -__version__ = "0.7.1" +__version__ = "0.6.0" diff --git a/README.md b/README.md index ed7e4f0..faced95 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,6 @@ User-facing clients built on LXMF include: Community-provided tools and utilities for LXMF include: -- [LXMFy](https://lxmfy.quad4.io/) - [LXMF-Bot](https://github.com/randogoth/lxmf-bot) - [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard) - [LXMEvent](https://github.com/faragher/LXMEvent) diff --git a/requirements.txt b/requirements.txt index 2f4f642..6b7926a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ -qrcode>=7.4.2 -rns>=0.9.1 +qrcode==7.4.2 +rns==0.7.8 +setuptools==70.0.0 diff --git a/setup.py b/setup.py index 724705f..cabf20a 100644 --- a/setup.py +++ b/setup.py @@ -15,10 +15,9 @@ setuptools.setup( long_description_content_type="text/markdown", url="https://github.com/markqvist/lxmf", packages=["LXMF", "LXMF.Utilities"], - license="Reticulum License", - license_files = ("LICENSE"), classifiers=[ "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], entry_points= { @@ -26,6 +25,6 @@ setuptools.setup( 'lxmd=LXMF.Utilities.lxmd:main', ] }, - install_requires=["rns>=0.9.5"], - python_requires=">=3.7", + install_requires=['rns>=0.9.1'], + python_requires='>=3.7', )