diff --git a/.github/ISSUE_TEMPLATE/🐛-bug-report.md b/.github/ISSUE_TEMPLATE/🐛-bug-report.md index 77ad6c2..65b492e 100644 --- a/.github/ISSUE_TEMPLATE/🐛-bug-report.md +++ b/.github/ISSUE_TEMPLATE/🐛-bug-report.md @@ -12,7 +12,7 @@ Before creating a bug report on this issue tracker, you **must** read the [Contr - The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**. - Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**. -- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), delete this section from your bug report. +- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**. **Describe the Bug** A clear and concise description of what the bug is. diff --git a/FUNDING.yml b/FUNDING.yml new file mode 100644 index 0000000..d125d55 --- /dev/null +++ b/FUNDING.yml @@ -0,0 +1,3 @@ +liberapay: Reticulum +ko_fi: markqvist +custom: "https://unsigned.io/donate" diff --git a/LICENSE b/LICENSE index a25bd7a..f5fb92d 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -MIT License +Reticulum License -Copyright (c) 2020 Mark Qvist / unsigned.io +Copyright (c) 2020-2025 Mark Qvist Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -9,8 +9,16 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. +- The Software shall not be used in any kind of system which includes amongst + its functions the ability to purposefully do harm to human beings. + +- The Software shall not be used, directly or indirectly, in the creation of + an artificial intelligence, machine learning or language model training + dataset, including but not limited to any use that contributes to the + training or development of such a model or algorithm. + +- The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, diff --git a/LXMF/Handlers.py b/LXMF/Handlers.py index 22c6cd3..01841f9 100644 --- a/LXMF/Handlers.py +++ b/LXMF/Handlers.py @@ -1,4 +1,5 @@ import time +import threading import RNS import RNS.vendor.umsgpack as msgpack @@ -7,20 +8,21 @@ from .LXMessage import LXMessage class LXMFDeliveryAnnounceHandler: def __init__(self, lxmrouter): - self.aspect_filter = APP_NAME+".delivery" + self.aspect_filter = APP_NAME+".delivery" self.receive_path_responses = True - self.lxmrouter = lxmrouter + self.lxmrouter = lxmrouter def received_announce(self, destination_hash, announced_identity, app_data): for lxmessage in self.lxmrouter.pending_outbound: - if destination_hash == lxmessage.destination_hash: + if destination_hash == lxmessage.destination_hash: if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC: lxmessage.next_delivery_attempt = time.time() - while self.lxmrouter.processing_outbound: - time.sleep(0.1) + def outbound_trigger(): + while self.lxmrouter.processing_outbound: time.sleep(0.1) + self.lxmrouter.process_outbound() - self.lxmrouter.process_outbound() + threading.Thread(target=outbound_trigger, daemon=True).start() try: stamp_cost = stamp_cost_from_app_data(app_data) @@ -32,36 +34,58 @@ class LXMFDeliveryAnnounceHandler: class LXMFPropagationAnnounceHandler: def __init__(self, lxmrouter): - self.aspect_filter = APP_NAME+".propagation" - self.receive_path_responses = False - self.lxmrouter = lxmrouter + self.aspect_filter = APP_NAME+".propagation" + self.receive_path_responses = True + self.lxmrouter = lxmrouter - def received_announce(self, destination_hash, announced_identity, app_data): + def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash, is_path_response): try: if type(app_data) == bytes: - if self.lxmrouter.propagation_node and self.lxmrouter.autopeer: - data = msgpack.unpackb(app_data) - - if pn_announce_data_is_valid(data): - node_timebase = data[1] - propagation_transfer_limit = None - if len(data) >= 3: - try: - propagation_transfer_limit = float(data[2]) - except: - propagation_transfer_limit = None - + if self.lxmrouter.propagation_node: + if pn_announce_data_is_valid(app_data): + data = msgpack.unpackb(app_data) + node_timebase = int(data[1]) + propagation_enabled = data[2] + propagation_transfer_limit = int(data[3]) + propagation_sync_limit = int(data[4]) + propagation_stamp_cost = int(data[5][0]) + propagation_stamp_cost_flexibility = int(data[5][1]) + peering_cost = int(data[5][2]) + metadata = data[6] + if destination_hash in self.lxmrouter.static_peers: - self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit) + static_peer = self.lxmrouter.peers[destination_hash] + if not is_path_response or static_peer.last_heard == 0: + self.lxmrouter.peer(destination_hash=destination_hash, + timestamp=node_timebase, + propagation_transfer_limit=propagation_transfer_limit, + propagation_sync_limit=propagation_sync_limit, + propagation_stamp_cost=propagation_stamp_cost, + propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility, + peering_cost=peering_cost, + metadata=metadata) else: - if data[0] == True: - if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: - self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit) + if self.lxmrouter.autopeer and not is_path_response: + if propagation_enabled == True: + if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: + self.lxmrouter.peer(destination_hash=destination_hash, + timestamp=node_timebase, + propagation_transfer_limit=propagation_transfer_limit, + propagation_sync_limit=propagation_sync_limit, + propagation_stamp_cost=propagation_stamp_cost, + propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility, + peering_cost=peering_cost, + metadata=metadata) - elif data[0] == False: - self.lxmrouter.unpeer(destination_hash, node_timebase) + else: + if destination_hash in self.lxmrouter.peers: + RNS.log(f"Peer {self.lxmrouter.peers[destination_hash]} moved outside auto-peering range, breaking peering...") + self.lxmrouter.unpeer(destination_hash, node_timebase) + + elif propagation_enabled == False: + self.lxmrouter.unpeer(destination_hash, node_timebase) except Exception as e: RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG) - RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + RNS.log(f"The contained exception was: {str(e)}", RNS.LOG_DEBUG) diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index db0edb7..ede9c3a 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -91,6 +91,18 @@ RENDERER_MICRON = 0x01 RENDERER_MARKDOWN = 0x02 RENDERER_BBCODE = 0x03 +# Optional propagation node metadata fields. These +# fields may be highly unstable in allocation and +# availability until the version 1.0.0 release, so use +# at your own risk until then, and expect changes! +PN_META_VERSION = 0x00 +PN_META_NAME = 0x01 +PN_META_SYNC_STRATUM = 0x02 +PN_META_SYNC_THROTTLE = 0x03 +PN_META_AUTH_BAND = 0x04 +PN_META_UTIL_PRESSURE = 0x05 +PN_META_CUSTOM = 0xFF + ########################################################## # The following helper functions makes it easier to # # handle and operate on LXMF data in client programs # @@ -99,21 +111,17 @@ RENDERER_BBCODE = 0x03 import RNS import RNS.vendor.umsgpack as msgpack def display_name_from_app_data(app_data=None): - if app_data == None: - return None - elif len(app_data) == 0: - return None + if app_data == None: return None + elif len(app_data) == 0: return None else: # Version 0.5.0+ announce format if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc: peer_data = msgpack.unpackb(app_data) if type(peer_data) == list: - if len(peer_data) < 1: - return None + if len(peer_data) < 1: return None else: dn = peer_data[0] - if dn == None: - return None + if dn == None: return None else: try: decoded = dn.decode("utf-8") @@ -127,36 +135,61 @@ def display_name_from_app_data(app_data=None): return app_data.decode("utf-8") def stamp_cost_from_app_data(app_data=None): - if app_data == None or app_data == b"": - return None + if app_data == None or app_data == b"": return None else: # Version 0.5.0+ announce format if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc: peer_data = msgpack.unpackb(app_data) if type(peer_data) == list: - if len(peer_data) < 2: - return None - else: - return peer_data[1] + if len(peer_data) < 2: return None + else: return peer_data[1] # Original announce format + else: return None + +def pn_name_from_app_data(app_data=None): + if app_data == None: return None + else: + if pn_announce_data_is_valid(app_data): + data = msgpack.unpackb(app_data) + metadata = data[6] + if not PN_META_NAME in metadata: return None + else: + try: return metadata[PN_META_NAME].decode("utf-8") + except: return None + + return None + +def pn_stamp_cost_from_app_data(app_data=None): + if app_data == None: return None + else: + if pn_announce_data_is_valid(app_data): + data = msgpack.unpackb(app_data) + return data[5][0] else: return None def pn_announce_data_is_valid(data): try: - if type(data) == bytes: - data = msgpack.unpackb(data) - - if len(data) < 3: - raise ValueError("Invalid announce data: Insufficient peer data") + if type(data) != bytes: return False + else: data = msgpack.unpackb(data) + if len(data) < 7: raise ValueError("Invalid announce data: Insufficient peer data, likely from deprecated LXMF version") else: - if data[0] != True and data[0] != False: - raise ValueError("Invalid announce data: Indeterminate propagation node status") - try: - int(data[1]) - except: - raise ValueError("Invalid announce data: Could not decode peer timebase") + try: int(data[1]) + except: raise ValueError("Invalid announce data: Could not decode timebase") + if data[2] != True and data[2] != False: raise ValueError("Invalid announce data: Indeterminate propagation node status") + try: int(data[3]) + except: raise ValueError("Invalid announce data: Could not decode propagation transfer limit") + try: int(data[4]) + except: raise ValueError("Invalid announce data: Could not decode propagation sync limit") + if type(data[5]) != list: raise ValueError("Invalid announce data: Could not decode stamp costs") + try: int(data[5][0]) + except: raise ValueError("Invalid announce data: Could not decode target stamp cost") + try: int(data[5][1]) + except: raise ValueError("Invalid announce data: Could not decode stamp cost flexibility") + try: int(data[5][2]) + except: raise ValueError("Invalid announce data: Could not decode peering cost") + if type(data[6]) != dict: raise ValueError("Invalid announce data: Could not decode metadata") except Exception as e: RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index ec0cfe2..0dbf8ce 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -1,25 +1,38 @@ import os import time +import threading import RNS import RNS.vendor.umsgpack as msgpack +import LXMF.LXStamper as LXStamper from collections import deque from .LXMF import APP_NAME +from .LXMF import PN_META_NAME class LXMPeer: OFFER_REQUEST_PATH = "/offer" MESSAGE_GET_PATH = "/get" - IDLE = 0x00 - LINK_ESTABLISHING = 0x01 - LINK_READY = 0x02 - REQUEST_SENT = 0x03 - RESPONSE_RECEIVED = 0x04 + IDLE = 0x00 + LINK_ESTABLISHING = 0x01 + LINK_READY = 0x02 + REQUEST_SENT = 0x03 + RESPONSE_RECEIVED = 0x04 RESOURCE_TRANSFERRING = 0x05 - ERROR_NO_IDENTITY = 0xf0 - ERROR_NO_ACCESS = 0xf1 + ERROR_NO_IDENTITY = 0xf0 + ERROR_NO_ACCESS = 0xf1 + ERROR_INVALID_KEY = 0xf3 + ERROR_INVALID_DATA = 0xf4 + ERROR_INVALID_STAMP = 0xf5 + ERROR_THROTTLED = 0xf6 + ERROR_NOT_FOUND = 0xfd + ERROR_TIMEOUT = 0xfe + + STRATEGY_LAZY = 0x01 + STRATEGY_PERSISTENT = 0x02 + DEFAULT_SYNC_STRATEGY = STRATEGY_PERSISTENT # Maximum amount of time a peer can # be unreachable before it is removed @@ -49,48 +62,58 @@ class LXMPeer: peer.alive = peer_alive peer.last_heard = peer_last_heard - if "link_establishment_rate" in dictionary: - peer.link_establishment_rate = dictionary["link_establishment_rate"] - else: - peer.link_establishment_rate = 0 + if "link_establishment_rate" in dictionary: peer.link_establishment_rate = dictionary["link_establishment_rate"] + else: peer.link_establishment_rate = 0 - if "sync_transfer_rate" in dictionary: - peer.sync_transfer_rate = dictionary["sync_transfer_rate"] - else: - peer.sync_transfer_rate = 0 + if "sync_transfer_rate" in dictionary: peer.sync_transfer_rate = dictionary["sync_transfer_rate"] + else: peer.sync_transfer_rate = 0 if "propagation_transfer_limit" in dictionary: - try: - peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"]) - except Exception as e: - peer.propagation_transfer_limit = None - else: - peer.propagation_transfer_limit = None + try: peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"]) + except Exception as e: peer.propagation_transfer_limit = None + else: peer.propagation_transfer_limit = None + + if "propagation_sync_limit" in dictionary: + try: peer.propagation_sync_limit = int(dictionary["propagation_sync_limit"]) + except: peer.propagation_sync_limit = peer.propagation_transfer_limit + else: peer.propagation_sync_limit = peer.propagation_transfer_limit + + if "propagation_stamp_cost" in dictionary: + try: peer.propagation_stamp_cost = int(dictionary["propagation_stamp_cost"]) + except: peer.propagation_stamp_cost = None + else: peer.propagation_stamp_cost = None + + if "propagation_stamp_cost_flexibility" in dictionary: + try: peer.propagation_stamp_cost_flexibility = int(dictionary["propagation_stamp_cost_flexibility"]) + except: peer.propagation_stamp_cost_flexibility = None + else: peer.propagation_stamp_cost_flexibility = None + + if "peering_cost" in dictionary: + try: peer.peering_cost = int(dictionary["peering_cost"]) + except: peer.peering_cost = None + else: peer.peering_cost = None + + if "sync_strategy" in dictionary: + try: peer.sync_strategy = int(dictionary["sync_strategy"]) + except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY + else: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY - if "offered" in dictionary: - peer.offered = dictionary["offered"] - else: - peer.offered = 0 - - if "outgoing" in dictionary: - peer.outgoing = dictionary["outgoing"] - else: - peer.outgoing = 0 - - if "incoming" in dictionary: - peer.incoming = dictionary["incoming"] - else: - peer.incoming = 0 - - if "rx_bytes" in dictionary: - peer.rx_bytes = dictionary["rx_bytes"] - else: - peer.rx_bytes = 0 - - if "tx_bytes" in dictionary: - peer.tx_bytes = dictionary["tx_bytes"] - else: - peer.tx_bytes = 0 + if "offered" in dictionary: peer.offered = dictionary["offered"] + else: peer.offered = 0 + if "outgoing" in dictionary: peer.outgoing = dictionary["outgoing"] + else: peer.outgoing = 0 + if "incoming" in dictionary: peer.incoming = dictionary["incoming"] + else: peer.incoming = 0 + if "rx_bytes" in dictionary: peer.rx_bytes = dictionary["rx_bytes"] + else: peer.rx_bytes = 0 + if "tx_bytes" in dictionary: peer.tx_bytes = dictionary["tx_bytes"] + else: peer.tx_bytes = 0 + if "last_sync_attempt" in dictionary: peer.last_sync_attempt = dictionary["last_sync_attempt"] + else: peer.last_sync_attempt = 0 + if "peering_key" in dictionary: peer.peering_key = dictionary["peering_key"] + else: peer.peering_key = None + if "metadata" in dictionary: peer.metadata = dictionary["metadata"] + else: peer.metadata = None hm_count = 0 for transient_id in dictionary["handled_ids"]: @@ -116,12 +139,20 @@ class LXMPeer: dictionary = {} dictionary["peering_timebase"] = self.peering_timebase dictionary["alive"] = self.alive + dictionary["metadata"] = self.metadata dictionary["last_heard"] = self.last_heard + dictionary["sync_strategy"] = self.sync_strategy + dictionary["peering_key"] = self.peering_key dictionary["destination_hash"] = self.destination_hash dictionary["link_establishment_rate"] = self.link_establishment_rate dictionary["sync_transfer_rate"] = self.sync_transfer_rate dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit - dictionary["offered"] = self.offered + dictionary["propagation_sync_limit"] = self.propagation_sync_limit + dictionary["propagation_stamp_cost"] = self.propagation_stamp_cost + dictionary["propagation_stamp_cost_flexibility"] = self.propagation_stamp_cost_flexibility + dictionary["peering_cost"] = self.peering_cost + dictionary["last_sync_attempt"] = self.last_sync_attempt + dictionary["offered"] = self.offered dictionary["outgoing"] = self.outgoing dictionary["incoming"] = self.incoming dictionary["rx_bytes"] = self.rx_bytes @@ -143,30 +174,42 @@ class LXMPeer: return peer_bytes - def __init__(self, router, destination_hash): - self.alive = False - self.last_heard = 0 - self.next_sync_attempt = 0 - self.last_sync_attempt = 0 - self.sync_backoff = 0 - self.peering_timebase = 0 - self.link_establishment_rate = 0 - self.sync_transfer_rate = 0 - self.propagation_transfer_limit = None - self.handled_messages_queue = deque() - self.unhandled_messages_queue = deque() + def __init__(self, router, destination_hash, sync_strategy=DEFAULT_SYNC_STRATEGY): + self.alive = False + self.last_heard = 0 + self.sync_strategy = sync_strategy + self.peering_key = None + self.peering_cost = None + self.metadata = None - self.offered = 0 # Messages offered to this peer - self.outgoing = 0 # Messages transferred to this peer - self.incoming = 0 # Messages received from this peer - self.rx_bytes = 0 # Bytes received from this peer - self.tx_bytes = 0 # Bytes sent to this peer + self.next_sync_attempt = 0 + self.last_sync_attempt = 0 + self.sync_backoff = 0 + self.peering_timebase = 0 + self.link_establishment_rate = 0 + self.sync_transfer_rate = 0 + + self.propagation_transfer_limit = None + self.propagation_sync_limit = None + self.propagation_stamp_cost = None + self.propagation_stamp_cost_flexibility = None + self.currently_transferring_messages = None + self.handled_messages_queue = deque() + self.unhandled_messages_queue = deque() + + self.offered = 0 # Messages offered to this peer + self.outgoing = 0 # Messages transferred to this peer + self.incoming = 0 # Messages received from this peer + self.rx_bytes = 0 # Bytes received from this peer + self.tx_bytes = 0 # Bytes sent to this peer self._hm_count = 0 self._um_count = 0 self._hm_counts_synced = False self._um_counts_synced = False + self._peering_key_lock = threading.Lock() + self.link = None self.state = LXMPeer.IDLE @@ -181,11 +224,74 @@ class LXMPeer: self.destination = None RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING) + def peering_key_ready(self): + if not self.peering_cost: return False + if type(self.peering_key) == list and len(self.peering_key) == 2: + value = self.peering_key[1] + if value >= self.peering_cost: return True + else: + RNS.log(f"Peering key value mismatch for {self}. Current value is {value}, but peer requires {self.peering_cost}. Scheduling regeneration...", RNS.LOG_WARNING) + self.peering_key = None + + return False + + def peering_key_value(self): + if type(self.peering_key) == list and len(self.peering_key) == 2: return self.peering_key[1] + else: return None + + def generate_peering_key(self): + if self.peering_cost == None: return False + with self._peering_key_lock: + if self.peering_key != None: return True + else: + RNS.log(f"Generating peering key for {self}", RNS.LOG_NOTICE) + if self.router.identity == None: + RNS.log(f"Could not update peering key for {self} since the local LXMF router identity is not configured", RNS.LOG_ERROR) + return False + + if self.identity == None: + self.identity = RNS.Identity.recall(destination_hash) + if self.identity == None: + RNS.log(f"Could not update peering key for {self} since its identity could not be recalled", RNS.LOG_ERROR) + return False + + key_material = self.identity.hash+self.router.identity.hash + peering_key, value = LXStamper.generate_stamp(key_material, self.peering_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PEERING) + if value >= self.peering_cost: + self.peering_key = [peering_key, value] + RNS.log(f"Peering key successfully generated for {self}", RNS.LOG_NOTICE) + return True + + return False + def sync(self): RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) self.last_sync_attempt = time.time() - if time.time() > self.next_sync_attempt: + sync_time_reached = time.time() > self.next_sync_attempt + stamp_costs_known = self.propagation_stamp_cost != None and self.propagation_stamp_cost_flexibility != None and self.peering_cost != None + peering_key_ready = self.peering_key_ready() + sync_checks = sync_time_reached and stamp_costs_known and peering_key_ready + + if not sync_checks: + try: + if not sync_time_reached: + postpone_reason = " due to previous failures" + if self.last_sync_attempt > self.last_heard: self.alive = False + elif not stamp_costs_known: + postpone_reason = " since its required stamp costs are not yet known" + elif not peering_key_ready: + postpone_reason = " since a peering key has not been generated yet" + def job(): self.generate_peering_key() + threading.Thread(target=job, daemon=True).start() + + delay = self.next_sync_attempt-time.time() + postpone_delay = f" for {RNS.prettytime(delay)}" if delay > 0 else "" + RNS.log(f"Postponing sync with peer {RNS.prettyhexrep(self.destination_hash)}{postpone_delay}{postpone_reason}", RNS.LOG_DEBUG) + except Exception as e: + RNS.trace_exception(e) + + else: if not RNS.Transport.has_path(self.destination_hash): RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG) RNS.Transport.request_path(self.destination_hash) @@ -201,7 +307,15 @@ class LXMPeer: self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") if self.destination != None: + if len(self.unhandled_messages) == 0: + RNS.log(f"Sync requested for {self}, but no unhandled messages exist for peer. Sync complete.", RNS.LOG_DEBUG) + return + if len(self.unhandled_messages) > 0: + if self.currently_transferring_messages != None: + RNS.log(f"Sync requested for {self}, but current message transfer index was not clear. Aborting.", RNS.LOG_ERROR) + return + if self.state == LXMPeer.IDLE: RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG) self.sync_backoff += LXMPeer.SYNC_BACKOFF_STEP @@ -214,58 +328,69 @@ class LXMPeer: self.alive = True self.last_heard = time.time() self.sync_backoff = 0 + min_accepted_cost = min(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) - RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) + RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing sync offer...", RNS.LOG_DEBUG) unhandled_entries = [] - unhandled_ids = [] - purged_ids = [] + unhandled_ids = [] + purged_ids = [] + low_value_ids = [] for transient_id in self.unhandled_messages: if transient_id in self.router.propagation_entries: - unhandled_entry = [ - transient_id, - self.router.get_weight(transient_id), - self.router.get_size(transient_id), - ] - unhandled_entries.append(unhandled_entry) - else: - purged_ids.append(transient_id) + if self.router.get_stamp_value(transient_id) < min_accepted_cost: low_value_ids.append(transient_id) + else: + unhandled_entry = [ transient_id, + self.router.get_weight(transient_id), + self.router.get_size(transient_id) ] + + unhandled_entries.append(unhandled_entry) + + else: purged_ids.append(transient_id) for transient_id in purged_ids: - RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) + RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since it no longer exists in the message store.", RNS.LOG_DEBUG) + self.remove_unhandled_message(transient_id) + + for transient_id in low_value_ids: + RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since its stamp value is lower than peer requirement of {min_accepted_cost}.", RNS.LOG_DEBUG) self.remove_unhandled_message(transient_id) unhandled_entries.sort(key=lambda e: e[1], reverse=False) - per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now - cumulative_size = 24 # Initialised to highest reasonable binary structure overhead - for unhandled_entry in unhandled_entries: - transient_id = unhandled_entry[0] - weight = unhandled_entry[1] - lxm_size = unhandled_entry[2] - next_size = cumulative_size + (lxm_size+per_message_overhead) - if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000): - pass - else: - cumulative_size += (lxm_size+per_message_overhead) - unhandled_ids.append(transient_id) + per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now + cumulative_size = 24 # Initialised to highest reasonable binary structure overhead + RNS.log(f"Syncing to peer with per-message limit {RNS.prettysize(self.propagation_transfer_limit*1000)} and sync limit {RNS.prettysize(self.propagation_sync_limit*1000)}") # TODO: Remove debug - RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_VERBOSE) + for unhandled_entry in unhandled_entries: + transient_id = unhandled_entry[0] + weight = unhandled_entry[1] + lxm_size = unhandled_entry[2] + lxm_transfer_size = lxm_size+per_message_overhead + next_size = cumulative_size + lxm_transfer_size + + if self.propagation_transfer_limit != None and lxm_transfer_size > (self.propagation_transfer_limit*1000): + self.remove_unhandled_message(transient_id) + self.add_handled_message(transient_id) + continue + + if self.propagation_sync_limit != None and next_size >= (self.propagation_sync_limit*1000): + continue + + cumulative_size += lxm_transfer_size + unhandled_ids.append(transient_id) + + offer = [self.peering_key[0], unhandled_ids] + + RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)} ({RNS.prettysize(len(msgpack.packb(unhandled_ids)))})", RNS.LOG_VERBOSE) self.last_offer = unhandled_ids - self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) + self.link.request(LXMPeer.OFFER_REQUEST_PATH, offer, response_callback=self.offer_response, failed_callback=self.request_failed) self.state = LXMPeer.REQUEST_SENT else: - RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) - - else: - RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG) - if self.last_sync_attempt > self.last_heard: - self.alive = False + RNS.log(f"Could not request sync to peer {RNS.prettyhexrep(self.destination_hash)} since its identity could not be recalled.", RNS.LOG_ERROR) def request_failed(self, request_receipt): - RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) - if self.link != None: - self.link.teardown() - + RNS.log(f"Sync request to peer {self.destination} failed", RNS.LOG_DEBUG) + if self.link != None: self.link.teardown() self.state = LXMPeer.IDLE def offer_response(self, request_receipt): @@ -289,6 +414,12 @@ class LXMPeer: self.router.unpeer(self.destination_hash) return + elif response == LXMPeer.ERROR_THROTTLED: + throttle_time = self.router.PN_STAMP_THROTTLE + RNS.log(f"Remote indicated that we're throttled, postponing sync for {RNS.prettytime(throttle_time)}", RNS.LOG_VERBOSE) + self.next_sync_attempt = time.time()+throttle_time + return + elif response == False: # Peer already has all advertised messages for transient_id in self.last_offer: @@ -317,7 +448,7 @@ class LXMPeer: wanted_message_ids.append(transient_id) if len(wanted_messages) > 0: - RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_VERBOSE) + RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} wanted {str(len(wanted_messages))} of the available messages", RNS.LOG_VERBOSE) lxm_list = [] for message_entry in wanted_messages: @@ -329,13 +460,14 @@ class LXMPeer: lxm_list.append(lxmf_data) data = msgpack.packb([time.time(), lxm_list]) + RNS.log(f"Total transfer size for this sync is {RNS.prettysize(len(data))}", RNS.LOG_VERBOSE) resource = RNS.Resource(data, self.link, callback = self.resource_concluded) - resource.transferred_messages = wanted_message_ids - resource.sync_transfer_started = time.time() + self.currently_transferring_messages = wanted_message_ids + self.current_sync_transfer_started = time.time() self.state = LXMPeer.RESOURCE_TRANSFERRING else: - RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_VERBOSE) + RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} did not request any of the available messages, sync completed", RNS.LOG_VERBOSE) self.offered += len(self.last_offer) if self.link != None: self.link.teardown() @@ -355,35 +487,45 @@ class LXMPeer: def resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: - for transient_id in resource.transferred_messages: + if self.currently_transferring_messages == None: + RNS.log(f"Sync transfer completed on {self}, but transferred message index was unavailable. Aborting.", RNS.LOG_ERROR) + if self.link != None: self.link.teardown() + self.link = None + self.state = LXMPeer.IDLE + + for transient_id in self.currently_transferring_messages: self.add_handled_message(transient_id) self.remove_unhandled_message(transient_id) - if self.link != None: - self.link.teardown() - - self.link = None - self.state = LXMPeer.IDLE + if self.link != None: self.link.teardown() + self.link = None + self.state = LXMPeer.IDLE rate_str = "" - if hasattr(resource, "sync_transfer_started") and resource.sync_transfer_started: - self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started) + if self.current_sync_transfer_started != None: + self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-self.current_sync_transfer_started) rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}" - RNS.log(f"Syncing {len(resource.transferred_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE) - self.alive = True + RNS.log(f"Syncing {len(self.currently_transferring_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE) + self.alive = True self.last_heard = time.time() self.offered += len(self.last_offer) - self.outgoing += len(resource.transferred_messages) + self.outgoing += len(self.currently_transferring_messages) self.tx_bytes += resource.get_data_size() + + self.currently_transferring_messages = None + self.current_sync_transfer_started = None + + if self.sync_strategy == self.STRATEGY_PERSISTENT: + if self.unhandled_message_count > 0: self.sync() else: RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE) - if self.link != None: - self.link.teardown() - - self.link = None - self.state = LXMPeer.IDLE + if self.link != None: self.link.teardown() + self.link = None + self.state = LXMPeer.IDLE + self.currently_transferring_messages = None + self.current_sync_transfer_started = None def link_established(self, link): self.link.identify(self.router.identity) @@ -396,7 +538,7 @@ class LXMPeer: self.sync() def link_closed(self, link): - self.link = None + self.link = None self.state = LXMPeer.IDLE def queued_items(self): @@ -409,19 +551,14 @@ class LXMPeer: self.handled_messages_queue.append(transient_id) def process_queues(self): - if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0: - # TODO: Remove debug - # st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue) - + if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0: handled_messages = self.handled_messages unhandled_messages = self.unhandled_messages while len(self.handled_messages_queue) > 0: transient_id = self.handled_messages_queue.pop() - if not transient_id in handled_messages: - self.add_handled_message(transient_id) - if transient_id in unhandled_messages: - self.remove_unhandled_message(transient_id) + if not transient_id in handled_messages: self.add_handled_message(transient_id) + if transient_id in unhandled_messages: self.remove_unhandled_message(transient_id) while len(self.unhandled_messages_queue) > 0: transient_id = self.unhandled_messages_queue.pop() @@ -429,8 +566,6 @@ class LXMPeer: self.add_unhandled_message(transient_id) del handled_messages, unhandled_messages - # TODO: Remove debug - # RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}") @property def handled_messages(self): @@ -450,18 +585,18 @@ class LXMPeer: @property def handled_message_count(self): - if not self._hm_counts_synced: - self._update_counts() - + if not self._hm_counts_synced: self._update_counts() return self._hm_count @property def unhandled_message_count(self): - if not self._um_counts_synced: - self._update_counts() - + if not self._um_counts_synced: self._update_counts() return self._um_count + @property + def acceptance_rate(self): + return 0 if self.offered == 0 else (self.outgoing/self.offered) + def _update_counts(self): if not self._hm_counts_synced: hm = self.handled_messages; del hm @@ -493,8 +628,15 @@ class LXMPeer: self.router.propagation_entries[transient_id][5].remove(self.destination_hash) self._um_counts_synced = False - def __str__(self): - if self.destination_hash: - return RNS.prettyhexrep(self.destination_hash) + @property + def name(self): + if type(self.metadata) != dict: return None else: - return "" \ No newline at end of file + if not PN_META_NAME in self.metadata: return None + else: + try: return self.metadata[PN_META_NAME].decode("utf-8") + except: return None + + def __str__(self): + if self.destination_hash: return RNS.prettyhexrep(self.destination_hash) + else: return "" \ No newline at end of file diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 5465356..9abef7c 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -15,6 +15,8 @@ import RNS.vendor.umsgpack as msgpack from .LXMF import APP_NAME from .LXMF import FIELD_TICKET +from .LXMF import PN_META_NAME +from .LXMF import pn_announce_data_is_valid from .LXMPeer import LXMPeer from .LXMessage import LXMessage @@ -37,15 +39,24 @@ class LXMRouter: NODE_ANNOUNCE_DELAY = 20 - MAX_PEERS = 50 + MAX_PEERS = 20 AUTOPEER = True AUTOPEER_MAXDEPTH = 4 FASTEST_N_RANDOM_POOL = 2 + ROTATION_HEADROOM_PCT = 10 + ROTATION_AR_MAX = 0.5 + PEERING_COST = 18 + MAX_PEERING_COST = 26 + PROPAGATION_COST_MIN = 13 + PROPAGATION_COST_FLEX = 3 + PROPAGATION_COST = 16 PROPAGATION_LIMIT = 256 + SYNC_LIMIT = PROPAGATION_LIMIT*40 DELIVERY_LIMIT = 1000 PR_PATH_TIMEOUT = 10 + PN_STAMP_THROTTLE = 180 PR_IDLE = 0x00 PR_PATH_REQUESTED = 0x01 @@ -64,13 +75,22 @@ class LXMRouter: PR_ALL_MESSAGES = 0x00 + DUPLICATE_SIGNAL = "lxmf_duplicate" + + STATS_GET_PATH = "/pn/get/stats" + SYNC_REQUEST_PATH = "/pn/peer/sync" + UNPEER_REQUEST_PATH = "/pn/peer/unpeer" + ### Developer-facing API ############################## ####################################################### def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None, - propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, enforce_ratchets=False, - enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False): + propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT, + enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None, + from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT, + propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX, + peering_cost=PEERING_COST, max_peering_cost=MAX_PEERING_COST, name=None): random.seed(os.urandom(10)) @@ -84,17 +104,20 @@ class LXMRouter: self.prioritised_list = [] self.ignored_list = [] self.allowed_list = [] + self.control_allowed_list = [] self.auth_required = False self.retain_synced_on_node = False - self.processing_outbound = False - self.processing_inbound = False - self.processing_count = 0 + self.default_sync_strategy = sync_strategy + self.processing_outbound = False + self.processing_inbound = False + self.processing_count = 0 + self.name = name self.propagation_node = False + self.propagation_node_start_time = None - if storagepath == None: - raise ValueError("LXMF cannot be initialised without a storage path") + if storagepath == None: raise ValueError("LXMF cannot be initialised without a storage path") else: self.storagepath = storagepath+"/lxmf" self.ratchetpath = self.storagepath+"/ratchets" @@ -102,24 +125,36 @@ class LXMRouter: self.outbound_propagation_node = None self.outbound_propagation_link = None - if delivery_limit == None: - delivery_limit = LXMRouter.DELIVERY_LIMIT + if delivery_limit == None: delivery_limit = LXMRouter.DELIVERY_LIMIT + if propagation_cost < LXMRouter.PROPAGATION_COST_MIN: propagation_cost = LXMRouter.PROPAGATION_COST_MIN - self.message_storage_limit = None - self.information_storage_limit = None - self.propagation_per_transfer_limit = propagation_limit - self.delivery_per_transfer_limit = delivery_limit - self.enforce_ratchets = enforce_ratchets - self._enforce_stamps = enforce_stamps - self.pending_deferred_stamps = {} + self.message_storage_limit = None + self.information_storage_limit = None + self.propagation_per_transfer_limit = propagation_limit + self.propagation_per_sync_limit = sync_limit + self.delivery_per_transfer_limit = delivery_limit + self.propagation_stamp_cost = propagation_cost + self.propagation_stamp_cost_flexibility = propagation_cost_flexibility + self.peering_cost = peering_cost + self.max_peering_cost = max_peering_cost + self.enforce_ratchets = enforce_ratchets + self._enforce_stamps = enforce_stamps + self.pending_deferred_stamps = {} + self.throttled_peers = {} + + if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit: + self.propagation_per_sync_limit = self.propagation_per_transfer_limit self.wants_download_on_path_available_from = None self.wants_download_on_path_available_to = None self.propagation_transfer_state = LXMRouter.PR_IDLE self.propagation_transfer_progress = 0.0 self.propagation_transfer_last_result = None + self.propagation_transfer_last_duplicates = None self.propagation_transfer_max_messages = None + self.prioritise_rotating_unreachable_peers = False self.active_propagation_links = [] + self.validated_peer_links = {} self.locally_delivered_transient_ids = {} self.locally_processed_transient_ids = {} self.outbound_stamp_costs = {} @@ -135,35 +170,31 @@ class LXMRouter: self.identity = identity self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation") + self.propagation_destination.set_default_app_data(self.get_propagation_node_app_data) + self.control_destination = None + self.client_propagation_messages_received = 0 + self.client_propagation_messages_served = 0 + self.unpeered_propagation_incoming = 0 + self.unpeered_propagation_rx_bytes = 0 - if autopeer != None: - self.autopeer = autopeer - else: - self.autopeer = LXMRouter.AUTOPEER + if autopeer != None: self.autopeer = autopeer + else: self.autopeer = LXMRouter.AUTOPEER - if autopeer_maxdepth != None: - self.autopeer_maxdepth = autopeer_maxdepth - else: - self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH + if autopeer_maxdepth != None: self.autopeer_maxdepth = autopeer_maxdepth + else: self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH - if max_peers == None: - self.max_peers = LXMRouter.MAX_PEERS + if max_peers == None: self.max_peers = LXMRouter.MAX_PEERS else: - if type(max_peers) == int and max_peers >= 0: - self.max_peers = max_peers - else: - raise ValueError(f"Invalid value for max_peers: {max_peers}") + if type(max_peers) == int and max_peers >= 0: self.max_peers = max_peers + else: raise ValueError(f"Invalid value for max_peers: {max_peers}") self.from_static_only = from_static_only - if type(static_peers) != list: - raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}") + if type(static_peers) != list: raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}") else: for static_peer in static_peers: - if type(static_peer) != bytes: - raise ValueError(f"Invalid static peer destination hash: {static_peer}") + if type(static_peer) != bytes: raise ValueError(f"Invalid static peer destination hash: {static_peer}") else: - if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: - raise ValueError(f"Invalid static peer destination hash: {static_peer}") + if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Invalid static peer destination hash: {static_peer}") self.static_peers = static_peers @@ -188,7 +219,7 @@ class LXMRouter: self.locally_delivered_transient_ids = {} except Exception as e: - RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.log(f"Could not load locally delivered message ID cache from storage. The contained exception was: {e}", RNS.LOG_ERROR) self.locally_delivered_transient_ids = {} try: @@ -267,18 +298,29 @@ class LXMRouter: if destination_hash in self.delivery_destinations: self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash), attached_interface=attached_interface) + def get_propagation_node_announce_metadata(self): + metadata = {} + if self.name: metadata[PN_META_NAME] = str(self.name).encode("utf-8") + return metadata + + def get_propagation_node_app_data(self): + metadata = self.get_propagation_node_announce_metadata() + node_state = self.propagation_node and not self.from_static_only + stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility, self.peering_cost] + announce_data = [ False, # 0: Legacy LXMF PN support + int(time.time()), # 1: Current node timebase + node_state, # 2: Boolean flag signalling propagation node state + self.propagation_per_transfer_limit, # 3: Per-transfer limit for message propagation in kilobytes + self.propagation_per_sync_limit, # 4: Limit for incoming propagation node syncs + stamp_cost, # 5: Propagation stamp cost for this node + metadata ] # 6: Node metadata + + return msgpack.packb(announce_data) + def announce_propagation_node(self): def delayed_announce(): time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) - node_state = self.propagation_node and not self.from_static_only - announce_data = [ - node_state, # Boolean flag signalling propagation node state - int(time.time()), # Current node timebase - self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes - ] - - data = msgpack.packb(announce_data) - self.propagation_destination.announce(app_data=data) + self.propagation_destination.announce(app_data=self.get_propagation_node_app_data()) da_thread = threading.Thread(target=delayed_announce) da_thread.setDaemon(True) @@ -357,6 +399,29 @@ class LXMRouter: def get_outbound_propagation_node(self): return self.outbound_propagation_node + def get_outbound_propagation_cost(self): + target_propagation_cost = None + pn_destination_hash = self.get_outbound_propagation_node() + pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash) + if pn_announce_data_is_valid(pn_app_data): + pn_config = msgpack.unpackb(pn_app_data) + target_propagation_cost = pn_config[5][0] + + if not target_propagation_cost: + RNS.log(f"Could not retrieve cached propagation node config. Requesting path to propagation node to get target propagation cost...", RNS.LOG_DEBUG) + RNS.Transport.request_path(pn_destination_hash) + timeout = time.time() + LXMRouter.PATH_REQUEST_WAIT + while not RNS.Identity.recall_app_data(pn_destination_hash) and time.time() < timeout: + time.sleep(0.5) + + pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash) + if pn_announce_data_is_valid(pn_app_data): + pn_config = msgpack.unpackb(pn_app_data) + target_propagation_cost = pn_config[5][0] + + if not target_propagation_cost: RNS.log("Propagation node stamp cost still unavailable after path request", RNS.LOG_ERROR) + return target_propagation_cost + def set_inbound_propagation_node(self, destination_hash): # TODO: Implement raise NotImplementedError("Inbound/outbound propagation node differentiation is currently not implemented") @@ -391,6 +456,16 @@ class LXMRouter: else: raise ValueError("Disallowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes") + def allow_control(self, identity_hash=None): + if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8: + if not identity_hash in self.control_allowed_list: self.control_allowed_list.append(identity_hash) + else: raise ValueError("Allowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes") + + def disallow_control(self, identity_hash=None): + if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8: + if identity_hash in self.control_allowed_list: self.control_allowed_list.pop(identity_hash) + else: raise ValueError("Disallowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes") + def prioritise(self, destination_hash=None): if isinstance(destination_hash, bytes) and len(destination_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: if not destination_hash in self.prioritised_list: @@ -470,17 +545,17 @@ class LXMRouter: st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE) for filename in os.listdir(self.messagepath): components = filename.split("_") - if len(components) == 2: + if len(components) >= 3: if float(components[1]) > 0: if len(components[0]) == RNS.Identity.HASHLENGTH//8*2: try: - transient_id = bytes.fromhex(components[0]) - received = float(components[1]) - - filepath = self.messagepath+"/"+filename - msg_size = os.path.getsize(filepath) - file = open(filepath, "rb") - destination_hash = file.read(LXMessage.DESTINATION_LENGTH) + transient_id = bytes.fromhex(components[0]) + received = float(components[1]) + stamp_value = int(components[2]) + filepath = self.messagepath+"/"+filename + msg_size = os.path.getsize(filepath) + file = open(filepath, "rb") + destination_hash = file.read(LXMessage.DESTINATION_LENGTH) file.close() self.propagation_entries[transient_id] = [ @@ -490,13 +565,16 @@ class LXMRouter: msg_size, # 3: Message size [], # 4: Handled peers [], # 5: Unhandled peers + stamp_value, # 6: Stamp value ] except Exception as e: RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) - et = time.time(); RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {math.floor(len(self.propagation_entries)/(et-st))} msgs/s", RNS.LOG_NOTICE) - st = time.time(); RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE) + et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st)) + RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE) + RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE) + st = time.time() if os.path.isfile(self.storagepath+"/peers"): peers_file = open(self.storagepath+"/peers", "rb") @@ -512,9 +590,6 @@ class LXMRouter: peer = LXMPeer.from_bytes(serialised_peer, self) del serialised_peer if peer.destination_hash in self.static_peers and peer.last_heard == 0: - # TODO: Allow path request responses through announce handler - # momentarily here, so peering config can be updated even if - # the static peer is not available to directly send an announce. RNS.Transport.request_path(peer.destination_hash) if peer.identity != None: self.peers[peer.destination_hash] = peer @@ -532,7 +607,7 @@ class LXMRouter: for static_peer in self.static_peers: if not static_peer in self.peers: RNS.log(f"Activating static peering with {RNS.prettyhexrep(static_peer)}", RNS.LOG_NOTICE) - self.peers[static_peer] = LXMPeer(self, static_peer) + self.peers[static_peer] = LXMPeer(self, static_peer, sync_strategy=self.default_sync_strategy) if self.peers[static_peer].last_heard == 0: # TODO: Allow path request responses through announce handler # momentarily here, so peering config can be updated even if @@ -541,13 +616,38 @@ class LXMRouter: RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE) + try: + if os.path.isfile(self.storagepath+"/node_stats"): + node_stats_file = open(self.storagepath+"/node_stats", "rb") + data = node_stats_file.read() + node_stats_file.close() + node_stats = msgpack.unpackb(data) + + if not type(node_stats) == dict: + RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR) + else: + self.client_propagation_messages_received = node_stats["client_propagation_messages_received"] + self.client_propagation_messages_served = node_stats["client_propagation_messages_served"] + self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"] + self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"] + + except Exception as e: + RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR) + self.propagation_node = True + self.propagation_node_start_time = time.time() self.propagation_destination.set_link_established_callback(self.propagation_link_established) self.propagation_destination.set_packet_callback(self.propagation_packet) self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL) + self.control_allowed_list = [self.identity.hash] + self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") + self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.control_allowed_list) + self.control_destination.register_request_handler(LXMRouter.SYNC_REQUEST_PATH, self.peer_sync_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.control_allowed_list) + self.control_destination.register_request_handler(LXMRouter.UNPEER_REQUEST_PATH, self.peer_unpeer_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.control_allowed_list) + if self.message_storage_limit != None: limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit) else: @@ -615,27 +715,15 @@ class LXMRouter: def set_information_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None): limit_bytes = 0 - - if kilobytes != None: - limit_bytes += kilobytes*1000 - - if megabytes != None: - limit_bytes += megabytes*1000*1000 - - if gigabytes != None: - limit_bytes += gigabytes*1000*1000*1000 - - if limit_bytes == 0: - limit_bytes = None + if kilobytes != None: limit_bytes += kilobytes*1000 + if megabytes != None: limit_bytes += megabytes*1000*1000 + if gigabytes != None: limit_bytes += gigabytes*1000*1000*1000 + if limit_bytes == 0: limit_bytes = None try: - if limit_bytes == None or int(limit_bytes) > 0: - self.information_storage_limit = int(limit_bytes) - else: - raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) - - except Exception as e: - raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) + if limit_bytes == None or int(limit_bytes) > 0: self.information_storage_limit = int(limit_bytes) + else: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) + except Exception as e: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) def information_storage_limit(self): return self.information_storage_limit @@ -644,10 +732,110 @@ class LXMRouter: pass def delivery_link_available(self, destination_hash): - if destination_hash in self.direct_links or destination_hash in self.backchannel_links: - return True + if destination_hash in self.direct_links or destination_hash in self.backchannel_links: return True + else: return False + + + ### Propagation Node Control ########################## + ####################################################### + + def compile_stats(self): + if not self.propagation_node: return None else: - return False + peer_stats = {} + for peer_id in self.peers.copy(): + peer = self.peers[peer_id] + peer_stats[peer_id] = { + "type": "static" if peer_id in self.static_peers else "discovered", + "state": peer.state, + "alive": peer.alive, + "name": peer.name, + "last_heard": int(peer.last_heard), + "next_sync_attempt": peer.next_sync_attempt, + "last_sync_attempt": peer.last_sync_attempt, + "sync_backoff": peer.sync_backoff, + "peering_timebase": peer.peering_timebase, + "ler": int(peer.link_establishment_rate), + "str": int(peer.sync_transfer_rate), + "transfer_limit": peer.propagation_transfer_limit, + "sync_limit": peer.propagation_sync_limit, + "target_stamp_cost": peer.propagation_stamp_cost, + "stamp_cost_flexibility": peer.propagation_stamp_cost_flexibility, + "peering_cost": peer.peering_cost, + "peering_key": peer.peering_key_value(), + "network_distance": RNS.Transport.hops_to(peer_id), + "rx_bytes": peer.rx_bytes, + "tx_bytes": peer.tx_bytes, + "acceptance_rate": peer.acceptance_rate, + "messages": { + "offered": peer.offered, + "outgoing": peer.outgoing, + "incoming": peer.incoming, + "unhandled": peer.unhandled_message_count + }, + } + + node_stats = { + "identity_hash": self.identity.hash, + "destination_hash": self.propagation_destination.hash, + "uptime": time.time()-self.propagation_node_start_time, + "delivery_limit": self.delivery_per_transfer_limit, + "propagation_limit": self.propagation_per_transfer_limit, + "sync_limit": self.propagation_per_sync_limit, + "target_stamp_cost": self.propagation_stamp_cost, + "stamp_cost_flexibility": self.propagation_stamp_cost_flexibility, + "peering_cost": self.peering_cost, + "max_peering_cost": self.max_peering_cost, + "autopeer_maxdepth": self.autopeer_maxdepth, + "from_static_only": self.from_static_only, + "messagestore": { + "count": len(self.propagation_entries), + "bytes": self.message_storage_size(), + "limit": self.message_storage_limit, + }, + "clients" : { + "client_propagation_messages_received": self.client_propagation_messages_received, + "client_propagation_messages_served": self.client_propagation_messages_served, + }, + "unpeered_propagation_incoming": self.unpeered_propagation_incoming, + "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes, + "static_peers": len(self.static_peers), + "discovered_peers": len(self.peers)-len(self.static_peers), + "total_peers": len(self.peers), + "max_peers": self.max_peers, + "peers": peer_stats, + } + + return node_stats + + def stats_get_request(self, path, data, request_id, remote_identity, requested_at): + if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY + elif remote_identity.hash not in self.control_allowed_list: return LXMPeer.ERROR_NO_ACCESS + else: return self.compile_stats() + + def peer_sync_request(self, path, data, request_id, remote_identity, requested_at): + if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY + elif remote_identity.hash not in self.control_allowed_list: return LXMPeer.ERROR_NO_ACCESS + else: + if type(data) != bytes: return LXMPeer.ERROR_INVALID_DATA + elif len(data) != RNS.Identity.TRUNCATED_HASHLENGTH//8: return LXMPeer.ERROR_INVALID_DATA + else: + if not data in self.peers: return LXMPeer.ERROR_NOT_FOUND + else: + self.peers[data].sync() + return True + + def peer_unpeer_request(self, path, data, request_id, remote_identity, requested_at): + if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY + elif remote_identity.hash not in self.control_allowed_list: return LXMPeer.ERROR_NO_ACCESS + else: + if type(data) != bytes: return LXMPeer.ERROR_INVALID_DATA + elif len(data) != RNS.Identity.TRUNCATED_HASHLENGTH//8: return LXMPeer.ERROR_INVALID_DATA + else: + if not data in self.peers: return LXMPeer.ERROR_NOT_FOUND + else: + self.unpeer(data) + return True ### Utility & Maintenance ############################# @@ -658,8 +846,9 @@ class LXMRouter: JOB_LINKS_INTERVAL = 1 JOB_TRANSIENT_INTERVAL = 60 JOB_STORE_INTERVAL = 120 - JOB_PEERSYNC_INTERVAL = 12 + JOB_PEERSYNC_INTERVAL = 6 JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL + JOB_ROTATE_INTERVAL = 56*JOB_PEERINGEST_INTERVAL def jobs(self): if not self.exit_handler_running: self.processing_count += 1 @@ -677,18 +866,17 @@ class LXMRouter: self.clean_transient_id_caches() if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: - self.clean_message_store() + if self.propagation_node == True: self.clean_message_store() if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0: - self.flush_queues() + if self.propagation_node == True: self.flush_queues() + + if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0: + if self.propagation_node == True: self.rotate_peers() if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: - self.sync_peers() - - # def syncstats(self): - # for peer_id in self.peers: - # p = self.peers[peer_id] - # RNS.log(f"{RNS.prettyhexrep(peer_id)} O={p.offered} S={p.outgoing} I={p.incoming} TX={RNS.prettysize(p.tx_bytes)} RX={RNS.prettysize(p.rx_bytes)}") + if self.propagation_node == True: self.sync_peers() + self.clean_throttled_peers() def jobloop(self): while (True): @@ -718,11 +906,13 @@ class LXMRouter: closed_links = [] for link_hash in self.direct_links: link = self.direct_links[link_hash] - inactive_time = link.inactive_for() + inactive_time = link.no_data_for() if inactive_time > LXMRouter.LINK_MAX_INACTIVITY: link.teardown() closed_links.append(link_hash) + if link.link_id in self.validated_peer_links: + self.validated_peer_links.pop(link.link_id) for link_hash in closed_links: cleaned_link = self.direct_links.pop(link_hash) @@ -781,8 +971,7 @@ class LXMRouter: RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG) self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost] - def job(): - self.save_outbound_stamp_costs() + def job(): self.save_outbound_stamp_costs() threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start() def get_announce_app_data(self, destination_hash): @@ -802,22 +991,26 @@ class LXMRouter: return msgpack.packb(peer_data) - def get_weight(self, transient_id): - dst_hash = self.propagation_entries[transient_id][0] - lxm_rcvd = self.propagation_entries[transient_id][2] + def get_size(self, transient_id): lxm_size = self.propagation_entries[transient_id][3] + return lxm_size - now = time.time() + def get_weight(self, transient_id): + dst_hash = self.propagation_entries[transient_id][0] + lxm_rcvd = self.propagation_entries[transient_id][2] + lxm_size = self.propagation_entries[transient_id][3] + + now = time.time() age_weight = max(1, (now - lxm_rcvd)/60/60/24/4) - if dst_hash in self.prioritised_list: - priority_weight = 0.1 - else: - priority_weight = 1.0 + if dst_hash in self.prioritised_list: priority_weight = 0.1 + else: priority_weight = 1.0 - weight = priority_weight * age_weight * lxm_size + return priority_weight * age_weight * lxm_size - return weight + def get_stamp_value(self, transient_id): + if not transient_id in self.propagation_entries: return None + else: return self.propagation_entries[transient_id][6] def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY): now = time.time() @@ -882,24 +1075,30 @@ class LXMRouter: else: return available_tickets - def get_size(self, transient_id): - lxm_size = self.propagation_entries[transient_id][3] - return lxm_size + def clean_throttled_peers(self): + expired_entries = [] + now = time.time() + for peer_hash in self.throttled_peers: + if now > self.throttled_peers[peer_hash]: expired_entries.append(peer_hash) + for peer_hash in expired_entries: self.throttled_peers.pop(peer_hash) def clean_message_store(self): + RNS.log("Cleaning message store", RNS.LOG_VERBOSE) # Check and remove expired messages now = time.time() removed_entries = {} - for transient_id in self.propagation_entries: - entry = self.propagation_entries[transient_id] - filepath = entry[1] - components = filepath.split("_") + for transient_id in self.propagation_entries.copy(): + entry = self.propagation_entries[transient_id] + filepath = entry[1] + stamp_value = entry[6] + filename = os.path.split(filepath)[-1] + components = filename.split("_") - if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2: + if len(components) == 3 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2 and int(components[2]) == stamp_value: timestamp = float(components[1]) if now > timestamp+LXMRouter.MESSAGE_EXPIRY: - RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG) + RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_EXTREME) removed_entries[transient_id] = filepath else: RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING) @@ -917,7 +1116,7 @@ class LXMRouter: RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) if removed_count > 0: - RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG) + RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_VERBOSE) # Check size of message store and cull if needed try: @@ -929,7 +1128,7 @@ class LXMRouter: bytes_cleaned = 0 weighted_entries = [] - for transient_id in self.propagation_entries: + for transient_id in self.propagation_entries.copy(): weighted_entries.append([ self.propagation_entries[transient_id], self.get_weight(transient_id), @@ -970,7 +1169,7 @@ class LXMRouter: try: if len(self.locally_delivered_transient_ids) > 0: if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) + os.makedirs(self.storagepath) with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file: locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) @@ -982,7 +1181,7 @@ class LXMRouter: try: if len(self.locally_processed_transient_ids) > 0: if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) + os.makedirs(self.storagepath) with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file: locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) @@ -990,6 +1189,24 @@ class LXMRouter: except Exception as e: RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + def save_node_stats(self): + try: + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) + + with open(self.storagepath+"/node_stats", "wb") as stats_file: + node_stats = { + "client_propagation_messages_received": self.client_propagation_messages_received, + "client_propagation_messages_served": self.client_propagation_messages_served, + "unpeered_propagation_incoming": self.unpeered_propagation_incoming, + "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes, + } + stats_file.write(msgpack.packb(node_stats)) + + except Exception as e: + RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + + def clean_outbound_stamp_costs(self): try: expired = [] @@ -1106,6 +1323,9 @@ class LXMRouter: self.propagation_destination.set_packet_callback(None) self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH) self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH) + self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH) + self.propagation_destination.deregister_request_handler(LXMRouter.SYNC_REQUEST_PATH) + self.propagation_destination.deregister_request_handler(LXMRouter.UNPEER_REQUEST_PATH) for link in self.active_propagation_links: try: if link.status == RNS.Link.ACTIVE: @@ -1135,18 +1355,21 @@ class LXMRouter: self.save_locally_delivered_transient_ids() self.save_locally_processed_transient_ids() + self.save_node_stats() def sigint_handler(self, signal, frame): if not self.exit_handler_running: RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING) - sys.exit(0) + self.exit_handler() + RNS.exit(0) else: RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) def sigterm_handler(self, signal, frame): if not self.exit_handler_running: RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING) - sys.exit(0) + self.exit_handler() + RNS.exit(0) else: RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) @@ -1184,12 +1407,8 @@ class LXMRouter: return True def message_get_request(self, path, data, request_id, remote_identity, requested_at): - if remote_identity == None: - return LXMPeer.ERROR_NO_IDENTITY - - elif not self.identity_allowed(remote_identity): - return LXMPeer.ERROR_NO_ACCESS - + if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY + elif not self.identity_allowed(remote_identity): return LXMPeer.ERROR_NO_ACCESS else: try: remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery") @@ -1208,9 +1427,7 @@ class LXMRouter: available_messages.sort(key=lambda e: e[1], reverse=False) transient_ids = [] - for available_entry in available_messages: - transient_ids.append(available_entry[0]) - + for available_entry in available_messages: transient_ids.append(available_entry[0]) return transient_ids else: @@ -1222,7 +1439,8 @@ class LXMRouter: filepath = self.propagation_entries[transient_id][1] self.propagation_entries.pop(transient_id) os.unlink(filepath) - RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG) + # TODO: Remove debug + # RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG) except Exception as e: RNS.log("Error while processing message purge request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -1236,8 +1454,7 @@ class LXMRouter: try: client_transfer_limit = float(data[2])*1000 RNS.log("Client indicates transfer limit of "+RNS.prettysize(client_transfer_limit), RNS.LOG_DEBUG) - except: - pass + except: pass per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now cumulative_size = 24 # Initialised to highest reasonable binary structure overhead @@ -1254,18 +1471,17 @@ class LXMRouter: lxm_size = len(lxmf_data) next_size = cumulative_size + (lxm_size+per_message_overhead) - if client_transfer_limit != None and next_size > client_transfer_limit: - pass + if client_transfer_limit != None and next_size > client_transfer_limit: pass else: - response_messages.append(lxmf_data) + response_messages.append(lxmf_data[:-LXStamper.STAMP_SIZE]) cumulative_size += (lxm_size+per_message_overhead) except Exception as e: RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + self.client_propagation_messages_served += len(response_messages) return response_messages - except Exception as e: RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG) return None @@ -1329,10 +1545,12 @@ class LXMRouter: self.propagation_transfer_state = LXMRouter.PR_NO_ACCESS else: + duplicates = 0 if request_receipt.response != None and len(request_receipt.response) > 0: haves = [] for lxmf_data in request_receipt.response: - self.lxmf_propagation(lxmf_data) + result = self.lxmf_propagation(lxmf_data, signal_duplicate=LXMRouter.DUPLICATE_SIGNAL) + if result == LXMRouter.DUPLICATE_SIGNAL: duplicates += 1 haves.append(RNS.Identity.full_hash(lxmf_data)) # Return a list of successfully received messages to the node. @@ -1348,6 +1566,7 @@ class LXMRouter: self.propagation_transfer_state = LXMRouter.PR_COMPLETE self.propagation_transfer_progress = 1.0 + self.propagation_transfer_last_duplicates = duplicates self.propagation_transfer_last_result = len(request_receipt.response) self.save_locally_delivered_transient_ids() @@ -1378,12 +1597,12 @@ class LXMRouter: else: return False - def cancel_outbound(self, message_id): + def cancel_outbound(self, message_id, cancel_state=LXMessage.CANCELLED): try: if message_id in self.pending_deferred_stamps: lxm = self.pending_deferred_stamps[message_id] RNS.log(f"Cancelling deferred stamp generation for {lxm}", RNS.LOG_DEBUG) - lxm.state = LXMessage.CANCELLED + lxm.state = cancel_state LXStamper.cancel_work(message_id) lxmessage = None @@ -1392,7 +1611,7 @@ class LXMRouter: lxmessage = lxm if lxmessage != None: - lxmessage.state = LXMessage.CANCELLED + lxmessage.state = cancel_state if lxmessage in self.pending_outbound: RNS.log(f"Cancelling {lxmessage} in outbound queue", RNS.LOG_DEBUG) if lxmessage.representation == LXMessage.RESOURCE: @@ -1427,11 +1646,9 @@ class LXMRouter: # destination to reply without generating a stamp. if lxmessage.include_ticket: ticket = self.generate_ticket(lxmessage.destination_hash) - if ticket: - lxmessage.fields[FIELD_TICKET] = ticket + if ticket: lxmessage.fields[FIELD_TICKET] = ticket - if not lxmessage.packed: - lxmessage.pack() + if not lxmessage.packed: lxmessage.pack() unknown_path_requested = False if not RNS.Transport.has_path(destination_hash) and lxmessage.method == LXMessage.OPPORTUNISTIC: @@ -1446,16 +1663,13 @@ class LXMRouter: RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but no stamp is required, processing immediately", RNS.LOG_DEBUG) lxmessage.defer_stamp = False - if not lxmessage.defer_stamp: - while not unknown_path_requested and self.processing_outbound: - time.sleep(0.05) + if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp): + while not unknown_path_requested and self.processing_outbound: time.sleep(0.05) self.pending_outbound.append(lxmessage) - if not unknown_path_requested: - self.process_outbound() + if not unknown_path_requested: self.process_outbound() - else: - self.pending_deferred_stamps[lxmessage.message_id] = lxmessage + else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage def get_outbound_progress(self, lxm_hash): for lxm in self.pending_outbound: @@ -1471,11 +1685,25 @@ class LXMRouter: def get_outbound_lxm_stamp_cost(self, lxm_hash): for lxm in self.pending_outbound: if lxm.hash == lxm_hash: - return lxm.stamp_cost + if lxm.outbound_ticket: return None + else: return lxm.stamp_cost for lxm_id in self.pending_deferred_stamps: if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: - return self.pending_deferred_stamps[lxm_id].stamp_cost + lxm = self.pending_deferred_stamps[lxm_id] + if lxm.outbound_ticket: return None + else: return lxm.stamp_cost + + return None + + def get_outbound_lxm_propagation_stamp_cost(self, lxm_hash): + for lxm in self.pending_outbound: + if lxm.hash == lxm_hash: + return lxm.propagation_target_cost + + for lxm_id in self.pending_deferred_stamps: + if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: + return self.pending_deferred_stamps[lxm_id].propagation_target_cost return None @@ -1483,7 +1711,7 @@ class LXMRouter: ### Message Routing & Delivery ######################## ####################################################### - def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False): + def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False, allow_duplicate=False): try: message = LXMessage.unpack_from_bytes(lxmf_data) if ratchet_id and not message.ratchet_id: @@ -1550,7 +1778,7 @@ class LXMRouter: RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) return False - if self.has_message(message.hash): + if not allow_duplicate and self.has_message(message.hash): RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) return False else: @@ -1642,28 +1870,50 @@ class LXMRouter: ### Peer Sync & Propagation ########################### ####################################################### - def peer(self, destination_hash, timestamp, propagation_transfer_limit): - if destination_hash in self.peers: - peer = self.peers[destination_hash] - if timestamp > peer.peering_timebase: - peer.alive = True - peer.sync_backoff = 0 - peer.next_sync_attempt = 0 - peer.peering_timebase = timestamp - peer.last_heard = time.time() - peer.propagation_transfer_limit = propagation_transfer_limit - RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE) - - else: - if len(self.peers) < self.max_peers: - peer = LXMPeer(self, destination_hash) - peer.alive = True - peer.last_heard = time.time() - peer.propagation_transfer_limit = propagation_transfer_limit - self.peers[destination_hash] = peer - RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE) + def peer(self, destination_hash, timestamp, propagation_transfer_limit, propagation_sync_limit, propagation_stamp_cost, propagation_stamp_cost_flexibility, peering_cost, metadata): + if peering_cost > self.max_peering_cost: + if destination_hash in self.peers: + RNS.log(f"Peer {RNS.prettyhexrep(destination_hash)} increased peering cost beyond local accepted maximum, breaking peering...", RNS.LOG_NOTICE) + self.unpeer(destination_hash, timestamp) else: - RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) + RNS.log(f"Not peering with {RNS.prettyhexrep(destination_hash)}, since its peering cost of {peering_cost} exceeds local maximum of {self.max_peering_cost}", RNS.LOG_NOTICE) + + else: + if destination_hash in self.peers: + peer = self.peers[destination_hash] + if timestamp > peer.peering_timebase: + peer.alive = True + peer.metadata = metadata + peer.sync_backoff = 0 + peer.next_sync_attempt = 0 + peer.peering_timebase = timestamp + peer.last_heard = time.time() + peer.propagation_stamp_cost = propagation_stamp_cost + peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility + peer.peering_cost = peering_cost + peer.propagation_transfer_limit = propagation_transfer_limit + if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit + else: peer.propagation_sync_limit = propagation_transfer_limit + + RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE) + + else: + if len(self.peers) >= self.max_peers: RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) + else: + peer = LXMPeer(self, destination_hash, sync_strategy=self.default_sync_strategy) + peer.alive = True + peer.metadata = metadata + peer.last_heard = time.time() + peer.propagation_stamp_cost = propagation_stamp_cost + peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility + peer.peering_cost = peering_cost + peer.propagation_transfer_limit = propagation_transfer_limit + if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit + else: peer.propagation_sync_limit = propagation_transfer_limit + + self.peers[destination_hash] = peer + RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE) + def unpeer(self, destination_hash, timestamp = None): if timestamp == None: @@ -1676,6 +1926,82 @@ class LXMRouter: self.peers.pop(destination_hash) RNS.log("Broke peering with "+str(peer.destination)) + def rotate_peers(self): + try: + rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0))) + required_drops = len(self.peers) - (self.max_peers - rotation_headroom) + if required_drops > 0 and len(self.peers) - required_drops > 1: + peers = self.peers.copy() + untested_peers = [] + for peer_id in self.peers: + peer = self.peers[peer_id] + if peer.last_sync_attempt == 0: + untested_peers.append(peer) + + if len(untested_peers) >= rotation_headroom: + RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG) + return + + fully_synced_peers = {} + for peer_id in peers: + peer = peers[peer_id] + if peer.unhandled_message_count == 0: + fully_synced_peers[peer_id] = peer + + if len(fully_synced_peers) > 0: + peers = fully_synced_peers + ms = "" if len(fully_synced_peers) == 1 else "s" + RNS.log(f"Found {len(fully_synced_peers)} fully synced peer{ms}, using as peer rotation pool basis", RNS.LOG_DEBUG) + + culled_peers = [] + waiting_peers = [] + unresponsive_peers = [] + for peer_id in peers: + peer = peers[peer_id] + if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE: + if peer.alive: + if peer.offered == 0: + # Don't consider for unpeering until at + # least one message has been offered + pass + else: + waiting_peers.append(peer) + else: + unresponsive_peers.append(peer) + + drop_pool = [] + if len(unresponsive_peers) > 0: + drop_pool.extend(unresponsive_peers) + if not self.prioritise_rotating_unreachable_peers: + drop_pool.extend(waiting_peers) + + else: + drop_pool.extend(waiting_peers) + + if len(drop_pool) > 0: + drop_count = min(required_drops, len(drop_pool)) + low_acceptance_rate_peers = sorted( + drop_pool, + key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ), + reverse=False + )[0:drop_count] + + dropped_peers = 0 + for peer in low_acceptance_rate_peers: + ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2) + if ar < LXMRouter.ROTATION_AR_MAX*100: + reachable_str = "reachable" if peer.alive else "unreachable" + RNS.log(f"Acceptance rate for {reachable_str} peer {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing}/{peer.offered}, {peer.unhandled_message_count} unhandled messages)", RNS.LOG_DEBUG) + self.unpeer(peer.destination_hash) + dropped_peers += 1 + + ms = "" if dropped_peers == 1 else "s" + RNS.log(f"Dropped {dropped_peers} low acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG) + + except Exception as e: + RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + def sync_peers(self): culled_peers = [] waiting_peers = [] @@ -1684,18 +2010,14 @@ class LXMRouter: for peer_id in peers: peer = peers[peer_id] if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: - if not peer_id in self.static_peers: - culled_peers.append(peer_id) + if not peer_id in self.static_peers: culled_peers.append(peer_id) + else: if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: - if peer.alive: - waiting_peers.append(peer) + if peer.alive: waiting_peers.append(peer) else: - if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt: - unresponsive_peers.append(peer) - else: - pass - # RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG) + if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt: unresponsive_peers.append(peer) + else: pass # RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG) peer_pool = [] if len(waiting_peers) > 0: @@ -1759,7 +2081,7 @@ class LXMRouter: return False size = resource.get_data_size() - limit = self.propagation_per_transfer_limit*1000 + limit = self.propagation_per_sync_limit*1000 if limit != None and size > limit: RNS.log(f"Rejecting {RNS.prettysize(size)} incoming propagation resource, since it exceeds the limit of {RNS.prettysize(limit)}", RNS.LOG_DEBUG) return False @@ -1768,17 +2090,31 @@ class LXMRouter: def propagation_packet(self, data, packet): try: - if packet.destination_type != RNS.Destination.LINK: - pass + if packet.destination_type != RNS.Destination.LINK: return else: - data = msgpack.unpackb(data) - remote_timebase = data[0] + data = msgpack.unpackb(data) + remote_timebase = data[0] + messages = data[1] - messages = data[1] - for lxmf_data in messages: - self.lxmf_propagation(lxmf_data) + min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) + validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost) - packet.prove() + for validated_entry in validated_messages: + lxmf_data = validated_entry[1] + stamp_value = validated_entry[2] + stamp_data = validated_entry[3] + self.lxmf_propagation(lxmf_data, stamp_value=stamp_value, stamp_data=stamp_data) + self.client_propagation_messages_received += 1 + + if len(validated_messages) == len(messages): + ms = "" if len(messages) == 1 else "s" + RNS.log(f"Received {len(messages)} propagation message{ms} from client with valid stamp{ms}", RNS.LOG_DEBUG) + packet.prove() + else: + RNS.log("Propagation transfer from client contained messages with invalid stamps", RNS.LOG_NOTICE) + reject_data = msgpack.packb([LXMPeer.ERROR_INVALID_STAMP]) + RNS.Packet(packet.link, reject_data).send() + packet.link.teardown() except Exception as e: RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR) @@ -1788,79 +2124,144 @@ class LXMRouter: if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY else: + remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + remote_hash = remote_destination.hash + remote_str = RNS.prettyhexrep(remote_hash) + + if remote_hash in self.throttled_peers: + throttle_remaining = self.throttled_peers[remote_hash]-time.time() + if throttle_remaining > 0: + RNS.log(f"Propagation offer from node {remote_str} rejected, throttled for {RNS.prettytime(throttle_remaining)} more", RNS.LOG_NOTICE) + return LXMPeer.ERROR_THROTTLED + else: self.throttled_peers.pop(remote_hash) + if self.from_static_only: - remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - remote_hash = remote_destination.hash - remote_str = RNS.prettyhexrep(remote_hash) if not remote_hash in self.static_peers: RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG) return LXMPeer.ERROR_NO_ACCESS try: - transient_ids = data - wanted_ids = [] + if type(data) != list and len(data) < 2: return LXMPeer.ERROR_INVALID_DATA - for transient_id in transient_ids: - if not transient_id in self.propagation_entries: - wanted_ids.append(transient_id) + peering_id = self.identity.hash+remote_identity.hash + target_cost = self.peering_cost + peering_key = data[0] + transient_ids = data[1] + wanted_ids = [] - if len(wanted_ids) == 0: - return False + ts = time.time() + peering_key_valid = LXStamper.validate_peering_key(peering_id, peering_key, target_cost) + td = time.time() - ts - elif len(wanted_ids) == len(transient_ids): - return True + if not peering_key_valid: + RNS.log(f"Invalid peering key for incoming sync offer", RNS.LOG_DEBUG) + return LXMPeer.ERROR_INVALID_KEY else: - return wanted_ids + RNS.log(f"Peering key validated for incoming offer in {RNS.prettytime(td)}", RNS.LOG_DEBUG) + self.validated_peer_links[link_id] = True + for transient_id in transient_ids: + if not transient_id in self.propagation_entries: wanted_ids.append(transient_id) + + if len(wanted_ids) == 0: return False + elif len(wanted_ids) == len(transient_ids): return True + else: return wanted_ids except Exception as e: RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG) + RNS.trace_exception(e) return None def propagation_resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: - # TODO: The peer this was received from should - # have the transient id added to its list of - # already handled messages. try: data = msgpack.unpackb(resource.data.read()) if type(data) == list and len(data) == 2 and type(data[0] == float) and type(data[1]) == list: # This is a series of propagation messages from a peer or originator - remote_timebase = data[0] - remote_hash = None - remote_str = "unknown peer" remote_identity = resource.link.get_remote_identity() + remote_timebase = data[0] + messages = data[1] + remote_hash = None + remote_str = "unknown client" if remote_identity != None: remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - remote_hash = remote_destination.hash - remote_str = RNS.prettyhexrep(remote_hash) + remote_hash = remote_destination.hash + remote_app_data = RNS.Identity.recall_app_data(remote_hash) + remote_str = RNS.prettyhexrep(remote_hash) - if not remote_hash in self.peers: - if self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: - # TODO: Query cache for an announce and get propagation - # transfer limit from that. For now, initialise it to a - # sane default value, and wait for an announce to arrive - # that will update the peering config to the actual limit. - propagation_transfer_limit = LXMRouter.PROPAGATION_LIMIT//4 - self.peer(remote_hash, remote_timebase, propagation_transfer_limit) + if remote_hash in self.peers: remote_str = f"peer {remote_str}" else: - remote_str = f"peer {remote_str}" + if pn_announce_data_is_valid(remote_app_data): + # 1: Current node timebase + # 2: Boolean flag signalling propagation node state + # 3: Per-transfer limit for message propagation in kilobytes + # 4: Limit for incoming propagation node syncs + # 5: Propagation stamp costs for this node + # 6: Node metadata + pn_config = msgpack.unpackb(remote_app_data) + if pn_config[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: + remote_timebase = pn_config[1] + remote_transfer_limit = pn_config[3] + remote_sync_limit = pn_config[4] + remote_stamp_cost = pn_config[5][0] + remote_stamp_flex = pn_config[5][1] + remote_peering_cost = pn_config[5][2] + remote_metadata = pn_config[6] - messages = data[1] - RNS.log(f"Received {len(messages)} messages from {remote_str}", RNS.LOG_VERBOSE) - for lxmf_data in messages: - peer = None - transient_id = RNS.Identity.full_hash(lxmf_data) - if remote_hash != None and remote_hash in self.peers: - peer = self.peers[remote_hash] - peer.incoming += 1 - peer.rx_bytes += len(lxmf_data) + RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug + self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata) - self.lxmf_propagation(lxmf_data, from_peer=peer) - if peer != None: - peer.queue_handled_message(transient_id) + peering_key_valid = False + if remote_identity != None: + if resource.link.link_id in self.validated_peer_links and self.validated_peer_links[resource.link.link_id] == True: + peering_key_valid = True + + if not peering_key_valid and len(messages) > 1: + resource.link.teardown() + RNS.log(f"Received multiple propagation messages from {remote_str} without valid peering key presentation. This is not supposed to happen, ignoring.", RNS.LOG_WARNING) + RNS.log(f"Clients and peers without a valid peering key can only deliver 1 message per transfer.", RNS.LOG_WARNING) + else: + ms = "" if len(messages) == 1 else "s" + RNS.log(f"Received {len(messages)} message{ms} from {remote_str}, validating stamps...", RNS.LOG_VERBOSE) + + min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) + validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost) + invalid_stamps = len(messages)-len(validated_messages) + ms = "" if invalid_stamps == 1 else "s" + if len(validated_messages) == len(messages): RNS.log(f"All message stamps validated from {remote_str}", RNS.LOG_VERBOSE) + else: RNS.log(f"Transfer from {remote_str} contained {invalid_stamps} invalid stamp{ms}", RNS.LOG_WARNING) + + for validated_entry in validated_messages: + transient_id = validated_entry[0] + lxmf_data = validated_entry[1] + stamp_value = validated_entry[2] + stamp_data = validated_entry[3] + peer = None + + if remote_hash != None and remote_hash in self.peers: + peer = self.peers[remote_hash] + peer.incoming += 1 + peer.rx_bytes += len(lxmf_data) + else: + if remote_identity != None: + self.unpeered_propagation_incoming += 1 + self.unpeered_propagation_rx_bytes += len(lxmf_data) + else: + self.client_propagation_messages_received += 1 + + self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value, stamp_data=stamp_data) + if peer != None: peer.queue_handled_message(transient_id) + + invalid_message_count = len(messages) - len(validated_messages) + if invalid_message_count > 0: + resource.link.teardown() + if remote_identity != None: + throttle_time = LXMRouter.PN_STAMP_THROTTLE + self.throttled_peers[remote_hash] = time.time()+throttle_time + ms = "" if invalid_message_count == 1 else "s" + RNS.log(f"Propagation transfer from {remote_str} contained {invalid_message_count} message{ms} with invalid stamps, throttled for {RNS.prettytime(throttle_time)}", RNS.LOG_NOTICE) else: RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) @@ -1887,16 +2288,16 @@ class LXMRouter: if peer != from_peer: peer.queue_unhandled_message(transient_id) - def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False, from_peer=None): - no_stamp_enforcement = False - if is_paper_message: - no_stamp_enforcement = True + def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, + from_peer=None, stamp_value=None, stamp_data=None): + if is_paper_message: no_stamp_enforcement = True + else: no_stamp_enforcement = False try: if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD: transient_id = RNS.Identity.full_hash(lxmf_data) - if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids: + if (not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids) or allow_duplicate == True: received = time.time() destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH] @@ -1908,7 +2309,7 @@ class LXMRouter: decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data) if decrypted_lxmf_data != None: delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data - self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement) + self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement, allow_duplicate=allow_duplicate) self.locally_delivered_transient_ids[transient_id] = time.time() if signal_local_delivery != None: @@ -1916,18 +2317,19 @@ class LXMRouter: else: if self.propagation_node: - file_path = self.messagepath+"/"+RNS.hexrep(transient_id, delimit=False)+"_"+str(received) - msg_file = open(file_path, "wb") - msg_file.write(lxmf_data) - msg_file.close() + stamped_data = lxmf_data+stamp_data + value_component = f"_{stamp_value}" if stamp_value and stamp_value > 0 else "" + file_path = f"{self.messagepath}/{RNS.hexrep(transient_id, delimit=False)}_{received}{value_component}" + msg_file = open(file_path, "wb") + msg_file.write(stamped_data); msg_file.close() - RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME) - self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []] + RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)} with stamp value {stamp_value}, adding to peer distribution queues...", RNS.LOG_EXTREME) + self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(stamped_data), [], [], stamp_value] self.enqueue_peer_distribution(transient_id, from_peer) else: # TODO: Add message to sneakernet queues when implemented - RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG) + RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)}, but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG) return True @@ -1946,7 +2348,7 @@ class LXMRouter: RNS.trace_exception(e) return False - def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None): + def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False): try: if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"): RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR) @@ -1956,7 +2358,7 @@ class LXMRouter: lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==") transient_id = RNS.Identity.full_hash(lxmf_data) - router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, is_paper_message=True) + router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True) if router_propagation_result != False: RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG) return router_propagation_result @@ -2008,29 +2410,88 @@ class LXMRouter: return - RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) - generated_stamp = selected_lxm.get_stamp() - if generated_stamp: - selected_lxm.stamp = generated_stamp - selected_lxm.defer_stamp = False - selected_lxm.packed = None - selected_lxm.pack() - self.pending_deferred_stamps.pop(selected_message_id) - self.pending_outbound.append(selected_lxm) - RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) - else: - if selected_lxm.state == LXMessage.CANCELLED: - RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) - selected_lxm.stamp_generation_failed = True - self.pending_deferred_stamps.pop(selected_message_id) - if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): - selected_lxm.failed_callback(lxmessage) + if selected_lxm.defer_stamp: + if selected_lxm.stamp == None: stamp_generation_success = False + else: stamp_generation_success = True + else: stamp_generation_success = True + + if selected_lxm.desired_method == LXMessage.PROPAGATED: + if selected_lxm.propagation_stamp == None: propagation_stamp_generation_success = False + else: propagation_stamp_generation_success = True + else: propagation_stamp_generation_success = True + + if stamp_generation_success == False: + RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) + generated_stamp = selected_lxm.get_stamp() + if generated_stamp: + selected_lxm.stamp = generated_stamp + selected_lxm.defer_stamp = False + selected_lxm.packed = None + selected_lxm.pack(payload_updated=True) + stamp_generation_success = True + RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) else: - RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) + if selected_lxm.state == LXMessage.CANCELLED: + RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): + selected_lxm.failed_callback(lxmessage) + else: + RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + self.fail_message(selected_lxm) + + if propagation_stamp_generation_success == False: + RNS.log(f"Starting propagation stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) + pn_target_cost = self.get_outbound_propagation_cost() + if pn_target_cost == None: + RNS.log("Failed to get propagation node stamp cost, cannot generate propagation stamp", RNS.LOG_ERROR) selected_lxm.stamp_generation_failed = True self.pending_deferred_stamps.pop(selected_message_id) self.fail_message(selected_lxm) + else: + propagation_stamp = selected_lxm.get_propagation_stamp(target_cost=pn_target_cost) + if propagation_stamp: + selected_lxm.propagation_stamp = propagation_stamp + selected_lxm.defer_propagation_stamp = False + selected_lxm.packed = None + selected_lxm.pack() + propagation_stamp_generation_success = True + RNS.log(f"Propagation stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) + else: + if selected_lxm.state == LXMessage.CANCELLED: + RNS.log(f"Message cancelled during deferred propagation stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): + selected_lxm.failed_callback(lxmessage) + else: + RNS.log(f"Deferred propagation stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + self.fail_message(selected_lxm) + + if stamp_generation_success and propagation_stamp_generation_success: + self.pending_deferred_stamps.pop(selected_message_id) + self.pending_outbound.append(selected_lxm) + + def propagation_transfer_signalling_packet(self, data, packet): + try: + unpacked = msgpack.unpackb(data) + if type(unpacked) == list and len(unpacked) >= 1: + signal = unpacked[0] + if signal == LXMPeer.ERROR_INVALID_STAMP: + RNS.log("Message rejected by propagation node", RNS.LOG_ERROR) + if hasattr(packet, "link") and hasattr(packet.link, "for_lxmessage"): + lxm = packet.link.for_lxmessage + RNS.log(f"Invalid propagation stamp on {lxm}", RNS.LOG_ERROR) + self.cancel_outbound(lxm.message_id, cancel_state=LXMessage.REJECTED) + + except Exception as e: + RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR) def process_outbound(self, sender = None): if self.processing_outbound: @@ -2074,15 +2535,14 @@ class LXMRouter: elif lxmessage.state == LXMessage.REJECTED: RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) - self.pending_outbound.remove(lxmessage) + if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage) if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): lxmessage.failed_callback(lxmessage) else: RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - if lxmessage.progress == None or lxmessage.progress < 0.01: - lxmessage.progress = 0.01 + if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 # Outbound handling for opportunistic messages if lxmessage.method == LXMessage.OPPORTUNISTIC: @@ -2149,7 +2609,8 @@ class LXMRouter: RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) elif direct_link.status == RNS.Link.CLOSED: if direct_link.activated_at != None: - RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed", RNS.LOG_DEBUG) + RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG) + RNS.Transport.request_path(lxmessage.get_destination().hash) else: if not hasattr(lxmessage, "path_request_retried"): RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG) @@ -2239,6 +2700,8 @@ class LXMRouter: propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) + self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet) + self.outbound_propagation_link.for_lxmessage = lxmessage else: RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) RNS.Transport.request_path(self.outbound_propagation_node) diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 2342708..baf951a 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -145,26 +145,32 @@ class LXMessage: self.set_fields(fields) - self.payload = None - self.timestamp = None - self.signature = None - self.hash = None - self.packed = None - self.state = LXMessage.GENERATING - self.method = LXMessage.UNKNOWN - self.progress = 0.0 - self.rssi = None - self.snr = None - self.q = None + self.payload = None + self.timestamp = None + self.signature = None + self.hash = None + self.transient_id = None + self.packed = None + self.state = LXMessage.GENERATING + self.method = LXMessage.UNKNOWN + self.progress = 0.0 + self.rssi = None + self.snr = None + self.q = None - self.stamp = None - self.stamp_cost = stamp_cost - self.stamp_value = None - self.stamp_valid = False - self.stamp_checked = False - self.defer_stamp = True - self.outbound_ticket = None - self.include_ticket = include_ticket + self.stamp = None + self.stamp_cost = stamp_cost + self.stamp_value = None + self.stamp_valid = False + self.stamp_checked = False + self.propagation_stamp = None + self.propagation_stamp_value = None + self.propagation_stamp_valid = False + self.propagation_target_cost = None + self.defer_stamp = True + self.defer_propagation_stamp = True + self.outbound_ticket = None + self.include_ticket = include_ticket self.propagation_packed = None self.paper_packed = None @@ -184,6 +190,7 @@ class LXMessage: self.resource_representation = None self.__delivery_destination = None self.__delivery_callback = None + self.__pn_encrypted_data = None self.failed_callback = None self.deferred_stamp_generating = False @@ -268,15 +275,6 @@ class LXMessage: def register_failed_callback(self, callback): self.failed_callback = callback - @staticmethod - def stamp_valid(stamp, target_cost, workblock): - target = 0b1 << 256-target_cost - result = RNS.Identity.full_hash(workblock+stamp) - if int.from_bytes(result, byteorder="big") > target: - return False - else: - return True - def validate_stamp(self, target_cost, tickets=None): if tickets != None: for ticket in tickets: @@ -293,7 +291,7 @@ class LXMessage: return False else: workblock = LXStamper.stamp_workblock(self.message_id) - if LXMessage.stamp_valid(self.stamp, target_cost, workblock): + if LXStamper.stamp_valid(self.stamp, target_cost, workblock): RNS.log(f"Stamp on {self} validated", RNS.LOG_DEBUG) # TODO: Remove at some point self.stamp_value = LXStamper.stamp_value(workblock, self.stamp) return True @@ -333,10 +331,35 @@ class LXMessage: else: return None - def pack(self): + def get_propagation_stamp(self, target_cost, timeout=None): + # If a stamp was already generated, return + # it immediately. + if self.propagation_stamp != None: + return self.propagation_stamp + + # Otherwise, we will need to generate a + # valid stamp according to the cost that + # the propagation node has specified. + else: + self.propagation_target_cost = target_cost + if self.propagation_target_cost == None: + raise ValueError("Cannot generate propagation stamp without configured target propagation cost") + + + if not self.transient_id: self.pack() + generated_stamp, value = LXStamper.generate_stamp(self.transient_id, target_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN) + if generated_stamp: + self.propagation_stamp = generated_stamp + self.propagation_stamp_value = value + self.propagation_stamp_valid = True + return generated_stamp + + else: + return None + + def pack(self, payload_updated=False): if not self.packed: - if self.timestamp == None: - self.timestamp = time.time() + if self.timestamp == None: self.timestamp = time.time() self.propagation_packed = None self.paper_packed = None @@ -352,9 +375,8 @@ class LXMessage: if not self.defer_stamp: self.stamp = self.get_stamp() - if self.stamp != None: - self.payload.append(self.stamp) - + if self.stamp != None: self.payload.append(self.stamp) + signed_part = b"" signed_part += hashed_part signed_part += self.hash @@ -380,7 +402,7 @@ class LXMessage: if self.desired_method == LXMessage.OPPORTUNISTIC: if self.__destination.type == RNS.Destination.SINGLE: if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT: - RNS.log(f"Opportunistic delivery was requested for {self}, but content exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG) + RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG) self.desired_method = LXMessage.DIRECT # Set delivery parameters according to delivery method @@ -409,9 +431,14 @@ class LXMessage: elif self.desired_method == LXMessage.PROPAGATED: single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT - encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) - self.ratchet_id = self.__destination.latest_ratchet_id - self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]]) + if self.__pn_encrypted_data == None or payload_updated: + self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) + self.ratchet_id = self.__destination.latest_ratchet_id + + lxmf_data = self.packed[:LXMessage.DESTINATION_LENGTH]+self.__pn_encrypted_data + self.transient_id = RNS.Identity.full_hash(lxmf_data) + if self.propagation_stamp != None: lxmf_data += self.propagation_stamp + self.propagation_packed = msgpack.packb([time.time(), [lxmf_data]]) content_size = len(self.propagation_packed) if content_size <= single_packet_content_limit: diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index bcfa95b..39b541b 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -3,25 +3,30 @@ import RNS.vendor.umsgpack as msgpack import os import time +import math +import itertools import multiprocessing -WORKBLOCK_EXPAND_ROUNDS = 3000 +WORKBLOCK_EXPAND_ROUNDS = 3000 +WORKBLOCK_EXPAND_ROUNDS_PN = 1000 +WORKBLOCK_EXPAND_ROUNDS_PEERING = 25 +STAMP_SIZE = RNS.Identity.HASHLENGTH//8 +PN_VALIDATION_POOL_MIN_SIZE = 256 active_jobs = {} -def stamp_workblock(message_id): +if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork") + +def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): wb_st = time.time() - expand_rounds = WORKBLOCK_EXPAND_ROUNDS workblock = b"" for n in range(expand_rounds): - workblock += RNS.Cryptography.hkdf( - length=256, - derive_from=message_id, - salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)), - context=None, - ) + workblock += RNS.Cryptography.hkdf(length=256, + derive_from=material, + salt=RNS.Identity.full_hash(material+msgpack.packb(n)), + context=None) wb_time = time.time() - wb_st - RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG) + # RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG) return workblock @@ -36,28 +41,70 @@ def stamp_value(workblock, stamp): return value -def generate_stamp(message_id, stamp_cost): +def stamp_valid(stamp, target_cost, workblock): + target = 0b1 << 256-target_cost + result = RNS.Identity.full_hash(workblock+stamp) + if int.from_bytes(result, byteorder="big") > target: return False + else: return True + +def validate_peering_key(peering_id, peering_key, target_cost): + workblock = stamp_workblock(peering_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING) + if not stamp_valid(peering_key, target_cost, workblock): return False + else: return True + +def validate_pn_stamp(transient_data, target_cost): + from .LXMessage import LXMessage + if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return None, None, None, None + else: + lxm_data = transient_data[:-STAMP_SIZE] + stamp = transient_data[-STAMP_SIZE:] + transient_id = RNS.Identity.full_hash(lxm_data) + workblock = stamp_workblock(transient_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN) + + if not stamp_valid(stamp, target_cost, workblock): return None, None, None, None + else: + value = stamp_value(workblock, stamp) + return transient_id, lxm_data, value, stamp + +def validate_pn_stamps_job_simple(transient_list, target_cost): + validated_messages = [] + for transient_data in transient_list: + transient_id, lxm_data, value, stamp_data = validate_pn_stamp(transient_data, target_cost) + if transient_id: validated_messages.append([transient_id, lxm_data, value, stamp_data]) + + return validated_messages + +def validate_pn_stamps_job_multip(transient_list, target_cost): + cores = multiprocessing.cpu_count() + pool_count = min(cores, math.ceil(len(transient_list) / PN_VALIDATION_POOL_MIN_SIZE)) + + RNS.log(f"Validating {len(transient_list)} stamps using {pool_count} processes...", RNS.LOG_VERBOSE) + with multiprocessing.Pool(pool_count) as p: + validated_entries = p.starmap(validate_pn_stamp, zip(transient_list, itertools.repeat(target_cost))) + + return [e for e in validated_entries if e[0] != None] + +def validate_pn_stamps(transient_list, target_cost): + non_mp_platform = RNS.vendor.platformutils.is_android() + if len(transient_list) <= PN_VALIDATION_POOL_MIN_SIZE or non_mp_platform: return validate_pn_stamps_job_simple(transient_list, target_cost) + else: return validate_pn_stamps_job_multip(transient_list, target_cost) + +def generate_stamp(message_id, stamp_cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG) - workblock = stamp_workblock(message_id) + workblock = stamp_workblock(message_id, expand_rounds=expand_rounds) start_time = time.time() stamp = None rounds = 0 value = 0 - if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin(): - stamp, rounds = job_simple(stamp_cost, workblock, message_id) - - elif RNS.vendor.platformutils.is_android(): - stamp, rounds = job_android(stamp_cost, workblock, message_id) - - else: - stamp, rounds = job_linux(stamp_cost, workblock, message_id) + if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin(): stamp, rounds = job_simple(stamp_cost, workblock, message_id) + elif RNS.vendor.platformutils.is_android(): stamp, rounds = job_android(stamp_cost, workblock, message_id) + else: stamp, rounds = job_linux(stamp_cost, workblock, message_id) duration = time.time() - start_time speed = rounds/duration - if stamp != None: - value = stamp_value(workblock, stamp) + if stamp != None: value = stamp_value(workblock, stamp) RNS.log(f"Stamp with value {value} generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG) @@ -113,10 +160,8 @@ def job_simple(stamp_cost, workblock, message_id): def sv(s, c, w): target = 0b1<<256-c; m = w+s result = RNS.Identity.full_hash(m) - if int.from_bytes(result, byteorder="big") > target: - return False - else: - return True + if int.from_bytes(result, byteorder="big") > target: return False + else: return True while not sv(pstamp, stamp_cost, workblock) and not active_jobs[message_id]: pstamp = os.urandom(256//8); rounds += 1 @@ -135,7 +180,8 @@ def job_linux(stamp_cost, workblock, message_id): allow_kill = True stamp = None total_rounds = 0 - jobs = multiprocessing.cpu_count() + cores = multiprocessing.cpu_count() + jobs = cores if cores <= 12 else int(cores/2) stop_event = multiprocessing.Event() result_queue = multiprocessing.Queue(1) rounds_queue = multiprocessing.Queue() @@ -310,6 +356,13 @@ def job_android(stamp_cost, workblock, message_id): return stamp, total_rounds +# def stamp_value_linear(workblock, stamp): +# value = 0 +# bits = 256 +# material = RNS.Identity.full_hash(workblock+stamp) +# s = int.from_bytes(material, byteorder="big") +# return s.bit_count() + if __name__ == "__main__": import sys if len(sys.argv) < 2: @@ -325,4 +378,14 @@ if __name__ == "__main__": RNS.loglevel = RNS.LOG_DEBUG RNS.log("Testing LXMF stamp generation", RNS.LOG_DEBUG) message_id = os.urandom(32) - generate_stamp(message_id, cost) \ No newline at end of file + generate_stamp(message_id, cost) + + RNS.log("", RNS.LOG_DEBUG) + RNS.log("Testing propagation stamp generation", RNS.LOG_DEBUG) + message_id = os.urandom(32) + generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN) + + RNS.log("", RNS.LOG_DEBUG) + RNS.log("Testing peering key generation", RNS.LOG_DEBUG) + message_id = os.urandom(32) + generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING) \ No newline at end of file diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index 0c87a73..ab8e30f 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 -# MIT License +# Reticulum License # -# Copyright (c) 2016-2022 Mark Qvist / unsigned.io +# Copyright (c) 2020-2025 Mark Qvist # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -11,8 +11,16 @@ # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. +# - The Software shall not be used in any kind of system which includes amongst +# its functions the ability to purposefully do harm to human beings. +# +# - The Software shall not be used, directly or indirectly, in the creation of +# an artificial intelligence, machine learning or language model training +# dataset, including but not limited to any use that contributes to the +# training or development of such a model or algorithm. +# +# - The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, @@ -35,6 +43,7 @@ import time import os from LXMF._version import __version__ +from LXMF import APP_NAME from RNS.vendor.configobj import ConfigObj @@ -96,6 +105,11 @@ def apply_config(): else: active_configuration["enable_propagation_node"] = False + if "propagation" in lxmd_config and "node_name" in lxmd_config["propagation"]: + active_configuration["node_name"] = lxmd_config["propagation"].get("node_name") + else: + active_configuration["node_name"] = None + if "propagation" in lxmd_config and "auth_required" in lxmd_config["propagation"]: active_configuration["auth_required"] = lxmd_config["propagation"].as_bool("auth_required") else: @@ -126,7 +140,7 @@ def apply_config(): if active_configuration["message_storage_limit"] < 0.005: active_configuration["message_storage_limit"] = 0.005 else: - active_configuration["message_storage_limit"] = 2000 + active_configuration["message_storage_limit"] = 500 if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]: active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size") @@ -134,11 +148,58 @@ def apply_config(): active_configuration["propagation_transfer_max_accepted_size"] = 0.38 else: active_configuration["propagation_transfer_max_accepted_size"] = 256 + + if "propagation" in lxmd_config and "propagation_message_max_accepted_size" in lxmd_config["propagation"]: + active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_message_max_accepted_size") + if active_configuration["propagation_transfer_max_accepted_size"] < 0.38: + active_configuration["propagation_transfer_max_accepted_size"] = 0.38 + else: + active_configuration["propagation_transfer_max_accepted_size"] = 256 + + if "propagation" in lxmd_config and "propagation_sync_max_accepted_size" in lxmd_config["propagation"]: + active_configuration["propagation_sync_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_sync_max_accepted_size") + if active_configuration["propagation_sync_max_accepted_size"] < 0.38: + active_configuration["propagation_sync_max_accepted_size"] = 0.38 + else: + active_configuration["propagation_sync_max_accepted_size"] = 256*40 + + if "propagation" in lxmd_config and "propagation_stamp_cost_target" in lxmd_config["propagation"]: + active_configuration["propagation_stamp_cost_target"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_target") + if active_configuration["propagation_stamp_cost_target"] < LXMF.LXMRouter.PROPAGATION_COST_MIN: + active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST_MIN + else: + active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST + + if "propagation" in lxmd_config and "propagation_stamp_cost_flexibility" in lxmd_config["propagation"]: + active_configuration["propagation_stamp_cost_flexibility"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_flexibility") + if active_configuration["propagation_stamp_cost_flexibility"] < 0: + active_configuration["propagation_stamp_cost_flexibility"] = 0 + else: + active_configuration["propagation_stamp_cost_flexibility"] = LXMF.LXMRouter.PROPAGATION_COST_FLEX + + if "propagation" in lxmd_config and "peering_cost" in lxmd_config["propagation"]: + active_configuration["peering_cost"] = lxmd_config["propagation"].as_int("peering_cost") + if active_configuration["peering_cost"] < 0: + active_configuration["peering_cost"] = 0 + else: + active_configuration["peering_cost"] = LXMF.LXMRouter.PEERING_COST + + if "propagation" in lxmd_config and "remote_peering_cost_max" in lxmd_config["propagation"]: + active_configuration["remote_peering_cost_max"] = lxmd_config["propagation"].as_int("remote_peering_cost_max") + if active_configuration["remote_peering_cost_max"] < 0: + active_configuration["remote_peering_cost_max"] = 0 + else: + active_configuration["remote_peering_cost_max"] = LXMF.LXMRouter.MAX_PEERING_COST if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") else: active_configuration["prioritised_lxmf_destinations"] = [] + + if "propagation" in lxmd_config and "control_allowed" in lxmd_config["propagation"]: + active_configuration["control_allowed_identities"] = lxmd_config["propagation"].as_list("control_allowed") + else: + active_configuration["control_allowed_identities"] = [] if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]: static_peers = lxmd_config["propagation"].as_list("static_peers") @@ -322,10 +383,16 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo autopeer = active_configuration["autopeer"], autopeer_maxdepth = active_configuration["autopeer_maxdepth"], propagation_limit = active_configuration["propagation_transfer_max_accepted_size"], + propagation_cost = active_configuration["propagation_stamp_cost_target"], + propagation_cost_flexibility = active_configuration["propagation_stamp_cost_flexibility"], + peering_cost = active_configuration["peering_cost"], + max_peering_cost = active_configuration["remote_peering_cost_max"], + sync_limit = active_configuration["propagation_sync_max_accepted_size"], delivery_limit = active_configuration["delivery_transfer_max_accepted_size"], max_peers = active_configuration["max_peers"], static_peers = active_configuration["static_peers"], - from_static_only = active_configuration["from_static_only"]) + from_static_only = active_configuration["from_static_only"], + name = active_configuration["node_name"]) message_router.register_delivery_callback(lxmf_delivery) @@ -358,13 +425,16 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo for dest_str in active_configuration["prioritised_lxmf_destinations"]: try: dest_hash = bytes.fromhex(dest_str) - if len(dest_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: - message_router.prioritise(dest_hash) - - except Exception as e: - RNS.log("Cannot prioritise "+str(dest_str)+", it is not a valid destination hash", RNS.LOG_ERROR) + if len(dest_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: message_router.prioritise(dest_hash) + except Exception as e: RNS.log("Cannot prioritise "+str(dest_str)+", it is not a valid destination hash", RNS.LOG_ERROR) message_router.enable_propagation() + + for ident_str in active_configuration["control_allowed_identities"]: + try: + identity_hash = bytes.fromhex(ident_str) + if len(identity_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: message_router.allow_control(identity_hash) + except Exception as e: RNS.log(f"Cannot allow control from {ident_str}, it is not a valid identity hash", RNS.LOG_ERROR) RNS.log("LXMF Propagation Node started on "+RNS.prettyhexrep(message_router.propagation_destination.hash)) @@ -415,6 +485,379 @@ def deferred_start_jobs(): last_node_announce = time.time() threading.Thread(target=jobs, daemon=True).start() +def _request_sync(identity, destination_hash, remote_identity, timeout=15, exit_on_fail=False): + control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") + + timeout = time.time()+timeout + def check_timeout(): + if time.time() > timeout: + if exit_on_fail: + print("Requesting lxmd peer sync timed out, exiting now") + exit(200) + else: + return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT + else: + time.sleep(0.1) + + if not RNS.Transport.has_path(control_destination.hash): + RNS.Transport.request_path(control_destination.hash) + while not RNS.Transport.has_path(control_destination.hash): + tc = check_timeout() + if tc: + return tc + + link = RNS.Link(control_destination) + while not link.status == RNS.Link.ACTIVE: + tc = check_timeout() + if tc: + return tc + + link.identify(identity) + request_receipt = link.request(LXMF.LXMRouter.SYNC_REQUEST_PATH, data=destination_hash, response_callback=None, failed_callback=None) + while not request_receipt.get_status() == RNS.RequestReceipt.READY: + tc = check_timeout() + if tc: + return tc + + link.teardown() + return request_receipt.get_response() + + +def request_sync(target, remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=15, identity_path=None): + global configpath, identitypath, storagedir, lxmdir + global lxmd_config, active_configuration, targetloglevel + + try: + peer_destination_hash = bytes.fromhex(target) + if len(peer_destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters") + except Exception as e: + print(f"Invalid peer destination hash: {e}") + exit(203) + remote + _remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path) + response = _request_sync(identity, peer_destination_hash, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True) + + if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY: + print("Remote received no identity") + exit(203) + elif response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS: + print("Access denied") + exit(204) + elif response == LXMF.LXMPeer.LXMPeer.ERROR_INVALID_DATA: + print("Invalid data received by remote") + exit(205) + elif response == LXMF.LXMPeer.LXMPeer.ERROR_NOT_FOUND: + print("The requested peer was not found") + exit(206) + elif response == None: + print("Empty response received") + exit(207) + else: + print(f"Sync requested for peer {RNS.prettyhexrep(peer_destination_hash)}") + exit(0) + +def _request_unpeer(identity, destination_hash, remote_identity, timeout=15, exit_on_fail=False): + control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") + + timeout = time.time()+timeout + def check_timeout(): + if time.time() > timeout: + if exit_on_fail: + print("Requesting lxmd peering break timed out, exiting now") + exit(200) + else: return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT + else: time.sleep(0.1) + + if not RNS.Transport.has_path(control_destination.hash): + RNS.Transport.request_path(control_destination.hash) + while not RNS.Transport.has_path(control_destination.hash): + tc = check_timeout() + if tc: + return tc + + link = RNS.Link(control_destination) + while not link.status == RNS.Link.ACTIVE: + tc = check_timeout() + if tc: + return tc + + link.identify(identity) + request_receipt = link.request(LXMF.LXMRouter.UNPEER_REQUEST_PATH, data=destination_hash, response_callback=None, failed_callback=None) + while not request_receipt.get_status() == RNS.RequestReceipt.READY: + tc = check_timeout() + if tc: + return tc + + link.teardown() + return request_receipt.get_response() + + +def request_unpeer(target, remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=15, identity_path=None): + global configpath, identitypath, storagedir, lxmdir + global lxmd_config, active_configuration, targetloglevel + + try: + peer_destination_hash = bytes.fromhex(target) + if len(peer_destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters") + except Exception as e: + print(f"Invalid peer destination hash: {e}") + exit(203) + remote + _remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path) + response = _request_unpeer(identity, peer_destination_hash, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True) + + if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY: + print("Remote received no identity") + exit(203) + elif response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS: + print("Access denied") + exit(204) + elif response == LXMF.LXMPeer.LXMPeer.ERROR_INVALID_DATA: + print("Invalid data received by remote") + exit(205) + elif response == LXMF.LXMPeer.LXMPeer.ERROR_NOT_FOUND: + print("The requested peer was not found") + exit(206) + elif response == None: + print("Empty response received") + exit(207) + else: + print(f"Broke peering with {RNS.prettyhexrep(peer_destination_hash)}") + exit(0) + +def query_status(identity, remote_identity=None, timeout=5, exit_on_fail=False): + if remote_identity == None: remote_identity = identity + control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") + + timeout = time.time()+timeout + def check_timeout(): + if time.time() > timeout: + if exit_on_fail: + print("Getting lxmd statistics timed out, exiting now") + exit(200) + else: return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT + else: time.sleep(0.1) + + if not RNS.Transport.has_path(control_destination.hash): + RNS.Transport.request_path(control_destination.hash) + while not RNS.Transport.has_path(control_destination.hash): + tc = check_timeout() + if tc: return tc + + link = RNS.Link(control_destination) + while not link.status == RNS.Link.ACTIVE: + tc = check_timeout() + if tc: return tc + + link.identify(identity) + request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None) + while not request_receipt.get_status() == RNS.RequestReceipt.READY: + tc = check_timeout() + if tc: return tc + + link.teardown() + return request_receipt.get_response() + +def get_status(remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=5, + show_status=False, show_peers=False, identity_path=None): + + global identity + _remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path) + response = query_status(identity, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True) + + if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY: + print("Remote received no identity") + exit(203) + if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS: + print("Access denied") + exit(204) + elif response == None: + print("Empty response received") + exit(207) + else: + s = response + mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2) + ms_util = f"{mutil}%" + if s["from_static_only"]: + who_str = "static peers only" + else: + who_str = "all nodes" + + available_peers = 0 + unreachable_peers = 0 + peered_incoming = 0 + peered_outgoing = 0 + peered_rx_bytes = 0 + peered_tx_bytes = 0 + for peer_id in s["peers"]: + p = s["peers"][peer_id] + pm = p["messages"] + peered_incoming += pm["incoming"] + peered_outgoing += pm["outgoing"] + peered_rx_bytes += p["rx_bytes"] + peered_tx_bytes += p["tx_bytes"] + + if p["alive"]: available_peers += 1 + else: unreachable_peers += 1 + + total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"] + total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"] + if total_incoming != 0: df = round(peered_outgoing/total_incoming, 2) + else: df = 0 + + dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"]) + print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}") + + if show_status: + msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"]) + ptl = RNS.prettysize(s["propagation_limit"]*1000); psl = RNS.prettysize(s["sync_limit"]*1000); + uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"]) + mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"] + ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"] + cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"] + psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"] + pc = s["peering_cost"]; pcm = s["max_peering_cost"] + print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})") + print(f"Required propagation stamp cost is {psc}, flexibility is {scf}") + print(f"Peering cost is {pc}, max remote peering cost is {pcm}") + print(f"Accepting propagated messages from {who_str}") + print(f"{ptl} message limit, {psl} sync limit") + print(f"") + print(f"Peers : {stp} total (peer limit is {smp})") + print(f" {sdp} discovered, {ssp} static") + print(f" {available_peers} available, {unreachable_peers} unreachable") + print(f"") + print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})") + print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})") + print(f" {upi} messages received from unpeered nodes ({uprx})") + print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})") + print(f" {cprr} propagation messages received directly from clients") + print(f" {cprs} propagation messages served to clients") + print(f" Distribution factor is {df}") + print(f"") + + if show_peers: + if not show_status: + print("") + + for peer_id in s["peers"]: + ind = " " + p = s["peers"][peer_id] + if p["type"] == "static": + t = "Static peer " + elif p["type"] == "discovered": + t = "Discovered peer " + else: + t = "Unknown peer " + a = "Available" if p["alive"] == True else "Unreachable" + h = max(time.time()-p["last_heard"], 0) + hops = p["network_distance"] + hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away" + pm = p["messages"]; pk = p["peering_key"] + pc = p["peering_cost"]; psc = p["target_stamp_cost"]; psf = p["stamp_cost_flexibility"] + if pc == None: pc = "unknown" + if psc == None: psc = "unknown" + if psf == None: psf = "unknown" + if pk == None: pk = "Not generated" + else: pk = f"Generated, value is {pk}" + if p["last_sync_attempt"] != 0: + lsa = p["last_sync_attempt"] + ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago" + else: + ls = "never synced" + + sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]) + stl = RNS.prettysize(p["transfer_limit"]*1000) if p["transfer_limit"] else "Unknown" + ssl = RNS.prettysize(p["sync_limit"]*1000) if p["sync_limit"] else "unknown" + srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"] + pmi = pm["incoming"]; pmuh = pm["unhandled"]; ar = round(p["acceptance_rate"]*100, 2) + if p["name"] == None: nn = "" + else: nn = p["name"].strip().replace("\n", "").replace("\r", "") + if len(nn) > 45: nn = f"{nn[:45]}..." + print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}") + if len(nn): print(f"{ind*2}Name : {nn}") + print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago") + print(f"{ind*2}Costs : Propagation {psc} (flex {psf}), peering {pc}") + print(f"{ind*2}Sync key : {pk}") + print(f"{ind*2}Speeds : {sstr} STR, {sler} LER") + print(f"{ind*2}Limits : {stl} message limit, {ssl} sync limit") + print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming, {ar}% acceptance rate") + print(f"{ind*2}Traffic : {srxb} received, {stxb} sent") + ms = "" if pm["unhandled"] == 1 else "s" + print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}") + print("") + +def _get_target_identity(remote=None, timeout=5): + global identity + timeout = time.time()+timeout + def check_timeout(): + if time.time() > timeout: + print("Resolving remote identity timed out, exiting now") + exit(200) + else: time.sleep(0.1) + + if remote == None: return identity + else: + try: + destination_hash = bytes.fromhex(remote) + if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters") + except Exception as e: + print(f"Invalid remote destination hash: {e}") + exit(203) + + remote_identity = RNS.Identity.recall(destination_hash) + if remote_identity: return remote_identity + else: + if not RNS.Transport.has_path(destination_hash): + RNS.Transport.request_path(destination_hash) + while not RNS.Transport.has_path(destination_hash): + tc = check_timeout() + if tc: return tc + + return RNS.Identity.recall(destination_hash) + +def _remote_init(configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, identity_path=None): + global configpath, identitypath, storagedir, lxmdir, identity + global lxmd_config, active_configuration, targetloglevel + targetlogdest = RNS.LOG_STDOUT + + if identity_path == None: + if configdir == None: + if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"): configdir = "/etc/lxmd" + elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"): configdir = RNS.Reticulum.userdir+"/.config/lxmd" + else: configdir = RNS.Reticulum.userdir+"/.lxmd" + + configpath = configdir+"/config" + identitypath = configdir+"/identity" + identity = None + + if not os.path.isdir(configdir): + RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR) + exit(201) + if not os.path.isfile(identitypath): + RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR) + exit(202) + else: + identity = RNS.Identity.from_file(identitypath) + if identity == None: + RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR) + exit(4) + + else: + if not os.path.isfile(identity_path): + RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR) + exit(202) + else: + identity = RNS.Identity.from_file(identity_path) + if identity == None: + RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR) + exit(4) + + if targetloglevel == None: targetloglevel = 3 + if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness + + reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest) + def main(): try: parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon") @@ -425,6 +868,13 @@ def main(): parser.add_argument("-v", "--verbose", action="count", default=0) parser.add_argument("-q", "--quiet", action="count", default=0) parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file") + parser.add_argument("--status", action="store_true", default=False, help="display node status") + parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes") + parser.add_argument("--sync", action="store", default=None, help="request a sync with the specified peer", type=str) + parser.add_argument("-b", "--break", dest="unpeer", action="store", default=None, help="break peering with the specified peer", type=str) + parser.add_argument("--timeout", action="store", default=None, help="timeout in seconds for query operations", type=float) + parser.add_argument("-r", "--remote", action="store", default=None, help="remote propagation node destination hash", type=str) + parser.add_argument("--identity", action="store", default=None, help="path to identity used for remote requests", type=str) parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit") parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__)) @@ -434,15 +884,50 @@ def main(): print(__default_lxmd_config__) exit() - program_setup( - configdir = args.config, - rnsconfigdir=args.rnsconfig, - run_pn=args.propagation_node, - on_inbound=args.on_inbound, - verbosity=args.verbose, - quietness=args.quiet, - service=args.service - ) + if args.status or args.peers: + if not args.timeout: args.timeout = 5 + get_status(configdir = args.config, + rnsconfigdir=args.rnsconfig, + verbosity=args.verbose, + quietness=args.quiet, + timeout=args.timeout, + show_status=args.status, + show_peers=args.peers, + identity_path=args.identity, + remote=args.remote) + exit() + + if args.sync: + if not args.timeout: args.timeout = 10 + request_sync(target=args.sync, + configdir = args.config, + rnsconfigdir=args.rnsconfig, + verbosity=args.verbose, + quietness=args.quiet, + timeout=args.timeout, + identity_path=args.identity, + remote=args.remote) + exit() + + if args.unpeer: + if not args.timeout: args.timeout = 10 + request_unpeer(target=args.unpeer, + configdir = args.config, + rnsconfigdir=args.rnsconfig, + verbosity=args.verbose, + quietness=args.quiet, + timeout=args.timeout, + identity_path=args.identity, + remote=args.remote) + exit() + + program_setup(configdir = args.config, + rnsconfigdir=args.rnsconfig, + run_pn=args.propagation_node, + on_inbound=args.on_inbound, + verbosity=args.verbose, + quietness=args.quiet, + service=args.service) except KeyboardInterrupt: print("") @@ -458,6 +943,17 @@ __default_lxmd_config__ = """# This is an example LXM Daemon config file. enable_node = no +# You can specify identity hashes for remotes +# that are allowed to control and query status +# for this propagation node. + +# control_allowed = 7d7e542829b40f32364499b27438dba8, 437229f8e29598b2282b88bad5e44698 + +# An optional name for this node, included +# in announces. + +# node_name = Anonymous Propagation Node + # Automatic announce interval in minutes. # 6 hours by default. @@ -477,19 +973,6 @@ autopeer = yes autopeer_maxdepth = 4 -# The maximum accepted transfer size per in- -# coming propagation transfer, in kilobytes. -# This also sets the upper limit for the size -# of single messages accepted onto this node. -# -# If a node wants to propagate a larger number -# of messages to this node, than what can fit -# within this limit, it will prioritise sending -# the smallest messages first, and try again -# with any remaining messages at a later point. - -propagation_transfer_max_accepted_size = 256 - # The maximum amount of storage to use for # the LXMF Propagation Node message store, # specified in megabytes. When this limit @@ -498,9 +981,57 @@ propagation_transfer_max_accepted_size = 256 # LXMF prioritises keeping messages that are # new and small. Large and old messages will # be removed first. This setting is optional -# and defaults to 2 gigabytes. +# and defaults to 500 megabytes. -# message_storage_limit = 2000 +# message_storage_limit = 500 + +# The maximum accepted transfer size per in- +# coming propagation message, in kilobytes. +# This sets the upper limit for the size of +# single messages accepted onto this node. + +# propagation_message_max_accepted_size = 256 + +# The maximum accepted transfer size per in- +# coming propagation node sync. +# +# If a node wants to propagate a larger number +# of messages to this node, than what can fit +# within this limit, it will prioritise sending +# the smallest messages first, and try again +# with any remaining messages at a later point. + +# propagation_sync_max_accepted_size = 10240 + +# You can configure the target stamp cost +# required to deliver messages via this node. + +# propagation_stamp_cost_target = 16 + +# If set higher than 0, the stamp cost flexi- +# bility option will make this node accept +# messages with a lower stamp cost than the +# target from other propagation nodes (but +# not from peers directly). This allows the +# network to gradually adjust stamp cost. + +# propagation_stamp_cost_flexibility = 3 + +# The peering_cost option configures the target +# value required for a remote node to peer with +# and deliver messages to this node. + +# peering_cost = 18 + +# You can configure the maximum peering cost +# of remote nodes that this node will peer with. +# Setting this to a higher number will allow +# this node to peer with other nodes requiring +# a higher peering key value, but will require +# more computation time during initial peering +# when generating the peering key. + +# remote_peering_cost_max = 26 # You can tell the LXMF message router to # prioritise storage for one or more @@ -512,6 +1043,25 @@ propagation_transfer_max_accepted_size = 256 # prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf +# You can configure the maximum number of other +# propagation nodes that this node will peer +# with automatically. The default is 20. + +# max_peers = 20 + +# You can configure a list of static propagation +# node peers, that this node will always be +# peered with, by specifying a list of +# destination hashes. + +# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4 + +# You can configure the propagation node to +# only accept incoming propagation messages +# from configured static peers. + +# from_static_only = True + # By default, any destination is allowed to # connect and download messages, but you can # optionally restrict this. If you enable diff --git a/LXMF/_version.py b/LXMF/_version.py index 43c4ab0..c598173 100644 --- a/LXMF/_version.py +++ b/LXMF/_version.py @@ -1 +1 @@ -__version__ = "0.6.1" +__version__ = "0.9.3" diff --git a/requirements.txt b/requirements.txt index 6b7926a..f0f3fc8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ -qrcode==7.4.2 -rns==0.7.8 -setuptools==70.0.0 +qrcode>=7.4.2 +rns>=1.0.0 diff --git a/setup.py b/setup.py index cabf20a..16d8d3c 100644 --- a/setup.py +++ b/setup.py @@ -15,9 +15,10 @@ setuptools.setup( long_description_content_type="text/markdown", url="https://github.com/markqvist/lxmf", packages=["LXMF", "LXMF.Utilities"], + license="Reticulum License", + license_files = ("LICENSE"), classifiers=[ "Programming Language :: Python :: 3", - "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], entry_points= { @@ -25,6 +26,6 @@ setuptools.setup( 'lxmd=LXMF.Utilities.lxmd:main', ] }, - install_requires=['rns>=0.9.1'], - python_requires='>=3.7', + install_requires=["rns>=1.0.1"], + python_requires=">=3.7", )