Compare commits

..

No commits in common. "master" and "0.2.9" have entirely different histories.

19 changed files with 348 additions and 3053 deletions

View File

@ -1,11 +0,0 @@
blank_issues_enabled: false
contact_links:
- name: ✨ Feature Request or Idea
url: https://github.com/markqvist/Reticulum/discussions/new?category=ideas
about: Propose and discuss features and ideas
- name: 💬 Questions, Help & Discussion
about: Ask anything, or get help
url: https://github.com/markqvist/Reticulum/discussions/new/choose
- name: 📖 Read the Reticulum Manual
url: https://markqvist.github.io/Reticulum/manual/
about: The complete documentation for Reticulum

View File

@ -1,35 +0,0 @@
---
name: "\U0001F41B Bug Report"
about: Report a reproducible bug
title: ''
labels: ''
assignees: ''
---
**Read the Contribution Guidelines**
Before creating a bug report on this issue tracker, you **must** read the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md). Issues that do not follow the contribution guidelines **will be deleted without comment**.
- The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**.
- Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**.
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**.
**Describe the Bug**
A clear and concise description of what the bug is.
**To Reproduce**
Describe in detail how to reproduce the bug.
**Expected Behavior**
A clear and concise description of what you expected to happen.
**Logs & Screenshots**
Please include any relevant log output. If applicable, also add screenshots to help explain your problem.
**System Information**
- OS and version
- Python version
- Program version
**Additional context**
Add any other context about the problem here.

View File

@ -1,31 +0,0 @@
FROM python:alpine
LABEL authors="Petr Blaha petr.blaha@cleverdata.cz"
USER root
RUN apk update
RUN apk add sdl2_ttf sdl2 build-base libc-dev pkgconfig gstreamer sdl2_mixer sdl2_image sdl2_pango linux-headers mesa-dev py3-virtualenv
RUN addgroup -S myuser && adduser -S -G myuser myuser
USER myuser
WORKDIR /home/myuser
RUN pip install --upgrade pip
ENV PATH="/home/myuser/.local/bin:${PATH}"
################### BEGIN LXMF ###########################################
COPY --chown=myuser:myuser requirements.txt requirements.txt
RUN pip install --user -r requirements.txt
COPY --chown=myuser:myuser . .
#Python create virtual environment
RUN virtualenv /home/myuser/LXMF/venv
RUN source /home/myuser/LXMF/venv/bin/activate
RUN make all
################### END LXMF ###########################################

View File

@ -1,6 +0,0 @@
# Run docker command one by one(all four), it will build LXMF artifact and copy to dist directory.
# No need to build locally and install dependencies
docker build -t lxmfdockerimage .
docker run -d -it --name lxmfdockercontainer lxmfdockerimage /bin/sh
docker cp lxmfdockercontainer:/home/myuser/dist .
docker rm -f lxmfdockercontainer

16
LICENSE
View File

@ -1,6 +1,6 @@
Reticulum License MIT License
Copyright (c) 2020-2025 Mark Qvist Copyright (c) 2020 Mark Qvist / unsigned.io
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
@ -9,16 +9,8 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
- The Software shall not be used in any kind of system which includes amongst The above copyright notice and this permission notice shall be included in all
its functions the ability to purposefully do harm to human beings. copies or substantial portions of the Software.
- The Software shall not be used, directly or indirectly, in the creation of
an artificial intelligence, machine learning or language model training
dataset, including but not limited to any use that contributes to the
training or development of such a model or algorithm.
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

View File

@ -2,19 +2,19 @@ import time
import RNS import RNS
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
from .LXMF import APP_NAME, stamp_cost_from_app_data, pn_announce_data_is_valid from .LXMF import APP_NAME
from .LXMessage import LXMessage from .LXMessage import LXMessage
class LXMFDeliveryAnnounceHandler: class LXMFDeliveryAnnounceHandler:
def __init__(self, lxmrouter): def __init__(self, lxmrouter):
self.aspect_filter = APP_NAME+".delivery" self.aspect_filter = APP_NAME+".delivery"
self.receive_path_responses = True
self.lxmrouter = lxmrouter self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data): def received_announce(self, destination_hash, announced_identity, app_data):
for lxmessage in self.lxmrouter.pending_outbound: for lxmessage in self.lxmrouter.pending_outbound:
if destination_hash == lxmessage.destination_hash: if destination_hash == lxmessage.destination_hash:
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC: if lxmessage.method == LXMessage.DIRECT:
lxmessage.next_delivery_attempt = time.time() lxmessage.next_delivery_attempt = time.time()
while self.lxmrouter.processing_outbound: while self.lxmrouter.processing_outbound:
@ -22,54 +22,23 @@ class LXMFDeliveryAnnounceHandler:
self.lxmrouter.process_outbound() self.lxmrouter.process_outbound()
try:
stamp_cost = stamp_cost_from_app_data(app_data)
self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost)
except Exception as e:
RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR)
class LXMFPropagationAnnounceHandler: class LXMFPropagationAnnounceHandler:
def __init__(self, lxmrouter): def __init__(self, lxmrouter):
self.aspect_filter = APP_NAME+".propagation" self.aspect_filter = APP_NAME+".propagation"
self.receive_path_responses = False
self.lxmrouter = lxmrouter self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data): def received_announce(self, destination_hash, announced_identity, app_data):
try: try:
if type(app_data) == bytes: if type(app_data) == bytes:
if self.lxmrouter.propagation_node and self.lxmrouter.autopeer:
data = msgpack.unpackb(app_data) data = msgpack.unpackb(app_data)
if pn_announce_data_is_valid(data): if self.lxmrouter.propagation_node and self.lxmrouter.autopeer:
node_timebase = data[1]
propagation_transfer_limit = None
wanted_inbound_peers = None
if len(data) >= 4:
# TODO: Rethink, probably not necessary anymore
# try:
# wanted_inbound_peers = int(data[3])
# except:
# wanted_inbound_peers = None
pass
if len(data) >= 3:
try:
propagation_transfer_limit = float(data[2])
except:
propagation_transfer_limit = None
if destination_hash in self.lxmrouter.static_peers:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
else:
if data[0] == True: if data[0] == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers) self.lxmrouter.peer(destination_hash, data[1])
elif data[0] == False: elif data[0] == False:
self.lxmrouter.unpeer(destination_hash, node_timebase) self.lxmrouter.unpeer(destination_hash, data[1])
except Exception as e: except Exception as e:
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG) RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)

View File

@ -1,165 +1 @@
APP_NAME = "lxmf" APP_NAME = "lxmf"
##########################################################
# The following core fields are provided to facilitate #
# interoperability in data exchange between various LXMF #
# clients and systems. #
##########################################################
FIELD_EMBEDDED_LXMS = 0x01
FIELD_TELEMETRY = 0x02
FIELD_TELEMETRY_STREAM = 0x03
FIELD_ICON_APPEARANCE = 0x04
FIELD_FILE_ATTACHMENTS = 0x05
FIELD_IMAGE = 0x06
FIELD_AUDIO = 0x07
FIELD_THREAD = 0x08
FIELD_COMMANDS = 0x09
FIELD_RESULTS = 0x0A
FIELD_GROUP = 0x0B
FIELD_TICKET = 0x0C
FIELD_EVENT = 0x0D
FIELD_RNR_REFS = 0x0E
FIELD_RENDERER = 0x0F
# For usecases such as including custom data structures,
# embedding or encapsulating other data types or protocols
# that are not native to LXMF, or bridging/tunneling
# external protocols or services over LXMF, the following
# fields are available. A format/type/protocol (or other)
# identifier can be included in the CUSTOM_TYPE field, and
# the embedded payload can be included in the CUSTOM_DATA
# field. It is up to the client application to correctly
# discern and potentially utilise any data embedded using
# this mechanism.
FIELD_CUSTOM_TYPE = 0xFB
FIELD_CUSTOM_DATA = 0xFC
FIELD_CUSTOM_META = 0xFD
# The non-specific and debug fields are intended for
# development, testing and debugging use.
FIELD_NON_SPECIFIC = 0xFE
FIELD_DEBUG = 0xFF
##########################################################
# The following section lists field-specific specifiers, #
# modes and identifiers that are native to LXMF. It is #
# optional for any client or system to support any of #
# these, and they are provided as template for easing #
# interoperability without sacrificing expandability #
# and flexibility of the format. #
##########################################################
# Audio modes for the data structure in FIELD_AUDIO
# Codec2 Audio Modes
AM_CODEC2_450PWB = 0x01
AM_CODEC2_450 = 0x02
AM_CODEC2_700C = 0x03
AM_CODEC2_1200 = 0x04
AM_CODEC2_1300 = 0x05
AM_CODEC2_1400 = 0x06
AM_CODEC2_1600 = 0x07
AM_CODEC2_2400 = 0x08
AM_CODEC2_3200 = 0x09
# Opus Audio Modes
AM_OPUS_OGG = 0x10
AM_OPUS_LBW = 0x11
AM_OPUS_MBW = 0x12
AM_OPUS_PTT = 0x13
AM_OPUS_RT_HDX = 0x14
AM_OPUS_RT_FDX = 0x15
AM_OPUS_STANDARD = 0x16
AM_OPUS_HQ = 0x17
AM_OPUS_BROADCAST = 0x18
AM_OPUS_LOSSLESS = 0x19
# Custom, unspecified audio mode, the client must
# determine it itself based on the included data.
AM_CUSTOM = 0xFF
# Message renderer specifications for FIELD_RENDERER.
# The renderer specification is completely optional,
# and only serves as an indication to the receiving
# client on how to render the message contents. It is
# not mandatory to implement, either on sending or
# receiving sides, but is the recommended way to
# signal how to render a message, if non-plaintext
# formatting is used.
RENDERER_PLAIN = 0x00
RENDERER_MICRON = 0x01
RENDERER_MARKDOWN = 0x02
RENDERER_BBCODE = 0x03
##########################################################
# The following helper functions makes it easier to #
# handle and operate on LXMF data in client programs #
##########################################################
import RNS
import RNS.vendor.umsgpack as msgpack
def display_name_from_app_data(app_data=None):
if app_data == None:
return None
elif len(app_data) == 0:
return None
else:
# Version 0.5.0+ announce format
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
peer_data = msgpack.unpackb(app_data)
if type(peer_data) == list:
if len(peer_data) < 1:
return None
else:
dn = peer_data[0]
if dn == None:
return None
else:
try:
decoded = dn.decode("utf-8")
return decoded
except Exception as e:
RNS.log(f"Could not decode display name in included announce data. The contained exception was: {e}", RNS.LOG_ERROR)
return None
# Original announce format
else:
return app_data.decode("utf-8")
def stamp_cost_from_app_data(app_data=None):
if app_data == None or app_data == b"":
return None
else:
# Version 0.5.0+ announce format
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
peer_data = msgpack.unpackb(app_data)
if type(peer_data) == list:
if len(peer_data) < 2:
return None
else:
return peer_data[1]
# Original announce format
else:
return None
def pn_announce_data_is_valid(data):
try:
if type(data) == bytes:
data = msgpack.unpackb(data)
if len(data) < 3:
raise ValueError("Invalid announce data: Insufficient peer data")
else:
if data[0] != True and data[0] != False:
raise ValueError("Invalid announce data: Indeterminate propagation node status")
try:
int(data[1])
except:
raise ValueError("Invalid announce data: Could not decode peer timebase")
except Exception as e:
RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG)
return False
return True

View File

@ -4,7 +4,6 @@ import time
import RNS import RNS
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
from collections import deque
from .LXMF import APP_NAME from .LXMF import APP_NAME
class LXMPeer: class LXMPeer:
@ -20,11 +19,10 @@ class LXMPeer:
ERROR_NO_IDENTITY = 0xf0 ERROR_NO_IDENTITY = 0xf0
ERROR_NO_ACCESS = 0xf1 ERROR_NO_ACCESS = 0xf1
ERROR_TIMEOUT = 0xfe
# Maximum amount of time a peer can # Maximum amount of time a peer can
# be unreachable before it is removed # be unreachable before it is removed
MAX_UNREACHABLE = 14*24*60*60 MAX_UNREACHABLE = 4*24*60*60
# Everytime consecutive time a sync # Everytime consecutive time a sync
# link fails to establish, add this # link fails to establish, add this
@ -40,82 +38,20 @@ class LXMPeer:
@staticmethod @staticmethod
def from_bytes(peer_bytes, router): def from_bytes(peer_bytes, router):
dictionary = msgpack.unpackb(peer_bytes) dictionary = msgpack.unpackb(peer_bytes)
peer_destination_hash = dictionary["destination_hash"]
peer_peering_timebase = dictionary["peering_timebase"]
peer_alive = dictionary["alive"]
peer_last_heard = dictionary["last_heard"]
peer = LXMPeer(router, peer_destination_hash) peer = LXMPeer(router, dictionary["destination_hash"])
peer.peering_timebase = peer_peering_timebase peer.peering_timebase = dictionary["peering_timebase"]
peer.alive = peer_alive peer.alive = dictionary["alive"]
peer.last_heard = peer_last_heard peer.last_heard = dictionary["last_heard"]
if "link_establishment_rate" in dictionary:
peer.link_establishment_rate = dictionary["link_establishment_rate"]
else:
peer.link_establishment_rate = 0
if "sync_transfer_rate" in dictionary:
peer.sync_transfer_rate = dictionary["sync_transfer_rate"]
else:
peer.sync_transfer_rate = 0
if "propagation_transfer_limit" in dictionary:
try:
peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"])
except Exception as e:
peer.propagation_transfer_limit = None
else:
peer.propagation_transfer_limit = None
if "offered" in dictionary:
peer.offered = dictionary["offered"]
else:
peer.offered = 0
if "outgoing" in dictionary:
peer.outgoing = dictionary["outgoing"]
else:
peer.outgoing = 0
if "incoming" in dictionary:
peer.incoming = dictionary["incoming"]
else:
peer.incoming = 0
if "rx_bytes" in dictionary:
peer.rx_bytes = dictionary["rx_bytes"]
else:
peer.rx_bytes = 0
if "tx_bytes" in dictionary:
peer.tx_bytes = dictionary["tx_bytes"]
else:
peer.tx_bytes = 0
if "last_sync_attempt" in dictionary:
peer.last_sync_attempt = dictionary["last_sync_attempt"]
else:
peer.last_sync_attempt = 0
hm_count = 0
for transient_id in dictionary["handled_ids"]: for transient_id in dictionary["handled_ids"]:
if transient_id in router.propagation_entries: if transient_id in router.propagation_entries:
peer.add_handled_message(transient_id) peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
hm_count += 1
um_count = 0
for transient_id in dictionary["unhandled_ids"]: for transient_id in dictionary["unhandled_ids"]:
if transient_id in router.propagation_entries: if transient_id in router.propagation_entries:
peer.add_unhandled_message(transient_id) peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id]
um_count += 1
peer._hm_count = hm_count
peer._um_count = um_count
peer._hm_counts_synced = True
peer._um_counts_synced = True
del dictionary
return peer return peer
def to_bytes(self): def to_bytes(self):
@ -124,15 +60,6 @@ class LXMPeer:
dictionary["alive"] = self.alive dictionary["alive"] = self.alive
dictionary["last_heard"] = self.last_heard dictionary["last_heard"] = self.last_heard
dictionary["destination_hash"] = self.destination_hash dictionary["destination_hash"] = self.destination_hash
dictionary["link_establishment_rate"] = self.link_establishment_rate
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
dictionary["last_sync_attempt"] = self.last_sync_attempt
dictionary["offered"] = self.offered
dictionary["outgoing"] = self.outgoing
dictionary["incoming"] = self.incoming
dictionary["rx_bytes"] = self.rx_bytes
dictionary["tx_bytes"] = self.tx_bytes
handled_ids = [] handled_ids = []
for transient_id in self.handled_messages: for transient_id in self.handled_messages:
@ -145,10 +72,7 @@ class LXMPeer:
dictionary["handled_ids"] = handled_ids dictionary["handled_ids"] = handled_ids
dictionary["unhandled_ids"] = unhandled_ids dictionary["unhandled_ids"] = unhandled_ids
peer_bytes = msgpack.packb(dictionary) return msgpack.packb(dictionary)
del dictionary
return peer_bytes
def __init__(self, router, destination_hash): def __init__(self, router, destination_hash):
self.alive = False self.alive = False
@ -157,38 +81,20 @@ class LXMPeer:
self.last_sync_attempt = 0 self.last_sync_attempt = 0
self.sync_backoff = 0 self.sync_backoff = 0
self.peering_timebase = 0 self.peering_timebase = 0
self.link_establishment_rate = 0
self.sync_transfer_rate = 0
self.propagation_transfer_limit = None
self.handled_messages_queue = deque()
self.unhandled_messages_queue = deque()
self.offered = 0 # Messages offered to this peer
self.outgoing = 0 # Messages transferred to this peer
self.incoming = 0 # Messages received from this peer
self.rx_bytes = 0 # Bytes received from this peer
self.tx_bytes = 0 # Bytes sent to this peer
self._hm_count = 0
self._um_count = 0
self._hm_counts_synced = False
self._um_counts_synced = False
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
self.last_offer = [] self.unhandled_messages = {}
self.handled_messages = {}
self.router = router self.router = router
self.destination_hash = destination_hash self.destination_hash = destination_hash
self.identity = RNS.Identity.recall(destination_hash) self.identity = RNS.Identity.recall(destination_hash)
if self.identity != None:
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
else:
self.destination = None
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
def sync(self): def sync(self):
RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
self.last_sync_attempt = time.time() self.last_sync_attempt = time.time()
@ -204,10 +110,9 @@ class LXMPeer:
else: else:
if self.identity == None: if self.identity == None:
self.identity = RNS.Identity.recall(destination_hash) self.identity = RNS.Identity.recall(destination_hash)
if self.identity != None:
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
if self.destination != None: if self.identity != None:
if len(self.unhandled_messages) > 0: if len(self.unhandled_messages) > 0:
if self.state == LXMPeer.IDLE: if self.state == LXMPeer.IDLE:
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG) RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
@ -223,49 +128,23 @@ class LXMPeer:
self.sync_backoff = 0 self.sync_backoff = 0
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
unhandled_entries = []
unhandled_ids = [] unhandled_ids = []
purged_ids = [] purged_ids = []
for transient_id in self.unhandled_messages: for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries: if transient_id in self.router.propagation_entries:
unhandled_entry = [ unhandled_ids.append(transient_id)
transient_id,
self.router.get_weight(transient_id),
self.router.get_size(transient_id),
]
unhandled_entries.append(unhandled_entry)
else: else:
purged_ids.append(transient_id) purged_ids.append(transient_id)
for transient_id in purged_ids: for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.remove_unhandled_message(transient_id) self.unhandled_messages.pop(transient_id)
unhandled_entries.sort(key=lambda e: e[1], reverse=False) RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
for unhandled_entry in unhandled_entries:
transient_id = unhandled_entry[0]
weight = unhandled_entry[1]
lxm_size = unhandled_entry[2]
next_size = cumulative_size + (lxm_size+per_message_overhead)
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
if lxm_size+per_message_overhead > (self.propagation_transfer_limit*1000):
self.remove_unhandled_message(transient_id)
self.add_handled_message(transient_id)
RNS.log(f"Message {RNS.prettyhexrep(transient_id)} exceeds transfer limit for {self}, considering handled", RNS.LOG_DEBUG)
else:
cumulative_size += (lxm_size+per_message_overhead)
unhandled_ids.append(transient_id)
RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_VERBOSE)
self.last_offer = unhandled_ids
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT self.state = LXMPeer.REQUEST_SENT
else: else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
else: else:
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG) RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
if self.last_sync_attempt > self.last_heard: if self.last_sync_attempt > self.last_heard:
@ -275,7 +154,7 @@ class LXMPeer:
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
if self.link != None: if self.link != None:
self.link.teardown() self.link.teardown()
else:
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def offer_response(self, request_receipt): def offer_response(self, request_receipt):
@ -288,48 +167,44 @@ class LXMPeer:
if response == LXMPeer.ERROR_NO_IDENTITY: if response == LXMPeer.ERROR_NO_IDENTITY:
if self.link != None: if self.link != None:
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE) RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG)
self.link.identify() self.link.indentify()
self.state = LXMPeer.LINK_READY self.state = LXMPeer.LINK_READY
self.sync() self.sync()
return
elif response == LXMPeer.ERROR_NO_ACCESS:
RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE)
self.router.unpeer(self.destination_hash)
return
elif response == False: elif response == False:
# Peer already has all advertised messages # Peer already has all advertised messages
for transient_id in self.last_offer: for transient_id in self.unhandled_messages:
if transient_id in self.unhandled_messages: message_entry = self.unhandled_messages[transient_id]
self.add_handled_message(transient_id) self.handled_messages[transient_id] = message_entry
self.remove_unhandled_message(transient_id)
self.unhandled_messages = {}
elif response == True: elif response == True:
# Peer wants all advertised messages # Peer wants all advertised messages
for transient_id in self.last_offer: for transient_id in self.unhandled_messages:
wanted_messages.append(self.router.propagation_entries[transient_id]) wanted_messages.append(self.unhandled_messages[transient_id])
wanted_message_ids.append(transient_id) wanted_message_ids.append(transient_id)
else: else:
# Peer wants some advertised messages # Peer wants some advertised messages
for transient_id in self.last_offer.copy(): peer_had_messages = []
for transient_id in self.unhandled_messages.copy():
# If the peer did not want the message, it has # If the peer did not want the message, it has
# already received it from another peer. # already received it from another peer.
if not transient_id in response: if not transient_id in response:
self.add_handled_message(transient_id) message_entry = self.unhandled_messages.pop(transient_id)
self.remove_unhandled_message(transient_id) self.handled_messages[transient_id] = message_entry
for transient_id in response: for transient_id in response:
wanted_messages.append(self.router.propagation_entries[transient_id]) wanted_messages.append(self.unhandled_messages[transient_id])
wanted_message_ids.append(transient_id) wanted_message_ids.append(transient_id)
if len(wanted_messages) > 0: if len(wanted_messages) > 0:
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_VERBOSE) RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG)
lxm_list = [] lxm_list = []
for message_entry in wanted_messages: for message_entry in wanted_messages:
file_path = message_entry[1] file_path = message_entry[1]
if os.path.isfile(file_path): if os.path.isfile(file_path):
@ -341,16 +216,9 @@ class LXMPeer:
data = msgpack.packb([time.time(), lxm_list]) data = msgpack.packb([time.time(), lxm_list])
resource = RNS.Resource(data, self.link, callback = self.resource_concluded) resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
resource.transferred_messages = wanted_message_ids resource.transferred_messages = wanted_message_ids
resource.sync_transfer_started = time.time()
self.state = LXMPeer.RESOURCE_TRANSFERRING self.state = LXMPeer.RESOURCE_TRANSFERRING
else: else:
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_VERBOSE) RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
self.offered += len(self.last_offer)
if self.link != None:
self.link.teardown()
self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
except Exception as e: except Exception as e:
@ -363,44 +231,28 @@ class LXMPeer:
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def resource_concluded(self, resource): def resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
for transient_id in resource.transferred_messages: for transient_id in resource.transferred_messages:
self.add_handled_message(transient_id) message = self.unhandled_messages.pop(transient_id)
self.remove_unhandled_message(transient_id) self.handled_messages[transient_id] = message
if self.link != None:
self.link.teardown()
self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
self.link.teardown()
rate_str = "" RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed", RNS.LOG_DEBUG)
if hasattr(resource, "sync_transfer_started") and resource.sync_transfer_started:
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started)
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
RNS.log(f"Syncing {len(resource.transferred_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
self.alive = True self.alive = True
self.last_heard = time.time() self.last_heard = time.time()
self.offered += len(self.last_offer)
self.outgoing += len(resource.transferred_messages)
self.tx_bytes += resource.get_data_size()
else: else:
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE) RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
if self.link != None: if self.link != None:
self.link.teardown() self.link.teardown()
else:
self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def link_established(self, link): def link_established(self, link):
self.link.identify(self.router.identity) self.link.identify(self.router.identity)
link_establishment_rate = link.get_establishment_rate()
if link_establishment_rate != None:
self.link_establishment_rate = link_establishment_rate
self.state = LXMPeer.LINK_READY self.state = LXMPeer.LINK_READY
self.next_sync_attempt = 0 self.next_sync_attempt = 0
self.sync() self.sync()
@ -409,103 +261,11 @@ class LXMPeer:
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def queued_items(self): def handle_message(self, transient_id):
return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0 if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
# TODO: Remove at some point
def queue_unhandled_message(self, transient_id): RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was added to distribution queue for "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_EXTREME)
self.unhandled_messages_queue.append(transient_id) self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
def queue_handled_message(self, transient_id):
self.handled_messages_queue.append(transient_id)
def process_queues(self):
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
# TODO: Remove debug
# st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue)
handled_messages = self.handled_messages
unhandled_messages = self.unhandled_messages
while len(self.handled_messages_queue) > 0:
transient_id = self.handled_messages_queue.pop()
if not transient_id in handled_messages:
self.add_handled_message(transient_id)
if transient_id in unhandled_messages:
self.remove_unhandled_message(transient_id)
while len(self.unhandled_messages_queue) > 0:
transient_id = self.unhandled_messages_queue.pop()
if not transient_id in handled_messages and not transient_id in unhandled_messages:
self.add_unhandled_message(transient_id)
del handled_messages, unhandled_messages
# TODO: Remove debug
# RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}")
@property
def handled_messages(self):
pes = self.router.propagation_entries.copy()
hm = list(filter(lambda tid: self.destination_hash in pes[tid][4], pes))
self._hm_count = len(hm); del pes
self._hm_counts_synced = True
return hm
@property
def unhandled_messages(self):
pes = self.router.propagation_entries.copy()
um = list(filter(lambda tid: self.destination_hash in pes[tid][5], pes))
self._um_count = len(um); del pes
self._um_counts_synced = True
return um
@property
def handled_message_count(self):
if not self._hm_counts_synced:
self._update_counts()
return self._hm_count
@property
def unhandled_message_count(self):
if not self._um_counts_synced:
self._update_counts()
return self._um_count
@property
def acceptance_rate(self):
return 0 if self.offered == 0 else (self.outgoing/self.offered)
def _update_counts(self):
if not self._hm_counts_synced:
hm = self.handled_messages; del hm
if not self._um_counts_synced:
um = self.unhandled_messages; del um
def add_handled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if not self.destination_hash in self.router.propagation_entries[transient_id][4]:
self.router.propagation_entries[transient_id][4].append(self.destination_hash)
self._hm_counts_synced = False
def add_unhandled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if not self.destination_hash in self.router.propagation_entries[transient_id][5]:
self.router.propagation_entries[transient_id][5].append(self.destination_hash)
self._um_count += 1
def remove_handled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if self.destination_hash in self.router.propagation_entries[transient_id][4]:
self.router.propagation_entries[transient_id][4].remove(self.destination_hash)
self._hm_counts_synced = False
def remove_unhandled_message(self, transient_id):
if transient_id in self.router.propagation_entries:
if self.destination_hash in self.router.propagation_entries[transient_id][5]:
self.router.propagation_entries[transient_id][5].remove(self.destination_hash)
self._um_counts_synced = False
def __str__(self): def __str__(self):
if self.destination_hash: if self.destination_hash:

File diff suppressed because it is too large Load Diff

View File

@ -1,25 +1,19 @@
import RNS import RNS
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
import os
import time import time
import base64 import base64
import multiprocessing
import LXMF.LXStamper as LXStamper
from .LXMF import APP_NAME from .LXMF import APP_NAME
class LXMessage: class LXMessage:
GENERATING = 0x00 DRAFT = 0x00
OUTBOUND = 0x01 OUTBOUND = 0x01
SENDING = 0x02 SENDING = 0x02
SENT = 0x04 SENT = 0x04
DELIVERED = 0x08 DELIVERED = 0x08
REJECTED = 0xFD
CANCELLED = 0xFE
FAILED = 0xFF FAILED = 0xFF
states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, REJECTED, CANCELLED, FAILED] states = [DRAFT, OUTBOUND, SENDING, SENT, DELIVERED, FAILED]
UNKNOWN = 0x00 UNKNOWN = 0x00
PACKET = 0x01 PACKET = 0x01
@ -38,33 +32,19 @@ class LXMessage:
DESTINATION_LENGTH = RNS.Identity.TRUNCATED_HASHLENGTH//8 DESTINATION_LENGTH = RNS.Identity.TRUNCATED_HASHLENGTH//8
SIGNATURE_LENGTH = RNS.Identity.SIGLENGTH//8 SIGNATURE_LENGTH = RNS.Identity.SIGLENGTH//8
TICKET_LENGTH = RNS.Identity.TRUNCATED_HASHLENGTH//8
# Default ticket expiry is 3 weeks, with an # LXMF overhead is 99 bytes per message:
# additional grace period of 5 days, allowing # 10 bytes for destination hash
# for timekeeping inaccuracies. Tickets will # 10 bytes for source hash
# automatically renew when there is less than
# 14 days to expiry.
TICKET_EXPIRY = 21*24*60*60
TICKET_GRACE = 5*24*60*60
TICKET_RENEW = 14*24*60*60
TICKET_INTERVAL = 1*24*60*60
COST_TICKET = 0x100
# LXMF overhead is 112 bytes per message:
# 16 bytes for destination hash
# 16 bytes for source hash
# 64 bytes for Ed25519 signature # 64 bytes for Ed25519 signature
# 8 bytes for timestamp # 8 bytes for timestamp
# 8 bytes for msgpack structure # 7 bytes for msgpack structure
TIMESTAMP_SIZE = 8 LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + 8 + 7
STRUCT_OVERHEAD = 8
LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + TIMESTAMP_SIZE + STRUCT_OVERHEAD
# With an MTU of 500, the maximum amount of data # With an MTU of 500, the maximum amount of data
# we can send in a single encrypted packet is # we can send in a single encrypted packet is
# 391 bytes. # 383 bytes.
ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU + TIMESTAMP_SIZE ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU
# The max content length we can fit in LXMF message # The max content length we can fit in LXMF message
# inside a single RNS packet is the encrypted MDU, minus # inside a single RNS packet is the encrypted MDU, minus
@ -73,7 +53,7 @@ class LXMessage:
# field of the packet, therefore we also add the length # field of the packet, therefore we also add the length
# of a destination hash to the calculation. With default # of a destination hash to the calculation. With default
# RNS and LXMF parameters, the largest single-packet # RNS and LXMF parameters, the largest single-packet
# LXMF message we can send is 295 bytes. If a message # LXMF message we can send is 294 bytes. If a message
# is larger than that, a Reticulum link will be used. # is larger than that, a Reticulum link will be used.
ENCRYPTED_PACKET_MAX_CONTENT = ENCRYPTED_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH ENCRYPTED_PACKET_MAX_CONTENT = ENCRYPTED_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
@ -83,13 +63,13 @@ class LXMessage:
LINK_PACKET_MDU = RNS.Link.MDU LINK_PACKET_MDU = RNS.Link.MDU
# Which means that we can deliver single-packet LXMF # Which means that we can deliver single-packet LXMF
# messages with content of up to 319 bytes over a link. # messages with content of up to 332 bytes over a link.
# If a message is larger than that, LXMF will sequence # If a message is larger than that, LXMF will sequence
# and transfer it as a RNS resource over the link instead. # and transfer it as a RNS resource over the link instead.
LINK_PACKET_MAX_CONTENT = LINK_PACKET_MDU - LXMF_OVERHEAD LINK_PACKET_MAX_CONTENT = LINK_PACKET_MDU - LXMF_OVERHEAD
# For plain packets without encryption, we can # For plain packets without encryption, we can
# fit up to 368 bytes of content. # fit up to 388 bytes of content.
PLAIN_PACKET_MDU = RNS.Packet.PLAIN_MDU PLAIN_PACKET_MDU = RNS.Packet.PLAIN_MDU
PLAIN_PACKET_MAX_CONTENT = PLAIN_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH PLAIN_PACKET_MAX_CONTENT = PLAIN_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
@ -110,8 +90,7 @@ class LXMessage:
else: else:
return "<LXMessage>" return "<LXMessage>"
def __init__(self, destination, source, content = "", title = "", fields = None, desired_method = None, destination_hash = None, source_hash = None, stamp_cost=None, include_ticket=False): def __init__(self, destination, source, content = "", title = "", fields = None, desired_method = None, destination_hash = None, source_hash = None):
if isinstance(destination, RNS.Destination) or destination == None: if isinstance(destination, RNS.Destination) or destination == None:
self.__destination = destination self.__destination = destination
if destination != None: if destination != None:
@ -130,17 +109,7 @@ class LXMessage:
else: else:
raise ValueError("LXMessage initialised with invalid source") raise ValueError("LXMessage initialised with invalid source")
if title == None:
title = ""
if type(title) == bytes:
self.set_title_from_bytes(title)
else:
self.set_title_from_string(title) self.set_title_from_string(title)
if type(content) == bytes:
self.set_content_from_bytes(content)
else:
self.set_content_from_string(content) self.set_content_from_string(content)
self.set_fields(fields) self.set_fields(fields)
@ -150,21 +119,9 @@ class LXMessage:
self.signature = None self.signature = None
self.hash = None self.hash = None
self.packed = None self.packed = None
self.state = LXMessage.GENERATING self.progress = None
self.state = LXMessage.DRAFT
self.method = LXMessage.UNKNOWN self.method = LXMessage.UNKNOWN
self.progress = 0.0
self.rssi = None
self.snr = None
self.q = None
self.stamp = None
self.stamp_cost = stamp_cost
self.stamp_value = None
self.stamp_valid = False
self.stamp_checked = False
self.defer_stamp = True
self.outbound_ticket = None
self.include_ticket = include_ticket
self.propagation_packed = None self.propagation_packed = None
self.paper_packed = None self.paper_packed = None
@ -172,22 +129,18 @@ class LXMessage:
self.incoming = False self.incoming = False
self.signature_validated = False self.signature_validated = False
self.unverified_reason = None self.unverified_reason = None
self.ratchet_id = None
self.representation = LXMessage.UNKNOWN self.representation = LXMessage.UNKNOWN
self.desired_method = desired_method self.desired_method = desired_method
self.delivery_attempts = 0 self.delivery_attempts = 0
self.transport_encrypted = False self.transport_encrypted = False
self.transport_encryption = None self.transport_encryption = None
self.ratchet_id = None
self.packet_representation = None self.packet_representation = None
self.resource_representation = None self.resource_representation = None
self.__delivery_destination = None self.__delivery_destination = None
self.__delivery_callback = None self.__delivery_callback = None
self.failed_callback = None self.failed_callback = None
self.deferred_stamp_generating = False
def set_title_from_string(self, title_string): def set_title_from_string(self, title_string):
self.title = title_string.encode("utf-8") self.title = title_string.encode("utf-8")
@ -204,31 +157,16 @@ class LXMessage:
self.content = content_bytes self.content = content_bytes
def content_as_string(self): def content_as_string(self):
try:
return self.content.decode("utf-8") return self.content.decode("utf-8")
except Exception as e:
RNS.log(f"{self} could not decode message content as string: {e}")
return None
def set_fields(self, fields): def set_fields(self, fields):
if isinstance(fields, dict) or fields == None: if isinstance(fields, dict) or fields == None:
self.fields = fields or {} self.fields = fields
else: else:
raise ValueError("LXMessage property \"fields\" can only be dict or None") raise ValueError("LXMessage property \"fields\" can only be dict or None")
def get_fields(self): def get_fields(self):
return self.fields return self.__fields
@property
def destination(self):
return self.__destination
@destination.setter
def destination(self, destination):
self.set_destination(destination)
def get_destination(self):
return self.destination
def set_destination(self, destination): def set_destination(self, destination):
if self.destination == None: if self.destination == None:
@ -239,16 +177,8 @@ class LXMessage:
else: else:
raise ValueError("Cannot reassign destination on LXMessage") raise ValueError("Cannot reassign destination on LXMessage")
@property def get_destination(self):
def source(self): return self.__destination
return self.__source
@source.setter
def source(self, source):
self.set_source(source)
def get_source(self):
return self.source
def set_source(self, source): def set_source(self, source):
if self.source == None: if self.source == None:
@ -259,6 +189,9 @@ class LXMessage:
else: else:
raise ValueError("Cannot reassign source on LXMessage") raise ValueError("Cannot reassign source on LXMessage")
def get_source(self):
return self.__source
def set_delivery_destination(self, delivery_destination): def set_delivery_destination(self, delivery_destination):
self.__delivery_destination = delivery_destination self.__delivery_destination = delivery_destination
@ -268,71 +201,6 @@ class LXMessage:
def register_failed_callback(self, callback): def register_failed_callback(self, callback):
self.failed_callback = callback self.failed_callback = callback
@staticmethod
def stamp_valid(stamp, target_cost, workblock):
target = 0b1 << 256-target_cost
result = RNS.Identity.full_hash(workblock+stamp)
if int.from_bytes(result, byteorder="big") > target:
return False
else:
return True
def validate_stamp(self, target_cost, tickets=None):
if tickets != None:
for ticket in tickets:
try:
if self.stamp == RNS.Identity.truncated_hash(ticket+self.message_id):
RNS.log(f"Stamp on {self} validated by inbound ticket", RNS.LOG_DEBUG) # TODO: Remove at some point
self.stamp_value = LXMessage.COST_TICKET
return True
except Exception as e:
RNS.log(f"Error while validating ticket: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
if self.stamp == None:
return False
else:
workblock = LXStamper.stamp_workblock(self.message_id)
if LXMessage.stamp_valid(self.stamp, target_cost, workblock):
RNS.log(f"Stamp on {self} validated", RNS.LOG_DEBUG) # TODO: Remove at some point
self.stamp_value = LXStamper.stamp_value(workblock, self.stamp)
return True
else:
return False
def get_stamp(self, timeout=None):
# If an outbound ticket exists, use this for
# generating a valid stamp.
if self.outbound_ticket != None and type(self.outbound_ticket) == bytes and len(self.outbound_ticket) == LXMessage.TICKET_LENGTH:
generated_stamp = RNS.Identity.truncated_hash(self.outbound_ticket+self.message_id)
self.stamp_value = LXMessage.COST_TICKET
RNS.log(f"Generated stamp with outbound ticket {RNS.hexrep(self.outbound_ticket)} for {self}", RNS.LOG_DEBUG) # TODO: Remove at some point
return generated_stamp
# If no stamp cost is required, we can just
# return immediately.
elif self.stamp_cost == None:
self.stamp_value = None
return None
# If a stamp was already generated, return
# it immediately.
elif self.stamp != None:
return self.stamp
# Otherwise, we will need to generate a
# valid stamp according to the cost that
# the receiver has specified.
else:
generated_stamp, value = LXStamper.generate_stamp(self.message_id, self.stamp_cost)
if generated_stamp:
self.stamp_value = value
self.stamp_valid = True
return generated_stamp
else:
return None
def pack(self): def pack(self):
if not self.packed: if not self.packed:
if self.timestamp == None: if self.timestamp == None:
@ -350,40 +218,27 @@ class LXMessage:
self.hash = RNS.Identity.full_hash(hashed_part) self.hash = RNS.Identity.full_hash(hashed_part)
self.message_id = self.hash self.message_id = self.hash
if not self.defer_stamp:
self.stamp = self.get_stamp()
if self.stamp != None:
self.payload.append(self.stamp)
signed_part = b"" signed_part = b""
signed_part += hashed_part signed_part += hashed_part
signed_part += self.hash signed_part += self.hash
self.signature = self.__source.sign(signed_part) self.signature = self.__source.sign(signed_part)
self.signature_validated = True self.signature_validated = True
packed_payload = msgpack.packb(self.payload)
self.packed = b"" self.packed = b""
self.packed += self.__destination.hash self.packed += self.__destination.hash
self.packed += self.__source.hash self.packed += self.__source.hash
self.packed += self.signature self.packed += self.signature
packed_payload = msgpack.packb(self.payload)
self.packed += packed_payload self.packed += packed_payload
self.packed_size = len(self.packed) self.packed_size = len(self.packed)
content_size = len(packed_payload)-LXMessage.TIMESTAMP_SIZE-LXMessage.STRUCT_OVERHEAD content_size = len(packed_payload)
# If no desired delivery method has been defined, # If no desired delivery method has been defined,
# one will be chosen according to these rules: # one will be chosen according to these rules:
if self.desired_method == None: if self.desired_method == None:
self.desired_method = LXMessage.DIRECT self.desired_method = LXMessage.DIRECT
# TODO: Expand rules to something more intelligent
# If opportunistic delivery was requested, check
# that message will fit within packet size limits
if self.desired_method == LXMessage.OPPORTUNISTIC:
if self.__destination.type == RNS.Destination.SINGLE:
if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT:
RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
self.desired_method = LXMessage.DIRECT
# Set delivery parameters according to delivery method
if self.desired_method == LXMessage.OPPORTUNISTIC: if self.desired_method == LXMessage.OPPORTUNISTIC:
if self.__destination.type == RNS.Destination.SINGLE: if self.__destination.type == RNS.Destination.SINGLE:
single_packet_content_limit = LXMessage.ENCRYPTED_PACKET_MAX_CONTENT single_packet_content_limit = LXMessage.ENCRYPTED_PACKET_MAX_CONTENT
@ -391,7 +246,7 @@ class LXMessage:
single_packet_content_limit = LXMessage.PLAIN_PACKET_MAX_CONTENT single_packet_content_limit = LXMessage.PLAIN_PACKET_MAX_CONTENT
if content_size > single_packet_content_limit: if content_size > single_packet_content_limit:
raise TypeError(f"LXMessage desired opportunistic delivery method, but content of length {content_size} exceeds single-packet content limit of {single_packet_content_limit}.") raise TypeError("LXMessage desired opportunistic delivery method, but content exceeds single-packet size.")
else: else:
self.method = LXMessage.OPPORTUNISTIC self.method = LXMessage.OPPORTUNISTIC
self.representation = LXMessage.PACKET self.representation = LXMessage.PACKET
@ -410,7 +265,6 @@ class LXMessage:
single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
self.ratchet_id = self.__destination.latest_ratchet_id
self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]]) self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]])
content_size = len(self.propagation_packed) content_size = len(self.propagation_packed)
@ -425,7 +279,6 @@ class LXMessage:
paper_content_limit = LXMessage.PAPER_MDU paper_content_limit = LXMessage.PAPER_MDU
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
self.ratchet_id = self.__destination.latest_ratchet_id
self.paper_packed = self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data self.paper_packed = self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data
content_size = len(self.paper_packed) content_size = len(self.paper_packed)
@ -442,31 +295,23 @@ class LXMessage:
self.determine_transport_encryption() self.determine_transport_encryption()
if self.method == LXMessage.OPPORTUNISTIC: if self.method == LXMessage.OPPORTUNISTIC:
lxm_packet = self.__as_packet() self.__as_packet().send().set_delivery_callback(self.__mark_delivered)
lxm_packet.send().set_delivery_callback(self.__mark_delivered)
self.progress = 0.50
self.ratchet_id = lxm_packet.ratchet_id
self.state = LXMessage.SENT self.state = LXMessage.SENT
elif self.method == LXMessage.DIRECT: elif self.method == LXMessage.DIRECT:
self.state = LXMessage.SENDING self.state = LXMessage.SENDING
if self.representation == LXMessage.PACKET: if self.representation == LXMessage.PACKET:
lxm_packet = self.__as_packet() receipt = self.__as_packet().send()
receipt = lxm_packet.send()
self.ratchet_id = self.__delivery_destination.link_id
if receipt: if receipt:
receipt.set_delivery_callback(self.__mark_delivered) receipt.set_delivery_callback(self.__mark_delivered)
receipt.set_timeout_callback(self.__link_packet_timed_out) receipt.set_timeout_callback(self.__link_packet_timed_out)
self.progress = 0.50
else: else:
if self.__delivery_destination: if self.__delivery_destination:
self.__delivery_destination.teardown() self.__delivery_destination.teardown()
elif self.representation == LXMessage.RESOURCE: elif self.representation == LXMessage.RESOURCE:
self.resource_representation = self.__as_resource() self.resource_representation = self.__as_resource()
self.ratchet_id = self.__delivery_destination.link_id
self.progress = 0.10
elif self.method == LXMessage.PROPAGATED: elif self.method == LXMessage.PROPAGATED:
self.state = LXMessage.SENDING self.state = LXMessage.SENDING
@ -476,19 +321,14 @@ class LXMessage:
if receipt: if receipt:
receipt.set_delivery_callback(self.__mark_propagated) receipt.set_delivery_callback(self.__mark_propagated)
receipt.set_timeout_callback(self.__link_packet_timed_out) receipt.set_timeout_callback(self.__link_packet_timed_out)
self.progress = 0.50
else: else:
self.__delivery_destination.teardown() self.__delivery_destination.teardown()
elif self.representation == LXMessage.RESOURCE: elif self.representation == LXMessage.RESOURCE:
self.resource_representation = self.__as_resource() self.resource_representation = self.__as_resource()
self.progress = 0.10
def determine_transport_encryption(self): def determine_transport_encryption(self):
# TODO: These descriptions are old and outdated.
# Update the transport encryption descriptions to
# account for ratchets and other changes.
if self.method == LXMessage.OPPORTUNISTIC: if self.method == LXMessage.OPPORTUNISTIC:
if self.__destination.type == RNS.Destination.SINGLE: if self.__destination.type == RNS.Destination.SINGLE:
self.transport_encrypted = True self.transport_encrypted = True
@ -529,47 +369,37 @@ class LXMessage:
def __mark_delivered(self, receipt = None): def __mark_delivered(self, receipt = None):
RNS.log("Received delivery notification for "+str(self), RNS.LOG_DEBUG) RNS.log("Received delivery notification for "+str(self), RNS.LOG_DEBUG)
self.state = LXMessage.DELIVERED self.state = LXMessage.DELIVERED
self.progress = 1.0
if self.__delivery_callback != None and callable(self.__delivery_callback): if self.__delivery_callback != None and callable(self.__delivery_callback):
try: try:
self.__delivery_callback(self) self.__delivery_callback(self)
except Exception as e: except Exception as e:
RNS.log("An error occurred in the external delivery callback for "+str(self), RNS.LOG_ERROR) RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
RNS.trace_exception(e)
def __mark_propagated(self, receipt = None): def __mark_propagated(self, receipt = None):
RNS.log("Received propagation success notification for "+str(self), RNS.LOG_DEBUG) RNS.log("Received propagation success notification for "+str(self), RNS.LOG_DEBUG)
self.state = LXMessage.SENT self.state = LXMessage.SENT
self.progress = 1.0
if self.__delivery_callback != None and callable(self.__delivery_callback): if self.__delivery_callback != None and callable(self.__delivery_callback):
try: try:
self.__delivery_callback(self) self.__delivery_callback(self)
except Exception as e: except Exception as e:
RNS.log("An error occurred in the external delivery callback for "+str(self), RNS.LOG_ERROR) RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
RNS.trace_exception(e)
def __mark_paper_generated(self, receipt = None): def __mark_paper_generated(self, receipt = None):
RNS.log("Paper message generation succeeded for "+str(self), RNS.LOG_DEBUG) RNS.log("Paper message generation succeeded for "+str(self), RNS.LOG_DEBUG)
self.state = LXMessage.PAPER self.state = LXMessage.PAPER
self.progress = 1.0
if self.__delivery_callback != None and callable(self.__delivery_callback): if self.__delivery_callback != None and callable(self.__delivery_callback):
try: try:
self.__delivery_callback(self) self.__delivery_callback(self)
except Exception as e: except Exception as e:
RNS.log("An error occurred in the external delivery callback for "+str(self), RNS.LOG_ERROR) RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
RNS.trace_exception(e)
def __resource_concluded(self, resource): def __resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
self.__mark_delivered() self.__mark_delivered()
else: else:
if resource.status == RNS.Resource.REJECTED:
self.state = LXMessage.REJECTED
elif self.state != LXMessage.CANCELLED:
resource.link.teardown() resource.link.teardown()
self.state = LXMessage.OUTBOUND self.state = LXMessage.OUTBOUND
@ -577,19 +407,18 @@ class LXMessage:
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
self.__mark_propagated() self.__mark_propagated()
else: else:
if self.state != LXMessage.CANCELLED:
resource.link.teardown() resource.link.teardown()
self.state = LXMessage.OUTBOUND self.state = LXMessage.OUTBOUND
def __link_packet_timed_out(self, packet_receipt): def __link_packet_timed_out(self, packet_receipt):
if self.state != LXMessage.CANCELLED:
if packet_receipt: if packet_receipt:
packet_receipt.destination.teardown() packet_receipt.destination.teardown()
self.state = LXMessage.OUTBOUND self.state = LXMessage.OUTBOUND
def __update_transfer_progress(self, resource): def __update_transfer_progress(self, resource):
self.progress = 0.10 + (resource.get_progress()*0.90) self.progress = resource.get_progress()
def __as_packet(self): def __as_packet(self):
if not self.packed: if not self.packed:
@ -618,6 +447,8 @@ class LXMessage:
if not self.__delivery_destination.status == RNS.Link.ACTIVE: if not self.__delivery_destination.status == RNS.Link.ACTIVE:
raise ConnectionError("Tried to synthesize resource for LXMF message on a link that was not active") raise ConnectionError("Tried to synthesize resource for LXMF message on a link that was not active")
self.progress = 0.0
if self.method == LXMessage.DIRECT: if self.method == LXMessage.DIRECT:
return RNS.Resource(self.packed, self.__delivery_destination, callback = self.__resource_concluded, progress_callback = self.__update_transfer_progress) return RNS.Resource(self.packed, self.__delivery_destination, callback = self.__resource_concluded, progress_callback = self.__update_transfer_progress)
elif self.method == LXMessage.PROPAGATED: elif self.method == LXMessage.PROPAGATED:
@ -656,6 +487,7 @@ class LXMessage:
return None return None
def as_uri(self, finalise=True): def as_uri(self, finalise=True):
self.determine_transport_encryption()
if not self.packed: if not self.packed:
self.pack() self.pack()
@ -667,7 +499,6 @@ class LXMessage:
lxm_uri = LXMessage.URI_SCHEMA+"://"+encoded_bytes.decode("utf-8").replace("=","") lxm_uri = LXMessage.URI_SCHEMA+"://"+encoded_bytes.decode("utf-8").replace("=","")
if finalise: if finalise:
self.determine_transport_encryption()
self.__mark_paper_generated() self.__mark_paper_generated()
return lxm_uri return lxm_uri
@ -676,6 +507,7 @@ class LXMessage:
raise TypeError("Attempt to represent LXM with non-paper delivery method as URI") raise TypeError("Attempt to represent LXM with non-paper delivery method as URI")
def as_qr(self): def as_qr(self):
self.determine_transport_encryption()
if not self.packed: if not self.packed:
self.pack() self.pack()
@ -690,7 +522,6 @@ class LXMessage:
data = self.as_uri(finalise=False), data = self.as_uri(finalise=False),
) )
self.determine_transport_encryption()
self.__mark_paper_generated() self.__mark_paper_generated()
return qr return qr
@ -709,19 +540,10 @@ class LXMessage:
source_hash = lxmf_bytes[LXMessage.DESTINATION_LENGTH:2*LXMessage.DESTINATION_LENGTH] source_hash = lxmf_bytes[LXMessage.DESTINATION_LENGTH:2*LXMessage.DESTINATION_LENGTH]
signature = lxmf_bytes[2*LXMessage.DESTINATION_LENGTH:2*LXMessage.DESTINATION_LENGTH+LXMessage.SIGNATURE_LENGTH] signature = lxmf_bytes[2*LXMessage.DESTINATION_LENGTH:2*LXMessage.DESTINATION_LENGTH+LXMessage.SIGNATURE_LENGTH]
packed_payload = lxmf_bytes[2*LXMessage.DESTINATION_LENGTH+LXMessage.SIGNATURE_LENGTH:] packed_payload = lxmf_bytes[2*LXMessage.DESTINATION_LENGTH+LXMessage.SIGNATURE_LENGTH:]
unpacked_payload = msgpack.unpackb(packed_payload)
# Extract stamp from payload if included
if len(unpacked_payload) > 4:
stamp = unpacked_payload[4]
unpacked_payload = unpacked_payload[:4]
packed_payload = msgpack.packb(unpacked_payload)
else:
stamp = None
hashed_part = b"" + destination_hash + source_hash + packed_payload hashed_part = b"" + destination_hash + source_hash + packed_payload
message_hash = RNS.Identity.full_hash(hashed_part) message_hash = RNS.Identity.full_hash(hashed_part)
signed_part = b"" + hashed_part + message_hash signed_part = b"" + hashed_part + message_hash
unpacked_payload = msgpack.unpackb(packed_payload)
timestamp = unpacked_payload[0] timestamp = unpacked_payload[0]
title_bytes = unpacked_payload[1] title_bytes = unpacked_payload[1]
content_bytes = unpacked_payload[2] content_bytes = unpacked_payload[2]
@ -750,9 +572,7 @@ class LXMessage:
desired_method = original_method) desired_method = original_method)
message.hash = message_hash message.hash = message_hash
message.message_id = message.hash
message.signature = signature message.signature = signature
message.stamp = stamp
message.incoming = True message.incoming = True
message.timestamp = timestamp message.timestamp = timestamp
message.packed = lxmf_bytes message.packed = lxmf_bytes

View File

@ -1,328 +0,0 @@
import RNS
import RNS.vendor.umsgpack as msgpack
import os
import time
import multiprocessing
WORKBLOCK_EXPAND_ROUNDS = 3000
active_jobs = {}
def stamp_workblock(message_id):
wb_st = time.time()
expand_rounds = WORKBLOCK_EXPAND_ROUNDS
workblock = b""
for n in range(expand_rounds):
workblock += RNS.Cryptography.hkdf(
length=256,
derive_from=message_id,
salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)),
context=None,
)
wb_time = time.time() - wb_st
RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)
return workblock
def stamp_value(workblock, stamp):
value = 0
bits = 256
material = RNS.Identity.full_hash(workblock+stamp)
i = int.from_bytes(material, byteorder="big")
while ((i & (1 << (bits - 1))) == 0):
i = (i << 1)
value += 1
return value
def generate_stamp(message_id, stamp_cost):
RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG)
workblock = stamp_workblock(message_id)
start_time = time.time()
stamp = None
rounds = 0
value = 0
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
stamp, rounds = job_simple(stamp_cost, workblock, message_id)
elif RNS.vendor.platformutils.is_android():
stamp, rounds = job_android(stamp_cost, workblock, message_id)
else:
stamp, rounds = job_linux(stamp_cost, workblock, message_id)
duration = time.time() - start_time
speed = rounds/duration
if stamp != None:
value = stamp_value(workblock, stamp)
RNS.log(f"Stamp with value {value} generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG)
return stamp, value
def cancel_work(message_id):
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
try:
if message_id in active_jobs:
active_jobs[message_id] = True
except Exception as e:
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
elif RNS.vendor.platformutils.is_android():
try:
if message_id in active_jobs:
active_jobs[message_id] = True
except Exception as e:
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
else:
try:
if message_id in active_jobs:
stop_event = active_jobs[message_id][0]
result_queue = active_jobs[message_id][1]
stop_event.set()
result_queue.put(None)
active_jobs.pop(message_id)
except Exception as e:
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def job_simple(stamp_cost, workblock, message_id):
# A simple, single-process stamp generator.
# should work on any platform, and is used
# as a fall-back, in case of limited multi-
# processing and/or acceleration support.
platform = RNS.vendor.platformutils.get_platform()
RNS.log(f"Running stamp generation on {platform}, work limited to single CPU core. This will be slower than ideal.", RNS.LOG_WARNING)
rounds = 0
pstamp = os.urandom(256//8)
st = time.time()
active_jobs[message_id] = False;
def sv(s, c, w):
target = 0b1<<256-c; m = w+s
result = RNS.Identity.full_hash(m)
if int.from_bytes(result, byteorder="big") > target:
return False
else:
return True
while not sv(pstamp, stamp_cost, workblock) and not active_jobs[message_id]:
pstamp = os.urandom(256//8); rounds += 1
if rounds % 2500 == 0:
speed = rounds / (time.time()-st)
RNS.log(f"Stamp generation running. {rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG)
if active_jobs[message_id] == True:
pstamp = None
active_jobs.pop(message_id)
return pstamp, rounds
def job_linux(stamp_cost, workblock, message_id):
allow_kill = True
stamp = None
total_rounds = 0
jobs = multiprocessing.cpu_count()
stop_event = multiprocessing.Event()
result_queue = multiprocessing.Queue(1)
rounds_queue = multiprocessing.Queue()
def job(stop_event, pn, sc, wb):
terminated = False
rounds = 0
pstamp = os.urandom(256//8)
def sv(s, c, w):
target = 0b1<<256-c; m = w+s
result = RNS.Identity.full_hash(m)
if int.from_bytes(result, byteorder="big") > target:
return False
else:
return True
while not stop_event.is_set() and not sv(pstamp, sc, wb):
pstamp = os.urandom(256//8); rounds += 1
if not stop_event.is_set():
stop_event.set()
result_queue.put(pstamp)
rounds_queue.put(rounds)
job_procs = []
RNS.log(f"Starting {jobs} stamp generation workers", RNS.LOG_DEBUG)
for jpn in range(jobs):
process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event, "pn": jpn, "sc": stamp_cost, "wb": workblock}, daemon=True)
job_procs.append(process)
process.start()
active_jobs[message_id] = [stop_event, result_queue]
stamp = result_queue.get()
RNS.log("Got stamp result from worker", RNS.LOG_DEBUG) # TODO: Remove
# Collect any potential spurious
# results from worker queue.
try:
while True:
result_queue.get_nowait()
except:
pass
for j in range(jobs):
nrounds = 0
try:
nrounds = rounds_queue.get(timeout=2)
except Exception as e:
RNS.log(f"Failed to get round stats part {j}: {e}", RNS.LOG_ERROR)
total_rounds += nrounds
all_exited = False
exit_timeout = time.time() + 5
while time.time() < exit_timeout:
if not any(p.is_alive() for p in job_procs):
all_exited = True
break
time.sleep(0.1)
if not all_exited:
RNS.log("Stamp generation IPC timeout, possible worker deadlock. Terminating remaining processes.", RNS.LOG_ERROR)
if allow_kill:
for j in range(jobs):
process = job_procs[j]
process.kill()
else:
return None
else:
for j in range(jobs):
process = job_procs[j]
process.join()
# RNS.log(f"Joined {j} / {process}", RNS.LOG_DEBUG) # TODO: Remove
return stamp, total_rounds
def job_android(stamp_cost, workblock, message_id):
# Semaphore support is flaky to non-existent on
# Android, so we need to manually dispatch and
# manage workloads here, while periodically
# checking in on the progress.
stamp = None
start_time = time.time()
total_rounds = 0
rounds_per_worker = 1000
use_nacl = False
try:
import nacl.encoding
import nacl.hash
use_nacl = True
except:
pass
if use_nacl:
def full_hash(m):
return nacl.hash.sha256(m, encoder=nacl.encoding.RawEncoder)
else:
def full_hash(m):
return RNS.Identity.full_hash(m)
def sv(s, c, w):
target = 0b1<<256-c
m = w+s
result = full_hash(m)
if int.from_bytes(result, byteorder="big") > target:
return False
else:
return True
wm = multiprocessing.Manager()
jobs = multiprocessing.cpu_count()
def job(procnum=None, results_dict=None, wb=None, sc=None, jr=None):
# RNS.log(f"Worker {procnum} starting for {jr} rounds...") # TODO: Remove
try:
rounds = 0
found_stamp = None
while True:
pstamp = os.urandom(256//8)
rounds += 1
if sv(pstamp, sc, wb):
found_stamp = pstamp
break
if rounds >= jr:
# RNS.log(f"Worker {procnum} found no result in {rounds} rounds") # TODO: Remove
break
results_dict[procnum] = [found_stamp, rounds]
except Exception as e:
RNS.log(f"Stamp generation worker error: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
active_jobs[message_id] = False;
RNS.log(f"Dispatching {jobs} workers for stamp generation...", RNS.LOG_DEBUG) # TODO: Remove
results_dict = wm.dict()
while stamp == None and active_jobs[message_id] == False:
job_procs = []
try:
for pnum in range(jobs):
pargs = {"procnum":pnum, "results_dict": results_dict, "wb": workblock, "sc":stamp_cost, "jr":rounds_per_worker}
process = multiprocessing.Process(target=job, kwargs=pargs)
job_procs.append(process)
process.start()
for process in job_procs:
process.join()
for j in results_dict:
r = results_dict[j]
total_rounds += r[1]
if r[0] != None:
stamp = r[0]
if stamp == None:
elapsed = time.time() - start_time
speed = total_rounds/elapsed
RNS.log(f"Stamp generation running. {total_rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG)
except Exception as e:
RNS.log(f"Stamp generation job error: {e}")
RNS.trace_exception(e)
active_jobs.pop(message_id)
return stamp, total_rounds
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
RNS.log("No cost argument provided", RNS.LOG_ERROR)
exit(1)
else:
try:
cost = int(sys.argv[1])
except Exception as e:
RNS.log(f"Invalid cost argument provided: {e}", RNS.LOG_ERROR)
exit(1)
RNS.loglevel = RNS.LOG_DEBUG
RNS.log("Testing LXMF stamp generation", RNS.LOG_DEBUG)
message_id = os.urandom(32)
generate_stamp(message_id, cost)

View File

@ -35,7 +35,6 @@ import time
import os import os
from LXMF._version import __version__ from LXMF._version import __version__
from LXMF import APP_NAME
from RNS.vendor.configobj import ConfigObj from RNS.vendor.configobj import ConfigObj
@ -79,13 +78,6 @@ def apply_config():
else: else:
active_configuration["peer_announce_interval"] = None active_configuration["peer_announce_interval"] = None
if "lxmf" in lxmd_config and "delivery_transfer_max_accepted_size" in lxmd_config["lxmf"]:
active_configuration["delivery_transfer_max_accepted_size"] = lxmd_config["lxmf"].as_float("delivery_transfer_max_accepted_size")
if active_configuration["delivery_transfer_max_accepted_size"] < 0.38:
active_configuration["delivery_transfer_max_accepted_size"] = 0.38
else:
active_configuration["delivery_transfer_max_accepted_size"] = 1000
if "lxmf" in lxmd_config and "on_inbound" in lxmd_config["lxmf"]: if "lxmf" in lxmd_config and "on_inbound" in lxmd_config["lxmf"]:
active_configuration["on_inbound"] = lxmd_config["lxmf"]["on_inbound"] active_configuration["on_inbound"] = lxmd_config["lxmf"]["on_inbound"]
else: else:
@ -127,38 +119,13 @@ def apply_config():
if active_configuration["message_storage_limit"] < 0.005: if active_configuration["message_storage_limit"] < 0.005:
active_configuration["message_storage_limit"] = 0.005 active_configuration["message_storage_limit"] = 0.005
else: else:
active_configuration["message_storage_limit"] = 500 active_configuration["message_storage_limit"] = 2000
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
if active_configuration["propagation_transfer_max_accepted_size"] < 0.38:
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
else:
active_configuration["propagation_transfer_max_accepted_size"] = 256
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
else: else:
active_configuration["prioritised_lxmf_destinations"] = [] active_configuration["prioritised_lxmf_destinations"] = []
if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]:
static_peers = lxmd_config["propagation"].as_list("static_peers")
active_configuration["static_peers"] = []
for static_peer in static_peers:
active_configuration["static_peers"].append(bytes.fromhex(static_peer))
else:
active_configuration["static_peers"] = []
if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]:
active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers")
else:
active_configuration["max_peers"] = None
if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]:
active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only")
else:
active_configuration["from_static_only"] = False
# Load various settings # Load various settings
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]: if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
targetloglevel = lxmd_config["logging"].as_int("loglevel") targetloglevel = lxmd_config["logging"].as_int("loglevel")
@ -237,6 +204,8 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
global lxmd_config, active_configuration, targetloglevel global lxmd_config, active_configuration, targetloglevel
global message_router, lxmf_destination global message_router, lxmf_destination
targetloglevel = 3+verbosity-quietness
if service: if service:
targetlogdest = RNS.LOG_FILE targetlogdest = RNS.LOG_FILE
targetloglevel = None targetloglevel = None
@ -265,12 +234,6 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
if not os.path.isdir(lxmdir): if not os.path.isdir(lxmdir):
os.makedirs(lxmdir) os.makedirs(lxmdir)
if not os.path.isfile(configpath):
RNS.log("Could not load config file, creating default configuration file...")
create_default_config(configpath)
RNS.log("Default config file created. Make any necessary changes in "+configpath+" and restart lxmd if needed.")
time.sleep(1.5)
if os.path.isfile(configpath): if os.path.isfile(configpath):
try: try:
lxmd_config = ConfigObj(configpath) lxmd_config = ConfigObj(configpath)
@ -278,16 +241,15 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
RNS.log("Could not parse the configuration at "+configpath, RNS.LOG_ERROR) RNS.log("Could not parse the configuration at "+configpath, RNS.LOG_ERROR)
RNS.log("Check your configuration file for errors!", RNS.LOG_ERROR) RNS.log("Check your configuration file for errors!", RNS.LOG_ERROR)
RNS.panic() RNS.panic()
else:
RNS.log("Could not load config file, creating default configuration file...")
create_default_config(configpath)
RNS.log("Default config file created. Make any necessary changes in "+configpath+" and restart Reticulum if needed.")
time.sleep(1.5)
apply_config() apply_config()
RNS.log("Configuration loaded from "+configpath, RNS.LOG_VERBOSE) RNS.log("Configuration loaded from "+configpath, RNS.LOG_VERBOSE)
if targetloglevel == None:
targetloglevel = 3
if verbosity != 0 or quietness != 0:
targetloglevel = targetloglevel+verbosity-quietness
# Start Reticulum # Start Reticulum
RNS.log("Substantiating Reticulum...") RNS.log("Substantiating Reticulum...")
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest) reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
@ -322,12 +284,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
storagepath = storagedir, storagepath = storagedir,
autopeer = active_configuration["autopeer"], autopeer = active_configuration["autopeer"],
autopeer_maxdepth = active_configuration["autopeer_maxdepth"], autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"], )
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
max_peers = active_configuration["max_peers"],
static_peers = active_configuration["static_peers"],
from_static_only = active_configuration["from_static_only"])
message_router.register_delivery_callback(lxmf_delivery) message_router.register_delivery_callback(lxmf_delivery)
for destination_hash in active_configuration["ignored_lxmf_destinations"]: for destination_hash in active_configuration["ignored_lxmf_destinations"]:
@ -345,10 +302,6 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
# Set up authentication # Set up authentication
if active_configuration["auth_required"]: if active_configuration["auth_required"]:
message_router.set_authentication(required=True) message_router.set_authentication(required=True)
if len(active_configuration["allowed_identities"]) == 0:
RNS.log("Clint authentication was enabled, but no identity hashes could be loaded from "+str(allowedpath)+". Nobody will be able to sync messages from this propagation node.", RNS.LOG_WARNING)
for identity_hash in active_configuration["allowed_identities"]: for identity_hash in active_configuration["allowed_identities"]:
message_router.allow(identity_hash) message_router.allow(identity_hash)
@ -369,6 +322,9 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
RNS.log("LXMF Propagation Node started on "+RNS.prettyhexrep(message_router.propagation_destination.hash)) RNS.log("LXMF Propagation Node started on "+RNS.prettyhexrep(message_router.propagation_destination.hash))
if len(active_configuration["allowed_identities"]) == 0:
RNS.log("Clint authentication was enabled, but no identity hashes could be loaded from "+str(allowedpath)+". Nobody will be able to sync messages from this propagation node.", RNS.LOG_WARNING)
RNS.log("Started lxmd version {version}".format(version=__version__), RNS.LOG_NOTICE) RNS.log("Started lxmd version {version}".format(version=__version__), RNS.LOG_NOTICE)
threading.Thread(target=deferred_start_jobs, daemon=True).start() threading.Thread(target=deferred_start_jobs, daemon=True).start()
@ -382,15 +338,13 @@ def jobs():
while True: while True:
try: try:
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]: if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE) RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
message_router.announce(lxmf_destination.hash) message_router.announce(lxmf_destination.hash)
last_peer_announce = time.time() last_peer_announce = time.time()
if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None:
if time.time() > last_node_announce + active_configuration["node_announce_interval"]: if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE) RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
message_router.announce_propagation_node() message_router.announce_propagation_node()
last_node_announce = time.time() last_node_announce = time.time()
@ -403,7 +357,7 @@ def deferred_start_jobs():
global active_configuration, last_peer_announce, last_node_announce global active_configuration, last_peer_announce, last_node_announce
global message_router, lxmf_destination global message_router, lxmf_destination
time.sleep(DEFFERED_JOBS_DELAY) time.sleep(DEFFERED_JOBS_DELAY)
RNS.log("Running deferred start jobs", RNS.LOG_DEBUG) RNS.log("Running deferred start jobs")
if active_configuration["peer_announce_at_start"]: if active_configuration["peer_announce_at_start"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME) RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
message_router.announce(lxmf_destination.hash) message_router.announce(lxmf_destination.hash)
@ -416,190 +370,6 @@ def deferred_start_jobs():
last_node_announce = time.time() last_node_announce = time.time()
threading.Thread(target=jobs, daemon=True).start() threading.Thread(target=jobs, daemon=True).start()
def query_status(identity, timeout=5, exit_on_fail=False):
control_destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
timeout = time.time()+timeout
def check_timeout():
if time.time() > timeout:
if exit_on_fail:
RNS.log("Getting lxmd statistics timed out, exiting now", RNS.LOG_ERROR)
exit(200)
else:
return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
else:
time.sleep(0.1)
if not RNS.Transport.has_path(control_destination.hash):
RNS.Transport.request_path(control_destination.hash)
while not RNS.Transport.has_path(control_destination.hash):
tc = check_timeout()
if tc:
return tc
link = RNS.Link(control_destination)
while not link.status == RNS.Link.ACTIVE:
tc = check_timeout()
if tc:
return tc
link.identify(identity)
request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None)
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
tc = check_timeout()
if tc:
return tc
link.teardown()
return request_receipt.get_response()
def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = 0, timeout=5, show_status=False, show_peers=False, identity_path=None):
global configpath, identitypath, storagedir, lxmdir
global lxmd_config, active_configuration, targetloglevel
targetlogdest = RNS.LOG_STDOUT
if identity_path == None:
if configdir == None:
if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"):
configdir = "/etc/lxmd"
elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"):
configdir = RNS.Reticulum.userdir+"/.config/lxmd"
else:
configdir = RNS.Reticulum.userdir+"/.lxmd"
configpath = configdir+"/config"
identitypath = configdir+"/identity"
identity = None
if not os.path.isdir(configdir):
RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR)
exit(201)
if not os.path.isfile(identitypath):
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
exit(202)
else:
identity = RNS.Identity.from_file(identitypath)
if identity == None:
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
exit(4)
else:
if not os.path.isfile(identity_path):
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
exit(202)
else:
identity = RNS.Identity.from_file(identity_path)
if identity == None:
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
exit(4)
if targetloglevel == None:
targetloglevel = 3
if verbosity != 0 or quietness != 0:
targetloglevel = targetloglevel+verbosity-quietness
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
response = query_status(identity, timeout=timeout, exit_on_fail=True)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Remote received no identity")
exit(203)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
RNS.log("Access denied")
exit(204)
else:
s = response
mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2)
ms_util = f"{mutil}%"
if s["from_static_only"]:
who_str = "static peers only"
else:
who_str = "all nodes"
available_peers = 0
unreachable_peers = 0
peered_incoming = 0
peered_outgoing = 0
peered_rx_bytes = 0
peered_tx_bytes = 0
for peer_id in s["peers"]:
p = s["peers"][peer_id]
pm = p["messages"]
peered_incoming += pm["incoming"]
peered_outgoing += pm["outgoing"]
peered_rx_bytes += p["rx_bytes"]
peered_tx_bytes += p["tx_bytes"]
if p["alive"]:
available_peers += 1
else:
unreachable_peers += 1
total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"]
total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"]
df = round(peered_outgoing/total_incoming, 2)
dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"])
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
if show_status:
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit")
print(f"")
print(f"Peers : {stp} total (peer limit is {smp})")
print(f" {sdp} discovered, {ssp} static")
print(f" {available_peers} available, {unreachable_peers} unreachable")
print(f"")
print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})")
print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})")
print(f" {upi} messages received from unpeered nodes ({uprx})")
print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})")
print(f" {cprr} propagation messages received directly from clients")
print(f" {cprs} propagation messages served to clients")
print(f" Distribution factor is {df}")
print(f"")
if show_peers:
if not show_status:
print("")
for peer_id in s["peers"]:
ind = " "
p = s["peers"][peer_id]
if p["type"] == "static":
t = "Static peer "
elif p["type"] == "discovered":
t = "Discovered peer "
else:
t = "Unknown peer "
a = "Available" if p["alive"] == True else "Unreachable"
h = max(time.time()-p["last_heard"], 0)
hops = p["network_distance"]
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
pm = p["messages"]
if p["last_sync_attempt"] != 0:
lsa = p["last_sync_attempt"]
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
else:
ls = "never synced"
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000)
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
pmi = pm["incoming"]; pmuh = pm["unhandled"]
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit")
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming")
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
ms = "" if pm["unhandled"] == 1 else "s"
print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}")
print("")
def main(): def main():
try: try:
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon") parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
@ -610,10 +380,6 @@ def main():
parser.add_argument("-v", "--verbose", action="count", default=0) parser.add_argument("-v", "--verbose", action="count", default=0)
parser.add_argument("-q", "--quiet", action="count", default=0) parser.add_argument("-q", "--quiet", action="count", default=0)
parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file") parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file")
parser.add_argument("--status", action="store_true", default=False, help="display node status")
parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes")
parser.add_argument("--timeout", action="store", default=5, help="timeout in seconds for query operations", type=float)
parser.add_argument("--identity", action="store", default=None, help="path to identity used for query request", type=str)
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit") parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__)) parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
@ -623,24 +389,15 @@ def main():
print(__default_lxmd_config__) print(__default_lxmd_config__)
exit() exit()
if args.status or args.peers: program_setup(
get_status(configdir = args.config, configdir = args.config,
rnsconfigdir=args.rnsconfig,
verbosity=args.verbose,
quietness=args.quiet,
timeout=args.timeout,
show_status=args.status,
show_peers=args.peers,
identity_path=args.identity)
exit()
program_setup(configdir = args.config,
rnsconfigdir=args.rnsconfig, rnsconfigdir=args.rnsconfig,
run_pn=args.propagation_node, run_pn=args.propagation_node,
on_inbound=args.on_inbound, on_inbound=args.on_inbound,
verbosity=args.verbose, verbosity=args.verbose,
quietness=args.quiet, quietness=args.quiet,
service=args.service) service=args.service
)
except KeyboardInterrupt: except KeyboardInterrupt:
print("") print("")
@ -653,41 +410,23 @@ __default_lxmd_config__ = """# This is an example LXM Daemon config file.
[propagation] [propagation]
# Whether to enable propagation node # Whether to enable propagation node
enable_node = no enable_node = no
# Automatic announce interval in minutes. # Automatic announce interval in minutes.
# 6 hours by default. # 6 hours by default.
announce_interval = 360 announce_interval = 360
# Whether to announce when the node starts. # Whether to announce when the node starts.
announce_at_start = yes announce_at_start = yes
# Wheter to automatically peer with other # Wheter to automatically peer with other
# propagation nodes on the network. # propagation nodes on the network.
autopeer = yes autopeer = yes
# The maximum peering depth (in hops) for # The maximum peering depth (in hops) for
# automatically peered nodes. # automatically peered nodes.
autopeer_maxdepth = 4 autopeer_maxdepth = 4
# The maximum accepted transfer size per in-
# coming propagation transfer, in kilobytes.
# This also sets the upper limit for the size
# of single messages accepted onto this node.
#
# If a node wants to propagate a larger number
# of messages to this node, than what can fit
# within this limit, it will prioritise sending
# the smallest messages first, and try again
# with any remaining messages at a later point.
propagation_transfer_max_accepted_size = 256
# The maximum amount of storage to use for # The maximum amount of storage to use for
# the LXMF Propagation Node message store, # the LXMF Propagation Node message store,
# specified in megabytes. When this limit # specified in megabytes. When this limit
@ -696,9 +435,8 @@ propagation_transfer_max_accepted_size = 256
# LXMF prioritises keeping messages that are # LXMF prioritises keeping messages that are
# new and small. Large and old messages will # new and small. Large and old messages will
# be removed first. This setting is optional # be removed first. This setting is optional
# and defaults to 500 megabytes. # and defaults to 2 gigabytes.
# message_storage_limit = 2000
# message_storage_limit = 500
# You can tell the LXMF message router to # You can tell the LXMF message router to
# prioritise storage for one or more # prioritise storage for one or more
@ -707,35 +445,14 @@ propagation_transfer_max_accepted_size = 256
# keeping messages for destinations specified # keeping messages for destinations specified
# with this option. This setting is optional, # with this option. This setting is optional,
# and generally you do not need to use it. # and generally you do not need to use it.
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf # prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
# You can configure the maximum number of other
# propagation nodes that this node will peer
# with automatically. The default is 50.
# max_peers = 25
# You can configure a list of static propagation
# node peers, that this node will always be
# peered with, by specifying a list of
# destination hashes.
# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4
# You can configure the propagation node to
# only accept incoming propagation messages
# from configured static peers.
# from_static_only = True
# By default, any destination is allowed to # By default, any destination is allowed to
# connect and download messages, but you can # connect and download messages, but you can
# optionally restrict this. If you enable # optionally restrict this. If you enable
# authentication, you must provide a list of # authentication, you must provide a list of
# allowed identity hashes in the a file named # allowed identity hashes in the a file named
# "allowed" in the lxmd config directory. # "allowed" in the lxmd config directory.
auth_required = no auth_required = no
@ -744,35 +461,23 @@ auth_required = no
# The LXM Daemon will create an LXMF destination # The LXM Daemon will create an LXMF destination
# that it can receive messages on. This option sets # that it can receive messages on. This option sets
# the announced display name for this destination. # the announced display name for this destination.
display_name = Anonymous Peer display_name = Anonymous Peer
# It is possible to announce the internal LXMF # It is possible to announce the internal LXMF
# destination when the LXM Daemon starts up. # destination when the LXM Daemon starts up.
announce_at_start = no announce_at_start = no
# You can also announce the delivery destination # You can also announce the delivery destination
# at a specified interval. This is not enabled by # at a specified interval. This is not enabled by
# default. # default.
# announce_interval = 360 # announce_interval = 360
# The maximum accepted unpacked size for mes-
# sages received directly from other peers,
# specified in kilobytes. Messages larger than
# this will be rejected before the transfer
# begins.
delivery_transfer_max_accepted_size = 1000
# You can configure an external program to be run # You can configure an external program to be run
# every time a message is received. The program # every time a message is received. The program
# will receive as an argument the full path to the # will receive as an argument the full path to the
# message saved as a file. The example below will # message saved as a file. The example below will
# simply result in the message getting deleted as # simply result in the message getting deleted as
# soon as it has been received. # soon as it has been received.
# on_inbound = rm # on_inbound = rm
@ -786,7 +491,6 @@ delivery_transfer_max_accepted_size = 1000
# 5: Verbose logging # 5: Verbose logging
# 6: Debug logging # 6: Debug logging
# 7: Extreme logging # 7: Extreme logging
loglevel = 4 loglevel = 4
""" """

View File

@ -2,7 +2,6 @@ import os
import glob import glob
from .LXMessage import LXMessage from .LXMessage import LXMessage
from .LXMRouter import LXMRouter from .LXMRouter import LXMRouter
from .LXMF import *
from ._version import __version__ from ._version import __version__

View File

@ -1 +1 @@
__version__ = "0.6.3" __version__ = "0.2.9"

View File

@ -7,17 +7,8 @@ LXMF is efficient enough that it can deliver messages over extremely low-bandwid
User-facing clients built on LXMF include: User-facing clients built on LXMF include:
- [Sideband](https://unsigned.io/sideband) - [Sideband](https://unsigned.io/sideband)
- [MeshChat](https://github.com/liamcottle/reticulum-meshchat)
- [Nomad Network](https://unsigned.io/nomadnet) - [Nomad Network](https://unsigned.io/nomadnet)
- [Nexus Messenger](https://github.com/HarlekinSimplex/nexus_messenger)
Community-provided tools and utilities for LXMF include:
- [LXMFy](https://lxmfy.quad4.io/)
- [LXMF-Bot](https://github.com/randogoth/lxmf-bot)
- [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard)
- [LXMEvent](https://github.com/faragher/LXMEvent)
- [RangeMap](https://github.com/faragher/RangeMap)
- [LXMF Tools](https://github.com/SebastianObi/LXMF-Tools)
## Structure ## Structure
@ -38,10 +29,10 @@ LXMF messages are stored in a simple and efficient format, that's easy to parse
1. A LXMF message is identified by its __message-id__, which is a SHA-256 hash of the __Destination__, __Source__ and __Payload__. The message-id is never included directly in the message, since it can always be inferred from the message itself. 1. A LXMF message is identified by its __message-id__, which is a SHA-256 hash of the __Destination__, __Source__ and __Payload__. The message-id is never included directly in the message, since it can always be inferred from the message itself.
In some cases the actual message-id cannot be inferred, for example when a Propagation Node is storing an encrypted message for an offline user. In these cases a _transient-id_ is used to identify the message while in storage or transit. In some cases the actual message-id cannot be inferred, for example when a Propagation Node is storing an encrypted message for an offline user. In theses cases a _transient-id_ is used to identify the message while in storage or transit.
2. __Destination__, __Source__, __Signature__ and __Payload__ parts are mandatory, as is the __Timestamp__ part of the payload. 2. __Destination__, __Source__, __Signature__ and __Payload__ parts are mandatory, as is the __Timestamp__ part of the payload.
- The __Destination__ and __Source__ fields are 16-byte Reticulum destination hashes - The __Destination__ and __Source__ fields are 10-byte Reticulum destination hashes
- The __Signature__ field is a 64-byte Ed25519 signature of the __Destination__, __Source__, __Payload__ and __message-id__ - The __Signature__ field is a 64-byte Ed25519 signature of the __Destination__, __Source__, __Payload__ and __message-id__
- The __Payload__ part is a [msgpacked](https://msgpack.org) list containing four items: - The __Payload__ part is a [msgpacked](https://msgpack.org) list containing four items:
1. The __Timestamp__ is a double-precision floating point number representing the number of seconds since the UNIX epoch. 1. The __Timestamp__ is a double-precision floating point number representing the number of seconds since the UNIX epoch.
@ -55,7 +46,7 @@ LXMF messages are stored in a simple and efficient format, that's easy to parse
## Usage Examples ## Usage Examples
LXMF offers flexibility to implement many different messaging schemes, ranging from human communication to machine control and sensor monitoring. Here are a few examples: LXMF offers flexibility to implement many different messaging schemes, ranging from human communication to machine control and sensor monitoring. Here's a few examples:
- A messaging system for passing short, simple messages between human users, akin to SMS can be implemented using only the __Content__ field, and leaving all other optional fields empty. - A messaging system for passing short, simple messages between human users, akin to SMS can be implemented using only the __Content__ field, and leaving all other optional fields empty.
@ -94,11 +85,11 @@ The LXM Router then handles the heavy lifting, such as message packing, encrypti
LXMF uses encryption provided by [Reticulum](https://reticulum.network), and thus uses end-to-end encryption by default. The delivery method of a message will influence which transport encryption scheme is used. LXMF uses encryption provided by [Reticulum](https://reticulum.network), and thus uses end-to-end encryption by default. The delivery method of a message will influence which transport encryption scheme is used.
- If a message is delivered over a Reticulum link (which is the default method), the message will be encrypted with ephemeral AES-128 keys derived with ECDH on Curve25519. This mode offers forward secrecy. - A message can be delivered opportunistically, embedded in a single Reticulum packet. In this cases the message will be opportunistically routed through the network, and will be encrypted with _ephemeral_ keys derived with _ECDH_ on _Curve25519_. This mode offers Perfect Forward Secrecy.
- A message can be delivered opportunistically, embedded in a single Reticulum packet. In this cases the message will be opportunistically routed through the network, and will be encrypted with per-packet AES-128 keys derived with ECDH on Curve25519. - If a message is delivered to the Reticulum GROUP destination type, the message will be transported using _AES-128_ encryption.
- If a message is delivered to the Reticulum GROUP destination type, the message will be encrypted using the symmetric AES-128 key of the GROUP destination. - If a message is delivered over a Reticulum link (which is the default method), the message will be encrypted with _ephemeral_ keys derived with _ECDH_ on _Curve25519_. This mode offers Perfect Forward Secrecy.
## Wire Format & Overhead ## Wire Format & Overhead
@ -109,19 +100,15 @@ Assuming the default Reticulum configuration, the binary wire-format is as follo
- 64 bytes Ed25519 signature - 64 bytes Ed25519 signature
- Remaining bytes of [msgpack](https://msgpack.org) payload data, in accordance with the structure defined above - Remaining bytes of [msgpack](https://msgpack.org) payload data, in accordance with the structure defined above
The complete message overhead for LXMF is only 111 bytes, which in return gives you timestamped, digitally signed, infinitely extensible, end-to-end encrypted, zero-conf routed, minimal-infrastructure messaging that's easy to use and build applications with. The complete message overhead for LXMF is only 99 bytes, which in return gives you timestamped, digitally signed, infinitely extensible, end-to-end encrypted, zero-conf routed, minimal-infrastructure messaging that's easy to use and build applications with.
## Code Examples
Before writing your own programs using LXMF, you need to have a basic understanding of how the [Reticulum](https://reticulum.network) protocol and API works. Please see the [Reticulum Manual](https://reticulum.network/manual/). For a few simple examples of how to send and receive messages with LXMF, please see the [receiver example](./docs/example_receiver.py) and the [sender example](./docs/example_sender.py) included in this repository.
## Example Paper Message ## Example Paper Message
You can try out the paper messaging functionality by using the following QR code. It is a paper message sent to the LXMF address `6b3362bd2c1dbf87b66a85f79a8d8c75`. To be able to decrypt and read the message, you will need to import the following Reticulum Identity to an LXMF messaging app: You can try out the paper messaging functionality by using the following QR-code. It is a paper message sent to the LXMF address `6b3362bd2c1dbf87b66a85f79a8d8c75`. To be able to decrypt and read the message, you will need to import the following Reticulum Identity to an LXMF messaging app:
`3BPTDTQCRZPKJT3TXAJCMQFMOYWIM3OCLKPWMG4HCF2T4CH3YZHVNHNRDU6QAZWV2KBHMWBNT2C62TQEVC5GLFM4MN25VLZFSK3ADRQ=` `3BPTDTQCRZPKJT3TXAJCMQFMOYWIM3OCLKPWMG4HCF2T4CH3YZHVNHNRDU6QAZWV2KBHMWBNT2C62TQEVC5GLFM4MN25VLZFSK3ADRQ=`
The [Sideband](https://unsigned.io/sideband) application allows you to do this easily. After you have imported the identity into an app of your choice, you can scan the following QR code and open it in the app, where it will be decrypted and added as a message. The [Sideband](https://unsigned.io/sideband) application allows you to do this easily. After the you have imported the identity into an app of your choice, you can scan the following QR-code and open it in the app, where it will be decrypted and added as a message.
<p align="center"><img width="50%" src="./docs/paper_msg_test.png"/></p> <p align="center"><img width="50%" src="./docs/paper_msg_test.png"/></p>
@ -131,69 +118,26 @@ You can also find the entire message in <a href="lxm://azNivSwdv4e2aoX3mo2MdTAoz
On operating systems that allow for registering custom URI-handlers, you can click the link, and it will be decoded directly in your LXMF client. This works with Sideband on Android. On operating systems that allow for registering custom URI-handlers, you can click the link, and it will be decoded directly in your LXMF client. This works with Sideband on Android.
## Caveat Emptor
LXMF is beta software, and should be considered experimental. While it has been built with cryptography best-practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch.
## Installation ## Installation
If you want to try out LXMF, you can install it with pip: If you want to try out LXMF, you can install it with pip:
```bash ```bash
pip install lxmf pip3 install lxmf
``` ```
If you are using an operating system that blocks normal user package installation via `pip`,
you can return `pip` to normal behaviour by editing the `~/.config/pip/pip.conf` file,
and adding the following directive in the `[global]` section:
```text
[global]
break-system-packages = true
```
Alternatively, you can use the `pipx` tool to install Reticulum in an isolated environment:
```bash
pipx install lxmf
```
## Daemon Included
The `lxmf` package comes with the `lxmd` program, a fully functional (but lightweight) LXMF message router and propagation node daemon. After installing the `lxmf` package, you can run `lxmd --help` to learn more about the command-line options:
```text
$ lxmd --help
usage: lxmd [-h] [--config CONFIG] [--rnsconfig RNSCONFIG] [-p] [-i PATH] [-v] [-q] [-s] [--exampleconfig] [--version]
Lightweight Extensible Messaging Daemon
options:
-h, --help show this help message and exit
--config CONFIG path to alternative lxmd config directory
--rnsconfig RNSCONFIG
path to alternative Reticulum config directory
-p, --propagation-node
run an LXMF Propagation Node
-i PATH, --on-inbound PATH
executable to run when a message is received
-v, --verbose
-q, --quiet
-s, --service lxmd is running as a service and should log to file
--exampleconfig print verbose configuration example to stdout and exit
--version show program's version number and exit
```
Or run `lxmd --exampleconfig` to generate a commented example configuration documenting all the available configuration directives.
## Caveat Emptor
LXMF is beta software, and should be considered experimental. While it has been built with cryptography best practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch.
## Development Roadmap ## Development Roadmap
LXMF is actively being developed, and the following improvements and features are currently planned for implementation: LXMF is actively being developed, and the following improvements and features are currently planned for implementation:
- ~~Update examples in readme to actually work~~
- ~~Sync affinity based on link speeds and distances, for more intelligently choosing peer sync order~~
- Sneakernet and physical transport functionality
- Content Destinations, and easy to use API for group messaging and discussion threads
- Write and release full API and protocol documentation - Write and release full API and protocol documentation
- Update examples in readme to actually work
- Content Destinations, and easy to use API for group messaging and discussion threads
- Sneakernet and physical transport functionality
- Documenting and possibly expanding LXMF limits and priorities - Documenting and possibly expanding LXMF limits and priorities
- Sync affinity based on link speeds and distances, for more intelligently choosing peer sync order
- Markets on LXMF

View File

@ -1,72 +0,0 @@
import RNS
import LXMF
import time
required_stamp_cost = 8
enforce_stamps = False
def delivery_callback(message):
global my_lxmf_destination, router
time_string = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.timestamp))
signature_string = "Signature is invalid, reason undetermined"
if message.signature_validated:
signature_string = "Validated"
else:
if message.unverified_reason == LXMF.LXMessage.SIGNATURE_INVALID:
signature_string = "Invalid signature"
if message.unverified_reason == LXMF.LXMessage.SOURCE_UNKNOWN:
signature_string = "Cannot verify, source is unknown"
if message.stamp_valid:
stamp_string = "Validated"
else:
stamp_string = "Invalid"
RNS.log("\t+--- LXMF Delivery ---------------------------------------------")
RNS.log("\t| Source hash : "+RNS.prettyhexrep(message.source_hash))
RNS.log("\t| Source instance : "+str(message.get_source()))
RNS.log("\t| Destination hash : "+RNS.prettyhexrep(message.destination_hash))
RNS.log("\t| Destination instance : "+str(message.get_destination()))
RNS.log("\t| Transport Encryption : "+str(message.transport_encryption))
RNS.log("\t| Timestamp : "+time_string)
RNS.log("\t| Title : "+str(message.title_as_string()))
RNS.log("\t| Content : "+str(message.content_as_string()))
RNS.log("\t| Fields : "+str(message.fields))
if message.ratchet_id:
RNS.log("\t| Ratchet : "+str(RNS.Identity._get_ratchet_id(message.ratchet_id)))
RNS.log("\t| Message signature : "+signature_string)
RNS.log("\t| Stamp : "+stamp_string)
RNS.log("\t+---------------------------------------------------------------")
# Optionally, send a reply
# source = my_lxmf_destination
# dest = message.source
# lxm = LXMF.LXMessage(dest, source, "Reply", None, desired_method=LXMF.LXMessage.DIRECT, include_ticket=True)
# router.handle_outbound(lxm)
r = RNS.Reticulum()
router = LXMF.LXMRouter(storagepath="./tmp1", enforce_stamps=enforce_stamps)
identity = RNS.Identity()
my_lxmf_destination = router.register_delivery_identity(identity, display_name="Anonymous Peer", stamp_cost=required_stamp_cost)
router.register_delivery_callback(delivery_callback)
RNS.log("Ready to receive on: "+RNS.prettyhexrep(my_lxmf_destination.hash))
# You can set a propagation node address to test receiving
# messages from a propagation node, instead of directly
# router.set_outbound_propagation_node(bytes.fromhex("e75d9b6a69f82b48b6077cf2242d7499"))
# This loop allows you to execute various actions for testing
# and experimenting with the example scripts.
while True:
input()
RNS.log("Announcing lxmf.delivery destination...")
router.announce(my_lxmf_destination.hash)
# input()
# RNS.log("Requesting messages from propagation node...")
# router.request_messages_from_propagation_node(identity)

View File

@ -1,69 +0,0 @@
import LXMF
import RNS
import time
import random
random_names = ["Tom", "Delilah", "Nancey", "Williams", "Neomi", "Curtis", "Alexa", "Theodora", "Ted", "Dinorah", "Nicol", "Drusilla", "Annalisa", "Verlene", "Latesha", "Tina", "Mia", "Brock", "Timothy", "Philip", "Willian", "Reyna", "Simona", "Mimi", "Stanford", "Ferne", "Catalina", "Lucie", "Jaye", "Natasha", "Willetta", "Isabel", "Esperanza", "Ciara", "Eusebio", "William", "Elma", "Angelica", "Coreen", "Melani", "Jonathan", "Maryland", "Caroline", "Gregg", "Ora", "Jacqui", "Letty", "Roselle", "Oralee", "Angla"]
random_titles = ["Long time", "Hi again", "Hi there", "Test message", "", "", "Something different"]
random_msgs = ["If wishes were horses then beggars might fly. Stuff like that. It's enough to drive you crazy.", "'My ident cards were stolen,' Jason said. 'That fivehundred-dollar bill is yours if you can get me to someone who can replace them. If you're going to do it, do it right now; I'm not going to wait.' Wait to be picked up by a pol or a nat, he thought. Caught here in this rundown dingy hotel.", "A six, no matter what the external circumstances, will always prevail. Because that's the way they genetically defined us.", "'Should be there in an hour,' he called back over his shoulder to Chuck. Then he added, in an afterthought, 'Wonder if the computers finished its run. It was due about now.'. Chuck didnt reply, so George swung round in his saddle. He could just see Chucks face, a white oval turned toward the sky."]
def delivery_callback(message):
pass
r = RNS.Reticulum()
router = LXMF.LXMRouter(storagepath="./tmp2")
router.register_delivery_callback(delivery_callback)
ident = RNS.Identity()
source = router.register_delivery_identity(ident, display_name=random_names[random.randint(0,len(random_names)-1)], stamp_cost=8)
router.announce(source.hash)
RNS.log("Source announced")
print("Recipient: ", end=" ")
recipient_hexhash = input()
recipient_hash = bytes.fromhex(recipient_hexhash)
if not RNS.Transport.has_path(recipient_hash):
RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...")
RNS.Transport.request_path(recipient_hash)
while not RNS.Transport.has_path(recipient_hash):
time.sleep(0.1)
# Recall the server identity
recipient_identity = RNS.Identity.recall(recipient_hash)
dest = RNS.Destination(recipient_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
# This loop allows you to execute various actions for testing
# and experimenting with the example scripts.
while True:
# Create a message that will be sent directly to the
# destination over a Reticulum link:
lxm = LXMF.LXMessage(dest, source, random_msgs[random.randint(0,len(random_msgs)-1)],
random_titles[random.randint(0,len(random_titles)-1)],
desired_method=LXMF.LXMessage.DIRECT, include_ticket=True)
# Or, create an oppertunistic, single-packet message
# for sending without first establishing a link:
# lxm = LXMF.LXMessage(dest, source, "This is a test",
# random_titles[random.randint(0,len(random_titles)-1)],
# desired_method=LXMF.LXMessage.OPPORTUNISTIC, include_ticket=True)
# Or, try sending the message via a propagation node:
# router.set_outbound_propagation_node(bytes.fromhex("e75d9b6a69f82b48b6077cf2242d7499"))
# lxm = LXMF.LXMessage(dest, source, random_msgs[random.randint(0,len(random_msgs)-1)],
# random_titles[random.randint(0,len(random_titles)-1)],
# desired_method=LXMF.LXMessage.PROPAGATED)
# Finally dispatch the message to the LXMF message
# router, which will handle the delivery according
# to the specified message parameters and options:
router.handle_outbound(lxm)
# Wait for user input before starting over
input()

View File

@ -1,2 +0,0 @@
qrcode>=7.4.2
rns>=0.9.1

View File

@ -25,6 +25,6 @@ setuptools.setup(
'lxmd=LXMF.Utilities.lxmd:main', 'lxmd=LXMF.Utilities.lxmd:main',
] ]
}, },
install_requires=["rns>=0.9.3"], install_requires=['rns>=0.4.7'],
python_requires=">=3.7", python_requires='>=3.6',
) )