Compare commits

..

112 commits

Author SHA1 Message Date
Mark Qvist
bc7522b63d Updated version 2025-11-13 19:42:24 +01:00
Mark Qvist
39e398be65 Fixed missing PN config unpack on incoming sync auto-peering 2025-11-13 17:48:10 +01:00
Mark Qvist
ee15e9f0b6 Updated version 2025-11-08 14:30:47 +01:00
Mark Qvist
00ffbc09fe Using multiprocessing start method fork on Linux to avoid issues with Python 3.14. Fixes #35. 2025-11-08 01:20:31 +01:00
Mark Qvist
dca6cc2adc Ensure LXMF and RNS exit handlers are called on SIGINT and SIGTERM, since for some ungodly reason atexit events are not always called on some combinations of Python version and platforms, even though they have been registered. 2025-11-07 23:10:30 +01:00
Mark Qvist
62038573f1 Updated version 2025-11-03 22:21:13 +01:00
Mark Qvist
fa2d78c351 Fixed message stamps getting overwritten if propagation stamp was also present 2025-11-03 22:19:20 +01:00
Mark Qvist
f18ce9ea99 Cleanup 2025-11-03 00:08:50 +01:00
Mark Qvist
d7a2979dd0 Cleanup 2025-11-02 23:07:43 +01:00
Mark Qvist
0b067914ea Adjusted default max peering cost 2025-11-02 20:41:03 +01:00
Mark Qvist
bc3f4ecff5 Handle client propagation transfer limits separately from PN peers 2025-11-02 18:35:15 +01:00
Mark Qvist
99830b6e8b Only index messages with stamp value set 2025-11-02 12:02:21 +01:00
Mark Qvist
1b5dc419b5 Merge messagestore check 2025-11-02 11:38:15 +01:00
Mark Qvist
9c5fa4a066 Take stamp value into account when cleaning message store 2025-11-02 11:35:12 +01:00
Mark Qvist
383d953e06 Don't hog all cores on high core-count machines 2025-11-02 11:34:22 +01:00
markqvist
557887d13f
Merge pull request #34 from busimus/master
Fix stored messages always getting deleted if config path has underscores
2025-11-02 11:33:00 +01:00
bus
e1905b85d7
Fix stored messages always getting deleted if config path has underscores 2025-11-02 14:12:33 +06:00
Mark Qvist
8e3ffb0d2a Stat query default identity as local 2025-11-02 02:48:19 +01:00
Mark Qvist
f383450b37 Implemented duplicate signalling on PN message download 2025-11-02 02:29:32 +01:00
Mark Qvist
747ddbddd5 Implemented duplicate signalling on PN message download 2025-11-02 02:27:57 +01:00
Mark Qvist
d0f3385f75 Added acceptance rate to lxmd stats output 2025-11-01 18:48:31 +01:00
Mark Qvist
401328fa16 Allow configuring max remote peering cost 2025-11-01 18:27:11 +01:00
Mark Qvist
4350a239e4 Cleanup 2025-11-01 17:35:36 +01:00
Mark Qvist
9dc998f149 Added peering break option to lxmd command line interface 2025-11-01 17:32:45 +01:00
Mark Qvist
fa9fd2ae01 Added remote status and control by allow-list for lxmd 2025-11-01 13:10:28 +01:00
Mark Qvist
0cebd5886d Allow specifying remote lxmd destination for status and control 2025-11-01 12:49:32 +01:00
Mark Qvist
b35b9213a6 Implemented throttling for naughty propagation node peers 2025-11-01 01:35:15 +01:00
Mark Qvist
df6271a026 Handle client message download for stamped propagation messages 2025-11-01 00:10:30 +01:00
Mark Qvist
4afb92bf3e Added peer sync option to lxmd command line interface 2025-11-01 00:09:23 +01:00
Mark Qvist
0a5edb2895 Implemented propagation node name configuration and inclusion in announce metadata 2025-10-31 22:24:55 +01:00
Mark Qvist
704b37dc16 Implemented client-side propagation stamp generation and inclusion in outbound propagation messages 2025-10-31 21:45:40 +01:00
Mark Qvist
606a723e31 Implemented getting static peer configuration from network if unknown at launch 2025-10-31 17:11:17 +01:00
Mark Qvist
a44c1f368a Validate peering key on incoming sync offer 2025-10-31 17:02:34 +01:00
Mark Qvist
434267784d Implemented propagation node peering key generation and peering cost signalling 2025-10-31 13:53:59 +01:00
Mark Qvist
9c646aead7 Stamp validation on incoming propagation node transfers 2025-10-31 02:19:24 +01:00
Mark Qvist
ebc8bb33c2 Cleanup 2025-10-31 00:28:06 +01:00
Mark Qvist
60bf99d151 Cleanup 2025-10-31 00:25:46 +01:00
Mark Qvist
c84aea745a Improved auto-peering on inbound PN sync. Added support for persisting and loading transient message stamp status. Implemented getting transient message stamp value. 2025-10-30 21:19:38 +01:00
Mark Qvist
a62ffa12b1 Cleanup 2025-10-30 19:45:40 +01:00
Mark Qvist
6446db4f11 Break peering when peer moves outside auto-peering range 2025-10-30 17:03:05 +01:00
Mark Qvist
81a6d503a3 Automatic legacy PN de-peering 2025-10-30 16:55:44 +01:00
Mark Qvist
c28d3b1432 Added metadate structure to propagation node announces 2025-10-30 16:44:15 +01:00
Mark Qvist
d8b25e092f Added metadate structure to propagation node announces 2025-10-30 16:43:26 +01:00
Mark Qvist
aca5bf9c14 Cleanup 2025-10-30 16:06:53 +01:00
Mark Qvist
bd6fe9f9d1 Handle propagation node stamp cost peering signalling 2025-10-30 15:39:00 +01:00
Mark Qvist
0f2d3b06c2 Also receive PN path response announces, but only update peering data on live announces 2025-10-30 15:18:09 +01:00
Mark Qvist
3f91e44a6d Updated RNS version 2025-10-30 14:10:14 +01:00
Mark Qvist
8f54d40abf Updated version 2025-10-30 14:09:54 +01:00
Mark Qvist
9beeafb0c8 Propagation node stamp cost handling 2025-10-30 14:08:39 +01:00
Mark Qvist
b572723a5e Cleanup 2025-10-30 13:49:26 +01:00
Mark Qvist
6cf7852271 Cleanup 2025-10-30 13:14:59 +01:00
Mark Qvist
e17263d25a Cleanup 2025-10-30 12:41:18 +01:00
Mark Qvist
16dfbc22cd Propagation stamp validation via pool dispatch 2025-10-30 12:38:49 +01:00
Mark Qvist
98347d3ad9 Increased PN peer sync frequency 2025-10-29 23:25:15 +01:00
Mark Qvist
61b75526d2 Added separate propagation node per-message and sync transfer limits. Added persistent PN sync strategy. Added concurrent PN peer sync. 2025-10-29 23:24:45 +01:00
Mark Qvist
85d8f4f583 Updated requirements 2025-07-13 13:42:01 +02:00
Mark Qvist
5b9f121593 Updated version and RNS dependency version 2025-07-13 13:24:52 +02:00
Mark Qvist
416ccf294f Retry path request on unexpectedly closed link 2025-07-13 13:24:46 +02:00
Mark Qvist
787cd069dc Fixed division by zero. Closes #30. 2025-05-26 20:57:46 +02:00
Mark Qvist
c2207d1eb7 Added funding 2025-05-17 10:27:21 +02:00
Mark Qvist
a9622e3a33 Updated version 2025-05-15 20:30:12 +02:00
Mark Qvist
499fe4cc53 Use no_data_for instead of inactive_for for cleaning links 2025-05-15 20:27:19 +02:00
Mark Qvist
37e99910ec Updated version and RNS dependency version 2025-05-12 11:58:24 +02:00
Mark Qvist
005d71707c Cleanup 2025-04-17 13:31:00 +02:00
Mark Qvist
1bdcf6ad53 Updated license 2025-04-15 20:21:54 +02:00
Mark Qvist
e6021b8fed Updated license 2025-04-15 20:21:16 +02:00
Mark Qvist
326c0eed8f Updated version 2025-03-13 19:46:11 +01:00
Mark Qvist
336792c07a Updated dependencies 2025-03-13 19:45:15 +01:00
Mark Qvist
570d2c6846 Added configuration options to default config file 2025-03-07 11:05:50 +01:00
Mark Qvist
1ef4665073 Cleanup 2025-02-18 20:05:19 +01:00
Mark Qvist
d5540b927f Added allow_duplicate option to message ingest API 2025-01-31 13:38:56 +01:00
Mark Qvist
a6cf585109 Cleanup 2025-01-30 15:11:26 +01:00
Mark Qvist
c0a8f3be49 Cleanup 2025-01-30 15:04:21 +01:00
Mark Qvist
7b4780cfb7 Automatically clean messages exceeding propagation transfer limit for peer from unhandled message queues 2025-01-30 11:36:11 +01:00
Mark Qvist
b94a712bb6 Automatically clean messages exceeding propagation transfer limit for peer from unhandled message queues 2025-01-30 11:30:45 +01:00
Mark Qvist
f42ccfc4e9 Automatically clean messages exceeding propagation transfer limit for peer from unhandled message queues 2025-01-30 11:23:18 +01:00
Mark Qvist
9eca747757 Updated peer rotation timing to align with distribution queue mapping 2025-01-30 10:46:31 +01:00
Mark Qvist
b7b6753640 Fixed potential division by zero. Fixes #25. 2025-01-30 00:37:50 +01:00
Mark Qvist
40d0b9a5de Added acceptance rate threshold to peer rotation 2025-01-29 21:21:51 +01:00
Mark Qvist
40fc75f559 Refined peer rotation algorithm 2025-01-29 14:24:09 +01:00
Mark Qvist
f1d060a92e Added peer rotation 2025-01-29 01:26:36 +01:00
Mark Qvist
e0e901291e Updated logging 2025-01-27 12:04:16 +01:00
Mark Qvist
886ac69a82 Tear down control link after use 2025-01-27 12:04:05 +01:00
Mark Qvist
e0163e100a Updated issue template 2025-01-27 10:26:11 +01:00
Mark Qvist
26a10cce8f Status query return code 2025-01-26 01:13:11 +01:00
Mark Qvist
cec903a4dc Added status query API function 2025-01-24 14:05:12 +01:00
Mark Qvist
962d9c90d1 Added wanted inbound peers to PN announce data 2025-01-24 13:50:56 +01:00
Mark Qvist
6d2eb4f973 Updated default config 2025-01-24 00:26:47 +01:00
Mark Qvist
a8cc5f41cf Fixed typo 2025-01-24 00:21:37 +01:00
Mark Qvist
aa57b16cf5 Fixed #23 2025-01-24 00:09:36 +01:00
Mark Qvist
cdea838a6c Updated status output 2025-01-23 17:43:24 +01:00
Mark Qvist
fb4bf9b0b9 Cleanup 2025-01-23 17:36:30 +01:00
Mark Qvist
a3e3868f92 Changed formatting 2025-01-23 17:09:40 +01:00
Mark Qvist
70186cf8d9 Fixed typo 2025-01-23 17:07:20 +01:00
Mark Qvist
fe59b265c5 Fixed fstrings not working on Python < 3.12 2025-01-23 16:54:12 +01:00
Mark Qvist
a87458d25f Updated version 2025-01-23 16:28:11 +01:00
Mark Qvist
35dd70c59e Format status and peers output 2025-01-23 16:27:48 +01:00
Mark Qvist
a198e96064 Include unhandled message count in stats 2025-01-23 16:27:23 +01:00
Mark Qvist
e3be7e0cfd Persist last sync attempt 2025-01-23 16:27:01 +01:00
Mark Qvist
460645cea2 Added lxmd status getter 2025-01-23 14:15:31 +01:00
Mark Qvist
f683e03891 Added lxmd status getter 2025-01-23 14:15:12 +01:00
Mark Qvist
2c71cea7a0 Added local node stats request handler 2025-01-23 14:13:08 +01:00
Mark Qvist
61b1ecce27 Updated readme 2025-01-22 10:10:57 +01:00
Mark Qvist
68257a441f Set transfer limit on reverse auto-peer 2025-01-22 09:44:03 +01:00
Mark Qvist
e69da2ed2a Added static peers and peering limit 2025-01-22 01:37:09 +01:00
Mark Qvist
c2a08ef355 Enqueue and batch process distribution queue mappings 2025-01-21 20:44:11 +01:00
Mark Qvist
1430b1ce90 Enqueue and batch process distribution queue mappings 2025-01-21 20:20:39 +01:00
Mark Qvist
1c9c744107 Memory optimisations 2025-01-21 16:51:25 +01:00
Mark Qvist
bfed126a7c Memory optimisations 2025-01-21 16:44:24 +01:00
Mark Qvist
44d1d992f8 Updated version 2025-01-21 16:34:00 +01:00
Mark Qvist
7701f326d9 Memory optimisations 2025-01-21 16:33:39 +01:00
Mark Qvist
356cb6412f Optimise structure overhead 2025-01-21 10:46:59 +01:00
14 changed files with 2233 additions and 534 deletions

View file

@ -12,7 +12,7 @@ Before creating a bug report on this issue tracker, you **must** read the [Contr
- The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**.
- Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**.
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), delete this section from your bug report.
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**.
**Describe the Bug**
A clear and concise description of what the bug is.

3
FUNDING.yml Normal file
View file

@ -0,0 +1,3 @@
liberapay: Reticulum
ko_fi: markqvist
custom: "https://unsigned.io/donate"

16
LICENSE
View file

@ -1,6 +1,6 @@
MIT License
Reticulum License
Copyright (c) 2020 Mark Qvist / unsigned.io
Copyright (c) 2020-2025 Mark Qvist
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@ -9,8 +9,16 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
- The Software shall not be used in any kind of system which includes amongst
its functions the ability to purposefully do harm to human beings.
- The Software shall not be used, directly or indirectly, in the creation of
an artificial intelligence, machine learning or language model training
dataset, including but not limited to any use that contributes to the
training or development of such a model or algorithm.
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

View file

@ -1,4 +1,5 @@
import time
import threading
import RNS
import RNS.vendor.umsgpack as msgpack
@ -7,20 +8,21 @@ from .LXMessage import LXMessage
class LXMFDeliveryAnnounceHandler:
def __init__(self, lxmrouter):
self.aspect_filter = APP_NAME+".delivery"
self.aspect_filter = APP_NAME+".delivery"
self.receive_path_responses = True
self.lxmrouter = lxmrouter
self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data):
for lxmessage in self.lxmrouter.pending_outbound:
if destination_hash == lxmessage.destination_hash:
if destination_hash == lxmessage.destination_hash:
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
lxmessage.next_delivery_attempt = time.time()
while self.lxmrouter.processing_outbound:
time.sleep(0.1)
def outbound_trigger():
while self.lxmrouter.processing_outbound: time.sleep(0.1)
self.lxmrouter.process_outbound()
self.lxmrouter.process_outbound()
threading.Thread(target=outbound_trigger, daemon=True).start()
try:
stamp_cost = stamp_cost_from_app_data(app_data)
@ -32,32 +34,58 @@ class LXMFDeliveryAnnounceHandler:
class LXMFPropagationAnnounceHandler:
def __init__(self, lxmrouter):
self.aspect_filter = APP_NAME+".propagation"
self.receive_path_responses = False
self.lxmrouter = lxmrouter
self.aspect_filter = APP_NAME+".propagation"
self.receive_path_responses = True
self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data):
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash, is_path_response):
try:
if type(app_data) == bytes:
if self.lxmrouter.propagation_node and self.lxmrouter.autopeer:
data = msgpack.unpackb(app_data)
if self.lxmrouter.propagation_node:
if pn_announce_data_is_valid(app_data):
data = msgpack.unpackb(app_data)
node_timebase = int(data[1])
propagation_enabled = data[2]
propagation_transfer_limit = int(data[3])
propagation_sync_limit = int(data[4])
propagation_stamp_cost = int(data[5][0])
propagation_stamp_cost_flexibility = int(data[5][1])
peering_cost = int(data[5][2])
metadata = data[6]
if destination_hash in self.lxmrouter.static_peers:
static_peer = self.lxmrouter.peers[destination_hash]
if not is_path_response or static_peer.last_heard == 0:
self.lxmrouter.peer(destination_hash=destination_hash,
timestamp=node_timebase,
propagation_transfer_limit=propagation_transfer_limit,
propagation_sync_limit=propagation_sync_limit,
propagation_stamp_cost=propagation_stamp_cost,
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
peering_cost=peering_cost,
metadata=metadata)
if pn_announce_data_is_valid(data):
node_timebase = data[1]
propagation_transfer_limit = None
if len(data) >= 3:
try:
propagation_transfer_limit = float(data[2])
except:
propagation_transfer_limit = None
else:
if self.lxmrouter.autopeer and not is_path_response:
if propagation_enabled == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
self.lxmrouter.peer(destination_hash=destination_hash,
timestamp=node_timebase,
propagation_transfer_limit=propagation_transfer_limit,
propagation_sync_limit=propagation_sync_limit,
propagation_stamp_cost=propagation_stamp_cost,
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
peering_cost=peering_cost,
metadata=metadata)
if data[0] == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
else:
if destination_hash in self.lxmrouter.peers:
RNS.log(f"Peer {self.lxmrouter.peers[destination_hash]} moved outside auto-peering range, breaking peering...")
self.lxmrouter.unpeer(destination_hash, node_timebase)
elif data[0] == False:
self.lxmrouter.unpeer(destination_hash, node_timebase)
elif propagation_enabled == False:
self.lxmrouter.unpeer(destination_hash, node_timebase)
except Exception as e:
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
RNS.log(f"The contained exception was: {str(e)}", RNS.LOG_DEBUG)

View file

@ -91,6 +91,18 @@ RENDERER_MICRON = 0x01
RENDERER_MARKDOWN = 0x02
RENDERER_BBCODE = 0x03
# Optional propagation node metadata fields. These
# fields may be highly unstable in allocation and
# availability until the version 1.0.0 release, so use
# at your own risk until then, and expect changes!
PN_META_VERSION = 0x00
PN_META_NAME = 0x01
PN_META_SYNC_STRATUM = 0x02
PN_META_SYNC_THROTTLE = 0x03
PN_META_AUTH_BAND = 0x04
PN_META_UTIL_PRESSURE = 0x05
PN_META_CUSTOM = 0xFF
##########################################################
# The following helper functions makes it easier to #
# handle and operate on LXMF data in client programs #
@ -99,21 +111,17 @@ RENDERER_BBCODE = 0x03
import RNS
import RNS.vendor.umsgpack as msgpack
def display_name_from_app_data(app_data=None):
if app_data == None:
return None
elif len(app_data) == 0:
return None
if app_data == None: return None
elif len(app_data) == 0: return None
else:
# Version 0.5.0+ announce format
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
peer_data = msgpack.unpackb(app_data)
if type(peer_data) == list:
if len(peer_data) < 1:
return None
if len(peer_data) < 1: return None
else:
dn = peer_data[0]
if dn == None:
return None
if dn == None: return None
else:
try:
decoded = dn.decode("utf-8")
@ -127,36 +135,61 @@ def display_name_from_app_data(app_data=None):
return app_data.decode("utf-8")
def stamp_cost_from_app_data(app_data=None):
if app_data == None or app_data == b"":
return None
if app_data == None or app_data == b"": return None
else:
# Version 0.5.0+ announce format
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
peer_data = msgpack.unpackb(app_data)
if type(peer_data) == list:
if len(peer_data) < 2:
return None
else:
return peer_data[1]
if len(peer_data) < 2: return None
else: return peer_data[1]
# Original announce format
else: return None
def pn_name_from_app_data(app_data=None):
if app_data == None: return None
else:
if pn_announce_data_is_valid(app_data):
data = msgpack.unpackb(app_data)
metadata = data[6]
if not PN_META_NAME in metadata: return None
else:
try: return metadata[PN_META_NAME].decode("utf-8")
except: return None
return None
def pn_stamp_cost_from_app_data(app_data=None):
if app_data == None: return None
else:
if pn_announce_data_is_valid(app_data):
data = msgpack.unpackb(app_data)
return data[5][0]
else:
return None
def pn_announce_data_is_valid(data):
try:
if type(data) == bytes:
data = msgpack.unpackb(data)
if len(data) < 3:
raise ValueError("Invalid announce data: Insufficient peer data")
if type(data) != bytes: return False
else: data = msgpack.unpackb(data)
if len(data) < 7: raise ValueError("Invalid announce data: Insufficient peer data, likely from deprecated LXMF version")
else:
if data[0] != True and data[0] != False:
raise ValueError("Invalid announce data: Indeterminate propagation node status")
try:
int(data[1])
except:
raise ValueError("Invalid announce data: Could not decode peer timebase")
try: int(data[1])
except: raise ValueError("Invalid announce data: Could not decode timebase")
if data[2] != True and data[2] != False: raise ValueError("Invalid announce data: Indeterminate propagation node status")
try: int(data[3])
except: raise ValueError("Invalid announce data: Could not decode propagation transfer limit")
try: int(data[4])
except: raise ValueError("Invalid announce data: Could not decode propagation sync limit")
if type(data[5]) != list: raise ValueError("Invalid announce data: Could not decode stamp costs")
try: int(data[5][0])
except: raise ValueError("Invalid announce data: Could not decode target stamp cost")
try: int(data[5][1])
except: raise ValueError("Invalid announce data: Could not decode stamp cost flexibility")
try: int(data[5][2])
except: raise ValueError("Invalid announce data: Could not decode peering cost")
if type(data[6]) != dict: raise ValueError("Invalid announce data: Could not decode metadata")
except Exception as e:
RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG)

View file

@ -1,24 +1,38 @@
import os
import time
import threading
import RNS
import RNS.vendor.umsgpack as msgpack
import LXMF.LXStamper as LXStamper
from collections import deque
from .LXMF import APP_NAME
from .LXMF import PN_META_NAME
class LXMPeer:
OFFER_REQUEST_PATH = "/offer"
MESSAGE_GET_PATH = "/get"
IDLE = 0x00
LINK_ESTABLISHING = 0x01
LINK_READY = 0x02
REQUEST_SENT = 0x03
RESPONSE_RECEIVED = 0x04
IDLE = 0x00
LINK_ESTABLISHING = 0x01
LINK_READY = 0x02
REQUEST_SENT = 0x03
RESPONSE_RECEIVED = 0x04
RESOURCE_TRANSFERRING = 0x05
ERROR_NO_IDENTITY = 0xf0
ERROR_NO_ACCESS = 0xf1
ERROR_NO_IDENTITY = 0xf0
ERROR_NO_ACCESS = 0xf1
ERROR_INVALID_KEY = 0xf3
ERROR_INVALID_DATA = 0xf4
ERROR_INVALID_STAMP = 0xf5
ERROR_THROTTLED = 0xf6
ERROR_NOT_FOUND = 0xfd
ERROR_TIMEOUT = 0xfe
STRATEGY_LAZY = 0x01
STRATEGY_PERSISTENT = 0x02
DEFAULT_SYNC_STRATEGY = STRATEGY_PERSISTENT
# Maximum amount of time a peer can
# be unreachable before it is removed
@ -38,48 +52,111 @@ class LXMPeer:
@staticmethod
def from_bytes(peer_bytes, router):
dictionary = msgpack.unpackb(peer_bytes)
peer_destination_hash = dictionary["destination_hash"]
peer_peering_timebase = dictionary["peering_timebase"]
peer_alive = dictionary["alive"]
peer_last_heard = dictionary["last_heard"]
peer = LXMPeer(router, dictionary["destination_hash"])
peer.peering_timebase = dictionary["peering_timebase"]
peer.alive = dictionary["alive"]
peer.last_heard = dictionary["last_heard"]
if "link_establishment_rate" in dictionary:
peer.link_establishment_rate = dictionary["link_establishment_rate"]
else:
peer.link_establishment_rate = 0
peer = LXMPeer(router, peer_destination_hash)
peer.peering_timebase = peer_peering_timebase
peer.alive = peer_alive
peer.last_heard = peer_last_heard
if "link_establishment_rate" in dictionary: peer.link_establishment_rate = dictionary["link_establishment_rate"]
else: peer.link_establishment_rate = 0
if "sync_transfer_rate" in dictionary:
peer.sync_transfer_rate = dictionary["sync_transfer_rate"]
else:
peer.sync_transfer_rate = 0
if "sync_transfer_rate" in dictionary: peer.sync_transfer_rate = dictionary["sync_transfer_rate"]
else: peer.sync_transfer_rate = 0
if "propagation_transfer_limit" in dictionary:
try:
peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"])
except Exception as e:
peer.propagation_transfer_limit = None
else:
peer.propagation_transfer_limit = None
try: peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"])
except Exception as e: peer.propagation_transfer_limit = None
else: peer.propagation_transfer_limit = None
if "propagation_sync_limit" in dictionary:
try: peer.propagation_sync_limit = int(dictionary["propagation_sync_limit"])
except: peer.propagation_sync_limit = peer.propagation_transfer_limit
else: peer.propagation_sync_limit = peer.propagation_transfer_limit
if "propagation_stamp_cost" in dictionary:
try: peer.propagation_stamp_cost = int(dictionary["propagation_stamp_cost"])
except: peer.propagation_stamp_cost = None
else: peer.propagation_stamp_cost = None
if "propagation_stamp_cost_flexibility" in dictionary:
try: peer.propagation_stamp_cost_flexibility = int(dictionary["propagation_stamp_cost_flexibility"])
except: peer.propagation_stamp_cost_flexibility = None
else: peer.propagation_stamp_cost_flexibility = None
if "peering_cost" in dictionary:
try: peer.peering_cost = int(dictionary["peering_cost"])
except: peer.peering_cost = None
else: peer.peering_cost = None
if "sync_strategy" in dictionary:
try: peer.sync_strategy = int(dictionary["sync_strategy"])
except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY
else: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY
if "offered" in dictionary: peer.offered = dictionary["offered"]
else: peer.offered = 0
if "outgoing" in dictionary: peer.outgoing = dictionary["outgoing"]
else: peer.outgoing = 0
if "incoming" in dictionary: peer.incoming = dictionary["incoming"]
else: peer.incoming = 0
if "rx_bytes" in dictionary: peer.rx_bytes = dictionary["rx_bytes"]
else: peer.rx_bytes = 0
if "tx_bytes" in dictionary: peer.tx_bytes = dictionary["tx_bytes"]
else: peer.tx_bytes = 0
if "last_sync_attempt" in dictionary: peer.last_sync_attempt = dictionary["last_sync_attempt"]
else: peer.last_sync_attempt = 0
if "peering_key" in dictionary: peer.peering_key = dictionary["peering_key"]
else: peer.peering_key = None
if "metadata" in dictionary: peer.metadata = dictionary["metadata"]
else: peer.metadata = None
hm_count = 0
for transient_id in dictionary["handled_ids"]:
if transient_id in router.propagation_entries:
peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
peer.add_handled_message(transient_id)
hm_count += 1
um_count = 0
for transient_id in dictionary["unhandled_ids"]:
if transient_id in router.propagation_entries:
peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id]
peer.add_unhandled_message(transient_id)
um_count += 1
peer._hm_count = hm_count
peer._um_count = um_count
peer._hm_counts_synced = True
peer._um_counts_synced = True
del dictionary
return peer
def to_bytes(self):
dictionary = {}
dictionary["peering_timebase"] = self.peering_timebase
dictionary["alive"] = self.alive
dictionary["metadata"] = self.metadata
dictionary["last_heard"] = self.last_heard
dictionary["sync_strategy"] = self.sync_strategy
dictionary["peering_key"] = self.peering_key
dictionary["destination_hash"] = self.destination_hash
dictionary["link_establishment_rate"] = self.link_establishment_rate
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
dictionary["propagation_sync_limit"] = self.propagation_sync_limit
dictionary["propagation_stamp_cost"] = self.propagation_stamp_cost
dictionary["propagation_stamp_cost_flexibility"] = self.propagation_stamp_cost_flexibility
dictionary["peering_cost"] = self.peering_cost
dictionary["last_sync_attempt"] = self.last_sync_attempt
dictionary["offered"] = self.offered
dictionary["outgoing"] = self.outgoing
dictionary["incoming"] = self.incoming
dictionary["rx_bytes"] = self.rx_bytes
dictionary["tx_bytes"] = self.tx_bytes
handled_ids = []
for transient_id in self.handled_messages:
@ -92,24 +169,50 @@ class LXMPeer:
dictionary["handled_ids"] = handled_ids
dictionary["unhandled_ids"] = unhandled_ids
return msgpack.packb(dictionary)
peer_bytes = msgpack.packb(dictionary)
del dictionary
def __init__(self, router, destination_hash):
self.alive = False
self.last_heard = 0
self.next_sync_attempt = 0
self.last_sync_attempt = 0
self.sync_backoff = 0
self.peering_timebase = 0
return peer_bytes
def __init__(self, router, destination_hash, sync_strategy=DEFAULT_SYNC_STRATEGY):
self.alive = False
self.last_heard = 0
self.sync_strategy = sync_strategy
self.peering_key = None
self.peering_cost = None
self.metadata = None
self.next_sync_attempt = 0
self.last_sync_attempt = 0
self.sync_backoff = 0
self.peering_timebase = 0
self.link_establishment_rate = 0
self.sync_transfer_rate = 0
self.propagation_transfer_limit = None
self.sync_transfer_rate = 0
self.propagation_transfer_limit = None
self.propagation_sync_limit = None
self.propagation_stamp_cost = None
self.propagation_stamp_cost_flexibility = None
self.currently_transferring_messages = None
self.handled_messages_queue = deque()
self.unhandled_messages_queue = deque()
self.offered = 0 # Messages offered to this peer
self.outgoing = 0 # Messages transferred to this peer
self.incoming = 0 # Messages received from this peer
self.rx_bytes = 0 # Bytes received from this peer
self.tx_bytes = 0 # Bytes sent to this peer
self._hm_count = 0
self._um_count = 0
self._hm_counts_synced = False
self._um_counts_synced = False
self._peering_key_lock = threading.Lock()
self.link = None
self.state = LXMPeer.IDLE
self.unhandled_messages = {}
self.handled_messages = {}
self.last_offer = []
self.router = router
@ -118,13 +221,77 @@ class LXMPeer:
if self.identity != None:
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
else:
self.destination = None
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
def peering_key_ready(self):
if not self.peering_cost: return False
if type(self.peering_key) == list and len(self.peering_key) == 2:
value = self.peering_key[1]
if value >= self.peering_cost: return True
else:
RNS.log(f"Peering key value mismatch for {self}. Current value is {value}, but peer requires {self.peering_cost}. Scheduling regeneration...", RNS.LOG_WARNING)
self.peering_key = None
return False
def peering_key_value(self):
if type(self.peering_key) == list and len(self.peering_key) == 2: return self.peering_key[1]
else: return None
def generate_peering_key(self):
if self.peering_cost == None: return False
with self._peering_key_lock:
if self.peering_key != None: return True
else:
RNS.log(f"Generating peering key for {self}", RNS.LOG_NOTICE)
if self.router.identity == None:
RNS.log(f"Could not update peering key for {self} since the local LXMF router identity is not configured", RNS.LOG_ERROR)
return False
if self.identity == None:
self.identity = RNS.Identity.recall(destination_hash)
if self.identity == None:
RNS.log(f"Could not update peering key for {self} since its identity could not be recalled", RNS.LOG_ERROR)
return False
key_material = self.identity.hash+self.router.identity.hash
peering_key, value = LXStamper.generate_stamp(key_material, self.peering_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PEERING)
if value >= self.peering_cost:
self.peering_key = [peering_key, value]
RNS.log(f"Peering key successfully generated for {self}", RNS.LOG_NOTICE)
return True
return False
def sync(self):
RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
self.last_sync_attempt = time.time()
if time.time() > self.next_sync_attempt:
sync_time_reached = time.time() > self.next_sync_attempt
stamp_costs_known = self.propagation_stamp_cost != None and self.propagation_stamp_cost_flexibility != None and self.peering_cost != None
peering_key_ready = self.peering_key_ready()
sync_checks = sync_time_reached and stamp_costs_known and peering_key_ready
if not sync_checks:
try:
if not sync_time_reached:
postpone_reason = " due to previous failures"
if self.last_sync_attempt > self.last_heard: self.alive = False
elif not stamp_costs_known:
postpone_reason = " since its required stamp costs are not yet known"
elif not peering_key_ready:
postpone_reason = " since a peering key has not been generated yet"
def job(): self.generate_peering_key()
threading.Thread(target=job, daemon=True).start()
delay = self.next_sync_attempt-time.time()
postpone_delay = f" for {RNS.prettytime(delay)}" if delay > 0 else ""
RNS.log(f"Postponing sync with peer {RNS.prettyhexrep(self.destination_hash)}{postpone_delay}{postpone_reason}", RNS.LOG_DEBUG)
except Exception as e:
RNS.trace_exception(e)
else:
if not RNS.Transport.has_path(self.destination_hash):
RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.destination_hash)
@ -140,7 +307,15 @@ class LXMPeer:
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
if self.destination != None:
if len(self.unhandled_messages) == 0:
RNS.log(f"Sync requested for {self}, but no unhandled messages exist for peer. Sync complete.", RNS.LOG_DEBUG)
return
if len(self.unhandled_messages) > 0:
if self.currently_transferring_messages != None:
RNS.log(f"Sync requested for {self}, but current message transfer index was not clear. Aborting.", RNS.LOG_ERROR)
return
if self.state == LXMPeer.IDLE:
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
self.sync_backoff += LXMPeer.SYNC_BACKOFF_STEP
@ -153,58 +328,69 @@ class LXMPeer:
self.alive = True
self.last_heard = time.time()
self.sync_backoff = 0
min_accepted_cost = min(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing sync offer...", RNS.LOG_DEBUG)
unhandled_entries = []
unhandled_ids = []
purged_ids = []
unhandled_ids = []
purged_ids = []
low_value_ids = []
for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries:
unhandled_entry = [
transient_id,
self.router.get_weight(transient_id),
self.router.get_size(transient_id),
]
unhandled_entries.append(unhandled_entry)
else:
purged_ids.append(transient_id)
if self.router.get_stamp_value(transient_id) < min_accepted_cost: low_value_ids.append(transient_id)
else:
unhandled_entry = [ transient_id,
self.router.get_weight(transient_id),
self.router.get_size(transient_id) ]
unhandled_entries.append(unhandled_entry)
else: purged_ids.append(transient_id)
for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.unhandled_messages.pop(transient_id)
RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.remove_unhandled_message(transient_id)
for transient_id in low_value_ids:
RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since its stamp value is lower than peer requirement of {min_accepted_cost}.", RNS.LOG_DEBUG)
self.remove_unhandled_message(transient_id)
unhandled_entries.sort(key=lambda e: e[1], reverse=False)
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
for unhandled_entry in unhandled_entries:
transient_id = unhandled_entry[0]
weight = unhandled_entry[1]
lxm_size = unhandled_entry[2]
next_size = cumulative_size + (lxm_size+per_message_overhead)
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
pass
else:
cumulative_size += (lxm_size+per_message_overhead)
unhandled_ids.append(transient_id)
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
RNS.log(f"Syncing to peer with per-message limit {RNS.prettysize(self.propagation_transfer_limit*1000)} and sync limit {RNS.prettysize(self.propagation_sync_limit*1000)}") # TODO: Remove debug
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
for unhandled_entry in unhandled_entries:
transient_id = unhandled_entry[0]
weight = unhandled_entry[1]
lxm_size = unhandled_entry[2]
lxm_transfer_size = lxm_size+per_message_overhead
next_size = cumulative_size + lxm_transfer_size
if self.propagation_transfer_limit != None and lxm_transfer_size > (self.propagation_transfer_limit*1000):
self.remove_unhandled_message(transient_id)
self.add_handled_message(transient_id)
continue
if self.propagation_sync_limit != None and next_size >= (self.propagation_sync_limit*1000):
continue
cumulative_size += lxm_transfer_size
unhandled_ids.append(transient_id)
offer = [self.peering_key[0], unhandled_ids]
RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)} ({RNS.prettysize(len(msgpack.packb(unhandled_ids)))})", RNS.LOG_VERBOSE)
self.last_offer = unhandled_ids
self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed)
self.link.request(LXMPeer.OFFER_REQUEST_PATH, offer, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT
else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
else:
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
if self.last_sync_attempt > self.last_heard:
self.alive = False
RNS.log(f"Could not request sync to peer {RNS.prettyhexrep(self.destination_hash)} since its identity could not be recalled.", RNS.LOG_ERROR)
def request_failed(self, request_receipt):
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
if self.link != None:
self.link.teardown()
RNS.log(f"Sync request to peer {self.destination} failed", RNS.LOG_DEBUG)
if self.link != None: self.link.teardown()
self.state = LXMPeer.IDLE
def offer_response(self, request_receipt):
@ -217,22 +403,35 @@ class LXMPeer:
if response == LXMPeer.ERROR_NO_IDENTITY:
if self.link != None:
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG)
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE)
self.link.identify()
self.state = LXMPeer.LINK_READY
self.sync()
return
elif response == LXMPeer.ERROR_NO_ACCESS:
RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE)
self.router.unpeer(self.destination_hash)
return
elif response == LXMPeer.ERROR_THROTTLED:
throttle_time = self.router.PN_STAMP_THROTTLE
RNS.log(f"Remote indicated that we're throttled, postponing sync for {RNS.prettytime(throttle_time)}", RNS.LOG_VERBOSE)
self.next_sync_attempt = time.time()+throttle_time
return
elif response == False:
# Peer already has all advertised messages
for transient_id in self.last_offer:
if transient_id in self.unhandled_messages:
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
self.add_handled_message(transient_id)
self.remove_unhandled_message(transient_id)
elif response == True:
# Peer wants all advertised messages
for transient_id in self.last_offer:
wanted_messages.append(self.unhandled_messages[transient_id])
wanted_messages.append(self.router.propagation_entries[transient_id])
wanted_message_ids.append(transient_id)
else:
@ -241,18 +440,17 @@ class LXMPeer:
# If the peer did not want the message, it has
# already received it from another peer.
if not transient_id in response:
if transient_id in self.unhandled_messages:
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
self.add_handled_message(transient_id)
self.remove_unhandled_message(transient_id)
for transient_id in response:
wanted_messages.append(self.unhandled_messages[transient_id])
wanted_messages.append(self.router.propagation_entries[transient_id])
wanted_message_ids.append(transient_id)
if len(wanted_messages) > 0:
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG)
RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} wanted {str(len(wanted_messages))} of the available messages", RNS.LOG_VERBOSE)
lxm_list = []
for message_entry in wanted_messages:
file_path = message_entry[1]
if os.path.isfile(file_path):
@ -262,13 +460,15 @@ class LXMPeer:
lxm_list.append(lxmf_data)
data = msgpack.packb([time.time(), lxm_list])
RNS.log(f"Total transfer size for this sync is {RNS.prettysize(len(data))}", RNS.LOG_VERBOSE)
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
resource.transferred_messages = wanted_message_ids
resource.sync_transfer_started = time.time()
self.currently_transferring_messages = wanted_message_ids
self.current_sync_transfer_started = time.time()
self.state = LXMPeer.RESOURCE_TRANSFERRING
else:
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} did not request any of the available messages, sync completed", RNS.LOG_VERBOSE)
self.offered += len(self.last_offer)
if self.link != None:
self.link.teardown()
@ -287,32 +487,45 @@ class LXMPeer:
def resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
for transient_id in resource.transferred_messages:
message = self.unhandled_messages.pop(transient_id)
self.handled_messages[transient_id] = message
if self.link != None:
self.link.teardown()
if self.currently_transferring_messages == None:
RNS.log(f"Sync transfer completed on {self}, but transferred message index was unavailable. Aborting.", RNS.LOG_ERROR)
if self.link != None: self.link.teardown()
self.link = None
self.state = LXMPeer.IDLE
self.link = None
self.state = LXMPeer.IDLE
for transient_id in self.currently_transferring_messages:
self.add_handled_message(transient_id)
self.remove_unhandled_message(transient_id)
if self.link != None: self.link.teardown()
self.link = None
self.state = LXMPeer.IDLE
rate_str = ""
if hasattr(resource, "sync_transfer_started") and resource.sync_transfer_started:
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started)
if self.current_sync_transfer_started != None:
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-self.current_sync_transfer_started)
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed"+rate_str, RNS.LOG_DEBUG)
self.alive = True
RNS.log(f"Syncing {len(self.currently_transferring_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
self.alive = True
self.last_heard = time.time()
self.offered += len(self.last_offer)
self.outgoing += len(self.currently_transferring_messages)
self.tx_bytes += resource.get_data_size()
self.currently_transferring_messages = None
self.current_sync_transfer_started = None
if self.sync_strategy == self.STRATEGY_PERSISTENT:
if self.unhandled_message_count > 0: self.sync()
else:
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
if self.link != None:
self.link.teardown()
self.link = None
self.state = LXMPeer.IDLE
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE)
if self.link != None: self.link.teardown()
self.link = None
self.state = LXMPeer.IDLE
self.currently_transferring_messages = None
self.current_sync_transfer_started = None
def link_established(self, link):
self.link.identify(self.router.identity)
@ -325,15 +538,105 @@ class LXMPeer:
self.sync()
def link_closed(self, link):
self.link = None
self.link = None
self.state = LXMPeer.IDLE
def handle_message(self, transient_id):
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
def queued_items(self):
return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0
def __str__(self):
if self.destination_hash:
return RNS.prettyhexrep(self.destination_hash)
def queue_unhandled_message(self, transient_id):
self.unhandled_messages_queue.append(transient_id)
def queue_handled_message(self, transient_id):
self.handled_messages_queue.append(transient_id)
def process_queues(self):
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
handled_messages = self.handled_messages
unhandled_messages = self.unhandled_messages
while len(self.handled_messages_queue) > 0:
transient_id = self.handled_messages_queue.pop()
if not transient_id in handled_messages: self.add_handled_message(transient_id)
if transient_id in unhandled_messages: self.remove_unhandled_message(transient_id)
while len(self.unhandled_messages_queue) > 0:
transient_id = self.unhandled_messages_queue.pop()
if not transient_id in handled_messages and not transient_id in unhandled_messages:
self.add_unhandled_message(transient_id)
del handled_messages, unhandled_messages
@property
def handled_messages(self):
pes = self.router.propagation_entries.copy()
hm = list(filter(lambda tid: self.destination_hash in pes[tid][4], pes))
self._hm_count = len(hm); del pes
self._hm_counts_synced = True
return hm
@property
def unhandled_messages(self):
pes = self.router.propagation_entries.copy()
um = list(filter(lambda tid: self.destination_hash in pes[tid][5], pes))
self._um_count = len(um); del pes
self._um_counts_synced = True
return um
@property
def handled_message_count(self):
if not self._hm_counts_synced: self._update_counts()
return self._hm_count
@property
def unhandled_message_count(self):
if not self._um_counts_synced: self._update_counts()
return self._um_count
@property
def acceptance_rate(self):
return 0 if self.offered == 0 else (self.outgoing/self.offered)
def _update_counts(self):
if not self._hm_counts_synced:
hm = self.handled_messages; del hm
if not self._um_counts_synced:
um = self.unhandled_messages; del um
def add_handled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if not self.destination_hash in self.router.propagation_entries[transient_id][4]:
self.router.propagation_entries[transient_id][4].append(self.destination_hash)
self._hm_counts_synced = False
def add_unhandled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if not self.destination_hash in self.router.propagation_entries[transient_id][5]:
self.router.propagation_entries[transient_id][5].append(self.destination_hash)
self._um_count += 1
def remove_handled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if self.destination_hash in self.router.propagation_entries[transient_id][4]:
self.router.propagation_entries[transient_id][4].remove(self.destination_hash)
self._hm_counts_synced = False
def remove_unhandled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if self.destination_hash in self.router.propagation_entries[transient_id][5]:
self.router.propagation_entries[transient_id][5].remove(self.destination_hash)
self._um_counts_synced = False
@property
def name(self):
if type(self.metadata) != dict: return None
else:
return "<Unknown>"
if not PN_META_NAME in self.metadata: return None
else:
try: return self.metadata[PN_META_NAME].decode("utf-8")
except: return None
def __str__(self):
if self.destination_hash: return RNS.prettyhexrep(self.destination_hash)
else: return "<Unknown>"

File diff suppressed because it is too large Load diff

View file

@ -145,26 +145,32 @@ class LXMessage:
self.set_fields(fields)
self.payload = None
self.timestamp = None
self.signature = None
self.hash = None
self.packed = None
self.state = LXMessage.GENERATING
self.method = LXMessage.UNKNOWN
self.progress = 0.0
self.rssi = None
self.snr = None
self.q = None
self.payload = None
self.timestamp = None
self.signature = None
self.hash = None
self.transient_id = None
self.packed = None
self.state = LXMessage.GENERATING
self.method = LXMessage.UNKNOWN
self.progress = 0.0
self.rssi = None
self.snr = None
self.q = None
self.stamp = None
self.stamp_cost = stamp_cost
self.stamp_value = None
self.stamp_valid = False
self.stamp_checked = False
self.defer_stamp = True
self.outbound_ticket = None
self.include_ticket = include_ticket
self.stamp = None
self.stamp_cost = stamp_cost
self.stamp_value = None
self.stamp_valid = False
self.stamp_checked = False
self.propagation_stamp = None
self.propagation_stamp_value = None
self.propagation_stamp_valid = False
self.propagation_target_cost = None
self.defer_stamp = True
self.defer_propagation_stamp = True
self.outbound_ticket = None
self.include_ticket = include_ticket
self.propagation_packed = None
self.paper_packed = None
@ -184,6 +190,7 @@ class LXMessage:
self.resource_representation = None
self.__delivery_destination = None
self.__delivery_callback = None
self.__pn_encrypted_data = None
self.failed_callback = None
self.deferred_stamp_generating = False
@ -268,15 +275,6 @@ class LXMessage:
def register_failed_callback(self, callback):
self.failed_callback = callback
@staticmethod
def stamp_valid(stamp, target_cost, workblock):
target = 0b1 << 256-target_cost
result = RNS.Identity.full_hash(workblock+stamp)
if int.from_bytes(result, byteorder="big") > target:
return False
else:
return True
def validate_stamp(self, target_cost, tickets=None):
if tickets != None:
for ticket in tickets:
@ -293,7 +291,7 @@ class LXMessage:
return False
else:
workblock = LXStamper.stamp_workblock(self.message_id)
if LXMessage.stamp_valid(self.stamp, target_cost, workblock):
if LXStamper.stamp_valid(self.stamp, target_cost, workblock):
RNS.log(f"Stamp on {self} validated", RNS.LOG_DEBUG) # TODO: Remove at some point
self.stamp_value = LXStamper.stamp_value(workblock, self.stamp)
return True
@ -333,10 +331,35 @@ class LXMessage:
else:
return None
def pack(self):
def get_propagation_stamp(self, target_cost, timeout=None):
# If a stamp was already generated, return
# it immediately.
if self.propagation_stamp != None:
return self.propagation_stamp
# Otherwise, we will need to generate a
# valid stamp according to the cost that
# the propagation node has specified.
else:
self.propagation_target_cost = target_cost
if self.propagation_target_cost == None:
raise ValueError("Cannot generate propagation stamp without configured target propagation cost")
if not self.transient_id: self.pack()
generated_stamp, value = LXStamper.generate_stamp(self.transient_id, target_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN)
if generated_stamp:
self.propagation_stamp = generated_stamp
self.propagation_stamp_value = value
self.propagation_stamp_valid = True
return generated_stamp
else:
return None
def pack(self, payload_updated=False):
if not self.packed:
if self.timestamp == None:
self.timestamp = time.time()
if self.timestamp == None: self.timestamp = time.time()
self.propagation_packed = None
self.paper_packed = None
@ -352,9 +375,8 @@ class LXMessage:
if not self.defer_stamp:
self.stamp = self.get_stamp()
if self.stamp != None:
self.payload.append(self.stamp)
if self.stamp != None: self.payload.append(self.stamp)
signed_part = b""
signed_part += hashed_part
signed_part += self.hash
@ -380,7 +402,7 @@ class LXMessage:
if self.desired_method == LXMessage.OPPORTUNISTIC:
if self.__destination.type == RNS.Destination.SINGLE:
if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT:
RNS.log(f"Opportunistic delivery was requested for {self}, but content exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
self.desired_method = LXMessage.DIRECT
# Set delivery parameters according to delivery method
@ -409,9 +431,14 @@ class LXMessage:
elif self.desired_method == LXMessage.PROPAGATED:
single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
self.ratchet_id = self.__destination.latest_ratchet_id
self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]])
if self.__pn_encrypted_data == None or payload_updated:
self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
self.ratchet_id = self.__destination.latest_ratchet_id
lxmf_data = self.packed[:LXMessage.DESTINATION_LENGTH]+self.__pn_encrypted_data
self.transient_id = RNS.Identity.full_hash(lxmf_data)
if self.propagation_stamp != None: lxmf_data += self.propagation_stamp
self.propagation_packed = msgpack.packb([time.time(), [lxmf_data]])
content_size = len(self.propagation_packed)
if content_size <= single_packet_content_limit:

View file

@ -3,25 +3,30 @@ import RNS.vendor.umsgpack as msgpack
import os
import time
import math
import itertools
import multiprocessing
WORKBLOCK_EXPAND_ROUNDS = 3000
WORKBLOCK_EXPAND_ROUNDS = 3000
WORKBLOCK_EXPAND_ROUNDS_PN = 1000
WORKBLOCK_EXPAND_ROUNDS_PEERING = 25
STAMP_SIZE = RNS.Identity.HASHLENGTH//8
PN_VALIDATION_POOL_MIN_SIZE = 256
active_jobs = {}
def stamp_workblock(message_id):
if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork")
def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
wb_st = time.time()
expand_rounds = WORKBLOCK_EXPAND_ROUNDS
workblock = b""
for n in range(expand_rounds):
workblock += RNS.Cryptography.hkdf(
length=256,
derive_from=message_id,
salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)),
context=None,
)
workblock += RNS.Cryptography.hkdf(length=256,
derive_from=material,
salt=RNS.Identity.full_hash(material+msgpack.packb(n)),
context=None)
wb_time = time.time() - wb_st
RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)
# RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)
return workblock
@ -36,28 +41,70 @@ def stamp_value(workblock, stamp):
return value
def generate_stamp(message_id, stamp_cost):
def stamp_valid(stamp, target_cost, workblock):
target = 0b1 << 256-target_cost
result = RNS.Identity.full_hash(workblock+stamp)
if int.from_bytes(result, byteorder="big") > target: return False
else: return True
def validate_peering_key(peering_id, peering_key, target_cost):
workblock = stamp_workblock(peering_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING)
if not stamp_valid(peering_key, target_cost, workblock): return False
else: return True
def validate_pn_stamp(transient_data, target_cost):
from .LXMessage import LXMessage
if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return None, None, None, None
else:
lxm_data = transient_data[:-STAMP_SIZE]
stamp = transient_data[-STAMP_SIZE:]
transient_id = RNS.Identity.full_hash(lxm_data)
workblock = stamp_workblock(transient_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN)
if not stamp_valid(stamp, target_cost, workblock): return None, None, None, None
else:
value = stamp_value(workblock, stamp)
return transient_id, lxm_data, value, stamp
def validate_pn_stamps_job_simple(transient_list, target_cost):
validated_messages = []
for transient_data in transient_list:
transient_id, lxm_data, value, stamp_data = validate_pn_stamp(transient_data, target_cost)
if transient_id: validated_messages.append([transient_id, lxm_data, value, stamp_data])
return validated_messages
def validate_pn_stamps_job_multip(transient_list, target_cost):
cores = multiprocessing.cpu_count()
pool_count = min(cores, math.ceil(len(transient_list) / PN_VALIDATION_POOL_MIN_SIZE))
RNS.log(f"Validating {len(transient_list)} stamps using {pool_count} processes...", RNS.LOG_VERBOSE)
with multiprocessing.Pool(pool_count) as p:
validated_entries = p.starmap(validate_pn_stamp, zip(transient_list, itertools.repeat(target_cost)))
return [e for e in validated_entries if e[0] != None]
def validate_pn_stamps(transient_list, target_cost):
non_mp_platform = RNS.vendor.platformutils.is_android()
if len(transient_list) <= PN_VALIDATION_POOL_MIN_SIZE or non_mp_platform: return validate_pn_stamps_job_simple(transient_list, target_cost)
else: return validate_pn_stamps_job_multip(transient_list, target_cost)
def generate_stamp(message_id, stamp_cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG)
workblock = stamp_workblock(message_id)
workblock = stamp_workblock(message_id, expand_rounds=expand_rounds)
start_time = time.time()
stamp = None
rounds = 0
value = 0
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
stamp, rounds = job_simple(stamp_cost, workblock, message_id)
elif RNS.vendor.platformutils.is_android():
stamp, rounds = job_android(stamp_cost, workblock, message_id)
else:
stamp, rounds = job_linux(stamp_cost, workblock, message_id)
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin(): stamp, rounds = job_simple(stamp_cost, workblock, message_id)
elif RNS.vendor.platformutils.is_android(): stamp, rounds = job_android(stamp_cost, workblock, message_id)
else: stamp, rounds = job_linux(stamp_cost, workblock, message_id)
duration = time.time() - start_time
speed = rounds/duration
if stamp != None:
value = stamp_value(workblock, stamp)
if stamp != None: value = stamp_value(workblock, stamp)
RNS.log(f"Stamp with value {value} generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG)
@ -113,10 +160,8 @@ def job_simple(stamp_cost, workblock, message_id):
def sv(s, c, w):
target = 0b1<<256-c; m = w+s
result = RNS.Identity.full_hash(m)
if int.from_bytes(result, byteorder="big") > target:
return False
else:
return True
if int.from_bytes(result, byteorder="big") > target: return False
else: return True
while not sv(pstamp, stamp_cost, workblock) and not active_jobs[message_id]:
pstamp = os.urandom(256//8); rounds += 1
@ -135,7 +180,8 @@ def job_linux(stamp_cost, workblock, message_id):
allow_kill = True
stamp = None
total_rounds = 0
jobs = multiprocessing.cpu_count()
cores = multiprocessing.cpu_count()
jobs = cores if cores <= 12 else int(cores/2)
stop_event = multiprocessing.Event()
result_queue = multiprocessing.Queue(1)
rounds_queue = multiprocessing.Queue()
@ -310,6 +356,13 @@ def job_android(stamp_cost, workblock, message_id):
return stamp, total_rounds
# def stamp_value_linear(workblock, stamp):
# value = 0
# bits = 256
# material = RNS.Identity.full_hash(workblock+stamp)
# s = int.from_bytes(material, byteorder="big")
# return s.bit_count()
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
@ -325,4 +378,14 @@ if __name__ == "__main__":
RNS.loglevel = RNS.LOG_DEBUG
RNS.log("Testing LXMF stamp generation", RNS.LOG_DEBUG)
message_id = os.urandom(32)
generate_stamp(message_id, cost)
generate_stamp(message_id, cost)
RNS.log("", RNS.LOG_DEBUG)
RNS.log("Testing propagation stamp generation", RNS.LOG_DEBUG)
message_id = os.urandom(32)
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN)
RNS.log("", RNS.LOG_DEBUG)
RNS.log("Testing peering key generation", RNS.LOG_DEBUG)
message_id = os.urandom(32)
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING)

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3
# MIT License
# Reticulum License
#
# Copyright (c) 2016-2022 Mark Qvist / unsigned.io
# Copyright (c) 2020-2025 Mark Qvist
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@ -11,8 +11,16 @@
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# - The Software shall not be used in any kind of system which includes amongst
# its functions the ability to purposefully do harm to human beings.
#
# - The Software shall not be used, directly or indirectly, in the creation of
# an artificial intelligence, machine learning or language model training
# dataset, including but not limited to any use that contributes to the
# training or development of such a model or algorithm.
#
# - The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
@ -35,6 +43,7 @@ import time
import os
from LXMF._version import __version__
from LXMF import APP_NAME
from RNS.vendor.configobj import ConfigObj
@ -96,6 +105,11 @@ def apply_config():
else:
active_configuration["enable_propagation_node"] = False
if "propagation" in lxmd_config and "node_name" in lxmd_config["propagation"]:
active_configuration["node_name"] = lxmd_config["propagation"].get("node_name")
else:
active_configuration["node_name"] = None
if "propagation" in lxmd_config and "auth_required" in lxmd_config["propagation"]:
active_configuration["auth_required"] = lxmd_config["propagation"].as_bool("auth_required")
else:
@ -126,7 +140,7 @@ def apply_config():
if active_configuration["message_storage_limit"] < 0.005:
active_configuration["message_storage_limit"] = 0.005
else:
active_configuration["message_storage_limit"] = 2000
active_configuration["message_storage_limit"] = 500
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
@ -134,11 +148,76 @@ def apply_config():
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
else:
active_configuration["propagation_transfer_max_accepted_size"] = 256
if "propagation" in lxmd_config and "propagation_message_max_accepted_size" in lxmd_config["propagation"]:
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_message_max_accepted_size")
if active_configuration["propagation_transfer_max_accepted_size"] < 0.38:
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
else:
active_configuration["propagation_transfer_max_accepted_size"] = 256
if "propagation" in lxmd_config and "propagation_sync_max_accepted_size" in lxmd_config["propagation"]:
active_configuration["propagation_sync_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_sync_max_accepted_size")
if active_configuration["propagation_sync_max_accepted_size"] < 0.38:
active_configuration["propagation_sync_max_accepted_size"] = 0.38
else:
active_configuration["propagation_sync_max_accepted_size"] = 256*40
if "propagation" in lxmd_config and "propagation_stamp_cost_target" in lxmd_config["propagation"]:
active_configuration["propagation_stamp_cost_target"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_target")
if active_configuration["propagation_stamp_cost_target"] < LXMF.LXMRouter.PROPAGATION_COST_MIN:
active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST_MIN
else:
active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST
if "propagation" in lxmd_config and "propagation_stamp_cost_flexibility" in lxmd_config["propagation"]:
active_configuration["propagation_stamp_cost_flexibility"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_flexibility")
if active_configuration["propagation_stamp_cost_flexibility"] < 0:
active_configuration["propagation_stamp_cost_flexibility"] = 0
else:
active_configuration["propagation_stamp_cost_flexibility"] = LXMF.LXMRouter.PROPAGATION_COST_FLEX
if "propagation" in lxmd_config and "peering_cost" in lxmd_config["propagation"]:
active_configuration["peering_cost"] = lxmd_config["propagation"].as_int("peering_cost")
if active_configuration["peering_cost"] < 0:
active_configuration["peering_cost"] = 0
else:
active_configuration["peering_cost"] = LXMF.LXMRouter.PEERING_COST
if "propagation" in lxmd_config and "remote_peering_cost_max" in lxmd_config["propagation"]:
active_configuration["remote_peering_cost_max"] = lxmd_config["propagation"].as_int("remote_peering_cost_max")
if active_configuration["remote_peering_cost_max"] < 0:
active_configuration["remote_peering_cost_max"] = 0
else:
active_configuration["remote_peering_cost_max"] = LXMF.LXMRouter.MAX_PEERING_COST
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
else:
active_configuration["prioritised_lxmf_destinations"] = []
if "propagation" in lxmd_config and "control_allowed" in lxmd_config["propagation"]:
active_configuration["control_allowed_identities"] = lxmd_config["propagation"].as_list("control_allowed")
else:
active_configuration["control_allowed_identities"] = []
if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]:
static_peers = lxmd_config["propagation"].as_list("static_peers")
active_configuration["static_peers"] = []
for static_peer in static_peers:
active_configuration["static_peers"].append(bytes.fromhex(static_peer))
else:
active_configuration["static_peers"] = []
if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]:
active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers")
else:
active_configuration["max_peers"] = None
if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]:
active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only")
else:
active_configuration["from_static_only"] = False
# Load various settings
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
@ -304,8 +383,17 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
autopeer = active_configuration["autopeer"],
autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
propagation_cost = active_configuration["propagation_stamp_cost_target"],
propagation_cost_flexibility = active_configuration["propagation_stamp_cost_flexibility"],
peering_cost = active_configuration["peering_cost"],
max_peering_cost = active_configuration["remote_peering_cost_max"],
sync_limit = active_configuration["propagation_sync_max_accepted_size"],
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
)
max_peers = active_configuration["max_peers"],
static_peers = active_configuration["static_peers"],
from_static_only = active_configuration["from_static_only"],
name = active_configuration["node_name"])
message_router.register_delivery_callback(lxmf_delivery)
for destination_hash in active_configuration["ignored_lxmf_destinations"]:
@ -337,13 +425,16 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
for dest_str in active_configuration["prioritised_lxmf_destinations"]:
try:
dest_hash = bytes.fromhex(dest_str)
if len(dest_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
message_router.prioritise(dest_hash)
except Exception as e:
RNS.log("Cannot prioritise "+str(dest_str)+", it is not a valid destination hash", RNS.LOG_ERROR)
if len(dest_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: message_router.prioritise(dest_hash)
except Exception as e: RNS.log("Cannot prioritise "+str(dest_str)+", it is not a valid destination hash", RNS.LOG_ERROR)
message_router.enable_propagation()
for ident_str in active_configuration["control_allowed_identities"]:
try:
identity_hash = bytes.fromhex(ident_str)
if len(identity_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: message_router.allow_control(identity_hash)
except Exception as e: RNS.log(f"Cannot allow control from {ident_str}, it is not a valid identity hash", RNS.LOG_ERROR)
RNS.log("LXMF Propagation Node started on "+RNS.prettyhexrep(message_router.propagation_destination.hash))
@ -362,13 +453,13 @@ def jobs():
try:
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE)
message_router.announce(lxmf_destination.hash)
last_peer_announce = time.time()
if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None:
if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE)
message_router.announce_propagation_node()
last_node_announce = time.time()
@ -381,7 +472,7 @@ def deferred_start_jobs():
global active_configuration, last_peer_announce, last_node_announce
global message_router, lxmf_destination
time.sleep(DEFFERED_JOBS_DELAY)
RNS.log("Running deferred start jobs")
RNS.log("Running deferred start jobs", RNS.LOG_DEBUG)
if active_configuration["peer_announce_at_start"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
message_router.announce(lxmf_destination.hash)
@ -394,6 +485,379 @@ def deferred_start_jobs():
last_node_announce = time.time()
threading.Thread(target=jobs, daemon=True).start()
def _request_sync(identity, destination_hash, remote_identity, timeout=15, exit_on_fail=False):
control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
timeout = time.time()+timeout
def check_timeout():
if time.time() > timeout:
if exit_on_fail:
print("Requesting lxmd peer sync timed out, exiting now")
exit(200)
else:
return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
else:
time.sleep(0.1)
if not RNS.Transport.has_path(control_destination.hash):
RNS.Transport.request_path(control_destination.hash)
while not RNS.Transport.has_path(control_destination.hash):
tc = check_timeout()
if tc:
return tc
link = RNS.Link(control_destination)
while not link.status == RNS.Link.ACTIVE:
tc = check_timeout()
if tc:
return tc
link.identify(identity)
request_receipt = link.request(LXMF.LXMRouter.SYNC_REQUEST_PATH, data=destination_hash, response_callback=None, failed_callback=None)
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
tc = check_timeout()
if tc:
return tc
link.teardown()
return request_receipt.get_response()
def request_sync(target, remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=15, identity_path=None):
global configpath, identitypath, storagedir, lxmdir
global lxmd_config, active_configuration, targetloglevel
try:
peer_destination_hash = bytes.fromhex(target)
if len(peer_destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters")
except Exception as e:
print(f"Invalid peer destination hash: {e}")
exit(203)
remote
_remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path)
response = _request_sync(identity, peer_destination_hash, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
print("Remote received no identity")
exit(203)
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
print("Access denied")
exit(204)
elif response == LXMF.LXMPeer.LXMPeer.ERROR_INVALID_DATA:
print("Invalid data received by remote")
exit(205)
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NOT_FOUND:
print("The requested peer was not found")
exit(206)
elif response == None:
print("Empty response received")
exit(207)
else:
print(f"Sync requested for peer {RNS.prettyhexrep(peer_destination_hash)}")
exit(0)
def _request_unpeer(identity, destination_hash, remote_identity, timeout=15, exit_on_fail=False):
control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
timeout = time.time()+timeout
def check_timeout():
if time.time() > timeout:
if exit_on_fail:
print("Requesting lxmd peering break timed out, exiting now")
exit(200)
else: return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
else: time.sleep(0.1)
if not RNS.Transport.has_path(control_destination.hash):
RNS.Transport.request_path(control_destination.hash)
while not RNS.Transport.has_path(control_destination.hash):
tc = check_timeout()
if tc:
return tc
link = RNS.Link(control_destination)
while not link.status == RNS.Link.ACTIVE:
tc = check_timeout()
if tc:
return tc
link.identify(identity)
request_receipt = link.request(LXMF.LXMRouter.UNPEER_REQUEST_PATH, data=destination_hash, response_callback=None, failed_callback=None)
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
tc = check_timeout()
if tc:
return tc
link.teardown()
return request_receipt.get_response()
def request_unpeer(target, remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=15, identity_path=None):
global configpath, identitypath, storagedir, lxmdir
global lxmd_config, active_configuration, targetloglevel
try:
peer_destination_hash = bytes.fromhex(target)
if len(peer_destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters")
except Exception as e:
print(f"Invalid peer destination hash: {e}")
exit(203)
remote
_remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path)
response = _request_unpeer(identity, peer_destination_hash, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
print("Remote received no identity")
exit(203)
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
print("Access denied")
exit(204)
elif response == LXMF.LXMPeer.LXMPeer.ERROR_INVALID_DATA:
print("Invalid data received by remote")
exit(205)
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NOT_FOUND:
print("The requested peer was not found")
exit(206)
elif response == None:
print("Empty response received")
exit(207)
else:
print(f"Broke peering with {RNS.prettyhexrep(peer_destination_hash)}")
exit(0)
def query_status(identity, remote_identity=None, timeout=5, exit_on_fail=False):
if remote_identity == None: remote_identity = identity
control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
timeout = time.time()+timeout
def check_timeout():
if time.time() > timeout:
if exit_on_fail:
print("Getting lxmd statistics timed out, exiting now")
exit(200)
else: return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
else: time.sleep(0.1)
if not RNS.Transport.has_path(control_destination.hash):
RNS.Transport.request_path(control_destination.hash)
while not RNS.Transport.has_path(control_destination.hash):
tc = check_timeout()
if tc: return tc
link = RNS.Link(control_destination)
while not link.status == RNS.Link.ACTIVE:
tc = check_timeout()
if tc: return tc
link.identify(identity)
request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None)
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
tc = check_timeout()
if tc: return tc
link.teardown()
return request_receipt.get_response()
def get_status(remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=5,
show_status=False, show_peers=False, identity_path=None):
global identity
_remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path)
response = query_status(identity, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
print("Remote received no identity")
exit(203)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
print("Access denied")
exit(204)
elif response == None:
print("Empty response received")
exit(207)
else:
s = response
mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2)
ms_util = f"{mutil}%"
if s["from_static_only"]:
who_str = "static peers only"
else:
who_str = "all nodes"
available_peers = 0
unreachable_peers = 0
peered_incoming = 0
peered_outgoing = 0
peered_rx_bytes = 0
peered_tx_bytes = 0
for peer_id in s["peers"]:
p = s["peers"][peer_id]
pm = p["messages"]
peered_incoming += pm["incoming"]
peered_outgoing += pm["outgoing"]
peered_rx_bytes += p["rx_bytes"]
peered_tx_bytes += p["tx_bytes"]
if p["alive"]: available_peers += 1
else: unreachable_peers += 1
total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"]
total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"]
if total_incoming != 0: df = round(peered_outgoing/total_incoming, 2)
else: df = 0
dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"])
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
if show_status:
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
ptl = RNS.prettysize(s["propagation_limit"]*1000); psl = RNS.prettysize(s["sync_limit"]*1000);
uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"]
pc = s["peering_cost"]; pcm = s["max_peering_cost"]
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
print(f"Required propagation stamp cost is {psc}, flexibility is {scf}")
print(f"Peering cost is {pc}, max remote peering cost is {pcm}")
print(f"Accepting propagated messages from {who_str}")
print(f"{ptl} message limit, {psl} sync limit")
print(f"")
print(f"Peers : {stp} total (peer limit is {smp})")
print(f" {sdp} discovered, {ssp} static")
print(f" {available_peers} available, {unreachable_peers} unreachable")
print(f"")
print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})")
print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})")
print(f" {upi} messages received from unpeered nodes ({uprx})")
print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})")
print(f" {cprr} propagation messages received directly from clients")
print(f" {cprs} propagation messages served to clients")
print(f" Distribution factor is {df}")
print(f"")
if show_peers:
if not show_status:
print("")
for peer_id in s["peers"]:
ind = " "
p = s["peers"][peer_id]
if p["type"] == "static":
t = "Static peer "
elif p["type"] == "discovered":
t = "Discovered peer "
else:
t = "Unknown peer "
a = "Available" if p["alive"] == True else "Unreachable"
h = max(time.time()-p["last_heard"], 0)
hops = p["network_distance"]
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
pm = p["messages"]; pk = p["peering_key"]
pc = p["peering_cost"]; psc = p["target_stamp_cost"]; psf = p["stamp_cost_flexibility"]
if pc == None: pc = "unknown"
if psc == None: psc = "unknown"
if psf == None: psf = "unknown"
if pk == None: pk = "Not generated"
else: pk = f"Generated, value is {pk}"
if p["last_sync_attempt"] != 0:
lsa = p["last_sync_attempt"]
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
else:
ls = "never synced"
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"])
stl = RNS.prettysize(p["transfer_limit"]*1000) if p["transfer_limit"] else "Unknown"
ssl = RNS.prettysize(p["sync_limit"]*1000) if p["sync_limit"] else "unknown"
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
pmi = pm["incoming"]; pmuh = pm["unhandled"]; ar = round(p["acceptance_rate"]*100, 2)
if p["name"] == None: nn = ""
else: nn = p["name"].strip().replace("\n", "").replace("\r", "")
if len(nn) > 45: nn = f"{nn[:45]}..."
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
if len(nn): print(f"{ind*2}Name : {nn}")
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
print(f"{ind*2}Costs : Propagation {psc} (flex {psf}), peering {pc}")
print(f"{ind*2}Sync key : {pk}")
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER")
print(f"{ind*2}Limits : {stl} message limit, {ssl} sync limit")
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming, {ar}% acceptance rate")
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
ms = "" if pm["unhandled"] == 1 else "s"
print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}")
print("")
def _get_target_identity(remote=None, timeout=5):
global identity
timeout = time.time()+timeout
def check_timeout():
if time.time() > timeout:
print("Resolving remote identity timed out, exiting now")
exit(200)
else: time.sleep(0.1)
if remote == None: return identity
else:
try:
destination_hash = bytes.fromhex(remote)
if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters")
except Exception as e:
print(f"Invalid remote destination hash: {e}")
exit(203)
remote_identity = RNS.Identity.recall(destination_hash)
if remote_identity: return remote_identity
else:
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
while not RNS.Transport.has_path(destination_hash):
tc = check_timeout()
if tc: return tc
return RNS.Identity.recall(destination_hash)
def _remote_init(configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, identity_path=None):
global configpath, identitypath, storagedir, lxmdir, identity
global lxmd_config, active_configuration, targetloglevel
targetlogdest = RNS.LOG_STDOUT
if identity_path == None:
if configdir == None:
if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"): configdir = "/etc/lxmd"
elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"): configdir = RNS.Reticulum.userdir+"/.config/lxmd"
else: configdir = RNS.Reticulum.userdir+"/.lxmd"
configpath = configdir+"/config"
identitypath = configdir+"/identity"
identity = None
if not os.path.isdir(configdir):
RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR)
exit(201)
if not os.path.isfile(identitypath):
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
exit(202)
else:
identity = RNS.Identity.from_file(identitypath)
if identity == None:
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
exit(4)
else:
if not os.path.isfile(identity_path):
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
exit(202)
else:
identity = RNS.Identity.from_file(identity_path)
if identity == None:
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
exit(4)
if targetloglevel == None: targetloglevel = 3
if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
def main():
try:
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
@ -404,6 +868,13 @@ def main():
parser.add_argument("-v", "--verbose", action="count", default=0)
parser.add_argument("-q", "--quiet", action="count", default=0)
parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file")
parser.add_argument("--status", action="store_true", default=False, help="display node status")
parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes")
parser.add_argument("--sync", action="store", default=None, help="request a sync with the specified peer", type=str)
parser.add_argument("-b", "--break", dest="unpeer", action="store", default=None, help="break peering with the specified peer", type=str)
parser.add_argument("--timeout", action="store", default=None, help="timeout in seconds for query operations", type=float)
parser.add_argument("-r", "--remote", action="store", default=None, help="remote propagation node destination hash", type=str)
parser.add_argument("--identity", action="store", default=None, help="path to identity used for remote requests", type=str)
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
@ -413,15 +884,50 @@ def main():
print(__default_lxmd_config__)
exit()
program_setup(
configdir = args.config,
rnsconfigdir=args.rnsconfig,
run_pn=args.propagation_node,
on_inbound=args.on_inbound,
verbosity=args.verbose,
quietness=args.quiet,
service=args.service
)
if args.status or args.peers:
if not args.timeout: args.timeout = 5
get_status(configdir = args.config,
rnsconfigdir=args.rnsconfig,
verbosity=args.verbose,
quietness=args.quiet,
timeout=args.timeout,
show_status=args.status,
show_peers=args.peers,
identity_path=args.identity,
remote=args.remote)
exit()
if args.sync:
if not args.timeout: args.timeout = 10
request_sync(target=args.sync,
configdir = args.config,
rnsconfigdir=args.rnsconfig,
verbosity=args.verbose,
quietness=args.quiet,
timeout=args.timeout,
identity_path=args.identity,
remote=args.remote)
exit()
if args.unpeer:
if not args.timeout: args.timeout = 10
request_unpeer(target=args.unpeer,
configdir = args.config,
rnsconfigdir=args.rnsconfig,
verbosity=args.verbose,
quietness=args.quiet,
timeout=args.timeout,
identity_path=args.identity,
remote=args.remote)
exit()
program_setup(configdir = args.config,
rnsconfigdir=args.rnsconfig,
run_pn=args.propagation_node,
on_inbound=args.on_inbound,
verbosity=args.verbose,
quietness=args.quiet,
service=args.service)
except KeyboardInterrupt:
print("")
@ -437,6 +943,17 @@ __default_lxmd_config__ = """# This is an example LXM Daemon config file.
enable_node = no
# You can specify identity hashes for remotes
# that are allowed to control and query status
# for this propagation node.
# control_allowed = 7d7e542829b40f32364499b27438dba8, 437229f8e29598b2282b88bad5e44698
# An optional name for this node, included
# in announces.
# node_name = Anonymous Propagation Node
# Automatic announce interval in minutes.
# 6 hours by default.
@ -456,19 +973,6 @@ autopeer = yes
autopeer_maxdepth = 4
# The maximum accepted transfer size per in-
# coming propagation transfer, in kilobytes.
# This also sets the upper limit for the size
# of single messages accepted onto this node.
#
# If a node wants to propagate a larger number
# of messages to this node, than what can fit
# within this limit, it will prioritise sending
# the smallest messages first, and try again
# with any remaining messages at a later point.
propagation_transfer_max_accepted_size = 256
# The maximum amount of storage to use for
# the LXMF Propagation Node message store,
# specified in megabytes. When this limit
@ -477,9 +981,57 @@ propagation_transfer_max_accepted_size = 256
# LXMF prioritises keeping messages that are
# new and small. Large and old messages will
# be removed first. This setting is optional
# and defaults to 2 gigabytes.
# and defaults to 500 megabytes.
# message_storage_limit = 2000
# message_storage_limit = 500
# The maximum accepted transfer size per in-
# coming propagation message, in kilobytes.
# This sets the upper limit for the size of
# single messages accepted onto this node.
# propagation_message_max_accepted_size = 256
# The maximum accepted transfer size per in-
# coming propagation node sync.
#
# If a node wants to propagate a larger number
# of messages to this node, than what can fit
# within this limit, it will prioritise sending
# the smallest messages first, and try again
# with any remaining messages at a later point.
# propagation_sync_max_accepted_size = 10240
# You can configure the target stamp cost
# required to deliver messages via this node.
# propagation_stamp_cost_target = 16
# If set higher than 0, the stamp cost flexi-
# bility option will make this node accept
# messages with a lower stamp cost than the
# target from other propagation nodes (but
# not from peers directly). This allows the
# network to gradually adjust stamp cost.
# propagation_stamp_cost_flexibility = 3
# The peering_cost option configures the target
# value required for a remote node to peer with
# and deliver messages to this node.
# peering_cost = 18
# You can configure the maximum peering cost
# of remote nodes that this node will peer with.
# Setting this to a higher number will allow
# this node to peer with other nodes requiring
# a higher peering key value, but will require
# more computation time during initial peering
# when generating the peering key.
# remote_peering_cost_max = 26
# You can tell the LXMF message router to
# prioritise storage for one or more
@ -491,6 +1043,25 @@ propagation_transfer_max_accepted_size = 256
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
# You can configure the maximum number of other
# propagation nodes that this node will peer
# with automatically. The default is 20.
# max_peers = 20
# You can configure a list of static propagation
# node peers, that this node will always be
# peered with, by specifying a list of
# destination hashes.
# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4
# You can configure the propagation node to
# only accept incoming propagation messages
# from configured static peers.
# from_static_only = True
# By default, any destination is allowed to
# connect and download messages, but you can
# optionally restrict this. If you enable

View file

@ -1 +1 @@
__version__ = "0.6.0"
__version__ = "0.9.3"

View file

@ -12,6 +12,7 @@ User-facing clients built on LXMF include:
Community-provided tools and utilities for LXMF include:
- [LXMFy](https://lxmfy.quad4.io/)
- [LXMF-Bot](https://github.com/randogoth/lxmf-bot)
- [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard)
- [LXMEvent](https://github.com/faragher/LXMEvent)

View file

@ -1,3 +1,2 @@
qrcode==7.4.2
rns==0.7.8
setuptools==70.0.0
qrcode>=7.4.2
rns>=1.0.0

View file

@ -15,9 +15,10 @@ setuptools.setup(
long_description_content_type="text/markdown",
url="https://github.com/markqvist/lxmf",
packages=["LXMF", "LXMF.Utilities"],
license="Reticulum License",
license_files = ("LICENSE"),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
entry_points= {
@ -25,6 +26,6 @@ setuptools.setup(
'lxmd=LXMF.Utilities.lxmd:main',
]
},
install_requires=['rns>=0.9.1'],
python_requires='>=3.7',
install_requires=["rns>=1.0.1"],
python_requires=">=3.7",
)