Compare commits

..

48 Commits

Author SHA1 Message Date
Mark Qvist
1bdcf6ad53 Updated license 2025-04-15 20:21:54 +02:00
Mark Qvist
e6021b8fed Updated license 2025-04-15 20:21:16 +02:00
Mark Qvist
326c0eed8f Updated version 2025-03-13 19:46:11 +01:00
Mark Qvist
336792c07a Updated dependencies 2025-03-13 19:45:15 +01:00
Mark Qvist
570d2c6846 Added configuration options to default config file 2025-03-07 11:05:50 +01:00
Mark Qvist
1ef4665073 Cleanup 2025-02-18 20:05:19 +01:00
Mark Qvist
d5540b927f Added allow_duplicate option to message ingest API 2025-01-31 13:38:56 +01:00
Mark Qvist
a6cf585109 Cleanup 2025-01-30 15:11:26 +01:00
Mark Qvist
c0a8f3be49 Cleanup 2025-01-30 15:04:21 +01:00
Mark Qvist
7b4780cfb7 Automatically clean messages exceeding propagation transfer limit for peer from unhandled message queues 2025-01-30 11:36:11 +01:00
Mark Qvist
b94a712bb6 Automatically clean messages exceeding propagation transfer limit for peer from unhandled message queues 2025-01-30 11:30:45 +01:00
Mark Qvist
f42ccfc4e9 Automatically clean messages exceeding propagation transfer limit for peer from unhandled message queues 2025-01-30 11:23:18 +01:00
Mark Qvist
9eca747757 Updated peer rotation timing to align with distribution queue mapping 2025-01-30 10:46:31 +01:00
Mark Qvist
b7b6753640 Fixed potential division by zero. Fixes #25. 2025-01-30 00:37:50 +01:00
Mark Qvist
40d0b9a5de Added acceptance rate threshold to peer rotation 2025-01-29 21:21:51 +01:00
Mark Qvist
40fc75f559 Refined peer rotation algorithm 2025-01-29 14:24:09 +01:00
Mark Qvist
f1d060a92e Added peer rotation 2025-01-29 01:26:36 +01:00
Mark Qvist
e0e901291e Updated logging 2025-01-27 12:04:16 +01:00
Mark Qvist
886ac69a82 Tear down control link after use 2025-01-27 12:04:05 +01:00
Mark Qvist
e0163e100a Updated issue template 2025-01-27 10:26:11 +01:00
Mark Qvist
26a10cce8f Status query return code 2025-01-26 01:13:11 +01:00
Mark Qvist
cec903a4dc Added status query API function 2025-01-24 14:05:12 +01:00
Mark Qvist
962d9c90d1 Added wanted inbound peers to PN announce data 2025-01-24 13:50:56 +01:00
Mark Qvist
6d2eb4f973 Updated default config 2025-01-24 00:26:47 +01:00
Mark Qvist
a8cc5f41cf Fixed typo 2025-01-24 00:21:37 +01:00
Mark Qvist
aa57b16cf5 Fixed #23 2025-01-24 00:09:36 +01:00
Mark Qvist
cdea838a6c Updated status output 2025-01-23 17:43:24 +01:00
Mark Qvist
fb4bf9b0b9 Cleanup 2025-01-23 17:36:30 +01:00
Mark Qvist
a3e3868f92 Changed formatting 2025-01-23 17:09:40 +01:00
Mark Qvist
70186cf8d9 Fixed typo 2025-01-23 17:07:20 +01:00
Mark Qvist
fe59b265c5 Fixed fstrings not working on Python < 3.12 2025-01-23 16:54:12 +01:00
Mark Qvist
a87458d25f Updated version 2025-01-23 16:28:11 +01:00
Mark Qvist
35dd70c59e Format status and peers output 2025-01-23 16:27:48 +01:00
Mark Qvist
a198e96064 Include unhandled message count in stats 2025-01-23 16:27:23 +01:00
Mark Qvist
e3be7e0cfd Persist last sync attempt 2025-01-23 16:27:01 +01:00
Mark Qvist
460645cea2 Added lxmd status getter 2025-01-23 14:15:31 +01:00
Mark Qvist
f683e03891 Added lxmd status getter 2025-01-23 14:15:12 +01:00
Mark Qvist
2c71cea7a0 Added local node stats request handler 2025-01-23 14:13:08 +01:00
Mark Qvist
61b1ecce27 Updated readme 2025-01-22 10:10:57 +01:00
Mark Qvist
68257a441f Set transfer limit on reverse auto-peer 2025-01-22 09:44:03 +01:00
Mark Qvist
e69da2ed2a Added static peers and peering limit 2025-01-22 01:37:09 +01:00
Mark Qvist
c2a08ef355 Enqueue and batch process distribution queue mappings 2025-01-21 20:44:11 +01:00
Mark Qvist
1430b1ce90 Enqueue and batch process distribution queue mappings 2025-01-21 20:20:39 +01:00
Mark Qvist
1c9c744107 Memory optimisations 2025-01-21 16:51:25 +01:00
Mark Qvist
bfed126a7c Memory optimisations 2025-01-21 16:44:24 +01:00
Mark Qvist
44d1d992f8 Updated version 2025-01-21 16:34:00 +01:00
Mark Qvist
7701f326d9 Memory optimisations 2025-01-21 16:33:39 +01:00
Mark Qvist
356cb6412f Optimise structure overhead 2025-01-21 10:46:59 +01:00
11 changed files with 980 additions and 127 deletions

View File

@ -12,7 +12,7 @@ Before creating a bug report on this issue tracker, you **must** read the [Contr
- The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**. - The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**.
- Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**. - Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**.
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), delete this section from your bug report. - After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**.
**Describe the Bug** **Describe the Bug**
A clear and concise description of what the bug is. A clear and concise description of what the bug is.

16
LICENSE
View File

@ -1,6 +1,6 @@
MIT License Reticulum License
Copyright (c) 2020 Mark Qvist / unsigned.io Copyright (c) 2020-2025 Mark Qvist
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
@ -9,8 +9,16 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all - The Software shall not be used in any kind of system which includes amongst
copies or substantial portions of the Software. its functions the ability to purposefully do harm to human beings.
- The Software shall not be used, directly or indirectly, in the creation of
an artificial intelligence, machine learning or language model training
dataset, including but not limited to any use that contributes to the
training or development of such a model or algorithm.
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

View File

@ -45,15 +45,28 @@ class LXMFPropagationAnnounceHandler:
if pn_announce_data_is_valid(data): if pn_announce_data_is_valid(data):
node_timebase = data[1] node_timebase = data[1]
propagation_transfer_limit = None propagation_transfer_limit = None
wanted_inbound_peers = None
if len(data) >= 4:
# TODO: Rethink, probably not necessary anymore
# try:
# wanted_inbound_peers = int(data[3])
# except:
# wanted_inbound_peers = None
pass
if len(data) >= 3: if len(data) >= 3:
try: try:
propagation_transfer_limit = float(data[2]) propagation_transfer_limit = float(data[2])
except: except:
propagation_transfer_limit = None propagation_transfer_limit = None
if destination_hash in self.lxmrouter.static_peers:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
else:
if data[0] == True: if data[0] == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit) self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
elif data[0] == False: elif data[0] == False:
self.lxmrouter.unpeer(destination_hash, node_timebase) self.lxmrouter.unpeer(destination_hash, node_timebase)

View File

@ -4,6 +4,7 @@ import time
import RNS import RNS
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
from collections import deque
from .LXMF import APP_NAME from .LXMF import APP_NAME
class LXMPeer: class LXMPeer:
@ -19,6 +20,7 @@ class LXMPeer:
ERROR_NO_IDENTITY = 0xf0 ERROR_NO_IDENTITY = 0xf0
ERROR_NO_ACCESS = 0xf1 ERROR_NO_ACCESS = 0xf1
ERROR_TIMEOUT = 0xfe
# Maximum amount of time a peer can # Maximum amount of time a peer can
# be unreachable before it is removed # be unreachable before it is removed
@ -38,11 +40,16 @@ class LXMPeer:
@staticmethod @staticmethod
def from_bytes(peer_bytes, router): def from_bytes(peer_bytes, router):
dictionary = msgpack.unpackb(peer_bytes) dictionary = msgpack.unpackb(peer_bytes)
peer_destination_hash = dictionary["destination_hash"]
peer_peering_timebase = dictionary["peering_timebase"]
peer_alive = dictionary["alive"]
peer_last_heard = dictionary["last_heard"]
peer = LXMPeer(router, peer_destination_hash)
peer.peering_timebase = peer_peering_timebase
peer.alive = peer_alive
peer.last_heard = peer_last_heard
peer = LXMPeer(router, dictionary["destination_hash"])
peer.peering_timebase = dictionary["peering_timebase"]
peer.alive = dictionary["alive"]
peer.last_heard = dictionary["last_heard"]
if "link_establishment_rate" in dictionary: if "link_establishment_rate" in dictionary:
peer.link_establishment_rate = dictionary["link_establishment_rate"] peer.link_establishment_rate = dictionary["link_establishment_rate"]
else: else:
@ -61,14 +68,54 @@ class LXMPeer:
else: else:
peer.propagation_transfer_limit = None peer.propagation_transfer_limit = None
if "offered" in dictionary:
peer.offered = dictionary["offered"]
else:
peer.offered = 0
if "outgoing" in dictionary:
peer.outgoing = dictionary["outgoing"]
else:
peer.outgoing = 0
if "incoming" in dictionary:
peer.incoming = dictionary["incoming"]
else:
peer.incoming = 0
if "rx_bytes" in dictionary:
peer.rx_bytes = dictionary["rx_bytes"]
else:
peer.rx_bytes = 0
if "tx_bytes" in dictionary:
peer.tx_bytes = dictionary["tx_bytes"]
else:
peer.tx_bytes = 0
if "last_sync_attempt" in dictionary:
peer.last_sync_attempt = dictionary["last_sync_attempt"]
else:
peer.last_sync_attempt = 0
hm_count = 0
for transient_id in dictionary["handled_ids"]: for transient_id in dictionary["handled_ids"]:
if transient_id in router.propagation_entries: if transient_id in router.propagation_entries:
peer.handled_messages[transient_id] = router.propagation_entries[transient_id] peer.add_handled_message(transient_id)
hm_count += 1
um_count = 0
for transient_id in dictionary["unhandled_ids"]: for transient_id in dictionary["unhandled_ids"]:
if transient_id in router.propagation_entries: if transient_id in router.propagation_entries:
peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id] peer.add_unhandled_message(transient_id)
um_count += 1
peer._hm_count = hm_count
peer._um_count = um_count
peer._hm_counts_synced = True
peer._um_counts_synced = True
del dictionary
return peer return peer
def to_bytes(self): def to_bytes(self):
@ -80,6 +127,12 @@ class LXMPeer:
dictionary["link_establishment_rate"] = self.link_establishment_rate dictionary["link_establishment_rate"] = self.link_establishment_rate
dictionary["sync_transfer_rate"] = self.sync_transfer_rate dictionary["sync_transfer_rate"] = self.sync_transfer_rate
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
dictionary["last_sync_attempt"] = self.last_sync_attempt
dictionary["offered"] = self.offered
dictionary["outgoing"] = self.outgoing
dictionary["incoming"] = self.incoming
dictionary["rx_bytes"] = self.rx_bytes
dictionary["tx_bytes"] = self.tx_bytes
handled_ids = [] handled_ids = []
for transient_id in self.handled_messages: for transient_id in self.handled_messages:
@ -92,7 +145,10 @@ class LXMPeer:
dictionary["handled_ids"] = handled_ids dictionary["handled_ids"] = handled_ids
dictionary["unhandled_ids"] = unhandled_ids dictionary["unhandled_ids"] = unhandled_ids
return msgpack.packb(dictionary) peer_bytes = msgpack.packb(dictionary)
del dictionary
return peer_bytes
def __init__(self, router, destination_hash): def __init__(self, router, destination_hash):
self.alive = False self.alive = False
@ -104,12 +160,23 @@ class LXMPeer:
self.link_establishment_rate = 0 self.link_establishment_rate = 0
self.sync_transfer_rate = 0 self.sync_transfer_rate = 0
self.propagation_transfer_limit = None self.propagation_transfer_limit = None
self.handled_messages_queue = deque()
self.unhandled_messages_queue = deque()
self.offered = 0 # Messages offered to this peer
self.outgoing = 0 # Messages transferred to this peer
self.incoming = 0 # Messages received from this peer
self.rx_bytes = 0 # Bytes received from this peer
self.tx_bytes = 0 # Bytes sent to this peer
self._hm_count = 0
self._um_count = 0
self._hm_counts_synced = False
self._um_counts_synced = False
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
self.unhandled_messages = {}
self.handled_messages = {}
self.last_offer = [] self.last_offer = []
self.router = router self.router = router
@ -118,6 +185,7 @@ class LXMPeer:
if self.identity != None: if self.identity != None:
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
else: else:
self.destination = None
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING) RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
def sync(self): def sync(self):
@ -171,7 +239,7 @@ class LXMPeer:
for transient_id in purged_ids: for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.unhandled_messages.pop(transient_id) self.remove_unhandled_message(transient_id)
unhandled_entries.sort(key=lambda e: e[1], reverse=False) unhandled_entries.sort(key=lambda e: e[1], reverse=False)
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
@ -182,14 +250,17 @@ class LXMPeer:
lxm_size = unhandled_entry[2] lxm_size = unhandled_entry[2]
next_size = cumulative_size + (lxm_size+per_message_overhead) next_size = cumulative_size + (lxm_size+per_message_overhead)
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000): if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
pass if lxm_size+per_message_overhead > (self.propagation_transfer_limit*1000):
self.remove_unhandled_message(transient_id)
self.add_handled_message(transient_id)
RNS.log(f"Message {RNS.prettyhexrep(transient_id)} exceeds transfer limit for {self}, considering handled", RNS.LOG_DEBUG)
else: else:
cumulative_size += (lxm_size+per_message_overhead) cumulative_size += (lxm_size+per_message_overhead)
unhandled_ids.append(transient_id) unhandled_ids.append(transient_id)
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_VERBOSE)
self.last_offer = unhandled_ids self.last_offer = unhandled_ids
self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed) self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT self.state = LXMPeer.REQUEST_SENT
else: else:
@ -217,22 +288,29 @@ class LXMPeer:
if response == LXMPeer.ERROR_NO_IDENTITY: if response == LXMPeer.ERROR_NO_IDENTITY:
if self.link != None: if self.link != None:
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG) RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE)
self.link.identify() self.link.identify()
self.state = LXMPeer.LINK_READY self.state = LXMPeer.LINK_READY
self.sync() self.sync()
return
elif response == LXMPeer.ERROR_NO_ACCESS:
RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE)
self.router.unpeer(self.destination_hash)
return
elif response == False: elif response == False:
# Peer already has all advertised messages # Peer already has all advertised messages
for transient_id in self.last_offer: for transient_id in self.last_offer:
if transient_id in self.unhandled_messages: if transient_id in self.unhandled_messages:
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id) self.add_handled_message(transient_id)
self.remove_unhandled_message(transient_id)
elif response == True: elif response == True:
# Peer wants all advertised messages # Peer wants all advertised messages
for transient_id in self.last_offer: for transient_id in self.last_offer:
wanted_messages.append(self.unhandled_messages[transient_id]) wanted_messages.append(self.router.propagation_entries[transient_id])
wanted_message_ids.append(transient_id) wanted_message_ids.append(transient_id)
else: else:
@ -241,18 +319,17 @@ class LXMPeer:
# If the peer did not want the message, it has # If the peer did not want the message, it has
# already received it from another peer. # already received it from another peer.
if not transient_id in response: if not transient_id in response:
if transient_id in self.unhandled_messages: self.add_handled_message(transient_id)
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id) self.remove_unhandled_message(transient_id)
for transient_id in response: for transient_id in response:
wanted_messages.append(self.unhandled_messages[transient_id]) wanted_messages.append(self.router.propagation_entries[transient_id])
wanted_message_ids.append(transient_id) wanted_message_ids.append(transient_id)
if len(wanted_messages) > 0: if len(wanted_messages) > 0:
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG) RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_VERBOSE)
lxm_list = [] lxm_list = []
for message_entry in wanted_messages: for message_entry in wanted_messages:
file_path = message_entry[1] file_path = message_entry[1]
if os.path.isfile(file_path): if os.path.isfile(file_path):
@ -268,7 +345,8 @@ class LXMPeer:
self.state = LXMPeer.RESOURCE_TRANSFERRING self.state = LXMPeer.RESOURCE_TRANSFERRING
else: else:
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG) RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_VERBOSE)
self.offered += len(self.last_offer)
if self.link != None: if self.link != None:
self.link.teardown() self.link.teardown()
@ -288,8 +366,8 @@ class LXMPeer:
def resource_concluded(self, resource): def resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
for transient_id in resource.transferred_messages: for transient_id in resource.transferred_messages:
message = self.unhandled_messages.pop(transient_id) self.add_handled_message(transient_id)
self.handled_messages[transient_id] = message self.remove_unhandled_message(transient_id)
if self.link != None: if self.link != None:
self.link.teardown() self.link.teardown()
@ -302,12 +380,15 @@ class LXMPeer:
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started) self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started)
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}" rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed"+rate_str, RNS.LOG_DEBUG) RNS.log(f"Syncing {len(resource.transferred_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
self.alive = True self.alive = True
self.last_heard = time.time() self.last_heard = time.time()
self.offered += len(self.last_offer)
self.outgoing += len(resource.transferred_messages)
self.tx_bytes += resource.get_data_size()
else: else:
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG) RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE)
if self.link != None: if self.link != None:
self.link.teardown() self.link.teardown()
@ -328,9 +409,103 @@ class LXMPeer:
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def handle_message(self, transient_id): def queued_items(self):
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages: return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
def queue_unhandled_message(self, transient_id):
self.unhandled_messages_queue.append(transient_id)
def queue_handled_message(self, transient_id):
self.handled_messages_queue.append(transient_id)
def process_queues(self):
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
# TODO: Remove debug
# st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue)
handled_messages = self.handled_messages
unhandled_messages = self.unhandled_messages
while len(self.handled_messages_queue) > 0:
transient_id = self.handled_messages_queue.pop()
if not transient_id in handled_messages:
self.add_handled_message(transient_id)
if transient_id in unhandled_messages:
self.remove_unhandled_message(transient_id)
while len(self.unhandled_messages_queue) > 0:
transient_id = self.unhandled_messages_queue.pop()
if not transient_id in handled_messages and not transient_id in unhandled_messages:
self.add_unhandled_message(transient_id)
del handled_messages, unhandled_messages
# TODO: Remove debug
# RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}")
@property
def handled_messages(self):
pes = self.router.propagation_entries.copy()
hm = list(filter(lambda tid: self.destination_hash in pes[tid][4], pes))
self._hm_count = len(hm); del pes
self._hm_counts_synced = True
return hm
@property
def unhandled_messages(self):
pes = self.router.propagation_entries.copy()
um = list(filter(lambda tid: self.destination_hash in pes[tid][5], pes))
self._um_count = len(um); del pes
self._um_counts_synced = True
return um
@property
def handled_message_count(self):
if not self._hm_counts_synced:
self._update_counts()
return self._hm_count
@property
def unhandled_message_count(self):
if not self._um_counts_synced:
self._update_counts()
return self._um_count
@property
def acceptance_rate(self):
return 0 if self.offered == 0 else (self.outgoing/self.offered)
def _update_counts(self):
if not self._hm_counts_synced:
hm = self.handled_messages; del hm
if not self._um_counts_synced:
um = self.unhandled_messages; del um
def add_handled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if not self.destination_hash in self.router.propagation_entries[transient_id][4]:
self.router.propagation_entries[transient_id][4].append(self.destination_hash)
self._hm_counts_synced = False
def add_unhandled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if not self.destination_hash in self.router.propagation_entries[transient_id][5]:
self.router.propagation_entries[transient_id][5].append(self.destination_hash)
self._um_count += 1
def remove_handled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if self.destination_hash in self.router.propagation_entries[transient_id][4]:
self.router.propagation_entries[transient_id][4].remove(self.destination_hash)
self._hm_counts_synced = False
def remove_unhandled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if self.destination_hash in self.router.propagation_entries[transient_id][5]:
self.router.propagation_entries[transient_id][5].remove(self.destination_hash)
self._um_counts_synced = False
def __str__(self): def __str__(self):
if self.destination_hash: if self.destination_hash:

View File

@ -1,10 +1,15 @@
import os import os
import sys
import time import time
import math
import random import random
import base64 import base64
import atexit import atexit
import signal
import threading import threading
from collections import deque
import RNS import RNS
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
@ -32,9 +37,12 @@ class LXMRouter:
NODE_ANNOUNCE_DELAY = 20 NODE_ANNOUNCE_DELAY = 20
MAX_PEERS = 50
AUTOPEER = True AUTOPEER = True
AUTOPEER_MAXDEPTH = 4 AUTOPEER_MAXDEPTH = 4
FASTEST_N_RANDOM_POOL = 2 FASTEST_N_RANDOM_POOL = 2
ROTATION_HEADROOM_PCT = 10
ROTATION_AR_MAX = 0.5
PROPAGATION_LIMIT = 256 PROPAGATION_LIMIT = 256
DELIVERY_LIMIT = 1000 DELIVERY_LIMIT = 1000
@ -58,11 +66,16 @@ class LXMRouter:
PR_ALL_MESSAGES = 0x00 PR_ALL_MESSAGES = 0x00
STATS_GET_PATH = "/pn/get/stats"
### Developer-facing API ############################## ### Developer-facing API ##############################
####################################################### #######################################################
def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None, propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT, enforce_ratchets = False, enforce_stamps = False): def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None,
propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, enforce_ratchets=False,
enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False):
random.seed(os.urandom(10)) random.seed(os.urandom(10))
self.pending_inbound = [] self.pending_inbound = []
@ -83,6 +96,7 @@ class LXMRouter:
self.processing_count = 0 self.processing_count = 0
self.propagation_node = False self.propagation_node = False
self.propagation_node_start_time = None
if storagepath == None: if storagepath == None:
raise ValueError("LXMF cannot be initialised without a storage path") raise ValueError("LXMF cannot be initialised without a storage path")
@ -93,6 +107,9 @@ class LXMRouter:
self.outbound_propagation_node = None self.outbound_propagation_node = None
self.outbound_propagation_link = None self.outbound_propagation_link = None
if delivery_limit == None:
delivery_limit = LXMRouter.DELIVERY_LIMIT
self.message_storage_limit = None self.message_storage_limit = None
self.information_storage_limit = None self.information_storage_limit = None
self.propagation_per_transfer_limit = propagation_limit self.propagation_per_transfer_limit = propagation_limit
@ -107,6 +124,7 @@ class LXMRouter:
self.propagation_transfer_progress = 0.0 self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = None self.propagation_transfer_last_result = None
self.propagation_transfer_max_messages = None self.propagation_transfer_max_messages = None
self.prioritise_rotating_unreachable_peers = False
self.active_propagation_links = [] self.active_propagation_links = []
self.locally_delivered_transient_ids = {} self.locally_delivered_transient_ids = {}
self.locally_processed_transient_ids = {} self.locally_processed_transient_ids = {}
@ -116,12 +134,18 @@ class LXMRouter:
self.cost_file_lock = threading.Lock() self.cost_file_lock = threading.Lock()
self.ticket_file_lock = threading.Lock() self.ticket_file_lock = threading.Lock()
self.stamp_gen_lock = threading.Lock() self.stamp_gen_lock = threading.Lock()
self.exit_handler_running = False
if identity == None: if identity == None:
identity = RNS.Identity() identity = RNS.Identity()
self.identity = identity self.identity = identity
self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation") self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.control_destination = None
self.client_propagation_messages_received = 0
self.client_propagation_messages_served = 0
self.unpeered_propagation_incoming = 0
self.unpeered_propagation_rx_bytes = 0
if autopeer != None: if autopeer != None:
self.autopeer = autopeer self.autopeer = autopeer
@ -133,9 +157,32 @@ class LXMRouter:
else: else:
self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH
if max_peers == None:
self.max_peers = LXMRouter.MAX_PEERS
else:
if type(max_peers) == int and max_peers >= 0:
self.max_peers = max_peers
else:
raise ValueError(f"Invalid value for max_peers: {max_peers}")
self.from_static_only = from_static_only
if type(static_peers) != list:
raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}")
else:
for static_peer in static_peers:
if type(static_peer) != bytes:
raise ValueError(f"Invalid static peer destination hash: {static_peer}")
else:
if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
raise ValueError(f"Invalid static peer destination hash: {static_peer}")
self.static_peers = static_peers
self.peers = {} self.peers = {}
self.propagation_entries = {} self.propagation_entries = {}
self.peer_distribution_queue = deque()
RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self)) RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self))
RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self)) RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self))
@ -220,6 +267,8 @@ class LXMRouter:
RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
atexit.register(self.exit_handler) atexit.register(self.exit_handler)
signal.signal(signal.SIGINT, self.sigint_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
job_thread = threading.Thread(target=self.jobloop) job_thread = threading.Thread(target=self.jobloop)
job_thread.setDaemon(True) job_thread.setDaemon(True)
@ -232,10 +281,12 @@ class LXMRouter:
def announce_propagation_node(self): def announce_propagation_node(self):
def delayed_announce(): def delayed_announce():
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
node_state = self.propagation_node and not self.from_static_only
announce_data = [ announce_data = [
self.propagation_node, # Boolean flag signalling propagation node state node_state, # Boolean flag signalling propagation node state
int(time.time()), # Current node timebase int(time.time()), # Current node timebase
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
None, # How many more inbound peers this node wants
] ]
data = msgpack.packb(announce_data) data = msgpack.packb(announce_data)
@ -427,6 +478,8 @@ class LXMRouter:
os.makedirs(self.messagepath) os.makedirs(self.messagepath)
self.propagation_entries = {} self.propagation_entries = {}
st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE)
for filename in os.listdir(self.messagepath): for filename in os.listdir(self.messagepath):
components = filename.split("_") components = filename.split("_")
if len(components) == 2: if len(components) == 2:
@ -443,41 +496,94 @@ class LXMRouter:
file.close() file.close()
self.propagation_entries[transient_id] = [ self.propagation_entries[transient_id] = [
destination_hash, destination_hash, # 0: Destination hash
filepath, filepath, # 1: Storage location
received, received, # 2: Receive timestamp
msg_size, msg_size, # 3: Message size
[], # 4: Handled peers
[], # 5: Unhandled peers
] ]
except Exception as e: except Exception as e:
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st))
RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE)
RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
st = time.time();
if os.path.isfile(self.storagepath+"/peers"): if os.path.isfile(self.storagepath+"/peers"):
peers_file = open(self.storagepath+"/peers", "rb") peers_file = open(self.storagepath+"/peers", "rb")
peers_data = peers_file.read() peers_data = peers_file.read()
peers_file.close()
if len(peers_data) > 0: if len(peers_data) > 0:
serialised_peers = msgpack.unpackb(peers_data) serialised_peers = msgpack.unpackb(peers_data)
del peers_data
for serialised_peer in serialised_peers: while len(serialised_peers) > 0:
serialised_peer = serialised_peers.pop()
peer = LXMPeer.from_bytes(serialised_peer, self) peer = LXMPeer.from_bytes(serialised_peer, self)
del serialised_peer
if peer.destination_hash in self.static_peers and peer.last_heard == 0:
# TODO: Allow path request responses through announce handler
# momentarily here, so peering config can be updated even if
# the static peer is not available to directly send an announce.
RNS.Transport.request_path(peer.destination_hash)
if peer.identity != None: if peer.identity != None:
self.peers[peer.destination_hash] = peer self.peers[peer.destination_hash] = peer
lim_str = ", no transfer limit" lim_str = ", no transfer limit"
if peer.propagation_transfer_limit != None: if peer.propagation_transfer_limit != None:
lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit" lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit"
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG) RNS.log("Rebuilt peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(peer.unhandled_message_count)+" unhandled messages"+lim_str, RNS.LOG_DEBUG)
else: else:
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
del peer
del serialised_peers
if len(self.static_peers) > 0:
for static_peer in self.static_peers:
if not static_peer in self.peers:
RNS.log(f"Activating static peering with {RNS.prettyhexrep(static_peer)}", RNS.LOG_NOTICE)
self.peers[static_peer] = LXMPeer(self, static_peer)
if self.peers[static_peer].last_heard == 0:
# TODO: Allow path request responses through announce handler
# momentarily here, so peering config can be updated even if
# the static peer is not available to directly send an announce.
RNS.Transport.request_path(static_peer)
RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
try:
if os.path.isfile(self.storagepath+"/node_stats"):
node_stats_file = open(self.storagepath+"/node_stats", "rb")
data = node_stats_file.read()
node_stats_file.close()
node_stats = msgpack.unpackb(data)
if not type(node_stats) == dict:
RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR)
else:
self.client_propagation_messages_received = node_stats["client_propagation_messages_received"]
self.client_propagation_messages_served = node_stats["client_propagation_messages_served"]
self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"]
self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"]
except Exception as e:
RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.propagation_node = True self.propagation_node = True
self.propagation_node_start_time = time.time()
self.propagation_destination.set_link_established_callback(self.propagation_link_established) self.propagation_destination.set_link_established_callback(self.propagation_link_established)
self.propagation_destination.set_packet_callback(self.propagation_packet) self.propagation_destination.set_packet_callback(self.propagation_packet)
self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL) self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)
self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash])
if self.message_storage_limit != None: if self.message_storage_limit != None:
limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit) limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit)
else: else:
@ -580,6 +686,76 @@ class LXMRouter:
return False return False
### Propagation Node Control ##########################
#######################################################
def compile_stats(self):
if not self.propagation_node:
return None
else:
peer_stats = {}
for peer_id in self.peers.copy():
peer = self.peers[peer_id]
peer_stats[peer_id] = {
"type": "static" if peer_id in self.static_peers else "discovered",
"state": peer.state,
"alive": peer.alive,
"last_heard": int(peer.last_heard),
"next_sync_attempt": peer.next_sync_attempt,
"last_sync_attempt": peer.last_sync_attempt,
"sync_backoff": peer.sync_backoff,
"peering_timebase": peer.peering_timebase,
"ler": int(peer.link_establishment_rate),
"str": int(peer.sync_transfer_rate),
"transfer_limit": peer.propagation_transfer_limit,
"network_distance": RNS.Transport.hops_to(peer_id),
"rx_bytes": peer.rx_bytes,
"tx_bytes": peer.tx_bytes,
"messages": {
"offered": peer.offered,
"outgoing": peer.outgoing,
"incoming": peer.incoming,
"unhandled": peer.unhandled_message_count
},
}
node_stats = {
"identity_hash": self.identity.hash,
"destination_hash": self.propagation_destination.hash,
"uptime": time.time()-self.propagation_node_start_time,
"delivery_limit": self.delivery_per_transfer_limit,
"propagation_limit": self.propagation_per_transfer_limit,
"autopeer_maxdepth": self.autopeer_maxdepth,
"from_static_only": self.from_static_only,
"messagestore": {
"count": len(self.propagation_entries),
"bytes": self.message_storage_size(),
"limit": self.message_storage_limit,
},
"clients" : {
"client_propagation_messages_received": self.client_propagation_messages_received,
"client_propagation_messages_served": self.client_propagation_messages_served,
},
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
"static_peers": len(self.static_peers),
"discovered_peers": len(self.peers)-len(self.static_peers),
"total_peers": len(self.peers),
"max_peers": self.max_peers,
"peers": peer_stats,
}
return node_stats
def stats_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
elif remote_identity.hash != self.identity.hash:
return LXMPeer.ERROR_NO_ACCESS
else:
return self.compile_stats()
### Utility & Maintenance ############################# ### Utility & Maintenance #############################
####################################################### #######################################################
@ -589,7 +765,10 @@ class LXMRouter:
JOB_TRANSIENT_INTERVAL = 60 JOB_TRANSIENT_INTERVAL = 60
JOB_STORE_INTERVAL = 120 JOB_STORE_INTERVAL = 120
JOB_PEERSYNC_INTERVAL = 12 JOB_PEERSYNC_INTERVAL = 12
JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL
JOB_ROTATE_INTERVAL = 56*JOB_PEERINGEST_INTERVAL
def jobs(self): def jobs(self):
if not self.exit_handler_running:
self.processing_count += 1 self.processing_count += 1
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
@ -605,23 +784,45 @@ class LXMRouter:
self.clean_transient_id_caches() self.clean_transient_id_caches()
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
if self.propagation_node == True:
self.clean_message_store() self.clean_message_store()
if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0:
if self.propagation_node == True:
self.flush_queues()
if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0:
if self.propagation_node == True:
self.rotate_peers()
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
if self.propagation_node == True:
self.sync_peers() self.sync_peers()
def jobloop(self): def jobloop(self):
while (True): while (True):
# TODO: Improve this to scheduling, so manual # TODO: Improve this to scheduling, so manual
# triggers can delay next run # triggers can delay next run
try: try:
self.jobs() self.jobs()
except Exception as e: except Exception as e:
RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR) RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
time.sleep(LXMRouter.PROCESSING_INTERVAL) time.sleep(LXMRouter.PROCESSING_INTERVAL)
def flush_queues(self):
if len(self.peers) > 0:
self.flush_peer_distribution_queue()
RNS.log("Calculating peer distribution queue mappings...", RNS.LOG_DEBUG); st = time.time()
for peer_id in self.peers.copy():
if peer_id in self.peers:
peer = self.peers[peer_id]
if peer.queued_items():
peer.process_queues()
RNS.log(f"Distribution queue mapping completed in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG)
def clean_links(self): def clean_links(self):
closed_links = [] closed_links = []
for link_hash in self.direct_links: for link_hash in self.direct_links:
@ -693,6 +894,11 @@ class LXMRouter:
self.save_outbound_stamp_costs() self.save_outbound_stamp_costs()
threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start() threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start()
def get_wanted_inbound_peers(self):
# TODO: Implement/rethink.
# Probably not necessary anymore.
return None
def get_announce_app_data(self, destination_hash): def get_announce_app_data(self, destination_hash):
if destination_hash in self.delivery_destinations: if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash] delivery_destination = self.delivery_destinations[destination_hash]
@ -794,12 +1000,12 @@ class LXMRouter:
lxm_size = self.propagation_entries[transient_id][3] lxm_size = self.propagation_entries[transient_id][3]
return lxm_size return lxm_size
def clean_message_store(self): def clean_message_store(self):
RNS.log("Cleaning message store", RNS.LOG_VERBOSE)
# Check and remove expired messages # Check and remove expired messages
now = time.time() now = time.time()
removed_entries = {} removed_entries = {}
for transient_id in self.propagation_entries: for transient_id in self.propagation_entries.copy():
entry = self.propagation_entries[transient_id] entry = self.propagation_entries[transient_id]
filepath = entry[1] filepath = entry[1]
components = filepath.split("_") components = filepath.split("_")
@ -807,7 +1013,7 @@ class LXMRouter:
if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2: if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2:
timestamp = float(components[1]) timestamp = float(components[1])
if now > timestamp+LXMRouter.MESSAGE_EXPIRY: if now > timestamp+LXMRouter.MESSAGE_EXPIRY:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG) RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_EXTREME)
removed_entries[transient_id] = filepath removed_entries[transient_id] = filepath
else: else:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING) RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
@ -825,7 +1031,7 @@ class LXMRouter:
RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
if removed_count > 0: if removed_count > 0:
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG) RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_VERBOSE)
# Check size of message store and cull if needed # Check size of message store and cull if needed
try: try:
@ -837,7 +1043,7 @@ class LXMRouter:
bytes_cleaned = 0 bytes_cleaned = 0
weighted_entries = [] weighted_entries = []
for transient_id in self.propagation_entries: for transient_id in self.propagation_entries.copy():
weighted_entries.append([ weighted_entries.append([
self.propagation_entries[transient_id], self.propagation_entries[transient_id],
self.get_weight(transient_id), self.get_weight(transient_id),
@ -876,6 +1082,7 @@ class LXMRouter:
def save_locally_delivered_transient_ids(self): def save_locally_delivered_transient_ids(self):
try: try:
if len(self.locally_delivered_transient_ids) > 0:
if not os.path.isdir(self.storagepath): if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath) os.makedirs(self.storagepath)
@ -887,6 +1094,7 @@ class LXMRouter:
def save_locally_processed_transient_ids(self): def save_locally_processed_transient_ids(self):
try: try:
if len(self.locally_processed_transient_ids) > 0:
if not os.path.isdir(self.storagepath): if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath) os.makedirs(self.storagepath)
@ -896,6 +1104,24 @@ class LXMRouter:
except Exception as e: except Exception as e:
RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def save_node_stats(self):
try:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
with open(self.storagepath+"/node_stats", "wb") as stats_file:
node_stats = {
"client_propagation_messages_received": self.client_propagation_messages_received,
"client_propagation_messages_served": self.client_propagation_messages_served,
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
}
stats_file.write(msgpack.packb(node_stats))
except Exception as e:
RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def clean_outbound_stamp_costs(self): def clean_outbound_stamp_costs(self):
try: try:
expired = [] expired = []
@ -989,10 +1215,45 @@ class LXMRouter:
RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR) RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR)
def exit_handler(self): def exit_handler(self):
if self.exit_handler_running:
return
self.exit_handler_running = True
RNS.log("Tearing down delivery destinations...", RNS.LOG_NOTICE)
for destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
delivery_destination.set_packet_callback(None)
delivery_destination.set_link_established_callback(None)
for link in delivery_destination.links:
try:
if link.status == RNS.Link.ACTIVE:
link.teardown()
except Exception as e:
RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
if self.propagation_node:
RNS.log("Tearing down propagation node destination...", RNS.LOG_NOTICE)
self.propagation_destination.set_link_established_callback(None)
self.propagation_destination.set_packet_callback(None)
self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH)
self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH)
self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH)
for link in self.active_propagation_links:
try:
if link.status == RNS.Link.ACTIVE:
link.teardown()
except Exception as e:
RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE)
self.flush_queues()
if self.propagation_node: if self.propagation_node:
try: try:
st = time.time(); RNS.log(f"Saving {len(self.peers)} peer synchronisation states to storage...", RNS.LOG_NOTICE)
serialised_peers = [] serialised_peers = []
for peer_id in self.peers: peer_dict = self.peers.copy()
for peer_id in peer_dict:
peer = self.peers[peer_id] peer = self.peers[peer_id]
serialised_peers.append(peer.to_bytes()) serialised_peers.append(peer.to_bytes())
@ -1000,13 +1261,28 @@ class LXMRouter:
peers_file.write(msgpack.packb(serialised_peers)) peers_file.write(msgpack.packb(serialised_peers))
peers_file.close() peers_file.close()
RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG) RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_NOTICE)
except Exception as e: except Exception as e:
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.save_locally_delivered_transient_ids() self.save_locally_delivered_transient_ids()
self.save_locally_processed_transient_ids() self.save_locally_processed_transient_ids()
self.save_node_stats()
def sigint_handler(self, signal, frame):
if not self.exit_handler_running:
RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING)
sys.exit(0)
else:
RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
def sigterm_handler(self, signal, frame):
if not self.exit_handler_running:
RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING)
sys.exit(0)
else:
RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
def __str__(self): def __str__(self):
return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">" return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">"
@ -1121,6 +1397,7 @@ class LXMRouter:
except Exception as e: except Exception as e:
RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
self.client_propagation_messages_served += len(response_messages)
return response_messages return response_messages
@ -1341,7 +1618,7 @@ class LXMRouter:
### Message Routing & Delivery ######################## ### Message Routing & Delivery ########################
####################################################### #######################################################
def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False): def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False, allow_duplicate=False):
try: try:
message = LXMessage.unpack_from_bytes(lxmf_data) message = LXMessage.unpack_from_bytes(lxmf_data)
if ratchet_id and not message.ratchet_id: if ratchet_id and not message.ratchet_id:
@ -1408,7 +1685,7 @@ class LXMRouter:
RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False return False
if self.has_message(message.hash): if not allow_duplicate and self.has_message(message.hash):
RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False return False
else: else:
@ -1500,7 +1777,7 @@ class LXMRouter:
### Peer Sync & Propagation ########################### ### Peer Sync & Propagation ###########################
####################################################### #######################################################
def peer(self, destination_hash, timestamp, propagation_transfer_limit): def peer(self, destination_hash, timestamp, propagation_transfer_limit, wanted_inbound_peers = None):
if destination_hash in self.peers: if destination_hash in self.peers:
peer = self.peers[destination_hash] peer = self.peers[destination_hash]
if timestamp > peer.peering_timebase: if timestamp > peer.peering_timebase:
@ -1510,14 +1787,18 @@ class LXMRouter:
peer.peering_timebase = timestamp peer.peering_timebase = timestamp
peer.last_heard = time.time() peer.last_heard = time.time()
peer.propagation_transfer_limit = propagation_transfer_limit peer.propagation_transfer_limit = propagation_transfer_limit
RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE)
else: else:
if len(self.peers) < self.max_peers:
peer = LXMPeer(self, destination_hash) peer = LXMPeer(self, destination_hash)
peer.alive = True peer.alive = True
peer.last_heard = time.time() peer.last_heard = time.time()
peer.propagation_transfer_limit = propagation_transfer_limit peer.propagation_transfer_limit = propagation_transfer_limit
self.peers[destination_hash] = peer self.peers[destination_hash] = peer
RNS.log("Peered with "+str(peer.destination)) RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE)
else:
RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
def unpeer(self, destination_hash, timestamp = None): def unpeer(self, destination_hash, timestamp = None):
if timestamp == None: if timestamp == None:
@ -1530,13 +1811,91 @@ class LXMRouter:
self.peers.pop(destination_hash) self.peers.pop(destination_hash)
RNS.log("Broke peering with "+str(peer.destination)) RNS.log("Broke peering with "+str(peer.destination))
def rotate_peers(self):
try:
rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0)))
required_drops = len(self.peers) - (self.max_peers - rotation_headroom)
if required_drops > 0 and len(self.peers) - required_drops > 1:
peers = self.peers.copy()
untested_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
if peer.last_sync_attempt == 0:
untested_peers.append(peer)
if len(untested_peers) >= rotation_headroom:
RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG)
return
fully_synced_peers = {}
for peer_id in peers:
peer = peers[peer_id]
if peer.unhandled_message_count == 0:
fully_synced_peers[peer_id] = peer
if len(fully_synced_peers) > 0:
peers = fully_synced_peers
ms = "" if len(fully_synced_peers) == 1 else "s"
RNS.log(f"Found {len(fully_synced_peers)} fully synced peer{ms}, using as peer rotation pool basis", RNS.LOG_DEBUG)
culled_peers = []
waiting_peers = []
unresponsive_peers = []
for peer_id in peers:
peer = peers[peer_id]
if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE:
if peer.alive:
if peer.offered == 0:
# Don't consider for unpeering until at
# least one message has been offered
pass
else:
waiting_peers.append(peer)
else:
unresponsive_peers.append(peer)
drop_pool = []
if len(unresponsive_peers) > 0:
drop_pool.extend(unresponsive_peers)
if not self.prioritise_rotating_unreachable_peers:
drop_pool.extend(waiting_peers)
else:
drop_pool.extend(waiting_peers)
if len(drop_pool) > 0:
drop_count = min(required_drops, len(drop_pool))
low_acceptance_rate_peers = sorted(
drop_pool,
key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ),
reverse=False
)[0:drop_count]
dropped_peers = 0
for peer in low_acceptance_rate_peers:
ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2)
if ar < LXMRouter.ROTATION_AR_MAX*100:
reachable_str = "reachable" if peer.alive else "unreachable"
RNS.log(f"Acceptance rate for {reachable_str} peer {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing}/{peer.offered}, {peer.unhandled_message_count} unhandled messages)", RNS.LOG_DEBUG)
self.unpeer(peer.destination_hash)
dropped_peers += 1
ms = "" if dropped_peers == 1 else "s"
RNS.log(f"Dropped {dropped_peers} low acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG)
except Exception as e:
RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def sync_peers(self): def sync_peers(self):
culled_peers = [] culled_peers = []
waiting_peers = [] waiting_peers = []
unresponsive_peers = [] unresponsive_peers = []
for peer_id in self.peers: peers = self.peers.copy()
peer = self.peers[peer_id] for peer_id in peers:
peer = peers[peer_id]
if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
if not peer_id in self.static_peers:
culled_peers.append(peer_id) culled_peers.append(peer_id)
else: else:
if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
@ -1597,10 +1956,23 @@ class LXMRouter:
self.active_propagation_links.append(link) self.active_propagation_links.append(link)
def propagation_resource_advertised(self, resource): def propagation_resource_advertised(self, resource):
if self.from_static_only:
remote_identity = resource.link.get_remote_identity()
if remote_identity == None:
RNS.log(f"Rejecting propagation resource from unidentified peer", RNS.LOG_DEBUG)
return False
else:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.static_peers:
RNS.log(f"Rejecting propagation resource from {remote_str} not in static peers list", RNS.LOG_DEBUG)
return False
size = resource.get_data_size() size = resource.get_data_size()
limit = self.propagation_per_transfer_limit*1000 limit = self.propagation_per_transfer_limit*1000
if limit != None and size > limit: if limit != None and size > limit:
RNS.log("Rejecting "+RNS.prettysize(size)+" incoming LXMF propagation resource, since it exceeds the limit of "+RNS.prettysize(limit), RNS.LOG_DEBUG) RNS.log(f"Rejecting {RNS.prettysize(size)} incoming propagation resource, since it exceeds the limit of {RNS.prettysize(limit)}", RNS.LOG_DEBUG)
return False return False
else: else:
return True return True
@ -1616,6 +1988,7 @@ class LXMRouter:
messages = data[1] messages = data[1]
for lxmf_data in messages: for lxmf_data in messages:
self.lxmf_propagation(lxmf_data) self.lxmf_propagation(lxmf_data)
self.client_propagation_messages_received += 1
packet.prove() packet.prove()
@ -1627,6 +2000,14 @@ class LXMRouter:
if remote_identity == None: if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY return LXMPeer.ERROR_NO_IDENTITY
else: else:
if self.from_static_only:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.static_peers:
RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG)
return LXMPeer.ERROR_NO_ACCESS
try: try:
transient_ids = data transient_ids = data
wanted_ids = [] wanted_ids = []
@ -1649,7 +2030,6 @@ class LXMRouter:
return None return None
def propagation_resource_concluded(self, resource): def propagation_resource_concluded(self, resource):
RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
# TODO: The peer this was received from should # TODO: The peer this was received from should
# have the transient id added to its list of # have the transient id added to its list of
@ -1661,31 +2041,73 @@ class LXMRouter:
# This is a series of propagation messages from a peer or originator # This is a series of propagation messages from a peer or originator
remote_timebase = data[0] remote_timebase = data[0]
remote_hash = None remote_hash = None
remote_str = "unknown peer"
remote_identity = resource.link.get_remote_identity() remote_identity = resource.link.get_remote_identity()
if remote_identity != None: if remote_identity != None:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.peers: if not remote_hash in self.peers:
if self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: if self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
self.peer(remote_hash, remote_timebase) # TODO: Query cache for an announce and get propagation
# transfer limit from that. For now, initialise it to a
# sane default value, and wait for an announce to arrive
# that will update the peering config to the actual limit.
propagation_transfer_limit = LXMRouter.PROPAGATION_LIMIT//4
wanted_inbound_peers = None
self.peer(remote_hash, remote_timebase, propagation_transfer_limit, wanted_inbound_peers)
else:
remote_str = f"peer {remote_str}"
messages = data[1] messages = data[1]
ms = "" if len(messages) == 1 else "s"
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE)
for lxmf_data in messages: for lxmf_data in messages:
if remote_hash != None and remote_hash in self.peers: peer = None
transient_id = RNS.Identity.full_hash(lxmf_data) transient_id = RNS.Identity.full_hash(lxmf_data)
if remote_hash != None and remote_hash in self.peers:
peer = self.peers[remote_hash] peer = self.peers[remote_hash]
peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data] peer.incoming += 1
peer.rx_bytes += len(lxmf_data)
else:
if remote_identity != None:
self.unpeered_propagation_incoming += 1
self.unpeered_propagation_rx_bytes += len(lxmf_data)
else:
self.client_propagation_messages_received += 1
self.lxmf_propagation(lxmf_data, from_peer=peer)
if peer != None:
peer.queue_handled_message(transient_id)
self.lxmf_propagation(lxmf_data)
else: else:
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
except Exception as e: except Exception as e:
RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
RNS.trace_exception(e)
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False): def enqueue_peer_distribution(self, transient_id, from_peer):
self.peer_distribution_queue.append([transient_id, from_peer])
def flush_peer_distribution_queue(self):
if len(self.peer_distribution_queue) > 0:
entries = []
while len(self.peer_distribution_queue) > 0:
entries.append(self.peer_distribution_queue.pop())
for peer_id in self.peers.copy():
if peer_id in self.peers:
peer = self.peers[peer_id]
for entry in entries:
transient_id = entry[0]
from_peer = entry[1]
if peer != from_peer:
peer.queue_unhandled_message(transient_id)
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None):
no_stamp_enforcement = False no_stamp_enforcement = False
if is_paper_message: if is_paper_message:
no_stamp_enforcement = True no_stamp_enforcement = True
@ -1694,9 +2116,8 @@ class LXMRouter:
if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD: if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
transient_id = RNS.Identity.full_hash(lxmf_data) transient_id = RNS.Identity.full_hash(lxmf_data)
if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids: if (not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids) or allow_duplicate == True:
received = time.time() received = time.time()
propagation_entry = [transient_id, received, lxmf_data]
destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH] destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH]
self.locally_processed_transient_ids[transient_id] = received self.locally_processed_transient_ids[transient_id] = received
@ -1707,7 +2128,7 @@ class LXMRouter:
decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data) decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data)
if decrypted_lxmf_data != None: if decrypted_lxmf_data != None:
delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data
self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement) self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement, allow_duplicate=allow_duplicate)
self.locally_delivered_transient_ids[transient_id] = time.time() self.locally_delivered_transient_ids[transient_id] = time.time()
if signal_local_delivery != None: if signal_local_delivery != None:
@ -1720,12 +2141,9 @@ class LXMRouter:
msg_file.write(lxmf_data) msg_file.write(lxmf_data)
msg_file.close() msg_file.close()
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data)] RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME)
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []]
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) self.enqueue_peer_distribution(transient_id, from_peer)
for peer_id in self.peers:
peer = self.peers[peer_id]
peer.handle_message(transient_id)
else: else:
# TODO: Add message to sneakernet queues when implemented # TODO: Add message to sneakernet queues when implemented
@ -1745,9 +2163,10 @@ class LXMRouter:
except Exception as e: except Exception as e:
RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG) RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
RNS.trace_exception(e)
return False return False
def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None): def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False):
try: try:
if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"): if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"):
RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR) RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR)
@ -1757,7 +2176,7 @@ class LXMRouter:
lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==") lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==")
transient_id = RNS.Identity.full_hash(lxmf_data) transient_id = RNS.Identity.full_hash(lxmf_data)
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, is_paper_message=True) router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True)
if router_propagation_result != False: if router_propagation_result != False:
RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG) RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG)
return router_propagation_result return router_propagation_result

View File

@ -380,7 +380,7 @@ class LXMessage:
if self.desired_method == LXMessage.OPPORTUNISTIC: if self.desired_method == LXMessage.OPPORTUNISTIC:
if self.__destination.type == RNS.Destination.SINGLE: if self.__destination.type == RNS.Destination.SINGLE:
if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT: if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT:
RNS.log(f"Opportunistic delivery was requested for {self}, but content exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG) RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
self.desired_method = LXMessage.DIRECT self.desired_method = LXMessage.DIRECT
# Set delivery parameters according to delivery method # Set delivery parameters according to delivery method

View File

@ -35,6 +35,7 @@ import time
import os import os
from LXMF._version import __version__ from LXMF._version import __version__
from LXMF import APP_NAME
from RNS.vendor.configobj import ConfigObj from RNS.vendor.configobj import ConfigObj
@ -126,7 +127,7 @@ def apply_config():
if active_configuration["message_storage_limit"] < 0.005: if active_configuration["message_storage_limit"] < 0.005:
active_configuration["message_storage_limit"] = 0.005 active_configuration["message_storage_limit"] = 0.005
else: else:
active_configuration["message_storage_limit"] = 2000 active_configuration["message_storage_limit"] = 500
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]: if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size") active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
@ -140,6 +141,24 @@ def apply_config():
else: else:
active_configuration["prioritised_lxmf_destinations"] = [] active_configuration["prioritised_lxmf_destinations"] = []
if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]:
static_peers = lxmd_config["propagation"].as_list("static_peers")
active_configuration["static_peers"] = []
for static_peer in static_peers:
active_configuration["static_peers"].append(bytes.fromhex(static_peer))
else:
active_configuration["static_peers"] = []
if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]:
active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers")
else:
active_configuration["max_peers"] = None
if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]:
active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only")
else:
active_configuration["from_static_only"] = False
# Load various settings # Load various settings
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]: if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
targetloglevel = lxmd_config["logging"].as_int("loglevel") targetloglevel = lxmd_config["logging"].as_int("loglevel")
@ -305,7 +324,10 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
autopeer_maxdepth = active_configuration["autopeer_maxdepth"], autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"], propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"], delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
) max_peers = active_configuration["max_peers"],
static_peers = active_configuration["static_peers"],
from_static_only = active_configuration["from_static_only"])
message_router.register_delivery_callback(lxmf_delivery) message_router.register_delivery_callback(lxmf_delivery)
for destination_hash in active_configuration["ignored_lxmf_destinations"]: for destination_hash in active_configuration["ignored_lxmf_destinations"]:
@ -362,13 +384,13 @@ def jobs():
try: try:
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None: if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]: if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME) RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE)
message_router.announce(lxmf_destination.hash) message_router.announce(lxmf_destination.hash)
last_peer_announce = time.time() last_peer_announce = time.time()
if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None: if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None:
if time.time() > last_node_announce + active_configuration["node_announce_interval"]: if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME) RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE)
message_router.announce_propagation_node() message_router.announce_propagation_node()
last_node_announce = time.time() last_node_announce = time.time()
@ -381,7 +403,7 @@ def deferred_start_jobs():
global active_configuration, last_peer_announce, last_node_announce global active_configuration, last_peer_announce, last_node_announce
global message_router, lxmf_destination global message_router, lxmf_destination
time.sleep(DEFFERED_JOBS_DELAY) time.sleep(DEFFERED_JOBS_DELAY)
RNS.log("Running deferred start jobs") RNS.log("Running deferred start jobs", RNS.LOG_DEBUG)
if active_configuration["peer_announce_at_start"]: if active_configuration["peer_announce_at_start"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME) RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
message_router.announce(lxmf_destination.hash) message_router.announce(lxmf_destination.hash)
@ -394,6 +416,190 @@ def deferred_start_jobs():
last_node_announce = time.time() last_node_announce = time.time()
threading.Thread(target=jobs, daemon=True).start() threading.Thread(target=jobs, daemon=True).start()
def query_status(identity, timeout=5, exit_on_fail=False):
control_destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
timeout = time.time()+timeout
def check_timeout():
if time.time() > timeout:
if exit_on_fail:
RNS.log("Getting lxmd statistics timed out, exiting now", RNS.LOG_ERROR)
exit(200)
else:
return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
else:
time.sleep(0.1)
if not RNS.Transport.has_path(control_destination.hash):
RNS.Transport.request_path(control_destination.hash)
while not RNS.Transport.has_path(control_destination.hash):
tc = check_timeout()
if tc:
return tc
link = RNS.Link(control_destination)
while not link.status == RNS.Link.ACTIVE:
tc = check_timeout()
if tc:
return tc
link.identify(identity)
request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None)
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
tc = check_timeout()
if tc:
return tc
link.teardown()
return request_receipt.get_response()
def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = 0, timeout=5, show_status=False, show_peers=False, identity_path=None):
global configpath, identitypath, storagedir, lxmdir
global lxmd_config, active_configuration, targetloglevel
targetlogdest = RNS.LOG_STDOUT
if identity_path == None:
if configdir == None:
if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"):
configdir = "/etc/lxmd"
elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"):
configdir = RNS.Reticulum.userdir+"/.config/lxmd"
else:
configdir = RNS.Reticulum.userdir+"/.lxmd"
configpath = configdir+"/config"
identitypath = configdir+"/identity"
identity = None
if not os.path.isdir(configdir):
RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR)
exit(201)
if not os.path.isfile(identitypath):
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
exit(202)
else:
identity = RNS.Identity.from_file(identitypath)
if identity == None:
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
exit(4)
else:
if not os.path.isfile(identity_path):
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
exit(202)
else:
identity = RNS.Identity.from_file(identity_path)
if identity == None:
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
exit(4)
if targetloglevel == None:
targetloglevel = 3
if verbosity != 0 or quietness != 0:
targetloglevel = targetloglevel+verbosity-quietness
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
response = query_status(identity, timeout=timeout, exit_on_fail=True)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Remote received no identity")
exit(203)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
RNS.log("Access denied")
exit(204)
else:
s = response
mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2)
ms_util = f"{mutil}%"
if s["from_static_only"]:
who_str = "static peers only"
else:
who_str = "all nodes"
available_peers = 0
unreachable_peers = 0
peered_incoming = 0
peered_outgoing = 0
peered_rx_bytes = 0
peered_tx_bytes = 0
for peer_id in s["peers"]:
p = s["peers"][peer_id]
pm = p["messages"]
peered_incoming += pm["incoming"]
peered_outgoing += pm["outgoing"]
peered_rx_bytes += p["rx_bytes"]
peered_tx_bytes += p["tx_bytes"]
if p["alive"]:
available_peers += 1
else:
unreachable_peers += 1
total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"]
total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"]
df = round(peered_outgoing/total_incoming, 2)
dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"])
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
if show_status:
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit")
print(f"")
print(f"Peers : {stp} total (peer limit is {smp})")
print(f" {sdp} discovered, {ssp} static")
print(f" {available_peers} available, {unreachable_peers} unreachable")
print(f"")
print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})")
print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})")
print(f" {upi} messages received from unpeered nodes ({uprx})")
print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})")
print(f" {cprr} propagation messages received directly from clients")
print(f" {cprs} propagation messages served to clients")
print(f" Distribution factor is {df}")
print(f"")
if show_peers:
if not show_status:
print("")
for peer_id in s["peers"]:
ind = " "
p = s["peers"][peer_id]
if p["type"] == "static":
t = "Static peer "
elif p["type"] == "discovered":
t = "Discovered peer "
else:
t = "Unknown peer "
a = "Available" if p["alive"] == True else "Unreachable"
h = max(time.time()-p["last_heard"], 0)
hops = p["network_distance"]
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
pm = p["messages"]
if p["last_sync_attempt"] != 0:
lsa = p["last_sync_attempt"]
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
else:
ls = "never synced"
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000)
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
pmi = pm["incoming"]; pmuh = pm["unhandled"]
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit")
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming")
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
ms = "" if pm["unhandled"] == 1 else "s"
print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}")
print("")
def main(): def main():
try: try:
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon") parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
@ -404,6 +610,10 @@ def main():
parser.add_argument("-v", "--verbose", action="count", default=0) parser.add_argument("-v", "--verbose", action="count", default=0)
parser.add_argument("-q", "--quiet", action="count", default=0) parser.add_argument("-q", "--quiet", action="count", default=0)
parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file") parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file")
parser.add_argument("--status", action="store_true", default=False, help="display node status")
parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes")
parser.add_argument("--timeout", action="store", default=5, help="timeout in seconds for query operations", type=float)
parser.add_argument("--identity", action="store", default=None, help="path to identity used for query request", type=str)
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit") parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__)) parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
@ -413,15 +623,24 @@ def main():
print(__default_lxmd_config__) print(__default_lxmd_config__)
exit() exit()
program_setup( if args.status or args.peers:
configdir = args.config, get_status(configdir = args.config,
rnsconfigdir=args.rnsconfig,
verbosity=args.verbose,
quietness=args.quiet,
timeout=args.timeout,
show_status=args.status,
show_peers=args.peers,
identity_path=args.identity)
exit()
program_setup(configdir = args.config,
rnsconfigdir=args.rnsconfig, rnsconfigdir=args.rnsconfig,
run_pn=args.propagation_node, run_pn=args.propagation_node,
on_inbound=args.on_inbound, on_inbound=args.on_inbound,
verbosity=args.verbose, verbosity=args.verbose,
quietness=args.quiet, quietness=args.quiet,
service=args.service service=args.service)
)
except KeyboardInterrupt: except KeyboardInterrupt:
print("") print("")
@ -477,9 +696,9 @@ propagation_transfer_max_accepted_size = 256
# LXMF prioritises keeping messages that are # LXMF prioritises keeping messages that are
# new and small. Large and old messages will # new and small. Large and old messages will
# be removed first. This setting is optional # be removed first. This setting is optional
# and defaults to 2 gigabytes. # and defaults to 500 megabytes.
# message_storage_limit = 2000 # message_storage_limit = 500
# You can tell the LXMF message router to # You can tell the LXMF message router to
# prioritise storage for one or more # prioritise storage for one or more
@ -491,6 +710,25 @@ propagation_transfer_max_accepted_size = 256
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf # prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
# You can configure the maximum number of other
# propagation nodes that this node will peer
# with automatically. The default is 50.
# max_peers = 25
# You can configure a list of static propagation
# node peers, that this node will always be
# peered with, by specifying a list of
# destination hashes.
# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4
# You can configure the propagation node to
# only accept incoming propagation messages
# from configured static peers.
# from_static_only = True
# By default, any destination is allowed to # By default, any destination is allowed to
# connect and download messages, but you can # connect and download messages, but you can
# optionally restrict this. If you enable # optionally restrict this. If you enable

View File

@ -1 +1 @@
__version__ = "0.6.0" __version__ = "0.6.3"

View File

@ -12,6 +12,7 @@ User-facing clients built on LXMF include:
Community-provided tools and utilities for LXMF include: Community-provided tools and utilities for LXMF include:
- [LXMFy](https://lxmfy.quad4.io/)
- [LXMF-Bot](https://github.com/randogoth/lxmf-bot) - [LXMF-Bot](https://github.com/randogoth/lxmf-bot)
- [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard) - [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard)
- [LXMEvent](https://github.com/faragher/LXMEvent) - [LXMEvent](https://github.com/faragher/LXMEvent)

View File

@ -1,3 +1,2 @@
qrcode==7.4.2 qrcode>=7.4.2
rns==0.7.8 rns>=0.9.1
setuptools==70.0.0

View File

@ -25,6 +25,6 @@ setuptools.setup(
'lxmd=LXMF.Utilities.lxmd:main', 'lxmd=LXMF.Utilities.lxmd:main',
] ]
}, },
install_requires=['rns>=0.9.1'], install_requires=["rns>=0.9.3"],
python_requires='>=3.7', python_requires=">=3.7",
) )