mirror of
https://github.com/markqvist/LXMF.git
synced 2025-04-06 14:03:40 -04:00
Compare commits
191 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
326c0eed8f | ||
![]() |
336792c07a | ||
![]() |
570d2c6846 | ||
![]() |
1ef4665073 | ||
![]() |
d5540b927f | ||
![]() |
a6cf585109 | ||
![]() |
c0a8f3be49 | ||
![]() |
7b4780cfb7 | ||
![]() |
b94a712bb6 | ||
![]() |
f42ccfc4e9 | ||
![]() |
9eca747757 | ||
![]() |
b7b6753640 | ||
![]() |
40d0b9a5de | ||
![]() |
40fc75f559 | ||
![]() |
f1d060a92e | ||
![]() |
e0e901291e | ||
![]() |
886ac69a82 | ||
![]() |
e0163e100a | ||
![]() |
26a10cce8f | ||
![]() |
cec903a4dc | ||
![]() |
962d9c90d1 | ||
![]() |
6d2eb4f973 | ||
![]() |
a8cc5f41cf | ||
![]() |
aa57b16cf5 | ||
![]() |
cdea838a6c | ||
![]() |
fb4bf9b0b9 | ||
![]() |
a3e3868f92 | ||
![]() |
70186cf8d9 | ||
![]() |
fe59b265c5 | ||
![]() |
a87458d25f | ||
![]() |
35dd70c59e | ||
![]() |
a198e96064 | ||
![]() |
e3be7e0cfd | ||
![]() |
460645cea2 | ||
![]() |
f683e03891 | ||
![]() |
2c71cea7a0 | ||
![]() |
61b1ecce27 | ||
![]() |
68257a441f | ||
![]() |
e69da2ed2a | ||
![]() |
c2a08ef355 | ||
![]() |
1430b1ce90 | ||
![]() |
1c9c744107 | ||
![]() |
bfed126a7c | ||
![]() |
44d1d992f8 | ||
![]() |
7701f326d9 | ||
![]() |
356cb6412f | ||
![]() |
7bd3cf986d | ||
![]() |
3948c9a187 | ||
![]() |
d6b1b9c94d | ||
![]() |
a676954116 | ||
![]() |
d97c4f292e | ||
![]() |
2d175a331f | ||
![]() |
976305b791 | ||
![]() |
a6a42eff80 | ||
![]() |
96dddf1b3a | ||
![]() |
c426c93cc5 | ||
![]() |
1a43d93da2 | ||
![]() |
575fbc9ffe | ||
![]() |
c21da895b6 | ||
![]() |
b172c7fcd4 | ||
![]() |
61331b58d7 | ||
![]() |
9ff76c0473 | ||
![]() |
c9272c9218 | ||
![]() |
36f0c17c8b | ||
![]() |
aa406d1552 | ||
![]() |
0cb771439f | ||
![]() |
672d754238 | ||
![]() |
0178fb0d4f | ||
![]() |
19d8909b10 | ||
![]() |
19f0fa7724 | ||
![]() |
eefb1c8349 | ||
![]() |
0e0d01a0b2 | ||
![]() |
ac2c9c3a9b | ||
![]() |
05f144ae50 | ||
![]() |
a461fd415a | ||
![]() |
ef1c3331ad | ||
![]() |
e1a56be5c1 | ||
![]() |
cf6fc4a510 | ||
![]() |
67d21deff9 | ||
![]() |
efc15bde26 | ||
![]() |
b5d6ed3d9b | ||
![]() |
7789e0bc26 | ||
![]() |
0e2f0fb090 | ||
![]() |
19696d206d | ||
![]() |
9193aa5e02 | ||
![]() |
bbf1eda3b0 | ||
![]() |
d8e2e2a45f | ||
![]() |
0fc15e6054 | ||
![]() |
717240c8d4 | ||
![]() |
453772859d | ||
![]() |
98eea1171e | ||
![]() |
ea49d22bce | ||
![]() |
058186dfdc | ||
![]() |
8134672436 | ||
![]() |
ae0952455b | ||
![]() |
a20f380284 | ||
![]() |
6d83b019e1 | ||
![]() |
40eb014c91 | ||
![]() |
62f5a9eead | ||
![]() |
74cbd11473 | ||
![]() |
0d76eee6cd | ||
![]() |
4b5e27a5e2 | ||
![]() |
dcb0a18cd7 | ||
![]() |
015dcc5631 | ||
![]() |
537f1823b6 | ||
![]() |
fc99010a3d | ||
![]() |
fe14f8744d | ||
![]() |
4dca031441 | ||
![]() |
5366f895b2 | ||
![]() |
95ba8cba60 | ||
![]() |
775ac7ff68 | ||
![]() |
a9fe086bcf | ||
![]() |
e0f763caee | ||
![]() |
c24e99a92b | ||
![]() |
c1a327c5b2 | ||
![]() |
68892091ec | ||
![]() |
e87c6c6d28 | ||
![]() |
0be569ccd6 | ||
![]() |
8e686b10e8 | ||
![]() |
ae203291b9 | ||
![]() |
8b8008478d | ||
![]() |
b4ba9d628b | ||
![]() |
4520507869 | ||
![]() |
e5a960b2df | ||
![]() |
a35e522e75 | ||
![]() |
a810be316f | ||
![]() |
741b66aab2 | ||
![]() |
d767d40c5f | ||
![]() |
2b3fa796a1 | ||
![]() |
f6cc9fd1cf | ||
![]() |
4b13d7188c | ||
![]() |
45e39917b5 | ||
![]() |
27ffea3ea9 | ||
![]() |
d2b2ef54e8 | ||
![]() |
3fbe2e94da | ||
![]() |
21af6a4e5c | ||
![]() |
739349c2a1 | ||
![]() |
994bb07efd | ||
![]() |
1e9fe969fd | ||
![]() |
1d651a9b53 | ||
![]() |
22493005dc | ||
![]() |
7ecd3c0d5e | ||
![]() |
64050d39bf | ||
![]() |
c7489dc0fa | ||
![]() |
bb1b9e4163 | ||
![]() |
b3bc8e684e | ||
![]() |
696c78ecea | ||
![]() |
7aea4ea209 | ||
![]() |
35dc771528 | ||
![]() |
c369c6fc26 | ||
![]() |
af2dd56707 | ||
![]() |
1287cb9f11 | ||
![]() |
f0e9fa6659 | ||
![]() |
7a1bcc134b | ||
![]() |
87413b9355 | ||
![]() |
6ed16916d6 | ||
![]() |
754ae969e1 | ||
![]() |
1ee0c83168 | ||
![]() |
2812a07165 | ||
![]() |
10be1383e5 | ||
![]() |
4d356bcda8 | ||
![]() |
da6fb3d0f3 | ||
![]() |
56cb637bfa | ||
![]() |
2872c94394 | ||
![]() |
f307f33bf8 | ||
![]() |
a095ba7e40 | ||
![]() |
0dc1e13f9d | ||
![]() |
f83600430a | ||
![]() |
537123896b | ||
![]() |
bc8863c3b1 | ||
![]() |
b894c5a2a6 | ||
![]() |
0ac4660510 | ||
![]() |
672054177c | ||
![]() |
820c92d38b | ||
![]() |
9f051aea17 | ||
![]() |
55cf934475 | ||
![]() |
b690880332 | ||
![]() |
7220a4d6f7 | ||
![]() |
5abf01a88f | ||
![]() |
51a4d19595 | ||
![]() |
ad8fb74d9e | ||
![]() |
447a8a920c | ||
![]() |
0ba3ea98cb | ||
![]() |
fc0b086cb1 | ||
![]() |
7a40bd44e8 | ||
![]() |
afe6815637 | ||
![]() |
a64f3a58f7 | ||
![]() |
2bb34176b7 | ||
![]() |
6f80c6941d | ||
![]() |
02d42747b7 | ||
![]() |
5b81ecce04 |
11
.github/ISSUE_TEMPLATE/config.yml
vendored
Normal file
11
.github/ISSUE_TEMPLATE/config.yml
vendored
Normal file
@ -0,0 +1,11 @@
|
||||
blank_issues_enabled: false
|
||||
contact_links:
|
||||
- name: ✨ Feature Request or Idea
|
||||
url: https://github.com/markqvist/Reticulum/discussions/new?category=ideas
|
||||
about: Propose and discuss features and ideas
|
||||
- name: 💬 Questions, Help & Discussion
|
||||
about: Ask anything, or get help
|
||||
url: https://github.com/markqvist/Reticulum/discussions/new/choose
|
||||
- name: 📖 Read the Reticulum Manual
|
||||
url: https://markqvist.github.io/Reticulum/manual/
|
||||
about: The complete documentation for Reticulum
|
35
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
Normal file
35
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
Normal file
@ -0,0 +1,35 @@
|
||||
---
|
||||
name: "\U0001F41B Bug Report"
|
||||
about: Report a reproducible bug
|
||||
title: ''
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Read the Contribution Guidelines**
|
||||
Before creating a bug report on this issue tracker, you **must** read the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md). Issues that do not follow the contribution guidelines **will be deleted without comment**.
|
||||
|
||||
- The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**.
|
||||
- Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**.
|
||||
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**.
|
||||
|
||||
**Describe the Bug**
|
||||
A clear and concise description of what the bug is.
|
||||
|
||||
**To Reproduce**
|
||||
Describe in detail how to reproduce the bug.
|
||||
|
||||
**Expected Behavior**
|
||||
A clear and concise description of what you expected to happen.
|
||||
|
||||
**Logs & Screenshots**
|
||||
Please include any relevant log output. If applicable, also add screenshots to help explain your problem.
|
||||
|
||||
**System Information**
|
||||
- OS and version
|
||||
- Python version
|
||||
- Program version
|
||||
|
||||
**Additional context**
|
||||
Add any other context about the problem here.
|
31
Dockerfile
Normal file
31
Dockerfile
Normal file
@ -0,0 +1,31 @@
|
||||
FROM python:alpine
|
||||
LABEL authors="Petr Blaha petr.blaha@cleverdata.cz"
|
||||
USER root
|
||||
RUN apk update
|
||||
RUN apk add sdl2_ttf sdl2 build-base libc-dev pkgconfig gstreamer sdl2_mixer sdl2_image sdl2_pango linux-headers mesa-dev py3-virtualenv
|
||||
|
||||
RUN addgroup -S myuser && adduser -S -G myuser myuser
|
||||
USER myuser
|
||||
WORKDIR /home/myuser
|
||||
|
||||
RUN pip install --upgrade pip
|
||||
|
||||
|
||||
ENV PATH="/home/myuser/.local/bin:${PATH}"
|
||||
|
||||
################### BEGIN LXMF ###########################################
|
||||
|
||||
COPY --chown=myuser:myuser requirements.txt requirements.txt
|
||||
|
||||
RUN pip install --user -r requirements.txt
|
||||
|
||||
|
||||
COPY --chown=myuser:myuser . .
|
||||
|
||||
#Python create virtual environment
|
||||
RUN virtualenv /home/myuser/LXMF/venv
|
||||
RUN source /home/myuser/LXMF/venv/bin/activate
|
||||
|
||||
RUN make all
|
||||
|
||||
################### END LXMF ###########################################
|
6
Dockerfile.howto
Normal file
6
Dockerfile.howto
Normal file
@ -0,0 +1,6 @@
|
||||
# Run docker command one by one(all four), it will build LXMF artifact and copy to dist directory.
|
||||
# No need to build locally and install dependencies
|
||||
docker build -t lxmfdockerimage .
|
||||
docker run -d -it --name lxmfdockercontainer lxmfdockerimage /bin/sh
|
||||
docker cp lxmfdockercontainer:/home/myuser/dist .
|
||||
docker rm -f lxmfdockercontainer
|
@ -2,19 +2,19 @@ import time
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
from .LXMF import APP_NAME
|
||||
|
||||
from .LXMF import APP_NAME, stamp_cost_from_app_data, pn_announce_data_is_valid
|
||||
from .LXMessage import LXMessage
|
||||
|
||||
class LXMFDeliveryAnnounceHandler:
|
||||
def __init__(self, lxmrouter):
|
||||
self.aspect_filter = APP_NAME+".delivery"
|
||||
self.receive_path_responses = True
|
||||
self.lxmrouter = lxmrouter
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data):
|
||||
for lxmessage in self.lxmrouter.pending_outbound:
|
||||
if destination_hash == lxmessage.destination_hash:
|
||||
if lxmessage.method == LXMessage.DIRECT:
|
||||
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
|
||||
lxmessage.next_delivery_attempt = time.time()
|
||||
|
||||
while self.lxmrouter.processing_outbound:
|
||||
@ -22,23 +22,54 @@ class LXMFDeliveryAnnounceHandler:
|
||||
|
||||
self.lxmrouter.process_outbound()
|
||||
|
||||
try:
|
||||
stamp_cost = stamp_cost_from_app_data(app_data)
|
||||
self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
|
||||
|
||||
class LXMFPropagationAnnounceHandler:
|
||||
def __init__(self, lxmrouter):
|
||||
self.aspect_filter = APP_NAME+".propagation"
|
||||
self.receive_path_responses = False
|
||||
self.lxmrouter = lxmrouter
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data):
|
||||
try:
|
||||
if type(app_data) == bytes:
|
||||
data = msgpack.unpackb(app_data)
|
||||
|
||||
if self.lxmrouter.propagation_node and self.lxmrouter.autopeer:
|
||||
if data[0] == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash, data[1])
|
||||
elif data[0] == False:
|
||||
self.lxmrouter.unpeer(destination_hash, data[1])
|
||||
data = msgpack.unpackb(app_data)
|
||||
|
||||
if pn_announce_data_is_valid(data):
|
||||
node_timebase = data[1]
|
||||
propagation_transfer_limit = None
|
||||
wanted_inbound_peers = None
|
||||
if len(data) >= 4:
|
||||
# TODO: Rethink, probably not necessary anymore
|
||||
# try:
|
||||
# wanted_inbound_peers = int(data[3])
|
||||
# except:
|
||||
# wanted_inbound_peers = None
|
||||
pass
|
||||
|
||||
if len(data) >= 3:
|
||||
try:
|
||||
propagation_transfer_limit = float(data[2])
|
||||
except:
|
||||
propagation_transfer_limit = None
|
||||
|
||||
if destination_hash in self.lxmrouter.static_peers:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
|
||||
|
||||
else:
|
||||
if data[0] == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
|
||||
|
||||
elif data[0] == False:
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)
|
||||
|
166
LXMF/LXMF.py
166
LXMF/LXMF.py
@ -1 +1,165 @@
|
||||
APP_NAME = "lxmf"
|
||||
APP_NAME = "lxmf"
|
||||
|
||||
##########################################################
|
||||
# The following core fields are provided to facilitate #
|
||||
# interoperability in data exchange between various LXMF #
|
||||
# clients and systems. #
|
||||
##########################################################
|
||||
FIELD_EMBEDDED_LXMS = 0x01
|
||||
FIELD_TELEMETRY = 0x02
|
||||
FIELD_TELEMETRY_STREAM = 0x03
|
||||
FIELD_ICON_APPEARANCE = 0x04
|
||||
FIELD_FILE_ATTACHMENTS = 0x05
|
||||
FIELD_IMAGE = 0x06
|
||||
FIELD_AUDIO = 0x07
|
||||
FIELD_THREAD = 0x08
|
||||
FIELD_COMMANDS = 0x09
|
||||
FIELD_RESULTS = 0x0A
|
||||
FIELD_GROUP = 0x0B
|
||||
FIELD_TICKET = 0x0C
|
||||
FIELD_EVENT = 0x0D
|
||||
FIELD_RNR_REFS = 0x0E
|
||||
FIELD_RENDERER = 0x0F
|
||||
|
||||
# For usecases such as including custom data structures,
|
||||
# embedding or encapsulating other data types or protocols
|
||||
# that are not native to LXMF, or bridging/tunneling
|
||||
# external protocols or services over LXMF, the following
|
||||
# fields are available. A format/type/protocol (or other)
|
||||
# identifier can be included in the CUSTOM_TYPE field, and
|
||||
# the embedded payload can be included in the CUSTOM_DATA
|
||||
# field. It is up to the client application to correctly
|
||||
# discern and potentially utilise any data embedded using
|
||||
# this mechanism.
|
||||
FIELD_CUSTOM_TYPE = 0xFB
|
||||
FIELD_CUSTOM_DATA = 0xFC
|
||||
FIELD_CUSTOM_META = 0xFD
|
||||
|
||||
# The non-specific and debug fields are intended for
|
||||
# development, testing and debugging use.
|
||||
FIELD_NON_SPECIFIC = 0xFE
|
||||
FIELD_DEBUG = 0xFF
|
||||
|
||||
##########################################################
|
||||
# The following section lists field-specific specifiers, #
|
||||
# modes and identifiers that are native to LXMF. It is #
|
||||
# optional for any client or system to support any of #
|
||||
# these, and they are provided as template for easing #
|
||||
# interoperability without sacrificing expandability #
|
||||
# and flexibility of the format. #
|
||||
##########################################################
|
||||
|
||||
# Audio modes for the data structure in FIELD_AUDIO
|
||||
|
||||
# Codec2 Audio Modes
|
||||
AM_CODEC2_450PWB = 0x01
|
||||
AM_CODEC2_450 = 0x02
|
||||
AM_CODEC2_700C = 0x03
|
||||
AM_CODEC2_1200 = 0x04
|
||||
AM_CODEC2_1300 = 0x05
|
||||
AM_CODEC2_1400 = 0x06
|
||||
AM_CODEC2_1600 = 0x07
|
||||
AM_CODEC2_2400 = 0x08
|
||||
AM_CODEC2_3200 = 0x09
|
||||
|
||||
# Opus Audio Modes
|
||||
AM_OPUS_OGG = 0x10
|
||||
AM_OPUS_LBW = 0x11
|
||||
AM_OPUS_MBW = 0x12
|
||||
AM_OPUS_PTT = 0x13
|
||||
AM_OPUS_RT_HDX = 0x14
|
||||
AM_OPUS_RT_FDX = 0x15
|
||||
AM_OPUS_STANDARD = 0x16
|
||||
AM_OPUS_HQ = 0x17
|
||||
AM_OPUS_BROADCAST = 0x18
|
||||
AM_OPUS_LOSSLESS = 0x19
|
||||
|
||||
# Custom, unspecified audio mode, the client must
|
||||
# determine it itself based on the included data.
|
||||
AM_CUSTOM = 0xFF
|
||||
|
||||
# Message renderer specifications for FIELD_RENDERER.
|
||||
# The renderer specification is completely optional,
|
||||
# and only serves as an indication to the receiving
|
||||
# client on how to render the message contents. It is
|
||||
# not mandatory to implement, either on sending or
|
||||
# receiving sides, but is the recommended way to
|
||||
# signal how to render a message, if non-plaintext
|
||||
# formatting is used.
|
||||
RENDERER_PLAIN = 0x00
|
||||
RENDERER_MICRON = 0x01
|
||||
RENDERER_MARKDOWN = 0x02
|
||||
RENDERER_BBCODE = 0x03
|
||||
|
||||
##########################################################
|
||||
# The following helper functions makes it easier to #
|
||||
# handle and operate on LXMF data in client programs #
|
||||
##########################################################
|
||||
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
def display_name_from_app_data(app_data=None):
|
||||
if app_data == None:
|
||||
return None
|
||||
elif len(app_data) == 0:
|
||||
return None
|
||||
else:
|
||||
# Version 0.5.0+ announce format
|
||||
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
|
||||
peer_data = msgpack.unpackb(app_data)
|
||||
if type(peer_data) == list:
|
||||
if len(peer_data) < 1:
|
||||
return None
|
||||
else:
|
||||
dn = peer_data[0]
|
||||
if dn == None:
|
||||
return None
|
||||
else:
|
||||
try:
|
||||
decoded = dn.decode("utf-8")
|
||||
return decoded
|
||||
except Exception as e:
|
||||
RNS.log(f"Could not decode display name in included announce data. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
return None
|
||||
|
||||
# Original announce format
|
||||
else:
|
||||
return app_data.decode("utf-8")
|
||||
|
||||
def stamp_cost_from_app_data(app_data=None):
|
||||
if app_data == None or app_data == b"":
|
||||
return None
|
||||
else:
|
||||
# Version 0.5.0+ announce format
|
||||
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
|
||||
peer_data = msgpack.unpackb(app_data)
|
||||
if type(peer_data) == list:
|
||||
if len(peer_data) < 2:
|
||||
return None
|
||||
else:
|
||||
return peer_data[1]
|
||||
|
||||
# Original announce format
|
||||
else:
|
||||
return None
|
||||
|
||||
def pn_announce_data_is_valid(data):
|
||||
try:
|
||||
if type(data) == bytes:
|
||||
data = msgpack.unpackb(data)
|
||||
|
||||
if len(data) < 3:
|
||||
raise ValueError("Invalid announce data: Insufficient peer data")
|
||||
else:
|
||||
if data[0] != True and data[0] != False:
|
||||
raise ValueError("Invalid announce data: Indeterminate propagation node status")
|
||||
try:
|
||||
int(data[1])
|
||||
except:
|
||||
raise ValueError("Invalid announce data: Could not decode peer timebase")
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG)
|
||||
return False
|
||||
|
||||
return True
|
346
LXMF/LXMPeer.py
346
LXMF/LXMPeer.py
@ -4,6 +4,7 @@ import time
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
from collections import deque
|
||||
from .LXMF import APP_NAME
|
||||
|
||||
class LXMPeer:
|
||||
@ -19,10 +20,11 @@ class LXMPeer:
|
||||
|
||||
ERROR_NO_IDENTITY = 0xf0
|
||||
ERROR_NO_ACCESS = 0xf1
|
||||
ERROR_TIMEOUT = 0xfe
|
||||
|
||||
# Maximum amount of time a peer can
|
||||
# be unreachable before it is removed
|
||||
MAX_UNREACHABLE = 4*24*60*60
|
||||
MAX_UNREACHABLE = 14*24*60*60
|
||||
|
||||
# Everytime consecutive time a sync
|
||||
# link fails to establish, add this
|
||||
@ -38,20 +40,82 @@ class LXMPeer:
|
||||
@staticmethod
|
||||
def from_bytes(peer_bytes, router):
|
||||
dictionary = msgpack.unpackb(peer_bytes)
|
||||
peer_destination_hash = dictionary["destination_hash"]
|
||||
peer_peering_timebase = dictionary["peering_timebase"]
|
||||
peer_alive = dictionary["alive"]
|
||||
peer_last_heard = dictionary["last_heard"]
|
||||
|
||||
peer = LXMPeer(router, dictionary["destination_hash"])
|
||||
peer.peering_timebase = dictionary["peering_timebase"]
|
||||
peer.alive = dictionary["alive"]
|
||||
peer.last_heard = dictionary["last_heard"]
|
||||
peer = LXMPeer(router, peer_destination_hash)
|
||||
peer.peering_timebase = peer_peering_timebase
|
||||
peer.alive = peer_alive
|
||||
peer.last_heard = peer_last_heard
|
||||
|
||||
if "link_establishment_rate" in dictionary:
|
||||
peer.link_establishment_rate = dictionary["link_establishment_rate"]
|
||||
else:
|
||||
peer.link_establishment_rate = 0
|
||||
|
||||
if "sync_transfer_rate" in dictionary:
|
||||
peer.sync_transfer_rate = dictionary["sync_transfer_rate"]
|
||||
else:
|
||||
peer.sync_transfer_rate = 0
|
||||
|
||||
if "propagation_transfer_limit" in dictionary:
|
||||
try:
|
||||
peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"])
|
||||
except Exception as e:
|
||||
peer.propagation_transfer_limit = None
|
||||
else:
|
||||
peer.propagation_transfer_limit = None
|
||||
|
||||
if "offered" in dictionary:
|
||||
peer.offered = dictionary["offered"]
|
||||
else:
|
||||
peer.offered = 0
|
||||
|
||||
if "outgoing" in dictionary:
|
||||
peer.outgoing = dictionary["outgoing"]
|
||||
else:
|
||||
peer.outgoing = 0
|
||||
|
||||
if "incoming" in dictionary:
|
||||
peer.incoming = dictionary["incoming"]
|
||||
else:
|
||||
peer.incoming = 0
|
||||
|
||||
if "rx_bytes" in dictionary:
|
||||
peer.rx_bytes = dictionary["rx_bytes"]
|
||||
else:
|
||||
peer.rx_bytes = 0
|
||||
|
||||
if "tx_bytes" in dictionary:
|
||||
peer.tx_bytes = dictionary["tx_bytes"]
|
||||
else:
|
||||
peer.tx_bytes = 0
|
||||
|
||||
if "last_sync_attempt" in dictionary:
|
||||
peer.last_sync_attempt = dictionary["last_sync_attempt"]
|
||||
else:
|
||||
peer.last_sync_attempt = 0
|
||||
|
||||
hm_count = 0
|
||||
for transient_id in dictionary["handled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
peer.add_handled_message(transient_id)
|
||||
hm_count += 1
|
||||
|
||||
um_count = 0
|
||||
for transient_id in dictionary["unhandled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
peer.add_unhandled_message(transient_id)
|
||||
um_count += 1
|
||||
|
||||
peer._hm_count = hm_count
|
||||
peer._um_count = um_count
|
||||
peer._hm_counts_synced = True
|
||||
peer._um_counts_synced = True
|
||||
|
||||
del dictionary
|
||||
return peer
|
||||
|
||||
def to_bytes(self):
|
||||
@ -60,6 +124,15 @@ class LXMPeer:
|
||||
dictionary["alive"] = self.alive
|
||||
dictionary["last_heard"] = self.last_heard
|
||||
dictionary["destination_hash"] = self.destination_hash
|
||||
dictionary["link_establishment_rate"] = self.link_establishment_rate
|
||||
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
|
||||
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
|
||||
dictionary["last_sync_attempt"] = self.last_sync_attempt
|
||||
dictionary["offered"] = self.offered
|
||||
dictionary["outgoing"] = self.outgoing
|
||||
dictionary["incoming"] = self.incoming
|
||||
dictionary["rx_bytes"] = self.rx_bytes
|
||||
dictionary["tx_bytes"] = self.tx_bytes
|
||||
|
||||
handled_ids = []
|
||||
for transient_id in self.handled_messages:
|
||||
@ -72,7 +145,10 @@ class LXMPeer:
|
||||
dictionary["handled_ids"] = handled_ids
|
||||
dictionary["unhandled_ids"] = unhandled_ids
|
||||
|
||||
return msgpack.packb(dictionary)
|
||||
peer_bytes = msgpack.packb(dictionary)
|
||||
del dictionary
|
||||
|
||||
return peer_bytes
|
||||
|
||||
def __init__(self, router, destination_hash):
|
||||
self.alive = False
|
||||
@ -81,20 +157,38 @@ class LXMPeer:
|
||||
self.last_sync_attempt = 0
|
||||
self.sync_backoff = 0
|
||||
self.peering_timebase = 0
|
||||
self.link_establishment_rate = 0
|
||||
self.sync_transfer_rate = 0
|
||||
self.propagation_transfer_limit = None
|
||||
self.handled_messages_queue = deque()
|
||||
self.unhandled_messages_queue = deque()
|
||||
|
||||
self.offered = 0 # Messages offered to this peer
|
||||
self.outgoing = 0 # Messages transferred to this peer
|
||||
self.incoming = 0 # Messages received from this peer
|
||||
self.rx_bytes = 0 # Bytes received from this peer
|
||||
self.tx_bytes = 0 # Bytes sent to this peer
|
||||
|
||||
self._hm_count = 0
|
||||
self._um_count = 0
|
||||
self._hm_counts_synced = False
|
||||
self._um_counts_synced = False
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
self.unhandled_messages = {}
|
||||
self.handled_messages = {}
|
||||
self.last_offer = []
|
||||
|
||||
self.router = router
|
||||
self.destination_hash = destination_hash
|
||||
self.identity = RNS.Identity.recall(destination_hash)
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
if self.identity != None:
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
else:
|
||||
self.destination = None
|
||||
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
|
||||
|
||||
def sync(self):
|
||||
|
||||
RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
|
||||
self.last_sync_attempt = time.time()
|
||||
|
||||
@ -110,9 +204,10 @@ class LXMPeer:
|
||||
else:
|
||||
if self.identity == None:
|
||||
self.identity = RNS.Identity.recall(destination_hash)
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
if self.identity != None:
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
|
||||
if self.identity != None:
|
||||
if self.destination != None:
|
||||
if len(self.unhandled_messages) > 0:
|
||||
if self.state == LXMPeer.IDLE:
|
||||
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
|
||||
@ -128,23 +223,49 @@ class LXMPeer:
|
||||
self.sync_backoff = 0
|
||||
|
||||
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
|
||||
unhandled_entries = []
|
||||
unhandled_ids = []
|
||||
purged_ids = []
|
||||
for transient_id in self.unhandled_messages:
|
||||
if transient_id in self.router.propagation_entries:
|
||||
unhandled_ids.append(transient_id)
|
||||
unhandled_entry = [
|
||||
transient_id,
|
||||
self.router.get_weight(transient_id),
|
||||
self.router.get_size(transient_id),
|
||||
]
|
||||
unhandled_entries.append(unhandled_entry)
|
||||
else:
|
||||
purged_ids.append(transient_id)
|
||||
|
||||
for transient_id in purged_ids:
|
||||
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
|
||||
self.unhandled_messages.pop(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
|
||||
unhandled_entries.sort(key=lambda e: e[1], reverse=False)
|
||||
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
|
||||
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
|
||||
for unhandled_entry in unhandled_entries:
|
||||
transient_id = unhandled_entry[0]
|
||||
weight = unhandled_entry[1]
|
||||
lxm_size = unhandled_entry[2]
|
||||
next_size = cumulative_size + (lxm_size+per_message_overhead)
|
||||
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
|
||||
if lxm_size+per_message_overhead > (self.propagation_transfer_limit*1000):
|
||||
self.remove_unhandled_message(transient_id)
|
||||
self.add_handled_message(transient_id)
|
||||
RNS.log(f"Message {RNS.prettyhexrep(transient_id)} exceeds transfer limit for {self}, considering handled", RNS.LOG_DEBUG)
|
||||
else:
|
||||
cumulative_size += (lxm_size+per_message_overhead)
|
||||
unhandled_ids.append(transient_id)
|
||||
|
||||
RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_VERBOSE)
|
||||
self.last_offer = unhandled_ids
|
||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||
self.state = LXMPeer.REQUEST_SENT
|
||||
|
||||
else:
|
||||
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
|
||||
|
||||
else:
|
||||
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
|
||||
if self.last_sync_attempt > self.last_heard:
|
||||
@ -154,8 +275,8 @@ class LXMPeer:
|
||||
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
else:
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def offer_response(self, request_receipt):
|
||||
try:
|
||||
@ -167,44 +288,48 @@ class LXMPeer:
|
||||
|
||||
if response == LXMPeer.ERROR_NO_IDENTITY:
|
||||
if self.link != None:
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG)
|
||||
self.link.indentify()
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE)
|
||||
self.link.identify()
|
||||
self.state = LXMPeer.LINK_READY
|
||||
self.sync()
|
||||
return
|
||||
|
||||
elif response == LXMPeer.ERROR_NO_ACCESS:
|
||||
RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE)
|
||||
self.router.unpeer(self.destination_hash)
|
||||
return
|
||||
|
||||
elif response == False:
|
||||
# Peer already has all advertised messages
|
||||
for transient_id in self.unhandled_messages:
|
||||
message_entry = self.unhandled_messages[transient_id]
|
||||
self.handled_messages[transient_id] = message_entry
|
||||
|
||||
self.unhandled_messages = {}
|
||||
for transient_id in self.last_offer:
|
||||
if transient_id in self.unhandled_messages:
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
|
||||
elif response == True:
|
||||
# Peer wants all advertised messages
|
||||
for transient_id in self.unhandled_messages:
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
for transient_id in self.last_offer:
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
else:
|
||||
# Peer wants some advertised messages
|
||||
peer_had_messages = []
|
||||
for transient_id in self.unhandled_messages.copy():
|
||||
for transient_id in self.last_offer.copy():
|
||||
# If the peer did not want the message, it has
|
||||
# already received it from another peer.
|
||||
if not transient_id in response:
|
||||
message_entry = self.unhandled_messages.pop(transient_id)
|
||||
self.handled_messages[transient_id] = message_entry
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
for transient_id in response:
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
if len(wanted_messages) > 0:
|
||||
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG)
|
||||
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_VERBOSE)
|
||||
|
||||
lxm_list = []
|
||||
|
||||
for message_entry in wanted_messages:
|
||||
file_path = message_entry[1]
|
||||
if os.path.isfile(file_path):
|
||||
@ -216,9 +341,16 @@ class LXMPeer:
|
||||
data = msgpack.packb([time.time(), lxm_list])
|
||||
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
||||
resource.transferred_messages = wanted_message_ids
|
||||
resource.sync_transfer_started = time.time()
|
||||
self.state = LXMPeer.RESOURCE_TRANSFERRING
|
||||
|
||||
else:
|
||||
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
|
||||
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_VERBOSE)
|
||||
self.offered += len(self.last_offer)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
except Exception as e:
|
||||
@ -231,28 +363,44 @@ class LXMPeer:
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
|
||||
def resource_concluded(self, resource):
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
for transient_id in resource.transferred_messages:
|
||||
message = self.unhandled_messages.pop(transient_id)
|
||||
self.handled_messages[transient_id] = message
|
||||
self.state = LXMPeer.IDLE
|
||||
self.link.teardown()
|
||||
RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed", RNS.LOG_DEBUG)
|
||||
self.alive = True
|
||||
self.last_heard = time.time()
|
||||
else:
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
else:
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
rate_str = ""
|
||||
if hasattr(resource, "sync_transfer_started") and resource.sync_transfer_started:
|
||||
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started)
|
||||
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
|
||||
|
||||
RNS.log(f"Syncing {len(resource.transferred_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
|
||||
self.alive = True
|
||||
self.last_heard = time.time()
|
||||
self.offered += len(self.last_offer)
|
||||
self.outgoing += len(resource.transferred_messages)
|
||||
self.tx_bytes += resource.get_data_size()
|
||||
|
||||
else:
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def link_established(self, link):
|
||||
self.link.identify(self.router.identity)
|
||||
link_establishment_rate = link.get_establishment_rate()
|
||||
if link_establishment_rate != None:
|
||||
self.link_establishment_rate = link_establishment_rate
|
||||
|
||||
self.state = LXMPeer.LINK_READY
|
||||
self.next_sync_attempt = 0
|
||||
self.sync()
|
||||
@ -261,11 +409,103 @@ class LXMPeer:
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def handle_message(self, transient_id):
|
||||
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
|
||||
# TODO: Remove at some point
|
||||
RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was added to distribution queue for "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_EXTREME)
|
||||
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
|
||||
def queued_items(self):
|
||||
return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0
|
||||
|
||||
def queue_unhandled_message(self, transient_id):
|
||||
self.unhandled_messages_queue.append(transient_id)
|
||||
|
||||
def queue_handled_message(self, transient_id):
|
||||
self.handled_messages_queue.append(transient_id)
|
||||
|
||||
def process_queues(self):
|
||||
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
|
||||
# TODO: Remove debug
|
||||
# st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue)
|
||||
|
||||
handled_messages = self.handled_messages
|
||||
unhandled_messages = self.unhandled_messages
|
||||
|
||||
while len(self.handled_messages_queue) > 0:
|
||||
transient_id = self.handled_messages_queue.pop()
|
||||
if not transient_id in handled_messages:
|
||||
self.add_handled_message(transient_id)
|
||||
if transient_id in unhandled_messages:
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
while len(self.unhandled_messages_queue) > 0:
|
||||
transient_id = self.unhandled_messages_queue.pop()
|
||||
if not transient_id in handled_messages and not transient_id in unhandled_messages:
|
||||
self.add_unhandled_message(transient_id)
|
||||
|
||||
del handled_messages, unhandled_messages
|
||||
# TODO: Remove debug
|
||||
# RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}")
|
||||
|
||||
@property
|
||||
def handled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
hm = list(filter(lambda tid: self.destination_hash in pes[tid][4], pes))
|
||||
self._hm_count = len(hm); del pes
|
||||
self._hm_counts_synced = True
|
||||
return hm
|
||||
|
||||
@property
|
||||
def unhandled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
um = list(filter(lambda tid: self.destination_hash in pes[tid][5], pes))
|
||||
self._um_count = len(um); del pes
|
||||
self._um_counts_synced = True
|
||||
return um
|
||||
|
||||
@property
|
||||
def handled_message_count(self):
|
||||
if not self._hm_counts_synced:
|
||||
self._update_counts()
|
||||
|
||||
return self._hm_count
|
||||
|
||||
@property
|
||||
def unhandled_message_count(self):
|
||||
if not self._um_counts_synced:
|
||||
self._update_counts()
|
||||
|
||||
return self._um_count
|
||||
|
||||
@property
|
||||
def acceptance_rate(self):
|
||||
return 0 if self.offered == 0 else (self.outgoing/self.offered)
|
||||
|
||||
def _update_counts(self):
|
||||
if not self._hm_counts_synced:
|
||||
hm = self.handled_messages; del hm
|
||||
|
||||
if not self._um_counts_synced:
|
||||
um = self.unhandled_messages; del um
|
||||
|
||||
def add_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].append(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def add_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].append(self.destination_hash)
|
||||
self._um_count += 1
|
||||
|
||||
def remove_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].remove(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def remove_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].remove(self.destination_hash)
|
||||
self._um_counts_synced = False
|
||||
|
||||
def __str__(self):
|
||||
if self.destination_hash:
|
||||
|
1525
LXMF/LXMRouter.py
1525
LXMF/LXMRouter.py
File diff suppressed because it is too large
Load Diff
@ -1,19 +1,25 @@
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
import os
|
||||
import time
|
||||
import base64
|
||||
import multiprocessing
|
||||
|
||||
import LXMF.LXStamper as LXStamper
|
||||
from .LXMF import APP_NAME
|
||||
|
||||
|
||||
class LXMessage:
|
||||
DRAFT = 0x00
|
||||
GENERATING = 0x00
|
||||
OUTBOUND = 0x01
|
||||
SENDING = 0x02
|
||||
SENT = 0x04
|
||||
DELIVERED = 0x08
|
||||
REJECTED = 0xFD
|
||||
CANCELLED = 0xFE
|
||||
FAILED = 0xFF
|
||||
states = [DRAFT, OUTBOUND, SENDING, SENT, DELIVERED, FAILED]
|
||||
states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, REJECTED, CANCELLED, FAILED]
|
||||
|
||||
UNKNOWN = 0x00
|
||||
PACKET = 0x01
|
||||
@ -32,19 +38,33 @@ class LXMessage:
|
||||
|
||||
DESTINATION_LENGTH = RNS.Identity.TRUNCATED_HASHLENGTH//8
|
||||
SIGNATURE_LENGTH = RNS.Identity.SIGLENGTH//8
|
||||
TICKET_LENGTH = RNS.Identity.TRUNCATED_HASHLENGTH//8
|
||||
|
||||
# LXMF overhead is 99 bytes per message:
|
||||
# 10 bytes for destination hash
|
||||
# 10 bytes for source hash
|
||||
# Default ticket expiry is 3 weeks, with an
|
||||
# additional grace period of 5 days, allowing
|
||||
# for timekeeping inaccuracies. Tickets will
|
||||
# automatically renew when there is less than
|
||||
# 14 days to expiry.
|
||||
TICKET_EXPIRY = 21*24*60*60
|
||||
TICKET_GRACE = 5*24*60*60
|
||||
TICKET_RENEW = 14*24*60*60
|
||||
TICKET_INTERVAL = 1*24*60*60
|
||||
COST_TICKET = 0x100
|
||||
|
||||
# LXMF overhead is 112 bytes per message:
|
||||
# 16 bytes for destination hash
|
||||
# 16 bytes for source hash
|
||||
# 64 bytes for Ed25519 signature
|
||||
# 8 bytes for timestamp
|
||||
# 7 bytes for msgpack structure
|
||||
LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + 8 + 7
|
||||
# 8 bytes for msgpack structure
|
||||
TIMESTAMP_SIZE = 8
|
||||
STRUCT_OVERHEAD = 8
|
||||
LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + TIMESTAMP_SIZE + STRUCT_OVERHEAD
|
||||
|
||||
# With an MTU of 500, the maximum amount of data
|
||||
# we can send in a single encrypted packet is
|
||||
# 383 bytes.
|
||||
ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU
|
||||
# 391 bytes.
|
||||
ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU + TIMESTAMP_SIZE
|
||||
|
||||
# The max content length we can fit in LXMF message
|
||||
# inside a single RNS packet is the encrypted MDU, minus
|
||||
@ -53,7 +73,7 @@ class LXMessage:
|
||||
# field of the packet, therefore we also add the length
|
||||
# of a destination hash to the calculation. With default
|
||||
# RNS and LXMF parameters, the largest single-packet
|
||||
# LXMF message we can send is 294 bytes. If a message
|
||||
# LXMF message we can send is 295 bytes. If a message
|
||||
# is larger than that, a Reticulum link will be used.
|
||||
ENCRYPTED_PACKET_MAX_CONTENT = ENCRYPTED_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
|
||||
|
||||
@ -63,13 +83,13 @@ class LXMessage:
|
||||
LINK_PACKET_MDU = RNS.Link.MDU
|
||||
|
||||
# Which means that we can deliver single-packet LXMF
|
||||
# messages with content of up to 332 bytes over a link.
|
||||
# messages with content of up to 319 bytes over a link.
|
||||
# If a message is larger than that, LXMF will sequence
|
||||
# and transfer it as a RNS resource over the link instead.
|
||||
LINK_PACKET_MAX_CONTENT = LINK_PACKET_MDU - LXMF_OVERHEAD
|
||||
|
||||
# For plain packets without encryption, we can
|
||||
# fit up to 388 bytes of content.
|
||||
# fit up to 368 bytes of content.
|
||||
PLAIN_PACKET_MDU = RNS.Packet.PLAIN_MDU
|
||||
PLAIN_PACKET_MAX_CONTENT = PLAIN_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
|
||||
|
||||
@ -90,7 +110,8 @@ class LXMessage:
|
||||
else:
|
||||
return "<LXMessage>"
|
||||
|
||||
def __init__(self, destination, source, content = "", title = "", fields = None, desired_method = None, destination_hash = None, source_hash = None):
|
||||
def __init__(self, destination, source, content = "", title = "", fields = None, desired_method = None, destination_hash = None, source_hash = None, stamp_cost=None, include_ticket=False):
|
||||
|
||||
if isinstance(destination, RNS.Destination) or destination == None:
|
||||
self.__destination = destination
|
||||
if destination != None:
|
||||
@ -109,19 +130,41 @@ class LXMessage:
|
||||
else:
|
||||
raise ValueError("LXMessage initialised with invalid source")
|
||||
|
||||
self.set_title_from_string(title)
|
||||
self.set_content_from_string(content)
|
||||
if title == None:
|
||||
title = ""
|
||||
|
||||
if type(title) == bytes:
|
||||
self.set_title_from_bytes(title)
|
||||
else:
|
||||
self.set_title_from_string(title)
|
||||
|
||||
if type(content) == bytes:
|
||||
self.set_content_from_bytes(content)
|
||||
else:
|
||||
self.set_content_from_string(content)
|
||||
|
||||
self.set_fields(fields)
|
||||
|
||||
self.payload = None
|
||||
self.timestamp = None
|
||||
self.signature = None
|
||||
self.hash = None
|
||||
self.packed = None
|
||||
self.progress = None
|
||||
self.state = LXMessage.DRAFT
|
||||
self.method = LXMessage.UNKNOWN
|
||||
self.payload = None
|
||||
self.timestamp = None
|
||||
self.signature = None
|
||||
self.hash = None
|
||||
self.packed = None
|
||||
self.state = LXMessage.GENERATING
|
||||
self.method = LXMessage.UNKNOWN
|
||||
self.progress = 0.0
|
||||
self.rssi = None
|
||||
self.snr = None
|
||||
self.q = None
|
||||
|
||||
self.stamp = None
|
||||
self.stamp_cost = stamp_cost
|
||||
self.stamp_value = None
|
||||
self.stamp_valid = False
|
||||
self.stamp_checked = False
|
||||
self.defer_stamp = True
|
||||
self.outbound_ticket = None
|
||||
self.include_ticket = include_ticket
|
||||
|
||||
self.propagation_packed = None
|
||||
self.paper_packed = None
|
||||
@ -129,17 +172,21 @@ class LXMessage:
|
||||
self.incoming = False
|
||||
self.signature_validated = False
|
||||
self.unverified_reason = None
|
||||
self.ratchet_id = None
|
||||
|
||||
self.representation = LXMessage.UNKNOWN
|
||||
self.desired_method = desired_method
|
||||
self.delivery_attempts = 0
|
||||
self.transport_encrypted = False
|
||||
self.transport_encryption = None
|
||||
self.ratchet_id = None
|
||||
self.packet_representation = None
|
||||
self.resource_representation = None
|
||||
self.__delivery_destination = None
|
||||
self.__delivery_callback = None
|
||||
self.failed_callback = None
|
||||
self.failed_callback = None
|
||||
|
||||
self.deferred_stamp_generating = False
|
||||
|
||||
def set_title_from_string(self, title_string):
|
||||
self.title = title_string.encode("utf-8")
|
||||
@ -157,16 +204,31 @@ class LXMessage:
|
||||
self.content = content_bytes
|
||||
|
||||
def content_as_string(self):
|
||||
return self.content.decode("utf-8")
|
||||
try:
|
||||
return self.content.decode("utf-8")
|
||||
except Exception as e:
|
||||
RNS.log(f"{self} could not decode message content as string: {e}")
|
||||
return None
|
||||
|
||||
def set_fields(self, fields):
|
||||
if isinstance(fields, dict) or fields == None:
|
||||
self.fields = fields
|
||||
self.fields = fields or {}
|
||||
else:
|
||||
raise ValueError("LXMessage property \"fields\" can only be dict or None")
|
||||
|
||||
def get_fields(self):
|
||||
return self.__fields
|
||||
return self.fields
|
||||
|
||||
@property
|
||||
def destination(self):
|
||||
return self.__destination
|
||||
|
||||
@destination.setter
|
||||
def destination(self, destination):
|
||||
self.set_destination(destination)
|
||||
|
||||
def get_destination(self):
|
||||
return self.destination
|
||||
|
||||
def set_destination(self, destination):
|
||||
if self.destination == None:
|
||||
@ -177,8 +239,16 @@ class LXMessage:
|
||||
else:
|
||||
raise ValueError("Cannot reassign destination on LXMessage")
|
||||
|
||||
def get_destination(self):
|
||||
return self.__destination
|
||||
@property
|
||||
def source(self):
|
||||
return self.__source
|
||||
|
||||
@source.setter
|
||||
def source(self, source):
|
||||
self.set_source(source)
|
||||
|
||||
def get_source(self):
|
||||
return self.source
|
||||
|
||||
def set_source(self, source):
|
||||
if self.source == None:
|
||||
@ -189,9 +259,6 @@ class LXMessage:
|
||||
else:
|
||||
raise ValueError("Cannot reassign source on LXMessage")
|
||||
|
||||
def get_source(self):
|
||||
return self.__source
|
||||
|
||||
def set_delivery_destination(self, delivery_destination):
|
||||
self.__delivery_destination = delivery_destination
|
||||
|
||||
@ -201,6 +268,71 @@ class LXMessage:
|
||||
def register_failed_callback(self, callback):
|
||||
self.failed_callback = callback
|
||||
|
||||
@staticmethod
|
||||
def stamp_valid(stamp, target_cost, workblock):
|
||||
target = 0b1 << 256-target_cost
|
||||
result = RNS.Identity.full_hash(workblock+stamp)
|
||||
if int.from_bytes(result, byteorder="big") > target:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def validate_stamp(self, target_cost, tickets=None):
|
||||
if tickets != None:
|
||||
for ticket in tickets:
|
||||
try:
|
||||
if self.stamp == RNS.Identity.truncated_hash(ticket+self.message_id):
|
||||
RNS.log(f"Stamp on {self} validated by inbound ticket", RNS.LOG_DEBUG) # TODO: Remove at some point
|
||||
self.stamp_value = LXMessage.COST_TICKET
|
||||
return True
|
||||
except Exception as e:
|
||||
RNS.log(f"Error while validating ticket: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
if self.stamp == None:
|
||||
return False
|
||||
else:
|
||||
workblock = LXStamper.stamp_workblock(self.message_id)
|
||||
if LXMessage.stamp_valid(self.stamp, target_cost, workblock):
|
||||
RNS.log(f"Stamp on {self} validated", RNS.LOG_DEBUG) # TODO: Remove at some point
|
||||
self.stamp_value = LXStamper.stamp_value(workblock, self.stamp)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_stamp(self, timeout=None):
|
||||
# If an outbound ticket exists, use this for
|
||||
# generating a valid stamp.
|
||||
if self.outbound_ticket != None and type(self.outbound_ticket) == bytes and len(self.outbound_ticket) == LXMessage.TICKET_LENGTH:
|
||||
generated_stamp = RNS.Identity.truncated_hash(self.outbound_ticket+self.message_id)
|
||||
self.stamp_value = LXMessage.COST_TICKET
|
||||
RNS.log(f"Generated stamp with outbound ticket {RNS.hexrep(self.outbound_ticket)} for {self}", RNS.LOG_DEBUG) # TODO: Remove at some point
|
||||
return generated_stamp
|
||||
|
||||
# If no stamp cost is required, we can just
|
||||
# return immediately.
|
||||
elif self.stamp_cost == None:
|
||||
self.stamp_value = None
|
||||
return None
|
||||
|
||||
# If a stamp was already generated, return
|
||||
# it immediately.
|
||||
elif self.stamp != None:
|
||||
return self.stamp
|
||||
|
||||
# Otherwise, we will need to generate a
|
||||
# valid stamp according to the cost that
|
||||
# the receiver has specified.
|
||||
else:
|
||||
generated_stamp, value = LXStamper.generate_stamp(self.message_id, self.stamp_cost)
|
||||
if generated_stamp:
|
||||
self.stamp_value = value
|
||||
self.stamp_valid = True
|
||||
return generated_stamp
|
||||
|
||||
else:
|
||||
return None
|
||||
|
||||
def pack(self):
|
||||
if not self.packed:
|
||||
if self.timestamp == None:
|
||||
@ -217,6 +349,11 @@ class LXMessage:
|
||||
hashed_part += msgpack.packb(self.payload)
|
||||
self.hash = RNS.Identity.full_hash(hashed_part)
|
||||
self.message_id = self.hash
|
||||
|
||||
if not self.defer_stamp:
|
||||
self.stamp = self.get_stamp()
|
||||
if self.stamp != None:
|
||||
self.payload.append(self.stamp)
|
||||
|
||||
signed_part = b""
|
||||
signed_part += hashed_part
|
||||
@ -224,21 +361,29 @@ class LXMessage:
|
||||
self.signature = self.__source.sign(signed_part)
|
||||
self.signature_validated = True
|
||||
|
||||
packed_payload = msgpack.packb(self.payload)
|
||||
self.packed = b""
|
||||
self.packed += self.__destination.hash
|
||||
self.packed += self.__source.hash
|
||||
self.packed += self.signature
|
||||
packed_payload = msgpack.packb(self.payload)
|
||||
self.packed += packed_payload
|
||||
self.packed_size = len(self.packed)
|
||||
content_size = len(packed_payload)
|
||||
content_size = len(packed_payload)-LXMessage.TIMESTAMP_SIZE-LXMessage.STRUCT_OVERHEAD
|
||||
|
||||
# If no desired delivery method has been defined,
|
||||
# one will be chosen according to these rules:
|
||||
if self.desired_method == None:
|
||||
self.desired_method = LXMessage.DIRECT
|
||||
# TODO: Expand rules to something more intelligent
|
||||
|
||||
# If opportunistic delivery was requested, check
|
||||
# that message will fit within packet size limits
|
||||
if self.desired_method == LXMessage.OPPORTUNISTIC:
|
||||
if self.__destination.type == RNS.Destination.SINGLE:
|
||||
if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT:
|
||||
RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
|
||||
self.desired_method = LXMessage.DIRECT
|
||||
|
||||
# Set delivery parameters according to delivery method
|
||||
if self.desired_method == LXMessage.OPPORTUNISTIC:
|
||||
if self.__destination.type == RNS.Destination.SINGLE:
|
||||
single_packet_content_limit = LXMessage.ENCRYPTED_PACKET_MAX_CONTENT
|
||||
@ -246,7 +391,7 @@ class LXMessage:
|
||||
single_packet_content_limit = LXMessage.PLAIN_PACKET_MAX_CONTENT
|
||||
|
||||
if content_size > single_packet_content_limit:
|
||||
raise TypeError("LXMessage desired opportunistic delivery method, but content exceeds single-packet size.")
|
||||
raise TypeError(f"LXMessage desired opportunistic delivery method, but content of length {content_size} exceeds single-packet content limit of {single_packet_content_limit}.")
|
||||
else:
|
||||
self.method = LXMessage.OPPORTUNISTIC
|
||||
self.representation = LXMessage.PACKET
|
||||
@ -265,6 +410,7 @@ class LXMessage:
|
||||
single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT
|
||||
|
||||
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
|
||||
self.ratchet_id = self.__destination.latest_ratchet_id
|
||||
self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]])
|
||||
|
||||
content_size = len(self.propagation_packed)
|
||||
@ -279,6 +425,7 @@ class LXMessage:
|
||||
paper_content_limit = LXMessage.PAPER_MDU
|
||||
|
||||
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
|
||||
self.ratchet_id = self.__destination.latest_ratchet_id
|
||||
self.paper_packed = self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data
|
||||
|
||||
content_size = len(self.paper_packed)
|
||||
@ -295,23 +442,31 @@ class LXMessage:
|
||||
self.determine_transport_encryption()
|
||||
|
||||
if self.method == LXMessage.OPPORTUNISTIC:
|
||||
self.__as_packet().send().set_delivery_callback(self.__mark_delivered)
|
||||
lxm_packet = self.__as_packet()
|
||||
lxm_packet.send().set_delivery_callback(self.__mark_delivered)
|
||||
self.progress = 0.50
|
||||
self.ratchet_id = lxm_packet.ratchet_id
|
||||
self.state = LXMessage.SENT
|
||||
|
||||
elif self.method == LXMessage.DIRECT:
|
||||
self.state = LXMessage.SENDING
|
||||
|
||||
if self.representation == LXMessage.PACKET:
|
||||
receipt = self.__as_packet().send()
|
||||
lxm_packet = self.__as_packet()
|
||||
receipt = lxm_packet.send()
|
||||
self.ratchet_id = self.__delivery_destination.link_id
|
||||
if receipt:
|
||||
receipt.set_delivery_callback(self.__mark_delivered)
|
||||
receipt.set_timeout_callback(self.__link_packet_timed_out)
|
||||
self.progress = 0.50
|
||||
else:
|
||||
if self.__delivery_destination:
|
||||
self.__delivery_destination.teardown()
|
||||
|
||||
elif self.representation == LXMessage.RESOURCE:
|
||||
self.resource_representation = self.__as_resource()
|
||||
self.ratchet_id = self.__delivery_destination.link_id
|
||||
self.progress = 0.10
|
||||
|
||||
elif self.method == LXMessage.PROPAGATED:
|
||||
self.state = LXMessage.SENDING
|
||||
@ -321,14 +476,19 @@ class LXMessage:
|
||||
if receipt:
|
||||
receipt.set_delivery_callback(self.__mark_propagated)
|
||||
receipt.set_timeout_callback(self.__link_packet_timed_out)
|
||||
self.progress = 0.50
|
||||
else:
|
||||
self.__delivery_destination.teardown()
|
||||
|
||||
elif self.representation == LXMessage.RESOURCE:
|
||||
self.resource_representation = self.__as_resource()
|
||||
self.progress = 0.10
|
||||
|
||||
|
||||
def determine_transport_encryption(self):
|
||||
# TODO: These descriptions are old and outdated.
|
||||
# Update the transport encryption descriptions to
|
||||
# account for ratchets and other changes.
|
||||
if self.method == LXMessage.OPPORTUNISTIC:
|
||||
if self.__destination.type == RNS.Destination.SINGLE:
|
||||
self.transport_encrypted = True
|
||||
@ -369,56 +529,67 @@ class LXMessage:
|
||||
def __mark_delivered(self, receipt = None):
|
||||
RNS.log("Received delivery notification for "+str(self), RNS.LOG_DEBUG)
|
||||
self.state = LXMessage.DELIVERED
|
||||
self.progress = 1.0
|
||||
|
||||
if self.__delivery_callback != None and callable(self.__delivery_callback):
|
||||
try:
|
||||
self.__delivery_callback(self)
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
|
||||
RNS.log("An error occurred in the external delivery callback for "+str(self), RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
def __mark_propagated(self, receipt = None):
|
||||
RNS.log("Received propagation success notification for "+str(self), RNS.LOG_DEBUG)
|
||||
self.state = LXMessage.SENT
|
||||
self.progress = 1.0
|
||||
|
||||
if self.__delivery_callback != None and callable(self.__delivery_callback):
|
||||
try:
|
||||
self.__delivery_callback(self)
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
|
||||
RNS.log("An error occurred in the external delivery callback for "+str(self), RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
def __mark_paper_generated(self, receipt = None):
|
||||
RNS.log("Paper message generation succeeded for "+str(self), RNS.LOG_DEBUG)
|
||||
self.state = LXMessage.PAPER
|
||||
self.progress = 1.0
|
||||
|
||||
if self.__delivery_callback != None and callable(self.__delivery_callback):
|
||||
try:
|
||||
self.__delivery_callback(self)
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
|
||||
RNS.log("An error occurred in the external delivery callback for "+str(self), RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
def __resource_concluded(self, resource):
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
self.__mark_delivered()
|
||||
else:
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
if resource.status == RNS.Resource.REJECTED:
|
||||
self.state = LXMessage.REJECTED
|
||||
|
||||
elif self.state != LXMessage.CANCELLED:
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
def __propagation_resource_concluded(self, resource):
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
self.__mark_propagated()
|
||||
else:
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
if self.state != LXMessage.CANCELLED:
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
def __link_packet_timed_out(self, packet_receipt):
|
||||
if packet_receipt:
|
||||
packet_receipt.destination.teardown()
|
||||
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
if self.state != LXMessage.CANCELLED:
|
||||
if packet_receipt:
|
||||
packet_receipt.destination.teardown()
|
||||
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
def __update_transfer_progress(self, resource):
|
||||
self.progress = resource.get_progress()
|
||||
self.progress = 0.10 + (resource.get_progress()*0.90)
|
||||
|
||||
def __as_packet(self):
|
||||
if not self.packed:
|
||||
@ -447,8 +618,6 @@ class LXMessage:
|
||||
if not self.__delivery_destination.status == RNS.Link.ACTIVE:
|
||||
raise ConnectionError("Tried to synthesize resource for LXMF message on a link that was not active")
|
||||
|
||||
self.progress = 0.0
|
||||
|
||||
if self.method == LXMessage.DIRECT:
|
||||
return RNS.Resource(self.packed, self.__delivery_destination, callback = self.__resource_concluded, progress_callback = self.__update_transfer_progress)
|
||||
elif self.method == LXMessage.PROPAGATED:
|
||||
@ -487,7 +656,6 @@ class LXMessage:
|
||||
return None
|
||||
|
||||
def as_uri(self, finalise=True):
|
||||
self.determine_transport_encryption()
|
||||
if not self.packed:
|
||||
self.pack()
|
||||
|
||||
@ -499,6 +667,7 @@ class LXMessage:
|
||||
lxm_uri = LXMessage.URI_SCHEMA+"://"+encoded_bytes.decode("utf-8").replace("=","")
|
||||
|
||||
if finalise:
|
||||
self.determine_transport_encryption()
|
||||
self.__mark_paper_generated()
|
||||
|
||||
return lxm_uri
|
||||
@ -507,7 +676,6 @@ class LXMessage:
|
||||
raise TypeError("Attempt to represent LXM with non-paper delivery method as URI")
|
||||
|
||||
def as_qr(self):
|
||||
self.determine_transport_encryption()
|
||||
if not self.packed:
|
||||
self.pack()
|
||||
|
||||
@ -522,6 +690,7 @@ class LXMessage:
|
||||
data = self.as_uri(finalise=False),
|
||||
)
|
||||
|
||||
self.determine_transport_encryption()
|
||||
self.__mark_paper_generated()
|
||||
|
||||
return qr
|
||||
@ -540,10 +709,19 @@ class LXMessage:
|
||||
source_hash = lxmf_bytes[LXMessage.DESTINATION_LENGTH:2*LXMessage.DESTINATION_LENGTH]
|
||||
signature = lxmf_bytes[2*LXMessage.DESTINATION_LENGTH:2*LXMessage.DESTINATION_LENGTH+LXMessage.SIGNATURE_LENGTH]
|
||||
packed_payload = lxmf_bytes[2*LXMessage.DESTINATION_LENGTH+LXMessage.SIGNATURE_LENGTH:]
|
||||
unpacked_payload = msgpack.unpackb(packed_payload)
|
||||
|
||||
# Extract stamp from payload if included
|
||||
if len(unpacked_payload) > 4:
|
||||
stamp = unpacked_payload[4]
|
||||
unpacked_payload = unpacked_payload[:4]
|
||||
packed_payload = msgpack.packb(unpacked_payload)
|
||||
else:
|
||||
stamp = None
|
||||
|
||||
hashed_part = b"" + destination_hash + source_hash + packed_payload
|
||||
message_hash = RNS.Identity.full_hash(hashed_part)
|
||||
signed_part = b"" + hashed_part + message_hash
|
||||
unpacked_payload = msgpack.unpackb(packed_payload)
|
||||
timestamp = unpacked_payload[0]
|
||||
title_bytes = unpacked_payload[1]
|
||||
content_bytes = unpacked_payload[2]
|
||||
@ -572,7 +750,9 @@ class LXMessage:
|
||||
desired_method = original_method)
|
||||
|
||||
message.hash = message_hash
|
||||
message.message_id = message.hash
|
||||
message.signature = signature
|
||||
message.stamp = stamp
|
||||
message.incoming = True
|
||||
message.timestamp = timestamp
|
||||
message.packed = lxmf_bytes
|
||||
|
328
LXMF/LXStamper.py
Normal file
328
LXMF/LXStamper.py
Normal file
@ -0,0 +1,328 @@
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
import os
|
||||
import time
|
||||
import multiprocessing
|
||||
|
||||
WORKBLOCK_EXPAND_ROUNDS = 3000
|
||||
|
||||
active_jobs = {}
|
||||
|
||||
def stamp_workblock(message_id):
|
||||
wb_st = time.time()
|
||||
expand_rounds = WORKBLOCK_EXPAND_ROUNDS
|
||||
workblock = b""
|
||||
for n in range(expand_rounds):
|
||||
workblock += RNS.Cryptography.hkdf(
|
||||
length=256,
|
||||
derive_from=message_id,
|
||||
salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)),
|
||||
context=None,
|
||||
)
|
||||
wb_time = time.time() - wb_st
|
||||
RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)
|
||||
|
||||
return workblock
|
||||
|
||||
def stamp_value(workblock, stamp):
|
||||
value = 0
|
||||
bits = 256
|
||||
material = RNS.Identity.full_hash(workblock+stamp)
|
||||
i = int.from_bytes(material, byteorder="big")
|
||||
while ((i & (1 << (bits - 1))) == 0):
|
||||
i = (i << 1)
|
||||
value += 1
|
||||
|
||||
return value
|
||||
|
||||
def generate_stamp(message_id, stamp_cost):
|
||||
RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG)
|
||||
workblock = stamp_workblock(message_id)
|
||||
|
||||
start_time = time.time()
|
||||
stamp = None
|
||||
rounds = 0
|
||||
value = 0
|
||||
|
||||
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
|
||||
stamp, rounds = job_simple(stamp_cost, workblock, message_id)
|
||||
|
||||
elif RNS.vendor.platformutils.is_android():
|
||||
stamp, rounds = job_android(stamp_cost, workblock, message_id)
|
||||
|
||||
else:
|
||||
stamp, rounds = job_linux(stamp_cost, workblock, message_id)
|
||||
|
||||
duration = time.time() - start_time
|
||||
speed = rounds/duration
|
||||
if stamp != None:
|
||||
value = stamp_value(workblock, stamp)
|
||||
|
||||
RNS.log(f"Stamp with value {value} generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG)
|
||||
|
||||
return stamp, value
|
||||
|
||||
def cancel_work(message_id):
|
||||
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
|
||||
try:
|
||||
if message_id in active_jobs:
|
||||
active_jobs[message_id] = True
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
elif RNS.vendor.platformutils.is_android():
|
||||
try:
|
||||
if message_id in active_jobs:
|
||||
active_jobs[message_id] = True
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
else:
|
||||
try:
|
||||
if message_id in active_jobs:
|
||||
stop_event = active_jobs[message_id][0]
|
||||
result_queue = active_jobs[message_id][1]
|
||||
stop_event.set()
|
||||
result_queue.put(None)
|
||||
active_jobs.pop(message_id)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
def job_simple(stamp_cost, workblock, message_id):
|
||||
# A simple, single-process stamp generator.
|
||||
# should work on any platform, and is used
|
||||
# as a fall-back, in case of limited multi-
|
||||
# processing and/or acceleration support.
|
||||
|
||||
platform = RNS.vendor.platformutils.get_platform()
|
||||
RNS.log(f"Running stamp generation on {platform}, work limited to single CPU core. This will be slower than ideal.", RNS.LOG_WARNING)
|
||||
|
||||
rounds = 0
|
||||
pstamp = os.urandom(256//8)
|
||||
st = time.time()
|
||||
|
||||
active_jobs[message_id] = False;
|
||||
|
||||
def sv(s, c, w):
|
||||
target = 0b1<<256-c; m = w+s
|
||||
result = RNS.Identity.full_hash(m)
|
||||
if int.from_bytes(result, byteorder="big") > target:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
while not sv(pstamp, stamp_cost, workblock) and not active_jobs[message_id]:
|
||||
pstamp = os.urandom(256//8); rounds += 1
|
||||
if rounds % 2500 == 0:
|
||||
speed = rounds / (time.time()-st)
|
||||
RNS.log(f"Stamp generation running. {rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG)
|
||||
|
||||
if active_jobs[message_id] == True:
|
||||
pstamp = None
|
||||
|
||||
active_jobs.pop(message_id)
|
||||
|
||||
return pstamp, rounds
|
||||
|
||||
def job_linux(stamp_cost, workblock, message_id):
|
||||
allow_kill = True
|
||||
stamp = None
|
||||
total_rounds = 0
|
||||
jobs = multiprocessing.cpu_count()
|
||||
stop_event = multiprocessing.Event()
|
||||
result_queue = multiprocessing.Queue(1)
|
||||
rounds_queue = multiprocessing.Queue()
|
||||
|
||||
def job(stop_event, pn, sc, wb):
|
||||
terminated = False
|
||||
rounds = 0
|
||||
pstamp = os.urandom(256//8)
|
||||
|
||||
def sv(s, c, w):
|
||||
target = 0b1<<256-c; m = w+s
|
||||
result = RNS.Identity.full_hash(m)
|
||||
if int.from_bytes(result, byteorder="big") > target:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
while not stop_event.is_set() and not sv(pstamp, sc, wb):
|
||||
pstamp = os.urandom(256//8); rounds += 1
|
||||
|
||||
if not stop_event.is_set():
|
||||
stop_event.set()
|
||||
result_queue.put(pstamp)
|
||||
rounds_queue.put(rounds)
|
||||
|
||||
job_procs = []
|
||||
RNS.log(f"Starting {jobs} stamp generation workers", RNS.LOG_DEBUG)
|
||||
for jpn in range(jobs):
|
||||
process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event, "pn": jpn, "sc": stamp_cost, "wb": workblock}, daemon=True)
|
||||
job_procs.append(process)
|
||||
process.start()
|
||||
|
||||
active_jobs[message_id] = [stop_event, result_queue]
|
||||
|
||||
stamp = result_queue.get()
|
||||
RNS.log("Got stamp result from worker", RNS.LOG_DEBUG) # TODO: Remove
|
||||
|
||||
# Collect any potential spurious
|
||||
# results from worker queue.
|
||||
try:
|
||||
while True:
|
||||
result_queue.get_nowait()
|
||||
except:
|
||||
pass
|
||||
|
||||
for j in range(jobs):
|
||||
nrounds = 0
|
||||
try:
|
||||
nrounds = rounds_queue.get(timeout=2)
|
||||
except Exception as e:
|
||||
RNS.log(f"Failed to get round stats part {j}: {e}", RNS.LOG_ERROR)
|
||||
total_rounds += nrounds
|
||||
|
||||
all_exited = False
|
||||
exit_timeout = time.time() + 5
|
||||
while time.time() < exit_timeout:
|
||||
if not any(p.is_alive() for p in job_procs):
|
||||
all_exited = True
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
if not all_exited:
|
||||
RNS.log("Stamp generation IPC timeout, possible worker deadlock. Terminating remaining processes.", RNS.LOG_ERROR)
|
||||
if allow_kill:
|
||||
for j in range(jobs):
|
||||
process = job_procs[j]
|
||||
process.kill()
|
||||
else:
|
||||
return None
|
||||
|
||||
else:
|
||||
for j in range(jobs):
|
||||
process = job_procs[j]
|
||||
process.join()
|
||||
# RNS.log(f"Joined {j} / {process}", RNS.LOG_DEBUG) # TODO: Remove
|
||||
|
||||
return stamp, total_rounds
|
||||
|
||||
def job_android(stamp_cost, workblock, message_id):
|
||||
# Semaphore support is flaky to non-existent on
|
||||
# Android, so we need to manually dispatch and
|
||||
# manage workloads here, while periodically
|
||||
# checking in on the progress.
|
||||
|
||||
stamp = None
|
||||
start_time = time.time()
|
||||
total_rounds = 0
|
||||
rounds_per_worker = 1000
|
||||
|
||||
use_nacl = False
|
||||
try:
|
||||
import nacl.encoding
|
||||
import nacl.hash
|
||||
use_nacl = True
|
||||
except:
|
||||
pass
|
||||
|
||||
if use_nacl:
|
||||
def full_hash(m):
|
||||
return nacl.hash.sha256(m, encoder=nacl.encoding.RawEncoder)
|
||||
else:
|
||||
def full_hash(m):
|
||||
return RNS.Identity.full_hash(m)
|
||||
|
||||
def sv(s, c, w):
|
||||
target = 0b1<<256-c
|
||||
m = w+s
|
||||
result = full_hash(m)
|
||||
if int.from_bytes(result, byteorder="big") > target:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
wm = multiprocessing.Manager()
|
||||
jobs = multiprocessing.cpu_count()
|
||||
|
||||
def job(procnum=None, results_dict=None, wb=None, sc=None, jr=None):
|
||||
# RNS.log(f"Worker {procnum} starting for {jr} rounds...") # TODO: Remove
|
||||
try:
|
||||
rounds = 0
|
||||
found_stamp = None
|
||||
|
||||
while True:
|
||||
pstamp = os.urandom(256//8)
|
||||
rounds += 1
|
||||
if sv(pstamp, sc, wb):
|
||||
found_stamp = pstamp
|
||||
break
|
||||
|
||||
if rounds >= jr:
|
||||
# RNS.log(f"Worker {procnum} found no result in {rounds} rounds") # TODO: Remove
|
||||
break
|
||||
|
||||
results_dict[procnum] = [found_stamp, rounds]
|
||||
except Exception as e:
|
||||
RNS.log(f"Stamp generation worker error: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
active_jobs[message_id] = False;
|
||||
|
||||
RNS.log(f"Dispatching {jobs} workers for stamp generation...", RNS.LOG_DEBUG) # TODO: Remove
|
||||
|
||||
results_dict = wm.dict()
|
||||
while stamp == None and active_jobs[message_id] == False:
|
||||
job_procs = []
|
||||
try:
|
||||
for pnum in range(jobs):
|
||||
pargs = {"procnum":pnum, "results_dict": results_dict, "wb": workblock, "sc":stamp_cost, "jr":rounds_per_worker}
|
||||
process = multiprocessing.Process(target=job, kwargs=pargs)
|
||||
job_procs.append(process)
|
||||
process.start()
|
||||
|
||||
for process in job_procs:
|
||||
process.join()
|
||||
|
||||
for j in results_dict:
|
||||
r = results_dict[j]
|
||||
total_rounds += r[1]
|
||||
if r[0] != None:
|
||||
stamp = r[0]
|
||||
|
||||
if stamp == None:
|
||||
elapsed = time.time() - start_time
|
||||
speed = total_rounds/elapsed
|
||||
RNS.log(f"Stamp generation running. {total_rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Stamp generation job error: {e}")
|
||||
RNS.trace_exception(e)
|
||||
|
||||
active_jobs.pop(message_id)
|
||||
|
||||
return stamp, total_rounds
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
if len(sys.argv) < 2:
|
||||
RNS.log("No cost argument provided", RNS.LOG_ERROR)
|
||||
exit(1)
|
||||
else:
|
||||
try:
|
||||
cost = int(sys.argv[1])
|
||||
except Exception as e:
|
||||
RNS.log(f"Invalid cost argument provided: {e}", RNS.LOG_ERROR)
|
||||
exit(1)
|
||||
|
||||
RNS.loglevel = RNS.LOG_DEBUG
|
||||
RNS.log("Testing LXMF stamp generation", RNS.LOG_DEBUG)
|
||||
message_id = os.urandom(32)
|
||||
generate_stamp(message_id, cost)
|
@ -35,6 +35,7 @@ import time
|
||||
import os
|
||||
|
||||
from LXMF._version import __version__
|
||||
from LXMF import APP_NAME
|
||||
|
||||
from RNS.vendor.configobj import ConfigObj
|
||||
|
||||
@ -77,6 +78,13 @@ def apply_config():
|
||||
active_configuration["peer_announce_interval"] = lxmd_config["lxmf"].as_int("announce_interval")*60
|
||||
else:
|
||||
active_configuration["peer_announce_interval"] = None
|
||||
|
||||
if "lxmf" in lxmd_config and "delivery_transfer_max_accepted_size" in lxmd_config["lxmf"]:
|
||||
active_configuration["delivery_transfer_max_accepted_size"] = lxmd_config["lxmf"].as_float("delivery_transfer_max_accepted_size")
|
||||
if active_configuration["delivery_transfer_max_accepted_size"] < 0.38:
|
||||
active_configuration["delivery_transfer_max_accepted_size"] = 0.38
|
||||
else:
|
||||
active_configuration["delivery_transfer_max_accepted_size"] = 1000
|
||||
|
||||
if "lxmf" in lxmd_config and "on_inbound" in lxmd_config["lxmf"]:
|
||||
active_configuration["on_inbound"] = lxmd_config["lxmf"]["on_inbound"]
|
||||
@ -119,13 +127,38 @@ def apply_config():
|
||||
if active_configuration["message_storage_limit"] < 0.005:
|
||||
active_configuration["message_storage_limit"] = 0.005
|
||||
else:
|
||||
active_configuration["message_storage_limit"] = 2000
|
||||
active_configuration["message_storage_limit"] = 500
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
|
||||
if active_configuration["propagation_transfer_max_accepted_size"] < 0.38:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
|
||||
else:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = 256
|
||||
|
||||
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
|
||||
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
|
||||
else:
|
||||
active_configuration["prioritised_lxmf_destinations"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]:
|
||||
static_peers = lxmd_config["propagation"].as_list("static_peers")
|
||||
active_configuration["static_peers"] = []
|
||||
for static_peer in static_peers:
|
||||
active_configuration["static_peers"].append(bytes.fromhex(static_peer))
|
||||
else:
|
||||
active_configuration["static_peers"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]:
|
||||
active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers")
|
||||
else:
|
||||
active_configuration["max_peers"] = None
|
||||
|
||||
if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]:
|
||||
active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only")
|
||||
else:
|
||||
active_configuration["from_static_only"] = False
|
||||
|
||||
# Load various settings
|
||||
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
|
||||
targetloglevel = lxmd_config["logging"].as_int("loglevel")
|
||||
@ -204,8 +237,6 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
global message_router, lxmf_destination
|
||||
|
||||
targetloglevel = 3+verbosity-quietness
|
||||
|
||||
if service:
|
||||
targetlogdest = RNS.LOG_FILE
|
||||
targetloglevel = None
|
||||
@ -234,6 +265,12 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
||||
if not os.path.isdir(lxmdir):
|
||||
os.makedirs(lxmdir)
|
||||
|
||||
if not os.path.isfile(configpath):
|
||||
RNS.log("Could not load config file, creating default configuration file...")
|
||||
create_default_config(configpath)
|
||||
RNS.log("Default config file created. Make any necessary changes in "+configpath+" and restart lxmd if needed.")
|
||||
time.sleep(1.5)
|
||||
|
||||
if os.path.isfile(configpath):
|
||||
try:
|
||||
lxmd_config = ConfigObj(configpath)
|
||||
@ -241,15 +278,16 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
||||
RNS.log("Could not parse the configuration at "+configpath, RNS.LOG_ERROR)
|
||||
RNS.log("Check your configuration file for errors!", RNS.LOG_ERROR)
|
||||
RNS.panic()
|
||||
else:
|
||||
RNS.log("Could not load config file, creating default configuration file...")
|
||||
create_default_config(configpath)
|
||||
RNS.log("Default config file created. Make any necessary changes in "+configpath+" and restart Reticulum if needed.")
|
||||
time.sleep(1.5)
|
||||
|
||||
|
||||
apply_config()
|
||||
RNS.log("Configuration loaded from "+configpath, RNS.LOG_VERBOSE)
|
||||
|
||||
if targetloglevel == None:
|
||||
targetloglevel = 3
|
||||
|
||||
if verbosity != 0 or quietness != 0:
|
||||
targetloglevel = targetloglevel+verbosity-quietness
|
||||
|
||||
# Start Reticulum
|
||||
RNS.log("Substantiating Reticulum...")
|
||||
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
|
||||
@ -284,7 +322,12 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
||||
storagepath = storagedir,
|
||||
autopeer = active_configuration["autopeer"],
|
||||
autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
|
||||
)
|
||||
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
|
||||
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
|
||||
max_peers = active_configuration["max_peers"],
|
||||
static_peers = active_configuration["static_peers"],
|
||||
from_static_only = active_configuration["from_static_only"])
|
||||
|
||||
message_router.register_delivery_callback(lxmf_delivery)
|
||||
|
||||
for destination_hash in active_configuration["ignored_lxmf_destinations"]:
|
||||
@ -302,6 +345,10 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
||||
# Set up authentication
|
||||
if active_configuration["auth_required"]:
|
||||
message_router.set_authentication(required=True)
|
||||
|
||||
if len(active_configuration["allowed_identities"]) == 0:
|
||||
RNS.log("Clint authentication was enabled, but no identity hashes could be loaded from "+str(allowedpath)+". Nobody will be able to sync messages from this propagation node.", RNS.LOG_WARNING)
|
||||
|
||||
for identity_hash in active_configuration["allowed_identities"]:
|
||||
message_router.allow(identity_hash)
|
||||
|
||||
@ -322,9 +369,6 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
||||
|
||||
RNS.log("LXMF Propagation Node started on "+RNS.prettyhexrep(message_router.propagation_destination.hash))
|
||||
|
||||
if len(active_configuration["allowed_identities"]) == 0:
|
||||
RNS.log("Clint authentication was enabled, but no identity hashes could be loaded from "+str(allowedpath)+". Nobody will be able to sync messages from this propagation node.", RNS.LOG_WARNING)
|
||||
|
||||
RNS.log("Started lxmd version {version}".format(version=__version__), RNS.LOG_NOTICE)
|
||||
|
||||
threading.Thread(target=deferred_start_jobs, daemon=True).start()
|
||||
@ -338,15 +382,17 @@ def jobs():
|
||||
|
||||
while True:
|
||||
try:
|
||||
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
last_peer_announce = time.time()
|
||||
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
|
||||
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
last_peer_announce = time.time()
|
||||
|
||||
if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
|
||||
message_router.announce_propagation_node()
|
||||
last_node_announce = time.time()
|
||||
if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None:
|
||||
if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE)
|
||||
message_router.announce_propagation_node()
|
||||
last_node_announce = time.time()
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred while running periodic jobs. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
@ -357,7 +403,7 @@ def deferred_start_jobs():
|
||||
global active_configuration, last_peer_announce, last_node_announce
|
||||
global message_router, lxmf_destination
|
||||
time.sleep(DEFFERED_JOBS_DELAY)
|
||||
RNS.log("Running deferred start jobs")
|
||||
RNS.log("Running deferred start jobs", RNS.LOG_DEBUG)
|
||||
if active_configuration["peer_announce_at_start"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
@ -370,6 +416,190 @@ def deferred_start_jobs():
|
||||
last_node_announce = time.time()
|
||||
threading.Thread(target=jobs, daemon=True).start()
|
||||
|
||||
def query_status(identity, timeout=5, exit_on_fail=False):
|
||||
control_destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
if exit_on_fail:
|
||||
RNS.log("Getting lxmd statistics timed out, exiting now", RNS.LOG_ERROR)
|
||||
exit(200)
|
||||
else:
|
||||
return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
|
||||
if not RNS.Transport.has_path(control_destination.hash):
|
||||
RNS.Transport.request_path(control_destination.hash)
|
||||
while not RNS.Transport.has_path(control_destination.hash):
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link = RNS.Link(control_destination)
|
||||
while not link.status == RNS.Link.ACTIVE:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.identify(identity)
|
||||
request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None)
|
||||
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.teardown()
|
||||
return request_receipt.get_response()
|
||||
|
||||
def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = 0, timeout=5, show_status=False, show_peers=False, identity_path=None):
|
||||
global configpath, identitypath, storagedir, lxmdir
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
targetlogdest = RNS.LOG_STDOUT
|
||||
|
||||
if identity_path == None:
|
||||
if configdir == None:
|
||||
if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"):
|
||||
configdir = "/etc/lxmd"
|
||||
elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"):
|
||||
configdir = RNS.Reticulum.userdir+"/.config/lxmd"
|
||||
else:
|
||||
configdir = RNS.Reticulum.userdir+"/.lxmd"
|
||||
|
||||
configpath = configdir+"/config"
|
||||
identitypath = configdir+"/identity"
|
||||
identity = None
|
||||
|
||||
if not os.path.isdir(configdir):
|
||||
RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR)
|
||||
exit(201)
|
||||
if not os.path.isfile(identitypath):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identitypath)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
else:
|
||||
if not os.path.isfile(identity_path):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identity_path)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
if targetloglevel == None:
|
||||
targetloglevel = 3
|
||||
if verbosity != 0 or quietness != 0:
|
||||
targetloglevel = targetloglevel+verbosity-quietness
|
||||
|
||||
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
|
||||
response = query_status(identity, timeout=timeout, exit_on_fail=True)
|
||||
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
|
||||
RNS.log("Remote received no identity")
|
||||
exit(203)
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
|
||||
RNS.log("Access denied")
|
||||
exit(204)
|
||||
else:
|
||||
s = response
|
||||
mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2)
|
||||
ms_util = f"{mutil}%"
|
||||
if s["from_static_only"]:
|
||||
who_str = "static peers only"
|
||||
else:
|
||||
who_str = "all nodes"
|
||||
|
||||
available_peers = 0
|
||||
unreachable_peers = 0
|
||||
peered_incoming = 0
|
||||
peered_outgoing = 0
|
||||
peered_rx_bytes = 0
|
||||
peered_tx_bytes = 0
|
||||
for peer_id in s["peers"]:
|
||||
p = s["peers"][peer_id]
|
||||
pm = p["messages"]
|
||||
peered_incoming += pm["incoming"]
|
||||
peered_outgoing += pm["outgoing"]
|
||||
peered_rx_bytes += p["rx_bytes"]
|
||||
peered_tx_bytes += p["tx_bytes"]
|
||||
if p["alive"]:
|
||||
available_peers += 1
|
||||
else:
|
||||
unreachable_peers += 1
|
||||
|
||||
total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"]
|
||||
total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"]
|
||||
df = round(peered_outgoing/total_incoming, 2)
|
||||
|
||||
dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"])
|
||||
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
|
||||
|
||||
if show_status:
|
||||
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
|
||||
ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
|
||||
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
|
||||
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
|
||||
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
|
||||
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
|
||||
print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit")
|
||||
print(f"")
|
||||
print(f"Peers : {stp} total (peer limit is {smp})")
|
||||
print(f" {sdp} discovered, {ssp} static")
|
||||
print(f" {available_peers} available, {unreachable_peers} unreachable")
|
||||
print(f"")
|
||||
print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})")
|
||||
print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})")
|
||||
print(f" {upi} messages received from unpeered nodes ({uprx})")
|
||||
print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})")
|
||||
print(f" {cprr} propagation messages received directly from clients")
|
||||
print(f" {cprs} propagation messages served to clients")
|
||||
print(f" Distribution factor is {df}")
|
||||
print(f"")
|
||||
|
||||
if show_peers:
|
||||
if not show_status:
|
||||
print("")
|
||||
|
||||
for peer_id in s["peers"]:
|
||||
ind = " "
|
||||
p = s["peers"][peer_id]
|
||||
if p["type"] == "static":
|
||||
t = "Static peer "
|
||||
elif p["type"] == "discovered":
|
||||
t = "Discovered peer "
|
||||
else:
|
||||
t = "Unknown peer "
|
||||
a = "Available" if p["alive"] == True else "Unreachable"
|
||||
h = max(time.time()-p["last_heard"], 0)
|
||||
hops = p["network_distance"]
|
||||
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
|
||||
pm = p["messages"]
|
||||
if p["last_sync_attempt"] != 0:
|
||||
lsa = p["last_sync_attempt"]
|
||||
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
|
||||
else:
|
||||
ls = "never synced"
|
||||
|
||||
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000)
|
||||
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
|
||||
pmi = pm["incoming"]; pmuh = pm["unhandled"]
|
||||
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
|
||||
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
|
||||
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit")
|
||||
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming")
|
||||
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
|
||||
ms = "" if pm["unhandled"] == 1 else "s"
|
||||
print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}")
|
||||
print("")
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
|
||||
@ -380,6 +610,10 @@ def main():
|
||||
parser.add_argument("-v", "--verbose", action="count", default=0)
|
||||
parser.add_argument("-q", "--quiet", action="count", default=0)
|
||||
parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file")
|
||||
parser.add_argument("--status", action="store_true", default=False, help="display node status")
|
||||
parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes")
|
||||
parser.add_argument("--timeout", action="store", default=5, help="timeout in seconds for query operations", type=float)
|
||||
parser.add_argument("--identity", action="store", default=None, help="path to identity used for query request", type=str)
|
||||
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
|
||||
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
|
||||
|
||||
@ -389,15 +623,24 @@ def main():
|
||||
print(__default_lxmd_config__)
|
||||
exit()
|
||||
|
||||
program_setup(
|
||||
configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service
|
||||
)
|
||||
if args.status or args.peers:
|
||||
get_status(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
timeout=args.timeout,
|
||||
show_status=args.status,
|
||||
show_peers=args.peers,
|
||||
identity_path=args.identity)
|
||||
exit()
|
||||
|
||||
program_setup(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
@ -410,23 +653,41 @@ __default_lxmd_config__ = """# This is an example LXM Daemon config file.
|
||||
[propagation]
|
||||
|
||||
# Whether to enable propagation node
|
||||
|
||||
enable_node = no
|
||||
|
||||
# Automatic announce interval in minutes.
|
||||
# 6 hours by default.
|
||||
|
||||
announce_interval = 360
|
||||
|
||||
# Whether to announce when the node starts.
|
||||
|
||||
announce_at_start = yes
|
||||
|
||||
# Wheter to automatically peer with other
|
||||
# propagation nodes on the network.
|
||||
|
||||
autopeer = yes
|
||||
|
||||
# The maximum peering depth (in hops) for
|
||||
# automatically peered nodes.
|
||||
|
||||
autopeer_maxdepth = 4
|
||||
|
||||
# The maximum accepted transfer size per in-
|
||||
# coming propagation transfer, in kilobytes.
|
||||
# This also sets the upper limit for the size
|
||||
# of single messages accepted onto this node.
|
||||
#
|
||||
# If a node wants to propagate a larger number
|
||||
# of messages to this node, than what can fit
|
||||
# within this limit, it will prioritise sending
|
||||
# the smallest messages first, and try again
|
||||
# with any remaining messages at a later point.
|
||||
|
||||
propagation_transfer_max_accepted_size = 256
|
||||
|
||||
# The maximum amount of storage to use for
|
||||
# the LXMF Propagation Node message store,
|
||||
# specified in megabytes. When this limit
|
||||
@ -435,8 +696,9 @@ autopeer_maxdepth = 4
|
||||
# LXMF prioritises keeping messages that are
|
||||
# new and small. Large and old messages will
|
||||
# be removed first. This setting is optional
|
||||
# and defaults to 2 gigabytes.
|
||||
# message_storage_limit = 2000
|
||||
# and defaults to 500 megabytes.
|
||||
|
||||
# message_storage_limit = 500
|
||||
|
||||
# You can tell the LXMF message router to
|
||||
# prioritise storage for one or more
|
||||
@ -445,14 +707,35 @@ autopeer_maxdepth = 4
|
||||
# keeping messages for destinations specified
|
||||
# with this option. This setting is optional,
|
||||
# and generally you do not need to use it.
|
||||
|
||||
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
|
||||
|
||||
# You can configure the maximum number of other
|
||||
# propagation nodes that this node will peer
|
||||
# with automatically. The default is 50.
|
||||
|
||||
# max_peers = 25
|
||||
|
||||
# You can configure a list of static propagation
|
||||
# node peers, that this node will always be
|
||||
# peered with, by specifying a list of
|
||||
# destination hashes.
|
||||
|
||||
# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4
|
||||
|
||||
# You can configure the propagation node to
|
||||
# only accept incoming propagation messages
|
||||
# from configured static peers.
|
||||
|
||||
# from_static_only = True
|
||||
|
||||
# By default, any destination is allowed to
|
||||
# connect and download messages, but you can
|
||||
# optionally restrict this. If you enable
|
||||
# authentication, you must provide a list of
|
||||
# allowed identity hashes in the a file named
|
||||
# "allowed" in the lxmd config directory.
|
||||
|
||||
auth_required = no
|
||||
|
||||
|
||||
@ -461,23 +744,35 @@ auth_required = no
|
||||
# The LXM Daemon will create an LXMF destination
|
||||
# that it can receive messages on. This option sets
|
||||
# the announced display name for this destination.
|
||||
|
||||
display_name = Anonymous Peer
|
||||
|
||||
# It is possible to announce the internal LXMF
|
||||
# destination when the LXM Daemon starts up.
|
||||
|
||||
announce_at_start = no
|
||||
|
||||
# You can also announce the delivery destination
|
||||
# at a specified interval. This is not enabled by
|
||||
# default.
|
||||
|
||||
# announce_interval = 360
|
||||
|
||||
# The maximum accepted unpacked size for mes-
|
||||
# sages received directly from other peers,
|
||||
# specified in kilobytes. Messages larger than
|
||||
# this will be rejected before the transfer
|
||||
# begins.
|
||||
|
||||
delivery_transfer_max_accepted_size = 1000
|
||||
|
||||
# You can configure an external program to be run
|
||||
# every time a message is received. The program
|
||||
# will receive as an argument the full path to the
|
||||
# message saved as a file. The example below will
|
||||
# simply result in the message getting deleted as
|
||||
# soon as it has been received.
|
||||
|
||||
# on_inbound = rm
|
||||
|
||||
|
||||
@ -491,6 +786,7 @@ announce_at_start = no
|
||||
# 5: Verbose logging
|
||||
# 6: Debug logging
|
||||
# 7: Extreme logging
|
||||
|
||||
loglevel = 4
|
||||
|
||||
"""
|
||||
|
@ -2,6 +2,7 @@ import os
|
||||
import glob
|
||||
from .LXMessage import LXMessage
|
||||
from .LXMRouter import LXMRouter
|
||||
from .LXMF import *
|
||||
|
||||
from ._version import __version__
|
||||
|
||||
|
@ -1 +1 @@
|
||||
__version__ = "0.2.9"
|
||||
__version__ = "0.6.3"
|
||||
|
96
README.md
96
README.md
@ -7,8 +7,17 @@ LXMF is efficient enough that it can deliver messages over extremely low-bandwid
|
||||
User-facing clients built on LXMF include:
|
||||
|
||||
- [Sideband](https://unsigned.io/sideband)
|
||||
- [MeshChat](https://github.com/liamcottle/reticulum-meshchat)
|
||||
- [Nomad Network](https://unsigned.io/nomadnet)
|
||||
- [Nexus Messenger](https://github.com/HarlekinSimplex/nexus_messenger)
|
||||
|
||||
Community-provided tools and utilities for LXMF include:
|
||||
|
||||
- [LXMFy](https://lxmfy.quad4.io/)
|
||||
- [LXMF-Bot](https://github.com/randogoth/lxmf-bot)
|
||||
- [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard)
|
||||
- [LXMEvent](https://github.com/faragher/LXMEvent)
|
||||
- [RangeMap](https://github.com/faragher/RangeMap)
|
||||
- [LXMF Tools](https://github.com/SebastianObi/LXMF-Tools)
|
||||
|
||||
## Structure
|
||||
|
||||
@ -29,10 +38,10 @@ LXMF messages are stored in a simple and efficient format, that's easy to parse
|
||||
|
||||
1. A LXMF message is identified by its __message-id__, which is a SHA-256 hash of the __Destination__, __Source__ and __Payload__. The message-id is never included directly in the message, since it can always be inferred from the message itself.
|
||||
|
||||
In some cases the actual message-id cannot be inferred, for example when a Propagation Node is storing an encrypted message for an offline user. In theses cases a _transient-id_ is used to identify the message while in storage or transit.
|
||||
In some cases the actual message-id cannot be inferred, for example when a Propagation Node is storing an encrypted message for an offline user. In these cases a _transient-id_ is used to identify the message while in storage or transit.
|
||||
|
||||
2. __Destination__, __Source__, __Signature__ and __Payload__ parts are mandatory, as is the __Timestamp__ part of the payload.
|
||||
- The __Destination__ and __Source__ fields are 10-byte Reticulum destination hashes
|
||||
- The __Destination__ and __Source__ fields are 16-byte Reticulum destination hashes
|
||||
- The __Signature__ field is a 64-byte Ed25519 signature of the __Destination__, __Source__, __Payload__ and __message-id__
|
||||
- The __Payload__ part is a [msgpacked](https://msgpack.org) list containing four items:
|
||||
1. The __Timestamp__ is a double-precision floating point number representing the number of seconds since the UNIX epoch.
|
||||
@ -46,7 +55,7 @@ LXMF messages are stored in a simple and efficient format, that's easy to parse
|
||||
|
||||
## Usage Examples
|
||||
|
||||
LXMF offers flexibility to implement many different messaging schemes, ranging from human communication to machine control and sensor monitoring. Here's a few examples:
|
||||
LXMF offers flexibility to implement many different messaging schemes, ranging from human communication to machine control and sensor monitoring. Here are a few examples:
|
||||
|
||||
- A messaging system for passing short, simple messages between human users, akin to SMS can be implemented using only the __Content__ field, and leaving all other optional fields empty.
|
||||
|
||||
@ -85,11 +94,11 @@ The LXM Router then handles the heavy lifting, such as message packing, encrypti
|
||||
|
||||
LXMF uses encryption provided by [Reticulum](https://reticulum.network), and thus uses end-to-end encryption by default. The delivery method of a message will influence which transport encryption scheme is used.
|
||||
|
||||
- A message can be delivered opportunistically, embedded in a single Reticulum packet. In this cases the message will be opportunistically routed through the network, and will be encrypted with _ephemeral_ keys derived with _ECDH_ on _Curve25519_. This mode offers Perfect Forward Secrecy.
|
||||
- If a message is delivered over a Reticulum link (which is the default method), the message will be encrypted with ephemeral AES-128 keys derived with ECDH on Curve25519. This mode offers forward secrecy.
|
||||
|
||||
- If a message is delivered to the Reticulum GROUP destination type, the message will be transported using _AES-128_ encryption.
|
||||
- A message can be delivered opportunistically, embedded in a single Reticulum packet. In this cases the message will be opportunistically routed through the network, and will be encrypted with per-packet AES-128 keys derived with ECDH on Curve25519.
|
||||
|
||||
- If a message is delivered over a Reticulum link (which is the default method), the message will be encrypted with _ephemeral_ keys derived with _ECDH_ on _Curve25519_. This mode offers Perfect Forward Secrecy.
|
||||
- If a message is delivered to the Reticulum GROUP destination type, the message will be encrypted using the symmetric AES-128 key of the GROUP destination.
|
||||
|
||||
## Wire Format & Overhead
|
||||
|
||||
@ -100,15 +109,19 @@ Assuming the default Reticulum configuration, the binary wire-format is as follo
|
||||
- 64 bytes Ed25519 signature
|
||||
- Remaining bytes of [msgpack](https://msgpack.org) payload data, in accordance with the structure defined above
|
||||
|
||||
The complete message overhead for LXMF is only 99 bytes, which in return gives you timestamped, digitally signed, infinitely extensible, end-to-end encrypted, zero-conf routed, minimal-infrastructure messaging that's easy to use and build applications with.
|
||||
The complete message overhead for LXMF is only 111 bytes, which in return gives you timestamped, digitally signed, infinitely extensible, end-to-end encrypted, zero-conf routed, minimal-infrastructure messaging that's easy to use and build applications with.
|
||||
|
||||
## Code Examples
|
||||
|
||||
Before writing your own programs using LXMF, you need to have a basic understanding of how the [Reticulum](https://reticulum.network) protocol and API works. Please see the [Reticulum Manual](https://reticulum.network/manual/). For a few simple examples of how to send and receive messages with LXMF, please see the [receiver example](./docs/example_receiver.py) and the [sender example](./docs/example_sender.py) included in this repository.
|
||||
|
||||
## Example Paper Message
|
||||
|
||||
You can try out the paper messaging functionality by using the following QR-code. It is a paper message sent to the LXMF address `6b3362bd2c1dbf87b66a85f79a8d8c75`. To be able to decrypt and read the message, you will need to import the following Reticulum Identity to an LXMF messaging app:
|
||||
You can try out the paper messaging functionality by using the following QR code. It is a paper message sent to the LXMF address `6b3362bd2c1dbf87b66a85f79a8d8c75`. To be able to decrypt and read the message, you will need to import the following Reticulum Identity to an LXMF messaging app:
|
||||
|
||||
`3BPTDTQCRZPKJT3TXAJCMQFMOYWIM3OCLKPWMG4HCF2T4CH3YZHVNHNRDU6QAZWV2KBHMWBNT2C62TQEVC5GLFM4MN25VLZFSK3ADRQ=`
|
||||
|
||||
The [Sideband](https://unsigned.io/sideband) application allows you to do this easily. After the you have imported the identity into an app of your choice, you can scan the following QR-code and open it in the app, where it will be decrypted and added as a message.
|
||||
The [Sideband](https://unsigned.io/sideband) application allows you to do this easily. After you have imported the identity into an app of your choice, you can scan the following QR code and open it in the app, where it will be decrypted and added as a message.
|
||||
|
||||
<p align="center"><img width="50%" src="./docs/paper_msg_test.png"/></p>
|
||||
|
||||
@ -118,26 +131,69 @@ You can also find the entire message in <a href="lxm://azNivSwdv4e2aoX3mo2MdTAoz
|
||||
|
||||
On operating systems that allow for registering custom URI-handlers, you can click the link, and it will be decoded directly in your LXMF client. This works with Sideband on Android.
|
||||
|
||||
## Caveat Emptor
|
||||
|
||||
LXMF is beta software, and should be considered experimental. While it has been built with cryptography best-practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch.
|
||||
|
||||
## Installation
|
||||
|
||||
If you want to try out LXMF, you can install it with pip:
|
||||
|
||||
```bash
|
||||
pip3 install lxmf
|
||||
pip install lxmf
|
||||
```
|
||||
|
||||
If you are using an operating system that blocks normal user package installation via `pip`,
|
||||
you can return `pip` to normal behaviour by editing the `~/.config/pip/pip.conf` file,
|
||||
and adding the following directive in the `[global]` section:
|
||||
|
||||
```text
|
||||
[global]
|
||||
break-system-packages = true
|
||||
```
|
||||
|
||||
Alternatively, you can use the `pipx` tool to install Reticulum in an isolated environment:
|
||||
|
||||
```bash
|
||||
pipx install lxmf
|
||||
```
|
||||
|
||||
## Daemon Included
|
||||
|
||||
The `lxmf` package comes with the `lxmd` program, a fully functional (but lightweight) LXMF message router and propagation node daemon. After installing the `lxmf` package, you can run `lxmd --help` to learn more about the command-line options:
|
||||
|
||||
```text
|
||||
$ lxmd --help
|
||||
|
||||
usage: lxmd [-h] [--config CONFIG] [--rnsconfig RNSCONFIG] [-p] [-i PATH] [-v] [-q] [-s] [--exampleconfig] [--version]
|
||||
|
||||
Lightweight Extensible Messaging Daemon
|
||||
|
||||
options:
|
||||
-h, --help show this help message and exit
|
||||
--config CONFIG path to alternative lxmd config directory
|
||||
--rnsconfig RNSCONFIG
|
||||
path to alternative Reticulum config directory
|
||||
-p, --propagation-node
|
||||
run an LXMF Propagation Node
|
||||
-i PATH, --on-inbound PATH
|
||||
executable to run when a message is received
|
||||
-v, --verbose
|
||||
-q, --quiet
|
||||
-s, --service lxmd is running as a service and should log to file
|
||||
--exampleconfig print verbose configuration example to stdout and exit
|
||||
--version show program's version number and exit
|
||||
```
|
||||
|
||||
Or run `lxmd --exampleconfig` to generate a commented example configuration documenting all the available configuration directives.
|
||||
|
||||
## Caveat Emptor
|
||||
|
||||
LXMF is beta software, and should be considered experimental. While it has been built with cryptography best practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch.
|
||||
|
||||
## Development Roadmap
|
||||
|
||||
LXMF is actively being developed, and the following improvements and features are currently planned for implementation:
|
||||
|
||||
- Write and release full API and protocol documentation
|
||||
- Update examples in readme to actually work
|
||||
- Content Destinations, and easy to use API for group messaging and discussion threads
|
||||
- ~~Update examples in readme to actually work~~
|
||||
- ~~Sync affinity based on link speeds and distances, for more intelligently choosing peer sync order~~
|
||||
- Sneakernet and physical transport functionality
|
||||
- Content Destinations, and easy to use API for group messaging and discussion threads
|
||||
- Write and release full API and protocol documentation
|
||||
- Documenting and possibly expanding LXMF limits and priorities
|
||||
- Sync affinity based on link speeds and distances, for more intelligently choosing peer sync order
|
||||
- Markets on LXMF
|
||||
|
72
docs/example_receiver.py
Normal file
72
docs/example_receiver.py
Normal file
@ -0,0 +1,72 @@
|
||||
import RNS
|
||||
import LXMF
|
||||
import time
|
||||
|
||||
required_stamp_cost = 8
|
||||
enforce_stamps = False
|
||||
|
||||
def delivery_callback(message):
|
||||
global my_lxmf_destination, router
|
||||
time_string = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.timestamp))
|
||||
signature_string = "Signature is invalid, reason undetermined"
|
||||
if message.signature_validated:
|
||||
signature_string = "Validated"
|
||||
else:
|
||||
if message.unverified_reason == LXMF.LXMessage.SIGNATURE_INVALID:
|
||||
signature_string = "Invalid signature"
|
||||
if message.unverified_reason == LXMF.LXMessage.SOURCE_UNKNOWN:
|
||||
signature_string = "Cannot verify, source is unknown"
|
||||
|
||||
if message.stamp_valid:
|
||||
stamp_string = "Validated"
|
||||
else:
|
||||
stamp_string = "Invalid"
|
||||
|
||||
RNS.log("\t+--- LXMF Delivery ---------------------------------------------")
|
||||
RNS.log("\t| Source hash : "+RNS.prettyhexrep(message.source_hash))
|
||||
RNS.log("\t| Source instance : "+str(message.get_source()))
|
||||
RNS.log("\t| Destination hash : "+RNS.prettyhexrep(message.destination_hash))
|
||||
RNS.log("\t| Destination instance : "+str(message.get_destination()))
|
||||
RNS.log("\t| Transport Encryption : "+str(message.transport_encryption))
|
||||
RNS.log("\t| Timestamp : "+time_string)
|
||||
RNS.log("\t| Title : "+str(message.title_as_string()))
|
||||
RNS.log("\t| Content : "+str(message.content_as_string()))
|
||||
RNS.log("\t| Fields : "+str(message.fields))
|
||||
if message.ratchet_id:
|
||||
RNS.log("\t| Ratchet : "+str(RNS.Identity._get_ratchet_id(message.ratchet_id)))
|
||||
RNS.log("\t| Message signature : "+signature_string)
|
||||
RNS.log("\t| Stamp : "+stamp_string)
|
||||
RNS.log("\t+---------------------------------------------------------------")
|
||||
|
||||
# Optionally, send a reply
|
||||
# source = my_lxmf_destination
|
||||
# dest = message.source
|
||||
# lxm = LXMF.LXMessage(dest, source, "Reply", None, desired_method=LXMF.LXMessage.DIRECT, include_ticket=True)
|
||||
# router.handle_outbound(lxm)
|
||||
|
||||
r = RNS.Reticulum()
|
||||
|
||||
router = LXMF.LXMRouter(storagepath="./tmp1", enforce_stamps=enforce_stamps)
|
||||
identity = RNS.Identity()
|
||||
my_lxmf_destination = router.register_delivery_identity(identity, display_name="Anonymous Peer", stamp_cost=required_stamp_cost)
|
||||
router.register_delivery_callback(delivery_callback)
|
||||
|
||||
RNS.log("Ready to receive on: "+RNS.prettyhexrep(my_lxmf_destination.hash))
|
||||
|
||||
|
||||
# You can set a propagation node address to test receiving
|
||||
# messages from a propagation node, instead of directly
|
||||
|
||||
# router.set_outbound_propagation_node(bytes.fromhex("e75d9b6a69f82b48b6077cf2242d7499"))
|
||||
|
||||
|
||||
# This loop allows you to execute various actions for testing
|
||||
# and experimenting with the example scripts.
|
||||
while True:
|
||||
input()
|
||||
RNS.log("Announcing lxmf.delivery destination...")
|
||||
router.announce(my_lxmf_destination.hash)
|
||||
|
||||
# input()
|
||||
# RNS.log("Requesting messages from propagation node...")
|
||||
# router.request_messages_from_propagation_node(identity)
|
69
docs/example_sender.py
Normal file
69
docs/example_sender.py
Normal file
@ -0,0 +1,69 @@
|
||||
import LXMF
|
||||
import RNS
|
||||
import time
|
||||
import random
|
||||
|
||||
random_names = ["Tom", "Delilah", "Nancey", "Williams", "Neomi", "Curtis", "Alexa", "Theodora", "Ted", "Dinorah", "Nicol", "Drusilla", "Annalisa", "Verlene", "Latesha", "Tina", "Mia", "Brock", "Timothy", "Philip", "Willian", "Reyna", "Simona", "Mimi", "Stanford", "Ferne", "Catalina", "Lucie", "Jaye", "Natasha", "Willetta", "Isabel", "Esperanza", "Ciara", "Eusebio", "William", "Elma", "Angelica", "Coreen", "Melani", "Jonathan", "Maryland", "Caroline", "Gregg", "Ora", "Jacqui", "Letty", "Roselle", "Oralee", "Angla"]
|
||||
random_titles = ["Long time", "Hi again", "Hi there", "Test message", "", "", "Something different"]
|
||||
random_msgs = ["If wishes were horses then beggars might fly. Stuff like that. It's enough to drive you crazy.", "'My ident cards were stolen,' Jason said. 'That fivehundred-dollar bill is yours if you can get me to someone who can replace them. If you're going to do it, do it right now; I'm not going to wait.' Wait to be picked up by a pol or a nat, he thought. Caught here in this rundown dingy hotel.", "A six, no matter what the external circumstances, will always prevail. Because that's the way they genetically defined us.", "'Should be there in an hour,' he called back over his shoulder to Chuck. Then he added, in an afterthought, 'Wonder if the computer’s finished its run. It was due about now.'. Chuck didn’t reply, so George swung round in his saddle. He could just see Chuck’s face, a white oval turned toward the sky."]
|
||||
|
||||
def delivery_callback(message):
|
||||
pass
|
||||
|
||||
r = RNS.Reticulum()
|
||||
router = LXMF.LXMRouter(storagepath="./tmp2")
|
||||
router.register_delivery_callback(delivery_callback)
|
||||
ident = RNS.Identity()
|
||||
source = router.register_delivery_identity(ident, display_name=random_names[random.randint(0,len(random_names)-1)], stamp_cost=8)
|
||||
router.announce(source.hash)
|
||||
RNS.log("Source announced")
|
||||
|
||||
print("Recipient: ", end=" ")
|
||||
recipient_hexhash = input()
|
||||
recipient_hash = bytes.fromhex(recipient_hexhash)
|
||||
|
||||
if not RNS.Transport.has_path(recipient_hash):
|
||||
RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...")
|
||||
RNS.Transport.request_path(recipient_hash)
|
||||
while not RNS.Transport.has_path(recipient_hash):
|
||||
time.sleep(0.1)
|
||||
|
||||
# Recall the server identity
|
||||
recipient_identity = RNS.Identity.recall(recipient_hash)
|
||||
|
||||
dest = RNS.Destination(recipient_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
|
||||
|
||||
# This loop allows you to execute various actions for testing
|
||||
# and experimenting with the example scripts.
|
||||
while True:
|
||||
|
||||
# Create a message that will be sent directly to the
|
||||
# destination over a Reticulum link:
|
||||
|
||||
lxm = LXMF.LXMessage(dest, source, random_msgs[random.randint(0,len(random_msgs)-1)],
|
||||
random_titles[random.randint(0,len(random_titles)-1)],
|
||||
desired_method=LXMF.LXMessage.DIRECT, include_ticket=True)
|
||||
|
||||
# Or, create an oppertunistic, single-packet message
|
||||
# for sending without first establishing a link:
|
||||
|
||||
# lxm = LXMF.LXMessage(dest, source, "This is a test",
|
||||
# random_titles[random.randint(0,len(random_titles)-1)],
|
||||
# desired_method=LXMF.LXMessage.OPPORTUNISTIC, include_ticket=True)
|
||||
|
||||
|
||||
# Or, try sending the message via a propagation node:
|
||||
|
||||
# router.set_outbound_propagation_node(bytes.fromhex("e75d9b6a69f82b48b6077cf2242d7499"))
|
||||
# lxm = LXMF.LXMessage(dest, source, random_msgs[random.randint(0,len(random_msgs)-1)],
|
||||
# random_titles[random.randint(0,len(random_titles)-1)],
|
||||
# desired_method=LXMF.LXMessage.PROPAGATED)
|
||||
|
||||
# Finally dispatch the message to the LXMF message
|
||||
# router, which will handle the delivery according
|
||||
# to the specified message parameters and options:
|
||||
|
||||
router.handle_outbound(lxm)
|
||||
|
||||
# Wait for user input before starting over
|
||||
input()
|
2
requirements.txt
Normal file
2
requirements.txt
Normal file
@ -0,0 +1,2 @@
|
||||
qrcode>=7.4.2
|
||||
rns>=0.9.1
|
4
setup.py
4
setup.py
@ -25,6 +25,6 @@ setuptools.setup(
|
||||
'lxmd=LXMF.Utilities.lxmd:main',
|
||||
]
|
||||
},
|
||||
install_requires=['rns>=0.4.7'],
|
||||
python_requires='>=3.6',
|
||||
install_requires=["rns>=0.9.3"],
|
||||
python_requires=">=3.7",
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user