mirror of
https://github.com/markqvist/LXMF.git
synced 2026-01-04 17:55:32 -05:00
Compare commits
No commits in common. "master" and "0.5.1" have entirely different histories.
17 changed files with 689 additions and 2756 deletions
2
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
2
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
|
|
@ -12,7 +12,7 @@ Before creating a bug report on this issue tracker, you **must** read the [Contr
|
|||
|
||||
- The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**.
|
||||
- Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**.
|
||||
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**.
|
||||
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), delete this section from your bug report.
|
||||
|
||||
**Describe the Bug**
|
||||
A clear and concise description of what the bug is.
|
||||
|
|
|
|||
|
|
@ -1,3 +0,0 @@
|
|||
liberapay: Reticulum
|
||||
ko_fi: markqvist
|
||||
custom: "https://unsigned.io/donate"
|
||||
16
LICENSE
16
LICENSE
|
|
@ -1,6 +1,6 @@
|
|||
Reticulum License
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2020-2025 Mark Qvist
|
||||
Copyright (c) 2020 Mark Qvist / unsigned.io
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
|
@ -9,16 +9,8 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
- The Software shall not be used in any kind of system which includes amongst
|
||||
its functions the ability to purposefully do harm to human beings.
|
||||
|
||||
- The Software shall not be used, directly or indirectly, in the creation of
|
||||
an artificial intelligence, machine learning or language model training
|
||||
dataset, including but not limited to any use that contributes to the
|
||||
training or development of such a model or algorithm.
|
||||
|
||||
- The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
|
|
|
|||
|
|
@ -1,18 +1,28 @@
|
|||
import time
|
||||
import threading
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
from .LXMF import APP_NAME, stamp_cost_from_app_data, pn_announce_data_is_valid
|
||||
from .LXMF import APP_NAME, stamp_cost_from_app_data
|
||||
|
||||
from .LXMessage import LXMessage
|
||||
|
||||
class LXMFDeliveryAnnounceHandler:
|
||||
def __init__(self, lxmrouter):
|
||||
self.aspect_filter = APP_NAME+".delivery"
|
||||
self.aspect_filter = APP_NAME+".delivery"
|
||||
self.receive_path_responses = True
|
||||
self.lxmrouter = lxmrouter
|
||||
self.lxmrouter = lxmrouter
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data):
|
||||
for lxmessage in self.lxmrouter.pending_outbound:
|
||||
if destination_hash == lxmessage.destination_hash:
|
||||
if lxmessage.method == LXMessage.DIRECT:
|
||||
lxmessage.next_delivery_attempt = time.time()
|
||||
|
||||
while self.lxmrouter.processing_outbound:
|
||||
time.sleep(0.1)
|
||||
|
||||
self.lxmrouter.process_outbound()
|
||||
|
||||
try:
|
||||
stamp_cost = stamp_cost_from_app_data(app_data)
|
||||
self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost)
|
||||
|
|
@ -20,72 +30,34 @@ class LXMFDeliveryAnnounceHandler:
|
|||
except Exception as e:
|
||||
RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
|
||||
for lxmessage in self.lxmrouter.pending_outbound:
|
||||
if destination_hash == lxmessage.destination_hash:
|
||||
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
|
||||
lxmessage.next_delivery_attempt = time.time()
|
||||
|
||||
def outbound_trigger():
|
||||
while self.lxmrouter.outbound_processing_lock.locked(): time.sleep(0.1)
|
||||
self.lxmrouter.process_outbound()
|
||||
|
||||
threading.Thread(target=outbound_trigger, daemon=True).start()
|
||||
|
||||
|
||||
class LXMFPropagationAnnounceHandler:
|
||||
def __init__(self, lxmrouter):
|
||||
self.aspect_filter = APP_NAME+".propagation"
|
||||
self.receive_path_responses = True
|
||||
self.lxmrouter = lxmrouter
|
||||
self.aspect_filter = APP_NAME+".propagation"
|
||||
self.receive_path_responses = False
|
||||
self.lxmrouter = lxmrouter
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash, is_path_response):
|
||||
def received_announce(self, destination_hash, announced_identity, app_data):
|
||||
try:
|
||||
if type(app_data) == bytes:
|
||||
if self.lxmrouter.propagation_node:
|
||||
if pn_announce_data_is_valid(app_data):
|
||||
data = msgpack.unpackb(app_data)
|
||||
node_timebase = int(data[1])
|
||||
propagation_enabled = data[2]
|
||||
propagation_transfer_limit = int(data[3])
|
||||
propagation_sync_limit = int(data[4])
|
||||
propagation_stamp_cost = int(data[5][0])
|
||||
propagation_stamp_cost_flexibility = int(data[5][1])
|
||||
peering_cost = int(data[5][2])
|
||||
metadata = data[6]
|
||||
|
||||
if destination_hash in self.lxmrouter.static_peers:
|
||||
static_peer = self.lxmrouter.peers[destination_hash]
|
||||
if not is_path_response or static_peer.last_heard == 0:
|
||||
self.lxmrouter.peer(destination_hash=destination_hash,
|
||||
timestamp=node_timebase,
|
||||
propagation_transfer_limit=propagation_transfer_limit,
|
||||
propagation_sync_limit=propagation_sync_limit,
|
||||
propagation_stamp_cost=propagation_stamp_cost,
|
||||
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
|
||||
peering_cost=peering_cost,
|
||||
metadata=metadata)
|
||||
data = msgpack.unpackb(app_data)
|
||||
|
||||
else:
|
||||
if self.lxmrouter.autopeer and not is_path_response:
|
||||
if propagation_enabled == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash=destination_hash,
|
||||
timestamp=node_timebase,
|
||||
propagation_transfer_limit=propagation_transfer_limit,
|
||||
propagation_sync_limit=propagation_sync_limit,
|
||||
propagation_stamp_cost=propagation_stamp_cost,
|
||||
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
|
||||
peering_cost=peering_cost,
|
||||
metadata=metadata)
|
||||
if self.lxmrouter.propagation_node and self.lxmrouter.autopeer:
|
||||
node_timebase = data[1]
|
||||
propagation_transfer_limit = None
|
||||
if len(data) >= 3:
|
||||
try:
|
||||
propagation_transfer_limit = float(data[2])
|
||||
except:
|
||||
propagation_transfer_limit = None
|
||||
|
||||
else:
|
||||
if destination_hash in self.lxmrouter.peers:
|
||||
RNS.log(f"Peer {self.lxmrouter.peers[destination_hash]} moved outside auto-peering range, breaking peering...")
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
if data[0] == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
|
||||
|
||||
elif propagation_enabled == False:
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
elif data[0] == False:
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)
|
||||
RNS.log(f"The contained exception was: {str(e)}", RNS.LOG_DEBUG)
|
||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
|
||||
|
|
|
|||
103
LXMF/LXMF.py
103
LXMF/LXMF.py
|
|
@ -18,8 +18,6 @@ FIELD_RESULTS = 0x0A
|
|||
FIELD_GROUP = 0x0B
|
||||
FIELD_TICKET = 0x0C
|
||||
FIELD_EVENT = 0x0D
|
||||
FIELD_RNR_REFS = 0x0E
|
||||
FIELD_RENDERER = 0x0F
|
||||
|
||||
# For usecases such as including custom data structures,
|
||||
# embedding or encapsulating other data types or protocols
|
||||
|
|
@ -78,56 +76,35 @@ AM_OPUS_LOSSLESS = 0x19
|
|||
# determine it itself based on the included data.
|
||||
AM_CUSTOM = 0xFF
|
||||
|
||||
# Message renderer specifications for FIELD_RENDERER.
|
||||
# The renderer specification is completely optional,
|
||||
# and only serves as an indication to the receiving
|
||||
# client on how to render the message contents. It is
|
||||
# not mandatory to implement, either on sending or
|
||||
# receiving sides, but is the recommended way to
|
||||
# signal how to render a message, if non-plaintext
|
||||
# formatting is used.
|
||||
RENDERER_PLAIN = 0x00
|
||||
RENDERER_MICRON = 0x01
|
||||
RENDERER_MARKDOWN = 0x02
|
||||
RENDERER_BBCODE = 0x03
|
||||
|
||||
# Optional propagation node metadata fields. These
|
||||
# fields may be highly unstable in allocation and
|
||||
# availability until the version 1.0.0 release, so use
|
||||
# at your own risk until then, and expect changes!
|
||||
PN_META_VERSION = 0x00
|
||||
PN_META_NAME = 0x01
|
||||
PN_META_SYNC_STRATUM = 0x02
|
||||
PN_META_SYNC_THROTTLE = 0x03
|
||||
PN_META_AUTH_BAND = 0x04
|
||||
PN_META_UTIL_PRESSURE = 0x05
|
||||
PN_META_CUSTOM = 0xFF
|
||||
|
||||
##########################################################
|
||||
# The following helper functions makes it easier to #
|
||||
# handle and operate on LXMF data in client programs #
|
||||
##########################################################
|
||||
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
def display_name_from_app_data(app_data=None):
|
||||
if app_data == None: return None
|
||||
elif len(app_data) == 0: return None
|
||||
if app_data == None:
|
||||
return None
|
||||
elif len(app_data) == 0:
|
||||
return None
|
||||
else:
|
||||
# Version 0.5.0+ announce format
|
||||
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
|
||||
peer_data = msgpack.unpackb(app_data)
|
||||
if type(peer_data) == list:
|
||||
if len(peer_data) < 1: return None
|
||||
if len(peer_data) < 1:
|
||||
return None
|
||||
else:
|
||||
dn = peer_data[0]
|
||||
if dn == None: return None
|
||||
if dn == None:
|
||||
return None
|
||||
else:
|
||||
try:
|
||||
decoded = dn.decode("utf-8")
|
||||
return decoded
|
||||
except Exception as e:
|
||||
RNS.log(f"Could not decode display name in included announce data. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
except:
|
||||
RNS.log("Could not decode display name in included announce data. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
return None
|
||||
|
||||
# Original announce format
|
||||
|
|
@ -135,64 +112,18 @@ def display_name_from_app_data(app_data=None):
|
|||
return app_data.decode("utf-8")
|
||||
|
||||
def stamp_cost_from_app_data(app_data=None):
|
||||
if app_data == None or app_data == b"": return None
|
||||
if app_data == None:
|
||||
return None
|
||||
else:
|
||||
# Version 0.5.0+ announce format
|
||||
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
|
||||
peer_data = msgpack.unpackb(app_data)
|
||||
if type(peer_data) == list:
|
||||
if len(peer_data) < 2: return None
|
||||
else: return peer_data[1]
|
||||
if len(peer_data) < 2:
|
||||
return None
|
||||
else:
|
||||
return peer_data[1]
|
||||
|
||||
# Original announce format
|
||||
else: return None
|
||||
|
||||
def pn_name_from_app_data(app_data=None):
|
||||
if app_data == None: return None
|
||||
else:
|
||||
if pn_announce_data_is_valid(app_data):
|
||||
data = msgpack.unpackb(app_data)
|
||||
metadata = data[6]
|
||||
if not PN_META_NAME in metadata: return None
|
||||
else:
|
||||
try: return metadata[PN_META_NAME].decode("utf-8")
|
||||
except: return None
|
||||
|
||||
return None
|
||||
|
||||
def pn_stamp_cost_from_app_data(app_data=None):
|
||||
if app_data == None: return None
|
||||
else:
|
||||
if pn_announce_data_is_valid(app_data):
|
||||
data = msgpack.unpackb(app_data)
|
||||
return data[5][0]
|
||||
else:
|
||||
return None
|
||||
|
||||
def pn_announce_data_is_valid(data):
|
||||
try:
|
||||
if type(data) != bytes: return False
|
||||
else: data = msgpack.unpackb(data)
|
||||
if len(data) < 7: raise ValueError("Invalid announce data: Insufficient peer data, likely from deprecated LXMF version")
|
||||
else:
|
||||
try: int(data[1])
|
||||
except: raise ValueError("Invalid announce data: Could not decode timebase")
|
||||
if data[2] != True and data[2] != False: raise ValueError("Invalid announce data: Indeterminate propagation node status")
|
||||
try: int(data[3])
|
||||
except: raise ValueError("Invalid announce data: Could not decode propagation transfer limit")
|
||||
try: int(data[4])
|
||||
except: raise ValueError("Invalid announce data: Could not decode propagation sync limit")
|
||||
if type(data[5]) != list: raise ValueError("Invalid announce data: Could not decode stamp costs")
|
||||
try: int(data[5][0])
|
||||
except: raise ValueError("Invalid announce data: Could not decode target stamp cost")
|
||||
try: int(data[5][1])
|
||||
except: raise ValueError("Invalid announce data: Could not decode stamp cost flexibility")
|
||||
try: int(data[5][2])
|
||||
except: raise ValueError("Invalid announce data: Could not decode peering cost")
|
||||
if type(data[6]) != dict: raise ValueError("Invalid announce data: Could not decode metadata")
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG)
|
||||
return False
|
||||
|
||||
return True
|
||||
return None
|
||||
534
LXMF/LXMPeer.py
534
LXMF/LXMPeer.py
|
|
@ -1,38 +1,24 @@
|
|||
import os
|
||||
import time
|
||||
import threading
|
||||
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
import LXMF.LXStamper as LXStamper
|
||||
|
||||
from collections import deque
|
||||
from .LXMF import APP_NAME
|
||||
from .LXMF import PN_META_NAME
|
||||
|
||||
class LXMPeer:
|
||||
OFFER_REQUEST_PATH = "/offer"
|
||||
MESSAGE_GET_PATH = "/get"
|
||||
|
||||
IDLE = 0x00
|
||||
LINK_ESTABLISHING = 0x01
|
||||
LINK_READY = 0x02
|
||||
REQUEST_SENT = 0x03
|
||||
RESPONSE_RECEIVED = 0x04
|
||||
IDLE = 0x00
|
||||
LINK_ESTABLISHING = 0x01
|
||||
LINK_READY = 0x02
|
||||
REQUEST_SENT = 0x03
|
||||
RESPONSE_RECEIVED = 0x04
|
||||
RESOURCE_TRANSFERRING = 0x05
|
||||
|
||||
ERROR_NO_IDENTITY = 0xf0
|
||||
ERROR_NO_ACCESS = 0xf1
|
||||
ERROR_INVALID_KEY = 0xf3
|
||||
ERROR_INVALID_DATA = 0xf4
|
||||
ERROR_INVALID_STAMP = 0xf5
|
||||
ERROR_THROTTLED = 0xf6
|
||||
ERROR_NOT_FOUND = 0xfd
|
||||
ERROR_TIMEOUT = 0xfe
|
||||
|
||||
STRATEGY_LAZY = 0x01
|
||||
STRATEGY_PERSISTENT = 0x02
|
||||
DEFAULT_SYNC_STRATEGY = STRATEGY_PERSISTENT
|
||||
ERROR_NO_IDENTITY = 0xf0
|
||||
ERROR_NO_ACCESS = 0xf1
|
||||
|
||||
# Maximum amount of time a peer can
|
||||
# be unreachable before it is removed
|
||||
|
|
@ -52,111 +38,42 @@ class LXMPeer:
|
|||
@staticmethod
|
||||
def from_bytes(peer_bytes, router):
|
||||
dictionary = msgpack.unpackb(peer_bytes)
|
||||
peer_destination_hash = dictionary["destination_hash"]
|
||||
peer_peering_timebase = dictionary["peering_timebase"]
|
||||
peer_alive = dictionary["alive"]
|
||||
peer_last_heard = dictionary["last_heard"]
|
||||
|
||||
peer = LXMPeer(router, peer_destination_hash)
|
||||
peer.peering_timebase = peer_peering_timebase
|
||||
peer.alive = peer_alive
|
||||
peer.last_heard = peer_last_heard
|
||||
|
||||
if "link_establishment_rate" in dictionary: peer.link_establishment_rate = dictionary["link_establishment_rate"]
|
||||
else: peer.link_establishment_rate = 0
|
||||
|
||||
if "sync_transfer_rate" in dictionary: peer.sync_transfer_rate = dictionary["sync_transfer_rate"]
|
||||
else: peer.sync_transfer_rate = 0
|
||||
peer = LXMPeer(router, dictionary["destination_hash"])
|
||||
peer.peering_timebase = dictionary["peering_timebase"]
|
||||
peer.alive = dictionary["alive"]
|
||||
peer.last_heard = dictionary["last_heard"]
|
||||
if "link_establishment_rate" in dictionary:
|
||||
peer.link_establishment_rate = dictionary["link_establishment_rate"]
|
||||
else:
|
||||
peer.link_establishment_rate = 0
|
||||
|
||||
if "propagation_transfer_limit" in dictionary:
|
||||
try: peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"])
|
||||
except Exception as e: peer.propagation_transfer_limit = None
|
||||
else: peer.propagation_transfer_limit = None
|
||||
try:
|
||||
peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"])
|
||||
except Exception as e:
|
||||
peer.propagation_transfer_limit = None
|
||||
else:
|
||||
peer.propagation_transfer_limit = None
|
||||
|
||||
if "propagation_sync_limit" in dictionary:
|
||||
try: peer.propagation_sync_limit = int(dictionary["propagation_sync_limit"])
|
||||
except: peer.propagation_sync_limit = peer.propagation_transfer_limit
|
||||
else: peer.propagation_sync_limit = peer.propagation_transfer_limit
|
||||
|
||||
if "propagation_stamp_cost" in dictionary:
|
||||
try: peer.propagation_stamp_cost = int(dictionary["propagation_stamp_cost"])
|
||||
except: peer.propagation_stamp_cost = None
|
||||
else: peer.propagation_stamp_cost = None
|
||||
|
||||
if "propagation_stamp_cost_flexibility" in dictionary:
|
||||
try: peer.propagation_stamp_cost_flexibility = int(dictionary["propagation_stamp_cost_flexibility"])
|
||||
except: peer.propagation_stamp_cost_flexibility = None
|
||||
else: peer.propagation_stamp_cost_flexibility = None
|
||||
|
||||
if "peering_cost" in dictionary:
|
||||
try: peer.peering_cost = int(dictionary["peering_cost"])
|
||||
except: peer.peering_cost = None
|
||||
else: peer.peering_cost = None
|
||||
|
||||
if "sync_strategy" in dictionary:
|
||||
try: peer.sync_strategy = int(dictionary["sync_strategy"])
|
||||
except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY
|
||||
else: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY
|
||||
|
||||
if "offered" in dictionary: peer.offered = dictionary["offered"]
|
||||
else: peer.offered = 0
|
||||
if "outgoing" in dictionary: peer.outgoing = dictionary["outgoing"]
|
||||
else: peer.outgoing = 0
|
||||
if "incoming" in dictionary: peer.incoming = dictionary["incoming"]
|
||||
else: peer.incoming = 0
|
||||
if "rx_bytes" in dictionary: peer.rx_bytes = dictionary["rx_bytes"]
|
||||
else: peer.rx_bytes = 0
|
||||
if "tx_bytes" in dictionary: peer.tx_bytes = dictionary["tx_bytes"]
|
||||
else: peer.tx_bytes = 0
|
||||
if "last_sync_attempt" in dictionary: peer.last_sync_attempt = dictionary["last_sync_attempt"]
|
||||
else: peer.last_sync_attempt = 0
|
||||
if "peering_key" in dictionary: peer.peering_key = dictionary["peering_key"]
|
||||
else: peer.peering_key = None
|
||||
if "metadata" in dictionary: peer.metadata = dictionary["metadata"]
|
||||
else: peer.metadata = None
|
||||
|
||||
hm_count = 0
|
||||
for transient_id in dictionary["handled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.add_handled_message(transient_id)
|
||||
hm_count += 1
|
||||
peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
|
||||
um_count = 0
|
||||
for transient_id in dictionary["unhandled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.add_unhandled_message(transient_id)
|
||||
um_count += 1
|
||||
peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
|
||||
peer._hm_count = hm_count
|
||||
peer._um_count = um_count
|
||||
peer._hm_counts_synced = True
|
||||
peer._um_counts_synced = True
|
||||
|
||||
del dictionary
|
||||
return peer
|
||||
|
||||
def to_bytes(self):
|
||||
dictionary = {}
|
||||
dictionary["peering_timebase"] = self.peering_timebase
|
||||
dictionary["alive"] = self.alive
|
||||
dictionary["metadata"] = self.metadata
|
||||
dictionary["last_heard"] = self.last_heard
|
||||
dictionary["sync_strategy"] = self.sync_strategy
|
||||
dictionary["peering_key"] = self.peering_key
|
||||
dictionary["destination_hash"] = self.destination_hash
|
||||
dictionary["link_establishment_rate"] = self.link_establishment_rate
|
||||
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
|
||||
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
|
||||
dictionary["propagation_sync_limit"] = self.propagation_sync_limit
|
||||
dictionary["propagation_stamp_cost"] = self.propagation_stamp_cost
|
||||
dictionary["propagation_stamp_cost_flexibility"] = self.propagation_stamp_cost_flexibility
|
||||
dictionary["peering_cost"] = self.peering_cost
|
||||
dictionary["last_sync_attempt"] = self.last_sync_attempt
|
||||
dictionary["offered"] = self.offered
|
||||
dictionary["outgoing"] = self.outgoing
|
||||
dictionary["incoming"] = self.incoming
|
||||
dictionary["rx_bytes"] = self.rx_bytes
|
||||
dictionary["tx_bytes"] = self.tx_bytes
|
||||
|
||||
handled_ids = []
|
||||
for transient_id in self.handled_messages:
|
||||
|
|
@ -169,129 +86,35 @@ class LXMPeer:
|
|||
dictionary["handled_ids"] = handled_ids
|
||||
dictionary["unhandled_ids"] = unhandled_ids
|
||||
|
||||
peer_bytes = msgpack.packb(dictionary)
|
||||
del dictionary
|
||||
return msgpack.packb(dictionary)
|
||||
|
||||
return peer_bytes
|
||||
|
||||
def __init__(self, router, destination_hash, sync_strategy=DEFAULT_SYNC_STRATEGY):
|
||||
self.alive = False
|
||||
self.last_heard = 0
|
||||
self.sync_strategy = sync_strategy
|
||||
self.peering_key = None
|
||||
self.peering_cost = None
|
||||
self.metadata = None
|
||||
|
||||
self.next_sync_attempt = 0
|
||||
self.last_sync_attempt = 0
|
||||
self.sync_backoff = 0
|
||||
self.peering_timebase = 0
|
||||
def __init__(self, router, destination_hash):
|
||||
self.alive = False
|
||||
self.last_heard = 0
|
||||
self.next_sync_attempt = 0
|
||||
self.last_sync_attempt = 0
|
||||
self.sync_backoff = 0
|
||||
self.peering_timebase = 0
|
||||
self.link_establishment_rate = 0
|
||||
self.sync_transfer_rate = 0
|
||||
|
||||
self.propagation_transfer_limit = None
|
||||
self.propagation_sync_limit = None
|
||||
self.propagation_stamp_cost = None
|
||||
self.propagation_stamp_cost_flexibility = None
|
||||
self.currently_transferring_messages = None
|
||||
self.handled_messages_queue = deque()
|
||||
self.unhandled_messages_queue = deque()
|
||||
|
||||
self.offered = 0 # Messages offered to this peer
|
||||
self.outgoing = 0 # Messages transferred to this peer
|
||||
self.incoming = 0 # Messages received from this peer
|
||||
self.rx_bytes = 0 # Bytes received from this peer
|
||||
self.tx_bytes = 0 # Bytes sent to this peer
|
||||
|
||||
self._hm_count = 0
|
||||
self._um_count = 0
|
||||
self._hm_counts_synced = False
|
||||
self._um_counts_synced = False
|
||||
|
||||
self._peering_key_lock = threading.Lock()
|
||||
self.propagation_transfer_limit = None
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
self.unhandled_messages = {}
|
||||
self.handled_messages = {}
|
||||
self.last_offer = []
|
||||
|
||||
self.router = router
|
||||
self.destination_hash = destination_hash
|
||||
self.identity = RNS.Identity.recall(destination_hash)
|
||||
if self.identity != None:
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
else:
|
||||
self.destination = None
|
||||
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
|
||||
|
||||
def peering_key_ready(self):
|
||||
if not self.peering_cost: return False
|
||||
if type(self.peering_key) == list and len(self.peering_key) == 2:
|
||||
value = self.peering_key[1]
|
||||
if value >= self.peering_cost: return True
|
||||
else:
|
||||
RNS.log(f"Peering key value mismatch for {self}. Current value is {value}, but peer requires {self.peering_cost}. Scheduling regeneration...", RNS.LOG_WARNING)
|
||||
self.peering_key = None
|
||||
|
||||
return False
|
||||
|
||||
def peering_key_value(self):
|
||||
if type(self.peering_key) == list and len(self.peering_key) == 2: return self.peering_key[1]
|
||||
else: return None
|
||||
|
||||
def generate_peering_key(self):
|
||||
if self.peering_cost == None: return False
|
||||
with self._peering_key_lock:
|
||||
if self.peering_key != None: return True
|
||||
else:
|
||||
RNS.log(f"Generating peering key for {self}", RNS.LOG_NOTICE)
|
||||
if self.router.identity == None:
|
||||
RNS.log(f"Could not update peering key for {self} since the local LXMF router identity is not configured", RNS.LOG_ERROR)
|
||||
return False
|
||||
|
||||
if self.identity == None:
|
||||
self.identity = RNS.Identity.recall(destination_hash)
|
||||
if self.identity == None:
|
||||
RNS.log(f"Could not update peering key for {self} since its identity could not be recalled", RNS.LOG_ERROR)
|
||||
return False
|
||||
|
||||
key_material = self.identity.hash+self.router.identity.hash
|
||||
peering_key, value = LXStamper.generate_stamp(key_material, self.peering_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PEERING)
|
||||
if value >= self.peering_cost:
|
||||
self.peering_key = [peering_key, value]
|
||||
RNS.log(f"Peering key successfully generated for {self}", RNS.LOG_NOTICE)
|
||||
return True
|
||||
|
||||
return False
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
|
||||
def sync(self):
|
||||
RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
|
||||
self.last_sync_attempt = time.time()
|
||||
|
||||
sync_time_reached = time.time() > self.next_sync_attempt
|
||||
stamp_costs_known = self.propagation_stamp_cost != None and self.propagation_stamp_cost_flexibility != None and self.peering_cost != None
|
||||
peering_key_ready = self.peering_key_ready()
|
||||
sync_checks = sync_time_reached and stamp_costs_known and peering_key_ready
|
||||
|
||||
if not sync_checks:
|
||||
try:
|
||||
if not sync_time_reached:
|
||||
postpone_reason = " due to previous failures"
|
||||
if self.last_sync_attempt > self.last_heard: self.alive = False
|
||||
elif not stamp_costs_known:
|
||||
postpone_reason = " since its required stamp costs are not yet known"
|
||||
elif not peering_key_ready:
|
||||
postpone_reason = " since a peering key has not been generated yet"
|
||||
def job(): self.generate_peering_key()
|
||||
threading.Thread(target=job, daemon=True).start()
|
||||
|
||||
delay = self.next_sync_attempt-time.time()
|
||||
postpone_delay = f" for {RNS.prettytime(delay)}" if delay > 0 else ""
|
||||
RNS.log(f"Postponing sync with peer {RNS.prettyhexrep(self.destination_hash)}{postpone_delay}{postpone_reason}", RNS.LOG_DEBUG)
|
||||
except Exception as e:
|
||||
RNS.trace_exception(e)
|
||||
|
||||
else:
|
||||
if time.time() > self.next_sync_attempt:
|
||||
if not RNS.Transport.has_path(self.destination_hash):
|
||||
RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG)
|
||||
RNS.Transport.request_path(self.destination_hash)
|
||||
|
|
@ -303,19 +126,10 @@ class LXMPeer:
|
|||
else:
|
||||
if self.identity == None:
|
||||
self.identity = RNS.Identity.recall(destination_hash)
|
||||
if self.identity != None:
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
|
||||
if self.destination != None:
|
||||
if len(self.unhandled_messages) == 0:
|
||||
RNS.log(f"Sync requested for {self}, but no unhandled messages exist for peer. Sync complete.", RNS.LOG_DEBUG)
|
||||
return
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
|
||||
if self.identity != None:
|
||||
if len(self.unhandled_messages) > 0:
|
||||
if self.currently_transferring_messages != None:
|
||||
RNS.log(f"Sync requested for {self}, but current message transfer index was not clear. Aborting.", RNS.LOG_ERROR)
|
||||
return
|
||||
|
||||
if self.state == LXMPeer.IDLE:
|
||||
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
|
||||
self.sync_backoff += LXMPeer.SYNC_BACKOFF_STEP
|
||||
|
|
@ -328,69 +142,58 @@ class LXMPeer:
|
|||
self.alive = True
|
||||
self.last_heard = time.time()
|
||||
self.sync_backoff = 0
|
||||
min_accepted_cost = min(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
|
||||
|
||||
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing sync offer...", RNS.LOG_DEBUG)
|
||||
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
|
||||
unhandled_entries = []
|
||||
unhandled_ids = []
|
||||
purged_ids = []
|
||||
low_value_ids = []
|
||||
unhandled_ids = []
|
||||
purged_ids = []
|
||||
for transient_id in self.unhandled_messages:
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.router.get_stamp_value(transient_id) < min_accepted_cost: low_value_ids.append(transient_id)
|
||||
else:
|
||||
unhandled_entry = [ transient_id,
|
||||
self.router.get_weight(transient_id),
|
||||
self.router.get_size(transient_id) ]
|
||||
|
||||
unhandled_entries.append(unhandled_entry)
|
||||
|
||||
else: purged_ids.append(transient_id)
|
||||
unhandled_entry = [
|
||||
transient_id,
|
||||
self.router.get_weight(transient_id),
|
||||
self.router.get_size(transient_id),
|
||||
]
|
||||
unhandled_entries.append(unhandled_entry)
|
||||
else:
|
||||
purged_ids.append(transient_id)
|
||||
|
||||
for transient_id in purged_ids:
|
||||
RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since it no longer exists in the message store.", RNS.LOG_DEBUG)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
for transient_id in low_value_ids:
|
||||
RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since its stamp value is lower than peer requirement of {min_accepted_cost}.", RNS.LOG_DEBUG)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
|
||||
self.unhandled_messages.pop(transient_id)
|
||||
|
||||
unhandled_entries.sort(key=lambda e: e[1], reverse=False)
|
||||
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
|
||||
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
|
||||
RNS.log(f"Syncing to peer with per-message limit {RNS.prettysize(self.propagation_transfer_limit*1000)} and sync limit {RNS.prettysize(self.propagation_sync_limit*1000)}") # TODO: Remove debug
|
||||
|
||||
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
|
||||
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
|
||||
for unhandled_entry in unhandled_entries:
|
||||
transient_id = unhandled_entry[0]
|
||||
weight = unhandled_entry[1]
|
||||
lxm_size = unhandled_entry[2]
|
||||
lxm_transfer_size = lxm_size+per_message_overhead
|
||||
next_size = cumulative_size + lxm_transfer_size
|
||||
transient_id = unhandled_entry[0]
|
||||
weight = unhandled_entry[1]
|
||||
lxm_size = unhandled_entry[2]
|
||||
next_size = cumulative_size + (lxm_size+per_message_overhead)
|
||||
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
|
||||
pass
|
||||
else:
|
||||
cumulative_size += (lxm_size+per_message_overhead)
|
||||
unhandled_ids.append(transient_id)
|
||||
|
||||
if self.propagation_transfer_limit != None and lxm_transfer_size > (self.propagation_transfer_limit*1000):
|
||||
self.remove_unhandled_message(transient_id)
|
||||
self.add_handled_message(transient_id)
|
||||
continue
|
||||
|
||||
if self.propagation_sync_limit != None and next_size >= (self.propagation_sync_limit*1000):
|
||||
continue
|
||||
|
||||
cumulative_size += lxm_transfer_size
|
||||
unhandled_ids.append(transient_id)
|
||||
|
||||
offer = [self.peering_key[0], unhandled_ids]
|
||||
|
||||
RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)} ({RNS.prettysize(len(msgpack.packb(unhandled_ids)))})", RNS.LOG_VERBOSE)
|
||||
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
|
||||
self.last_offer = unhandled_ids
|
||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, offer, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||
self.state = LXMPeer.REQUEST_SENT
|
||||
|
||||
else:
|
||||
RNS.log(f"Could not request sync to peer {RNS.prettyhexrep(self.destination_hash)} since its identity could not be recalled.", RNS.LOG_ERROR)
|
||||
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
|
||||
|
||||
else:
|
||||
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
|
||||
if self.last_sync_attempt > self.last_heard:
|
||||
self.alive = False
|
||||
|
||||
def request_failed(self, request_receipt):
|
||||
RNS.log(f"Sync request to peer {self.destination} failed", RNS.LOG_DEBUG)
|
||||
if self.link != None: self.link.teardown()
|
||||
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def offer_response(self, request_receipt):
|
||||
|
|
@ -403,35 +206,22 @@ class LXMPeer:
|
|||
|
||||
if response == LXMPeer.ERROR_NO_IDENTITY:
|
||||
if self.link != None:
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE)
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG)
|
||||
self.link.identify()
|
||||
self.state = LXMPeer.LINK_READY
|
||||
self.sync()
|
||||
return
|
||||
|
||||
elif response == LXMPeer.ERROR_NO_ACCESS:
|
||||
RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE)
|
||||
self.router.unpeer(self.destination_hash)
|
||||
return
|
||||
|
||||
elif response == LXMPeer.ERROR_THROTTLED:
|
||||
throttle_time = self.router.PN_STAMP_THROTTLE
|
||||
RNS.log(f"Remote indicated that we're throttled, postponing sync for {RNS.prettytime(throttle_time)}", RNS.LOG_VERBOSE)
|
||||
self.next_sync_attempt = time.time()+throttle_time
|
||||
return
|
||||
|
||||
elif response == False:
|
||||
# Peer already has all advertised messages
|
||||
for transient_id in self.last_offer:
|
||||
if transient_id in self.unhandled_messages:
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
||||
|
||||
|
||||
elif response == True:
|
||||
# Peer wants all advertised messages
|
||||
for transient_id in self.last_offer:
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
else:
|
||||
|
|
@ -440,17 +230,18 @@ class LXMPeer:
|
|||
# If the peer did not want the message, it has
|
||||
# already received it from another peer.
|
||||
if not transient_id in response:
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
if transient_id in self.unhandled_messages:
|
||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
||||
|
||||
for transient_id in response:
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
if len(wanted_messages) > 0:
|
||||
RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} wanted {str(len(wanted_messages))} of the available messages", RNS.LOG_VERBOSE)
|
||||
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG)
|
||||
|
||||
lxm_list = []
|
||||
|
||||
for message_entry in wanted_messages:
|
||||
file_path = message_entry[1]
|
||||
if os.path.isfile(file_path):
|
||||
|
|
@ -460,15 +251,12 @@ class LXMPeer:
|
|||
lxm_list.append(lxmf_data)
|
||||
|
||||
data = msgpack.packb([time.time(), lxm_list])
|
||||
RNS.log(f"Total transfer size for this sync is {RNS.prettysize(len(data))}", RNS.LOG_VERBOSE)
|
||||
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
||||
self.currently_transferring_messages = wanted_message_ids
|
||||
self.current_sync_transfer_started = time.time()
|
||||
resource.transferred_messages = wanted_message_ids
|
||||
self.state = LXMPeer.RESOURCE_TRANSFERRING
|
||||
|
||||
else:
|
||||
RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} did not request any of the available messages, sync completed", RNS.LOG_VERBOSE)
|
||||
self.offered += len(self.last_offer)
|
||||
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
|
|
@ -487,45 +275,27 @@ class LXMPeer:
|
|||
|
||||
def resource_concluded(self, resource):
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
if self.currently_transferring_messages == None:
|
||||
RNS.log(f"Sync transfer completed on {self}, but transferred message index was unavailable. Aborting.", RNS.LOG_ERROR)
|
||||
if self.link != None: self.link.teardown()
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
for transient_id in self.currently_transferring_messages:
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
for transient_id in resource.transferred_messages:
|
||||
message = self.unhandled_messages.pop(transient_id)
|
||||
self.handled_messages[transient_id] = message
|
||||
|
||||
if self.link != None: self.link.teardown()
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
rate_str = ""
|
||||
if self.current_sync_transfer_started != None:
|
||||
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-self.current_sync_transfer_started)
|
||||
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
RNS.log(f"Syncing {len(self.currently_transferring_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
|
||||
self.alive = True
|
||||
RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed", RNS.LOG_DEBUG)
|
||||
self.alive = True
|
||||
self.last_heard = time.time()
|
||||
self.offered += len(self.last_offer)
|
||||
self.outgoing += len(self.currently_transferring_messages)
|
||||
self.tx_bytes += resource.get_data_size()
|
||||
|
||||
self.currently_transferring_messages = None
|
||||
self.current_sync_transfer_started = None
|
||||
|
||||
if self.sync_strategy == self.STRATEGY_PERSISTENT:
|
||||
if self.unhandled_message_count > 0: self.sync()
|
||||
|
||||
else:
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE)
|
||||
if self.link != None: self.link.teardown()
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
self.currently_transferring_messages = None
|
||||
self.current_sync_transfer_started = None
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def link_established(self, link):
|
||||
self.link.identify(self.router.identity)
|
||||
|
|
@ -538,105 +308,15 @@ class LXMPeer:
|
|||
self.sync()
|
||||
|
||||
def link_closed(self, link):
|
||||
self.link = None
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def queued_items(self):
|
||||
return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0
|
||||
def handle_message(self, transient_id):
|
||||
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
|
||||
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
|
||||
|
||||
def queue_unhandled_message(self, transient_id):
|
||||
self.unhandled_messages_queue.append(transient_id)
|
||||
|
||||
def queue_handled_message(self, transient_id):
|
||||
self.handled_messages_queue.append(transient_id)
|
||||
|
||||
def process_queues(self):
|
||||
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
|
||||
handled_messages = self.handled_messages
|
||||
unhandled_messages = self.unhandled_messages
|
||||
|
||||
while len(self.handled_messages_queue) > 0:
|
||||
transient_id = self.handled_messages_queue.pop()
|
||||
if not transient_id in handled_messages: self.add_handled_message(transient_id)
|
||||
if transient_id in unhandled_messages: self.remove_unhandled_message(transient_id)
|
||||
|
||||
while len(self.unhandled_messages_queue) > 0:
|
||||
transient_id = self.unhandled_messages_queue.pop()
|
||||
if not transient_id in handled_messages and not transient_id in unhandled_messages:
|
||||
self.add_unhandled_message(transient_id)
|
||||
|
||||
del handled_messages, unhandled_messages
|
||||
|
||||
@property
|
||||
def handled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
hm = list(filter(lambda tid: self.destination_hash in pes[tid][4], pes))
|
||||
self._hm_count = len(hm); del pes
|
||||
self._hm_counts_synced = True
|
||||
return hm
|
||||
|
||||
@property
|
||||
def unhandled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
um = list(filter(lambda tid: self.destination_hash in pes[tid][5], pes))
|
||||
self._um_count = len(um); del pes
|
||||
self._um_counts_synced = True
|
||||
return um
|
||||
|
||||
@property
|
||||
def handled_message_count(self):
|
||||
if not self._hm_counts_synced: self._update_counts()
|
||||
return self._hm_count
|
||||
|
||||
@property
|
||||
def unhandled_message_count(self):
|
||||
if not self._um_counts_synced: self._update_counts()
|
||||
return self._um_count
|
||||
|
||||
@property
|
||||
def acceptance_rate(self):
|
||||
return 0 if self.offered == 0 else (self.outgoing/self.offered)
|
||||
|
||||
def _update_counts(self):
|
||||
if not self._hm_counts_synced:
|
||||
hm = self.handled_messages; del hm
|
||||
|
||||
if not self._um_counts_synced:
|
||||
um = self.unhandled_messages; del um
|
||||
|
||||
def add_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].append(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def add_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].append(self.destination_hash)
|
||||
self._um_count += 1
|
||||
|
||||
def remove_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].remove(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def remove_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].remove(self.destination_hash)
|
||||
self._um_counts_synced = False
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
if type(self.metadata) != dict: return None
|
||||
else:
|
||||
if not PN_META_NAME in self.metadata: return None
|
||||
else:
|
||||
try: return self.metadata[PN_META_NAME].decode("utf-8")
|
||||
except: return None
|
||||
|
||||
def __str__(self):
|
||||
if self.destination_hash: return RNS.prettyhexrep(self.destination_hash)
|
||||
else: return "<Unknown>"
|
||||
if self.destination_hash:
|
||||
return RNS.prettyhexrep(self.destination_hash)
|
||||
else:
|
||||
return "<Unknown>"
|
||||
1605
LXMF/LXMRouter.py
1605
LXMF/LXMRouter.py
File diff suppressed because it is too large
Load diff
|
|
@ -16,10 +16,8 @@ class LXMessage:
|
|||
SENDING = 0x02
|
||||
SENT = 0x04
|
||||
DELIVERED = 0x08
|
||||
REJECTED = 0xFD
|
||||
CANCELLED = 0xFE
|
||||
FAILED = 0xFF
|
||||
states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, REJECTED, CANCELLED, FAILED]
|
||||
states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, FAILED]
|
||||
|
||||
UNKNOWN = 0x00
|
||||
PACKET = 0x01
|
||||
|
|
@ -51,20 +49,18 @@ class LXMessage:
|
|||
TICKET_INTERVAL = 1*24*60*60
|
||||
COST_TICKET = 0x100
|
||||
|
||||
# LXMF overhead is 112 bytes per message:
|
||||
# LXMF overhead is 111 bytes per message:
|
||||
# 16 bytes for destination hash
|
||||
# 16 bytes for source hash
|
||||
# 64 bytes for Ed25519 signature
|
||||
# 8 bytes for timestamp
|
||||
# 8 bytes for msgpack structure
|
||||
TIMESTAMP_SIZE = 8
|
||||
STRUCT_OVERHEAD = 8
|
||||
LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + TIMESTAMP_SIZE + STRUCT_OVERHEAD
|
||||
# 7 bytes for msgpack structure
|
||||
LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + 8 + 7
|
||||
|
||||
# With an MTU of 500, the maximum amount of data
|
||||
# we can send in a single encrypted packet is
|
||||
# 391 bytes.
|
||||
ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU + TIMESTAMP_SIZE
|
||||
# 383 bytes.
|
||||
ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU
|
||||
|
||||
# The max content length we can fit in LXMF message
|
||||
# inside a single RNS packet is the encrypted MDU, minus
|
||||
|
|
@ -73,7 +69,7 @@ class LXMessage:
|
|||
# field of the packet, therefore we also add the length
|
||||
# of a destination hash to the calculation. With default
|
||||
# RNS and LXMF parameters, the largest single-packet
|
||||
# LXMF message we can send is 295 bytes. If a message
|
||||
# LXMF message we can send is 288 bytes. If a message
|
||||
# is larger than that, a Reticulum link will be used.
|
||||
ENCRYPTED_PACKET_MAX_CONTENT = ENCRYPTED_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
|
||||
|
||||
|
|
@ -83,13 +79,13 @@ class LXMessage:
|
|||
LINK_PACKET_MDU = RNS.Link.MDU
|
||||
|
||||
# Which means that we can deliver single-packet LXMF
|
||||
# messages with content of up to 319 bytes over a link.
|
||||
# messages with content of up to 320 bytes over a link.
|
||||
# If a message is larger than that, LXMF will sequence
|
||||
# and transfer it as a RNS resource over the link instead.
|
||||
LINK_PACKET_MAX_CONTENT = LINK_PACKET_MDU - LXMF_OVERHEAD
|
||||
|
||||
# For plain packets without encryption, we can
|
||||
# fit up to 368 bytes of content.
|
||||
# fit up to 369 bytes of content.
|
||||
PLAIN_PACKET_MDU = RNS.Packet.PLAIN_MDU
|
||||
PLAIN_PACKET_MAX_CONTENT = PLAIN_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
|
||||
|
||||
|
|
@ -133,44 +129,30 @@ class LXMessage:
|
|||
if title == None:
|
||||
title = ""
|
||||
|
||||
if type(title) == bytes:
|
||||
self.set_title_from_bytes(title)
|
||||
else:
|
||||
self.set_title_from_string(title)
|
||||
|
||||
if type(content) == bytes:
|
||||
self.set_content_from_bytes(content)
|
||||
else:
|
||||
self.set_content_from_string(content)
|
||||
|
||||
self.set_title_from_string(title)
|
||||
self.set_content_from_string(content)
|
||||
self.set_fields(fields)
|
||||
|
||||
self.payload = None
|
||||
self.timestamp = None
|
||||
self.signature = None
|
||||
self.hash = None
|
||||
self.transient_id = None
|
||||
self.packed = None
|
||||
self.state = LXMessage.GENERATING
|
||||
self.method = LXMessage.UNKNOWN
|
||||
self.progress = 0.0
|
||||
self.rssi = None
|
||||
self.snr = None
|
||||
self.q = None
|
||||
self.payload = None
|
||||
self.timestamp = None
|
||||
self.signature = None
|
||||
self.hash = None
|
||||
self.packed = None
|
||||
self.state = LXMessage.GENERATING
|
||||
self.method = LXMessage.UNKNOWN
|
||||
self.progress = 0.0
|
||||
self.rssi = None
|
||||
self.snr = None
|
||||
self.q = None
|
||||
|
||||
self.stamp = None
|
||||
self.stamp_cost = stamp_cost
|
||||
self.stamp_value = None
|
||||
self.stamp_valid = False
|
||||
self.stamp_checked = False
|
||||
self.propagation_stamp = None
|
||||
self.propagation_stamp_value = None
|
||||
self.propagation_stamp_valid = False
|
||||
self.propagation_target_cost = None
|
||||
self.defer_stamp = True
|
||||
self.defer_propagation_stamp = True
|
||||
self.outbound_ticket = None
|
||||
self.include_ticket = include_ticket
|
||||
self.stamp = None
|
||||
self.stamp_cost = stamp_cost
|
||||
self.stamp_value = None
|
||||
self.stamp_valid = False
|
||||
self.stamp_checked = False
|
||||
self.defer_stamp = True
|
||||
self.outbound_ticket = None
|
||||
self.include_ticket = include_ticket
|
||||
|
||||
self.propagation_packed = None
|
||||
self.paper_packed = None
|
||||
|
|
@ -190,7 +172,6 @@ class LXMessage:
|
|||
self.resource_representation = None
|
||||
self.__delivery_destination = None
|
||||
self.__delivery_callback = None
|
||||
self.__pn_encrypted_data = None
|
||||
self.failed_callback = None
|
||||
|
||||
self.deferred_stamp_generating = False
|
||||
|
|
@ -211,11 +192,7 @@ class LXMessage:
|
|||
self.content = content_bytes
|
||||
|
||||
def content_as_string(self):
|
||||
try:
|
||||
return self.content.decode("utf-8")
|
||||
except Exception as e:
|
||||
RNS.log(f"{self} could not decode message content as string: {e}")
|
||||
return None
|
||||
return self.content.decode("utf-8")
|
||||
|
||||
def set_fields(self, fields):
|
||||
if isinstance(fields, dict) or fields == None:
|
||||
|
|
@ -275,6 +252,15 @@ class LXMessage:
|
|||
def register_failed_callback(self, callback):
|
||||
self.failed_callback = callback
|
||||
|
||||
@staticmethod
|
||||
def stamp_valid(stamp, target_cost, workblock):
|
||||
target = 0b1 << 256-target_cost
|
||||
result = RNS.Identity.full_hash(workblock+stamp)
|
||||
if int.from_bytes(result, byteorder="big") > target:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def validate_stamp(self, target_cost, tickets=None):
|
||||
if tickets != None:
|
||||
for ticket in tickets:
|
||||
|
|
@ -291,7 +277,7 @@ class LXMessage:
|
|||
return False
|
||||
else:
|
||||
workblock = LXStamper.stamp_workblock(self.message_id)
|
||||
if LXStamper.stamp_valid(self.stamp, target_cost, workblock):
|
||||
if LXMessage.stamp_valid(self.stamp, target_cost, workblock):
|
||||
RNS.log(f"Stamp on {self} validated", RNS.LOG_DEBUG) # TODO: Remove at some point
|
||||
self.stamp_value = LXStamper.stamp_value(workblock, self.stamp)
|
||||
return True
|
||||
|
|
@ -331,35 +317,10 @@ class LXMessage:
|
|||
else:
|
||||
return None
|
||||
|
||||
def get_propagation_stamp(self, target_cost, timeout=None):
|
||||
# If a stamp was already generated, return
|
||||
# it immediately.
|
||||
if self.propagation_stamp != None:
|
||||
return self.propagation_stamp
|
||||
|
||||
# Otherwise, we will need to generate a
|
||||
# valid stamp according to the cost that
|
||||
# the propagation node has specified.
|
||||
else:
|
||||
self.propagation_target_cost = target_cost
|
||||
if self.propagation_target_cost == None:
|
||||
raise ValueError("Cannot generate propagation stamp without configured target propagation cost")
|
||||
|
||||
|
||||
if not self.transient_id: self.pack()
|
||||
generated_stamp, value = LXStamper.generate_stamp(self.transient_id, target_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN)
|
||||
if generated_stamp:
|
||||
self.propagation_stamp = generated_stamp
|
||||
self.propagation_stamp_value = value
|
||||
self.propagation_stamp_valid = True
|
||||
return generated_stamp
|
||||
|
||||
else:
|
||||
return None
|
||||
|
||||
def pack(self, payload_updated=False):
|
||||
def pack(self):
|
||||
if not self.packed:
|
||||
if self.timestamp == None: self.timestamp = time.time()
|
||||
if self.timestamp == None:
|
||||
self.timestamp = time.time()
|
||||
|
||||
self.propagation_packed = None
|
||||
self.paper_packed = None
|
||||
|
|
@ -375,8 +336,9 @@ class LXMessage:
|
|||
|
||||
if not self.defer_stamp:
|
||||
self.stamp = self.get_stamp()
|
||||
if self.stamp != None: self.payload.append(self.stamp)
|
||||
|
||||
if self.stamp != None:
|
||||
self.payload.append(self.stamp)
|
||||
|
||||
signed_part = b""
|
||||
signed_part += hashed_part
|
||||
signed_part += self.hash
|
||||
|
|
@ -390,22 +352,14 @@ class LXMessage:
|
|||
self.packed += self.signature
|
||||
self.packed += packed_payload
|
||||
self.packed_size = len(self.packed)
|
||||
content_size = len(packed_payload)-LXMessage.TIMESTAMP_SIZE-LXMessage.STRUCT_OVERHEAD
|
||||
content_size = len(packed_payload)
|
||||
|
||||
# If no desired delivery method has been defined,
|
||||
# one will be chosen according to these rules:
|
||||
if self.desired_method == None:
|
||||
self.desired_method = LXMessage.DIRECT
|
||||
|
||||
# If opportunistic delivery was requested, check
|
||||
# that message will fit within packet size limits
|
||||
if self.desired_method == LXMessage.OPPORTUNISTIC:
|
||||
if self.__destination.type == RNS.Destination.SINGLE:
|
||||
if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT:
|
||||
RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
|
||||
self.desired_method = LXMessage.DIRECT
|
||||
# TODO: Expand rules to something more intelligent
|
||||
|
||||
# Set delivery parameters according to delivery method
|
||||
if self.desired_method == LXMessage.OPPORTUNISTIC:
|
||||
if self.__destination.type == RNS.Destination.SINGLE:
|
||||
single_packet_content_limit = LXMessage.ENCRYPTED_PACKET_MAX_CONTENT
|
||||
|
|
@ -413,7 +367,7 @@ class LXMessage:
|
|||
single_packet_content_limit = LXMessage.PLAIN_PACKET_MAX_CONTENT
|
||||
|
||||
if content_size > single_packet_content_limit:
|
||||
raise TypeError(f"LXMessage desired opportunistic delivery method, but content of length {content_size} exceeds single-packet content limit of {single_packet_content_limit}.")
|
||||
raise TypeError("LXMessage desired opportunistic delivery method, but content exceeds single-packet size.")
|
||||
else:
|
||||
self.method = LXMessage.OPPORTUNISTIC
|
||||
self.representation = LXMessage.PACKET
|
||||
|
|
@ -431,14 +385,9 @@ class LXMessage:
|
|||
elif self.desired_method == LXMessage.PROPAGATED:
|
||||
single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT
|
||||
|
||||
if self.__pn_encrypted_data == None or payload_updated:
|
||||
self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
|
||||
self.ratchet_id = self.__destination.latest_ratchet_id
|
||||
|
||||
lxmf_data = self.packed[:LXMessage.DESTINATION_LENGTH]+self.__pn_encrypted_data
|
||||
self.transient_id = RNS.Identity.full_hash(lxmf_data)
|
||||
if self.propagation_stamp != None: lxmf_data += self.propagation_stamp
|
||||
self.propagation_packed = msgpack.packb([time.time(), [lxmf_data]])
|
||||
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
|
||||
self.ratchet_id = self.__destination.latest_ratchet_id
|
||||
self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]])
|
||||
|
||||
content_size = len(self.propagation_packed)
|
||||
if content_size <= single_packet_content_limit:
|
||||
|
|
@ -471,7 +420,6 @@ class LXMessage:
|
|||
if self.method == LXMessage.OPPORTUNISTIC:
|
||||
lxm_packet = self.__as_packet()
|
||||
lxm_packet.send().set_delivery_callback(self.__mark_delivered)
|
||||
self.progress = 0.50
|
||||
self.ratchet_id = lxm_packet.ratchet_id
|
||||
self.state = LXMessage.SENT
|
||||
|
||||
|
|
@ -593,27 +541,22 @@ class LXMessage:
|
|||
if resource.status == RNS.Resource.COMPLETE:
|
||||
self.__mark_delivered()
|
||||
else:
|
||||
if resource.status == RNS.Resource.REJECTED:
|
||||
self.state = LXMessage.REJECTED
|
||||
|
||||
elif self.state != LXMessage.CANCELLED:
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
def __propagation_resource_concluded(self, resource):
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
self.__mark_propagated()
|
||||
else:
|
||||
if self.state != LXMessage.CANCELLED:
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
def __link_packet_timed_out(self, packet_receipt):
|
||||
if self.state != LXMessage.CANCELLED:
|
||||
if packet_receipt:
|
||||
packet_receipt.destination.teardown()
|
||||
|
||||
self.state = LXMessage.OUTBOUND
|
||||
if packet_receipt:
|
||||
packet_receipt.destination.teardown()
|
||||
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
|
||||
def __update_transfer_progress(self, resource):
|
||||
self.progress = 0.10 + (resource.get_progress()*0.90)
|
||||
|
|
|
|||
|
|
@ -3,30 +3,23 @@ import RNS.vendor.umsgpack as msgpack
|
|||
|
||||
import os
|
||||
import time
|
||||
import math
|
||||
import itertools
|
||||
import multiprocessing
|
||||
|
||||
WORKBLOCK_EXPAND_ROUNDS = 3000
|
||||
WORKBLOCK_EXPAND_ROUNDS_PN = 1000
|
||||
WORKBLOCK_EXPAND_ROUNDS_PEERING = 25
|
||||
STAMP_SIZE = RNS.Identity.HASHLENGTH//8
|
||||
PN_VALIDATION_POOL_MIN_SIZE = 256
|
||||
WORKBLOCK_EXPAND_ROUNDS = 3000
|
||||
|
||||
active_jobs = {}
|
||||
|
||||
if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork")
|
||||
|
||||
def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
|
||||
def stamp_workblock(message_id):
|
||||
wb_st = time.time()
|
||||
expand_rounds = WORKBLOCK_EXPAND_ROUNDS
|
||||
workblock = b""
|
||||
for n in range(expand_rounds):
|
||||
workblock += RNS.Cryptography.hkdf(length=256,
|
||||
derive_from=material,
|
||||
salt=RNS.Identity.full_hash(material+msgpack.packb(n)),
|
||||
context=None)
|
||||
workblock += RNS.Cryptography.hkdf(
|
||||
length=256,
|
||||
derive_from=message_id,
|
||||
salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)),
|
||||
context=None,
|
||||
)
|
||||
wb_time = time.time() - wb_st
|
||||
# RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)
|
||||
RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)
|
||||
|
||||
return workblock
|
||||
|
||||
|
|
@ -34,115 +27,40 @@ def stamp_value(workblock, stamp):
|
|||
value = 0
|
||||
bits = 256
|
||||
material = RNS.Identity.full_hash(workblock+stamp)
|
||||
i = int.from_bytes(material, byteorder="big")
|
||||
i = int.from_bytes(material)
|
||||
while ((i & (1 << (bits - 1))) == 0):
|
||||
i = (i << 1)
|
||||
value += 1
|
||||
|
||||
return value
|
||||
|
||||
def stamp_valid(stamp, target_cost, workblock):
|
||||
target = 0b1 << 256-target_cost
|
||||
result = RNS.Identity.full_hash(workblock+stamp)
|
||||
if int.from_bytes(result, byteorder="big") > target: return False
|
||||
else: return True
|
||||
|
||||
def validate_peering_key(peering_id, peering_key, target_cost):
|
||||
workblock = stamp_workblock(peering_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING)
|
||||
if not stamp_valid(peering_key, target_cost, workblock): return False
|
||||
else: return True
|
||||
|
||||
def validate_pn_stamp(transient_data, target_cost):
|
||||
from .LXMessage import LXMessage
|
||||
if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return None, None, None, None
|
||||
else:
|
||||
lxm_data = transient_data[:-STAMP_SIZE]
|
||||
stamp = transient_data[-STAMP_SIZE:]
|
||||
transient_id = RNS.Identity.full_hash(lxm_data)
|
||||
workblock = stamp_workblock(transient_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN)
|
||||
|
||||
if not stamp_valid(stamp, target_cost, workblock): return None, None, None, None
|
||||
else:
|
||||
value = stamp_value(workblock, stamp)
|
||||
return transient_id, lxm_data, value, stamp
|
||||
|
||||
def validate_pn_stamps_job_simple(transient_list, target_cost):
|
||||
validated_messages = []
|
||||
for transient_data in transient_list:
|
||||
transient_id, lxm_data, value, stamp_data = validate_pn_stamp(transient_data, target_cost)
|
||||
if transient_id: validated_messages.append([transient_id, lxm_data, value, stamp_data])
|
||||
|
||||
return validated_messages
|
||||
|
||||
def validate_pn_stamps_job_multip(transient_list, target_cost):
|
||||
cores = multiprocessing.cpu_count()
|
||||
pool_count = min(cores, math.ceil(len(transient_list) / PN_VALIDATION_POOL_MIN_SIZE))
|
||||
|
||||
RNS.log(f"Validating {len(transient_list)} stamps using {pool_count} processes...", RNS.LOG_VERBOSE)
|
||||
with multiprocessing.Pool(pool_count) as p:
|
||||
validated_entries = p.starmap(validate_pn_stamp, zip(transient_list, itertools.repeat(target_cost)))
|
||||
|
||||
return [e for e in validated_entries if e[0] != None]
|
||||
|
||||
def validate_pn_stamps(transient_list, target_cost):
|
||||
non_mp_platform = RNS.vendor.platformutils.is_android()
|
||||
if len(transient_list) <= PN_VALIDATION_POOL_MIN_SIZE or non_mp_platform: return validate_pn_stamps_job_simple(transient_list, target_cost)
|
||||
else: return validate_pn_stamps_job_multip(transient_list, target_cost)
|
||||
|
||||
def generate_stamp(message_id, stamp_cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
|
||||
def generate_stamp(message_id, stamp_cost):
|
||||
RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG)
|
||||
workblock = stamp_workblock(message_id, expand_rounds=expand_rounds)
|
||||
workblock = stamp_workblock(message_id)
|
||||
|
||||
start_time = time.time()
|
||||
stamp = None
|
||||
rounds = 0
|
||||
value = 0
|
||||
|
||||
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin(): stamp, rounds = job_simple(stamp_cost, workblock, message_id)
|
||||
elif RNS.vendor.platformutils.is_android(): stamp, rounds = job_android(stamp_cost, workblock, message_id)
|
||||
else: stamp, rounds = job_linux(stamp_cost, workblock, message_id)
|
||||
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
|
||||
stamp, rounds = job_simple(stamp_cost, workblock)
|
||||
|
||||
elif RNS.vendor.platformutils.is_android():
|
||||
stamp, rounds = job_android(stamp_cost, workblock)
|
||||
|
||||
else:
|
||||
stamp, rounds = job_linux(stamp_cost, workblock)
|
||||
|
||||
duration = time.time() - start_time
|
||||
speed = rounds/duration
|
||||
if stamp != None: value = stamp_value(workblock, stamp)
|
||||
value = stamp_value(workblock, stamp)
|
||||
|
||||
RNS.log(f"Stamp with value {value} generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG)
|
||||
|
||||
return stamp, value
|
||||
|
||||
def cancel_work(message_id):
|
||||
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
|
||||
try:
|
||||
if message_id in active_jobs:
|
||||
active_jobs[message_id] = True
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
elif RNS.vendor.platformutils.is_android():
|
||||
try:
|
||||
if message_id in active_jobs:
|
||||
active_jobs[message_id] = True
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
else:
|
||||
try:
|
||||
if message_id in active_jobs:
|
||||
stop_event = active_jobs[message_id][0]
|
||||
result_queue = active_jobs[message_id][1]
|
||||
stop_event.set()
|
||||
result_queue.put(None)
|
||||
active_jobs.pop(message_id)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
def job_simple(stamp_cost, workblock, message_id):
|
||||
def job_simple(stamp_cost, workblock):
|
||||
# A simple, single-process stamp generator.
|
||||
# should work on any platform, and is used
|
||||
# as a fall-back, in case of limited multi-
|
||||
|
|
@ -155,33 +73,27 @@ def job_simple(stamp_cost, workblock, message_id):
|
|||
pstamp = os.urandom(256//8)
|
||||
st = time.time()
|
||||
|
||||
active_jobs[message_id] = False;
|
||||
|
||||
def sv(s, c, w):
|
||||
target = 0b1<<256-c; m = w+s
|
||||
result = RNS.Identity.full_hash(m)
|
||||
if int.from_bytes(result, byteorder="big") > target: return False
|
||||
else: return True
|
||||
if int.from_bytes(result, byteorder="big") > target:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
while not sv(pstamp, stamp_cost, workblock) and not active_jobs[message_id]:
|
||||
while not sv(pstamp, stamp_cost, workblock):
|
||||
pstamp = os.urandom(256//8); rounds += 1
|
||||
if rounds % 2500 == 0:
|
||||
speed = rounds / (time.time()-st)
|
||||
RNS.log(f"Stamp generation running. {rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG)
|
||||
|
||||
if active_jobs[message_id] == True:
|
||||
pstamp = None
|
||||
|
||||
active_jobs.pop(message_id)
|
||||
|
||||
return pstamp, rounds
|
||||
|
||||
def job_linux(stamp_cost, workblock, message_id):
|
||||
def job_linux(stamp_cost, workblock):
|
||||
allow_kill = True
|
||||
stamp = None
|
||||
total_rounds = 0
|
||||
cores = multiprocessing.cpu_count()
|
||||
jobs = cores if cores <= 12 else int(cores/2)
|
||||
jobs = multiprocessing.cpu_count()
|
||||
stop_event = multiprocessing.Event()
|
||||
result_queue = multiprocessing.Queue(1)
|
||||
rounds_queue = multiprocessing.Queue()
|
||||
|
|
@ -214,8 +126,6 @@ def job_linux(stamp_cost, workblock, message_id):
|
|||
job_procs.append(process)
|
||||
process.start()
|
||||
|
||||
active_jobs[message_id] = [stop_event, result_queue]
|
||||
|
||||
stamp = result_queue.get()
|
||||
RNS.log("Got stamp result from worker", RNS.LOG_DEBUG) # TODO: Remove
|
||||
|
||||
|
|
@ -260,7 +170,7 @@ def job_linux(stamp_cost, workblock, message_id):
|
|||
|
||||
return stamp, total_rounds
|
||||
|
||||
def job_android(stamp_cost, workblock, message_id):
|
||||
def job_android(stamp_cost, workblock):
|
||||
# Semaphore support is flaky to non-existent on
|
||||
# Android, so we need to manually dispatch and
|
||||
# manage workloads here, while periodically
|
||||
|
|
@ -320,12 +230,10 @@ def job_android(stamp_cost, workblock, message_id):
|
|||
RNS.log(f"Stamp generation worker error: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
active_jobs[message_id] = False;
|
||||
|
||||
RNS.log(f"Dispatching {jobs} workers for stamp generation...", RNS.LOG_DEBUG) # TODO: Remove
|
||||
|
||||
results_dict = wm.dict()
|
||||
while stamp == None and active_jobs[message_id] == False:
|
||||
while stamp == None:
|
||||
job_procs = []
|
||||
try:
|
||||
for pnum in range(jobs):
|
||||
|
|
@ -352,17 +260,8 @@ def job_android(stamp_cost, workblock, message_id):
|
|||
RNS.log(f"Stamp generation job error: {e}")
|
||||
RNS.trace_exception(e)
|
||||
|
||||
active_jobs.pop(message_id)
|
||||
|
||||
return stamp, total_rounds
|
||||
|
||||
# def stamp_value_linear(workblock, stamp):
|
||||
# value = 0
|
||||
# bits = 256
|
||||
# material = RNS.Identity.full_hash(workblock+stamp)
|
||||
# s = int.from_bytes(material, byteorder="big")
|
||||
# return s.bit_count()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
if len(sys.argv) < 2:
|
||||
|
|
@ -378,14 +277,4 @@ if __name__ == "__main__":
|
|||
RNS.loglevel = RNS.LOG_DEBUG
|
||||
RNS.log("Testing LXMF stamp generation", RNS.LOG_DEBUG)
|
||||
message_id = os.urandom(32)
|
||||
generate_stamp(message_id, cost)
|
||||
|
||||
RNS.log("", RNS.LOG_DEBUG)
|
||||
RNS.log("Testing propagation stamp generation", RNS.LOG_DEBUG)
|
||||
message_id = os.urandom(32)
|
||||
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN)
|
||||
|
||||
RNS.log("", RNS.LOG_DEBUG)
|
||||
RNS.log("Testing peering key generation", RNS.LOG_DEBUG)
|
||||
message_id = os.urandom(32)
|
||||
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING)
|
||||
generate_stamp(message_id, cost)
|
||||
|
|
@ -1,8 +1,8 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
# Reticulum License
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2020-2025 Mark Qvist
|
||||
# Copyright (c) 2016-2022 Mark Qvist / unsigned.io
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
|
|
@ -11,16 +11,8 @@
|
|||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# - The Software shall not be used in any kind of system which includes amongst
|
||||
# its functions the ability to purposefully do harm to human beings.
|
||||
#
|
||||
# - The Software shall not be used, directly or indirectly, in the creation of
|
||||
# an artificial intelligence, machine learning or language model training
|
||||
# dataset, including but not limited to any use that contributes to the
|
||||
# training or development of such a model or algorithm.
|
||||
#
|
||||
# - The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
|
|
@ -43,7 +35,6 @@ import time
|
|||
import os
|
||||
|
||||
from LXMF._version import __version__
|
||||
from LXMF import APP_NAME
|
||||
|
||||
from RNS.vendor.configobj import ConfigObj
|
||||
|
||||
|
|
@ -105,11 +96,6 @@ def apply_config():
|
|||
else:
|
||||
active_configuration["enable_propagation_node"] = False
|
||||
|
||||
if "propagation" in lxmd_config and "node_name" in lxmd_config["propagation"]:
|
||||
active_configuration["node_name"] = lxmd_config["propagation"].get("node_name")
|
||||
else:
|
||||
active_configuration["node_name"] = None
|
||||
|
||||
if "propagation" in lxmd_config and "auth_required" in lxmd_config["propagation"]:
|
||||
active_configuration["auth_required"] = lxmd_config["propagation"].as_bool("auth_required")
|
||||
else:
|
||||
|
|
@ -140,7 +126,7 @@ def apply_config():
|
|||
if active_configuration["message_storage_limit"] < 0.005:
|
||||
active_configuration["message_storage_limit"] = 0.005
|
||||
else:
|
||||
active_configuration["message_storage_limit"] = 500
|
||||
active_configuration["message_storage_limit"] = 2000
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
|
||||
|
|
@ -148,76 +134,11 @@ def apply_config():
|
|||
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
|
||||
else:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = 256
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_message_max_accepted_size" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_message_max_accepted_size")
|
||||
if active_configuration["propagation_transfer_max_accepted_size"] < 0.38:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
|
||||
else:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = 256
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_sync_max_accepted_size" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_sync_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_sync_max_accepted_size")
|
||||
if active_configuration["propagation_sync_max_accepted_size"] < 0.38:
|
||||
active_configuration["propagation_sync_max_accepted_size"] = 0.38
|
||||
else:
|
||||
active_configuration["propagation_sync_max_accepted_size"] = 256*40
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_stamp_cost_target" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_stamp_cost_target"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_target")
|
||||
if active_configuration["propagation_stamp_cost_target"] < LXMF.LXMRouter.PROPAGATION_COST_MIN:
|
||||
active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST_MIN
|
||||
else:
|
||||
active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_stamp_cost_flexibility" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_stamp_cost_flexibility"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_flexibility")
|
||||
if active_configuration["propagation_stamp_cost_flexibility"] < 0:
|
||||
active_configuration["propagation_stamp_cost_flexibility"] = 0
|
||||
else:
|
||||
active_configuration["propagation_stamp_cost_flexibility"] = LXMF.LXMRouter.PROPAGATION_COST_FLEX
|
||||
|
||||
if "propagation" in lxmd_config and "peering_cost" in lxmd_config["propagation"]:
|
||||
active_configuration["peering_cost"] = lxmd_config["propagation"].as_int("peering_cost")
|
||||
if active_configuration["peering_cost"] < 0:
|
||||
active_configuration["peering_cost"] = 0
|
||||
else:
|
||||
active_configuration["peering_cost"] = LXMF.LXMRouter.PEERING_COST
|
||||
|
||||
if "propagation" in lxmd_config and "remote_peering_cost_max" in lxmd_config["propagation"]:
|
||||
active_configuration["remote_peering_cost_max"] = lxmd_config["propagation"].as_int("remote_peering_cost_max")
|
||||
if active_configuration["remote_peering_cost_max"] < 0:
|
||||
active_configuration["remote_peering_cost_max"] = 0
|
||||
else:
|
||||
active_configuration["remote_peering_cost_max"] = LXMF.LXMRouter.MAX_PEERING_COST
|
||||
|
||||
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
|
||||
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
|
||||
else:
|
||||
active_configuration["prioritised_lxmf_destinations"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "control_allowed" in lxmd_config["propagation"]:
|
||||
active_configuration["control_allowed_identities"] = lxmd_config["propagation"].as_list("control_allowed")
|
||||
else:
|
||||
active_configuration["control_allowed_identities"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]:
|
||||
static_peers = lxmd_config["propagation"].as_list("static_peers")
|
||||
active_configuration["static_peers"] = []
|
||||
for static_peer in static_peers:
|
||||
active_configuration["static_peers"].append(bytes.fromhex(static_peer))
|
||||
else:
|
||||
active_configuration["static_peers"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]:
|
||||
active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers")
|
||||
else:
|
||||
active_configuration["max_peers"] = None
|
||||
|
||||
if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]:
|
||||
active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only")
|
||||
else:
|
||||
active_configuration["from_static_only"] = False
|
||||
|
||||
# Load various settings
|
||||
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
|
||||
|
|
@ -383,17 +304,8 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
|||
autopeer = active_configuration["autopeer"],
|
||||
autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
|
||||
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
|
||||
propagation_cost = active_configuration["propagation_stamp_cost_target"],
|
||||
propagation_cost_flexibility = active_configuration["propagation_stamp_cost_flexibility"],
|
||||
peering_cost = active_configuration["peering_cost"],
|
||||
max_peering_cost = active_configuration["remote_peering_cost_max"],
|
||||
sync_limit = active_configuration["propagation_sync_max_accepted_size"],
|
||||
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
|
||||
max_peers = active_configuration["max_peers"],
|
||||
static_peers = active_configuration["static_peers"],
|
||||
from_static_only = active_configuration["from_static_only"],
|
||||
name = active_configuration["node_name"])
|
||||
|
||||
)
|
||||
message_router.register_delivery_callback(lxmf_delivery)
|
||||
|
||||
for destination_hash in active_configuration["ignored_lxmf_destinations"]:
|
||||
|
|
@ -425,16 +337,13 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
|||
for dest_str in active_configuration["prioritised_lxmf_destinations"]:
|
||||
try:
|
||||
dest_hash = bytes.fromhex(dest_str)
|
||||
if len(dest_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: message_router.prioritise(dest_hash)
|
||||
except Exception as e: RNS.log("Cannot prioritise "+str(dest_str)+", it is not a valid destination hash", RNS.LOG_ERROR)
|
||||
if len(dest_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
|
||||
message_router.prioritise(dest_hash)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Cannot prioritise "+str(dest_str)+", it is not a valid destination hash", RNS.LOG_ERROR)
|
||||
|
||||
message_router.enable_propagation()
|
||||
|
||||
for ident_str in active_configuration["control_allowed_identities"]:
|
||||
try:
|
||||
identity_hash = bytes.fromhex(ident_str)
|
||||
if len(identity_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: message_router.allow_control(identity_hash)
|
||||
except Exception as e: RNS.log(f"Cannot allow control from {ident_str}, it is not a valid identity hash", RNS.LOG_ERROR)
|
||||
|
||||
RNS.log("LXMF Propagation Node started on "+RNS.prettyhexrep(message_router.propagation_destination.hash))
|
||||
|
||||
|
|
@ -453,13 +362,13 @@ def jobs():
|
|||
try:
|
||||
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
|
||||
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE)
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
last_peer_announce = time.time()
|
||||
|
||||
if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None:
|
||||
if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE)
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
|
||||
message_router.announce_propagation_node()
|
||||
last_node_announce = time.time()
|
||||
|
||||
|
|
@ -472,7 +381,7 @@ def deferred_start_jobs():
|
|||
global active_configuration, last_peer_announce, last_node_announce
|
||||
global message_router, lxmf_destination
|
||||
time.sleep(DEFFERED_JOBS_DELAY)
|
||||
RNS.log("Running deferred start jobs", RNS.LOG_DEBUG)
|
||||
RNS.log("Running deferred start jobs")
|
||||
if active_configuration["peer_announce_at_start"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
|
|
@ -485,379 +394,6 @@ def deferred_start_jobs():
|
|||
last_node_announce = time.time()
|
||||
threading.Thread(target=jobs, daemon=True).start()
|
||||
|
||||
def _request_sync(identity, destination_hash, remote_identity, timeout=15, exit_on_fail=False):
|
||||
control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
if exit_on_fail:
|
||||
print("Requesting lxmd peer sync timed out, exiting now")
|
||||
exit(200)
|
||||
else:
|
||||
return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
|
||||
if not RNS.Transport.has_path(control_destination.hash):
|
||||
RNS.Transport.request_path(control_destination.hash)
|
||||
while not RNS.Transport.has_path(control_destination.hash):
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link = RNS.Link(control_destination)
|
||||
while not link.status == RNS.Link.ACTIVE:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.identify(identity)
|
||||
request_receipt = link.request(LXMF.LXMRouter.SYNC_REQUEST_PATH, data=destination_hash, response_callback=None, failed_callback=None)
|
||||
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.teardown()
|
||||
return request_receipt.get_response()
|
||||
|
||||
|
||||
def request_sync(target, remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=15, identity_path=None):
|
||||
global configpath, identitypath, storagedir, lxmdir
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
|
||||
try:
|
||||
peer_destination_hash = bytes.fromhex(target)
|
||||
if len(peer_destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters")
|
||||
except Exception as e:
|
||||
print(f"Invalid peer destination hash: {e}")
|
||||
exit(203)
|
||||
remote
|
||||
_remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path)
|
||||
response = _request_sync(identity, peer_destination_hash, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True)
|
||||
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
|
||||
print("Remote received no identity")
|
||||
exit(203)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
|
||||
print("Access denied")
|
||||
exit(204)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_INVALID_DATA:
|
||||
print("Invalid data received by remote")
|
||||
exit(205)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NOT_FOUND:
|
||||
print("The requested peer was not found")
|
||||
exit(206)
|
||||
elif response == None:
|
||||
print("Empty response received")
|
||||
exit(207)
|
||||
else:
|
||||
print(f"Sync requested for peer {RNS.prettyhexrep(peer_destination_hash)}")
|
||||
exit(0)
|
||||
|
||||
def _request_unpeer(identity, destination_hash, remote_identity, timeout=15, exit_on_fail=False):
|
||||
control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
if exit_on_fail:
|
||||
print("Requesting lxmd peering break timed out, exiting now")
|
||||
exit(200)
|
||||
else: return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
|
||||
else: time.sleep(0.1)
|
||||
|
||||
if not RNS.Transport.has_path(control_destination.hash):
|
||||
RNS.Transport.request_path(control_destination.hash)
|
||||
while not RNS.Transport.has_path(control_destination.hash):
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link = RNS.Link(control_destination)
|
||||
while not link.status == RNS.Link.ACTIVE:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.identify(identity)
|
||||
request_receipt = link.request(LXMF.LXMRouter.UNPEER_REQUEST_PATH, data=destination_hash, response_callback=None, failed_callback=None)
|
||||
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.teardown()
|
||||
return request_receipt.get_response()
|
||||
|
||||
|
||||
def request_unpeer(target, remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=15, identity_path=None):
|
||||
global configpath, identitypath, storagedir, lxmdir
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
|
||||
try:
|
||||
peer_destination_hash = bytes.fromhex(target)
|
||||
if len(peer_destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters")
|
||||
except Exception as e:
|
||||
print(f"Invalid peer destination hash: {e}")
|
||||
exit(203)
|
||||
remote
|
||||
_remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path)
|
||||
response = _request_unpeer(identity, peer_destination_hash, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True)
|
||||
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
|
||||
print("Remote received no identity")
|
||||
exit(203)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
|
||||
print("Access denied")
|
||||
exit(204)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_INVALID_DATA:
|
||||
print("Invalid data received by remote")
|
||||
exit(205)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NOT_FOUND:
|
||||
print("The requested peer was not found")
|
||||
exit(206)
|
||||
elif response == None:
|
||||
print("Empty response received")
|
||||
exit(207)
|
||||
else:
|
||||
print(f"Broke peering with {RNS.prettyhexrep(peer_destination_hash)}")
|
||||
exit(0)
|
||||
|
||||
def query_status(identity, remote_identity=None, timeout=5, exit_on_fail=False):
|
||||
if remote_identity == None: remote_identity = identity
|
||||
control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
if exit_on_fail:
|
||||
print("Getting lxmd statistics timed out, exiting now")
|
||||
exit(200)
|
||||
else: return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
|
||||
else: time.sleep(0.1)
|
||||
|
||||
if not RNS.Transport.has_path(control_destination.hash):
|
||||
RNS.Transport.request_path(control_destination.hash)
|
||||
while not RNS.Transport.has_path(control_destination.hash):
|
||||
tc = check_timeout()
|
||||
if tc: return tc
|
||||
|
||||
link = RNS.Link(control_destination)
|
||||
while not link.status == RNS.Link.ACTIVE:
|
||||
tc = check_timeout()
|
||||
if tc: return tc
|
||||
|
||||
link.identify(identity)
|
||||
request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None)
|
||||
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
|
||||
tc = check_timeout()
|
||||
if tc: return tc
|
||||
|
||||
link.teardown()
|
||||
return request_receipt.get_response()
|
||||
|
||||
def get_status(remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=5,
|
||||
show_status=False, show_peers=False, identity_path=None):
|
||||
|
||||
global identity
|
||||
_remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path)
|
||||
response = query_status(identity, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True)
|
||||
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
|
||||
print("Remote received no identity")
|
||||
exit(203)
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
|
||||
print("Access denied")
|
||||
exit(204)
|
||||
elif response == None:
|
||||
print("Empty response received")
|
||||
exit(207)
|
||||
else:
|
||||
s = response
|
||||
mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2)
|
||||
ms_util = f"{mutil}%"
|
||||
if s["from_static_only"]:
|
||||
who_str = "static peers only"
|
||||
else:
|
||||
who_str = "all nodes"
|
||||
|
||||
available_peers = 0
|
||||
unreachable_peers = 0
|
||||
peered_incoming = 0
|
||||
peered_outgoing = 0
|
||||
peered_rx_bytes = 0
|
||||
peered_tx_bytes = 0
|
||||
for peer_id in s["peers"]:
|
||||
p = s["peers"][peer_id]
|
||||
pm = p["messages"]
|
||||
peered_incoming += pm["incoming"]
|
||||
peered_outgoing += pm["outgoing"]
|
||||
peered_rx_bytes += p["rx_bytes"]
|
||||
peered_tx_bytes += p["tx_bytes"]
|
||||
|
||||
if p["alive"]: available_peers += 1
|
||||
else: unreachable_peers += 1
|
||||
|
||||
total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"]
|
||||
total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"]
|
||||
if total_incoming != 0: df = round(peered_outgoing/total_incoming, 2)
|
||||
else: df = 0
|
||||
|
||||
dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"])
|
||||
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
|
||||
|
||||
if show_status:
|
||||
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
|
||||
ptl = RNS.prettysize(s["propagation_limit"]*1000); psl = RNS.prettysize(s["sync_limit"]*1000);
|
||||
uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
|
||||
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
|
||||
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
|
||||
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
|
||||
psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"]
|
||||
pc = s["peering_cost"]; pcm = s["max_peering_cost"]
|
||||
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
|
||||
print(f"Required propagation stamp cost is {psc}, flexibility is {scf}")
|
||||
print(f"Peering cost is {pc}, max remote peering cost is {pcm}")
|
||||
print(f"Accepting propagated messages from {who_str}")
|
||||
print(f"{ptl} message limit, {psl} sync limit")
|
||||
print(f"")
|
||||
print(f"Peers : {stp} total (peer limit is {smp})")
|
||||
print(f" {sdp} discovered, {ssp} static")
|
||||
print(f" {available_peers} available, {unreachable_peers} unreachable")
|
||||
print(f"")
|
||||
print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})")
|
||||
print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})")
|
||||
print(f" {upi} messages received from unpeered nodes ({uprx})")
|
||||
print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})")
|
||||
print(f" {cprr} propagation messages received directly from clients")
|
||||
print(f" {cprs} propagation messages served to clients")
|
||||
print(f" Distribution factor is {df}")
|
||||
print(f"")
|
||||
|
||||
if show_peers:
|
||||
if not show_status:
|
||||
print("")
|
||||
|
||||
for peer_id in s["peers"]:
|
||||
ind = " "
|
||||
p = s["peers"][peer_id]
|
||||
if p["type"] == "static":
|
||||
t = "Static peer "
|
||||
elif p["type"] == "discovered":
|
||||
t = "Discovered peer "
|
||||
else:
|
||||
t = "Unknown peer "
|
||||
a = "Available" if p["alive"] == True else "Unreachable"
|
||||
h = max(time.time()-p["last_heard"], 0)
|
||||
hops = p["network_distance"]
|
||||
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
|
||||
pm = p["messages"]; pk = p["peering_key"]
|
||||
pc = p["peering_cost"]; psc = p["target_stamp_cost"]; psf = p["stamp_cost_flexibility"]
|
||||
if pc == None: pc = "unknown"
|
||||
if psc == None: psc = "unknown"
|
||||
if psf == None: psf = "unknown"
|
||||
if pk == None: pk = "Not generated"
|
||||
else: pk = f"Generated, value is {pk}"
|
||||
if p["last_sync_attempt"] != 0:
|
||||
lsa = p["last_sync_attempt"]
|
||||
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
|
||||
else:
|
||||
ls = "never synced"
|
||||
|
||||
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"])
|
||||
stl = RNS.prettysize(p["transfer_limit"]*1000) if p["transfer_limit"] else "Unknown"
|
||||
ssl = RNS.prettysize(p["sync_limit"]*1000) if p["sync_limit"] else "unknown"
|
||||
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
|
||||
pmi = pm["incoming"]; pmuh = pm["unhandled"]; ar = round(p["acceptance_rate"]*100, 2)
|
||||
if p["name"] == None: nn = ""
|
||||
else: nn = p["name"].strip().replace("\n", "").replace("\r", "")
|
||||
if len(nn) > 45: nn = f"{nn[:45]}..."
|
||||
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
|
||||
if len(nn): print(f"{ind*2}Name : {nn}")
|
||||
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
|
||||
print(f"{ind*2}Costs : Propagation {psc} (flex {psf}), peering {pc}")
|
||||
print(f"{ind*2}Sync key : {pk}")
|
||||
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER")
|
||||
print(f"{ind*2}Limits : {stl} message limit, {ssl} sync limit")
|
||||
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming, {ar}% acceptance rate")
|
||||
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
|
||||
ms = "" if pm["unhandled"] == 1 else "s"
|
||||
print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}")
|
||||
print("")
|
||||
|
||||
def _get_target_identity(remote=None, timeout=5):
|
||||
global identity
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
print("Resolving remote identity timed out, exiting now")
|
||||
exit(200)
|
||||
else: time.sleep(0.1)
|
||||
|
||||
if remote == None: return identity
|
||||
else:
|
||||
try:
|
||||
destination_hash = bytes.fromhex(remote)
|
||||
if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters")
|
||||
except Exception as e:
|
||||
print(f"Invalid remote destination hash: {e}")
|
||||
exit(203)
|
||||
|
||||
remote_identity = RNS.Identity.recall(destination_hash)
|
||||
if remote_identity: return remote_identity
|
||||
else:
|
||||
if not RNS.Transport.has_path(destination_hash):
|
||||
RNS.Transport.request_path(destination_hash)
|
||||
while not RNS.Transport.has_path(destination_hash):
|
||||
tc = check_timeout()
|
||||
if tc: return tc
|
||||
|
||||
return RNS.Identity.recall(destination_hash)
|
||||
|
||||
def _remote_init(configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, identity_path=None):
|
||||
global configpath, identitypath, storagedir, lxmdir, identity
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
targetlogdest = RNS.LOG_STDOUT
|
||||
|
||||
if identity_path == None:
|
||||
if configdir == None:
|
||||
if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"): configdir = "/etc/lxmd"
|
||||
elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"): configdir = RNS.Reticulum.userdir+"/.config/lxmd"
|
||||
else: configdir = RNS.Reticulum.userdir+"/.lxmd"
|
||||
|
||||
configpath = configdir+"/config"
|
||||
identitypath = configdir+"/identity"
|
||||
identity = None
|
||||
|
||||
if not os.path.isdir(configdir):
|
||||
RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR)
|
||||
exit(201)
|
||||
if not os.path.isfile(identitypath):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identitypath)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
else:
|
||||
if not os.path.isfile(identity_path):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identity_path)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
if targetloglevel == None: targetloglevel = 3
|
||||
if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness
|
||||
|
||||
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
|
||||
|
||||
def main():
|
||||
try:
|
||||
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
|
||||
|
|
@ -868,13 +404,6 @@ def main():
|
|||
parser.add_argument("-v", "--verbose", action="count", default=0)
|
||||
parser.add_argument("-q", "--quiet", action="count", default=0)
|
||||
parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file")
|
||||
parser.add_argument("--status", action="store_true", default=False, help="display node status")
|
||||
parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes")
|
||||
parser.add_argument("--sync", action="store", default=None, help="request a sync with the specified peer", type=str)
|
||||
parser.add_argument("-b", "--break", dest="unpeer", action="store", default=None, help="break peering with the specified peer", type=str)
|
||||
parser.add_argument("--timeout", action="store", default=None, help="timeout in seconds for query operations", type=float)
|
||||
parser.add_argument("-r", "--remote", action="store", default=None, help="remote propagation node destination hash", type=str)
|
||||
parser.add_argument("--identity", action="store", default=None, help="path to identity used for remote requests", type=str)
|
||||
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
|
||||
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
|
||||
|
||||
|
|
@ -884,50 +413,15 @@ def main():
|
|||
print(__default_lxmd_config__)
|
||||
exit()
|
||||
|
||||
if args.status or args.peers:
|
||||
if not args.timeout: args.timeout = 5
|
||||
get_status(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
timeout=args.timeout,
|
||||
show_status=args.status,
|
||||
show_peers=args.peers,
|
||||
identity_path=args.identity,
|
||||
remote=args.remote)
|
||||
exit()
|
||||
|
||||
if args.sync:
|
||||
if not args.timeout: args.timeout = 10
|
||||
request_sync(target=args.sync,
|
||||
configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
timeout=args.timeout,
|
||||
identity_path=args.identity,
|
||||
remote=args.remote)
|
||||
exit()
|
||||
|
||||
if args.unpeer:
|
||||
if not args.timeout: args.timeout = 10
|
||||
request_unpeer(target=args.unpeer,
|
||||
configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
timeout=args.timeout,
|
||||
identity_path=args.identity,
|
||||
remote=args.remote)
|
||||
exit()
|
||||
|
||||
program_setup(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service)
|
||||
program_setup(
|
||||
configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service
|
||||
)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
|
|
@ -943,17 +437,6 @@ __default_lxmd_config__ = """# This is an example LXM Daemon config file.
|
|||
|
||||
enable_node = no
|
||||
|
||||
# You can specify identity hashes for remotes
|
||||
# that are allowed to control and query status
|
||||
# for this propagation node.
|
||||
|
||||
# control_allowed = 7d7e542829b40f32364499b27438dba8, 437229f8e29598b2282b88bad5e44698
|
||||
|
||||
# An optional name for this node, included
|
||||
# in announces.
|
||||
|
||||
# node_name = Anonymous Propagation Node
|
||||
|
||||
# Automatic announce interval in minutes.
|
||||
# 6 hours by default.
|
||||
|
||||
|
|
@ -973,6 +456,19 @@ autopeer = yes
|
|||
|
||||
autopeer_maxdepth = 4
|
||||
|
||||
# The maximum accepted transfer size per in-
|
||||
# coming propagation transfer, in kilobytes.
|
||||
# This also sets the upper limit for the size
|
||||
# of single messages accepted onto this node.
|
||||
#
|
||||
# If a node wants to propagate a larger number
|
||||
# of messages to this node, than what can fit
|
||||
# within this limit, it will prioritise sending
|
||||
# the smallest messages first, and try again
|
||||
# with any remaining messages at a later point.
|
||||
|
||||
propagation_transfer_max_accepted_size = 256
|
||||
|
||||
# The maximum amount of storage to use for
|
||||
# the LXMF Propagation Node message store,
|
||||
# specified in megabytes. When this limit
|
||||
|
|
@ -981,57 +477,9 @@ autopeer_maxdepth = 4
|
|||
# LXMF prioritises keeping messages that are
|
||||
# new and small. Large and old messages will
|
||||
# be removed first. This setting is optional
|
||||
# and defaults to 500 megabytes.
|
||||
# and defaults to 2 gigabytes.
|
||||
|
||||
# message_storage_limit = 500
|
||||
|
||||
# The maximum accepted transfer size per in-
|
||||
# coming propagation message, in kilobytes.
|
||||
# This sets the upper limit for the size of
|
||||
# single messages accepted onto this node.
|
||||
|
||||
# propagation_message_max_accepted_size = 256
|
||||
|
||||
# The maximum accepted transfer size per in-
|
||||
# coming propagation node sync.
|
||||
#
|
||||
# If a node wants to propagate a larger number
|
||||
# of messages to this node, than what can fit
|
||||
# within this limit, it will prioritise sending
|
||||
# the smallest messages first, and try again
|
||||
# with any remaining messages at a later point.
|
||||
|
||||
# propagation_sync_max_accepted_size = 10240
|
||||
|
||||
# You can configure the target stamp cost
|
||||
# required to deliver messages via this node.
|
||||
|
||||
# propagation_stamp_cost_target = 16
|
||||
|
||||
# If set higher than 0, the stamp cost flexi-
|
||||
# bility option will make this node accept
|
||||
# messages with a lower stamp cost than the
|
||||
# target from other propagation nodes (but
|
||||
# not from peers directly). This allows the
|
||||
# network to gradually adjust stamp cost.
|
||||
|
||||
# propagation_stamp_cost_flexibility = 3
|
||||
|
||||
# The peering_cost option configures the target
|
||||
# value required for a remote node to peer with
|
||||
# and deliver messages to this node.
|
||||
|
||||
# peering_cost = 18
|
||||
|
||||
# You can configure the maximum peering cost
|
||||
# of remote nodes that this node will peer with.
|
||||
# Setting this to a higher number will allow
|
||||
# this node to peer with other nodes requiring
|
||||
# a higher peering key value, but will require
|
||||
# more computation time during initial peering
|
||||
# when generating the peering key.
|
||||
|
||||
# remote_peering_cost_max = 26
|
||||
# message_storage_limit = 2000
|
||||
|
||||
# You can tell the LXMF message router to
|
||||
# prioritise storage for one or more
|
||||
|
|
@ -1043,25 +491,6 @@ autopeer_maxdepth = 4
|
|||
|
||||
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
|
||||
|
||||
# You can configure the maximum number of other
|
||||
# propagation nodes that this node will peer
|
||||
# with automatically. The default is 20.
|
||||
|
||||
# max_peers = 20
|
||||
|
||||
# You can configure a list of static propagation
|
||||
# node peers, that this node will always be
|
||||
# peered with, by specifying a list of
|
||||
# destination hashes.
|
||||
|
||||
# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4
|
||||
|
||||
# You can configure the propagation node to
|
||||
# only accept incoming propagation messages
|
||||
# from configured static peers.
|
||||
|
||||
# from_static_only = True
|
||||
|
||||
# By default, any destination is allowed to
|
||||
# connect and download messages, but you can
|
||||
# optionally restrict this. If you enable
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
__version__ = "0.9.4"
|
||||
__version__ = "0.5.1"
|
||||
|
|
|
|||
33
MIRROR.md
33
MIRROR.md
|
|
@ -1,33 +0,0 @@
|
|||
This repository is a public mirror. All potential future development is happening elsewhere.
|
||||
|
||||
I am stepping back from all public-facing interaction with this project. Reticulum has always been primarily my work, and continuing in the current public, internet-facing model is no longer sustainable.
|
||||
|
||||
The software remains available for use as-is. Occasional updates may appear at unpredictable intervals, but there will be no support, no responses to issues, no discussions, and no community management in this or any other public venue. If it doesn't work for you, it doesn't work. That is the entire extent of available troubleshooting assistance I can offer you.
|
||||
|
||||
If you've followed this project for a while, you already know what this means. You know who designed, wrote and tested this, and you know how many years of my life it took. You'll also know about both my particular challenges and strengths, and how I believe anything worth building needs to be built and maintained with our own hands.
|
||||
|
||||
Seven months ago, I said I needed to step back, that I was exhausted, and that I needed to recover. I believed a public resolve would be enough to effectuate that, but while striving to get just a few more useful features and protocols out, the unproductive requests and demands also ramped up, and I got pulled back into the same patterns and draining interactions that I'd explicitly said I couldn't sustain anymore.
|
||||
|
||||
So here's what you might have already guessed: I'm done playing the game by rules I can't win at.
|
||||
|
||||
Everything you need is right here, and by any sensible measure, it's done. Anyone who wants to invest the time, skill and persistence can build on it, or completely re-imagine it with different priorities. That was always the point.
|
||||
|
||||
The people who actually contributed - you know who you are, and you know I mean it when I say: Thank you. All of you who've used this to build something real - that was the goal, and you did it without needing me to hold your hand.
|
||||
|
||||
The rest of you: You have what you need. Use it or don't. I am not going to be the person who explains it to you anymore.
|
||||
|
||||
This is not a temporary break. It's not "see you after some rest", but a recognition that the current model is fundamentally incompatible with my life, my health, and my reality.
|
||||
|
||||
If you want to support continued work, you can do so at the donation links listed in this repository. But please understand, that this is not purchasing support or guaranteeing updates. It is support for work that happens on my timeline, according to my capacity, which at the moment is not what it was.
|
||||
|
||||
If you want Reticulum to continue evolving, you have the power to make that happen. The protocol is public domain. The code is open source. Everything you need is right here. I've provided the tools, but building what comes next is not my responsibility anymore. It's yours.
|
||||
|
||||
To the small group of people who has actually been here, and understood what this work was and what it cost - you already know where to find me if it actually matters.
|
||||
|
||||
To everyone else: This is where we part ways. No hard feelings. It's just time.
|
||||
|
||||
---
|
||||
|
||||
असतो मा सद्गमय
|
||||
तमसो मा ज्योतिर्गमय
|
||||
मृत्योर्मा अमृतं गमय
|
||||
7
Makefile
7
Makefile
|
|
@ -16,12 +16,7 @@ create_symlinks:
|
|||
-ln -s ../../LXMF ./LXMF/Utilities/LXMF
|
||||
|
||||
build_wheel:
|
||||
python3 setup.py bdist_wheel
|
||||
|
||||
build_sdist:
|
||||
python3 setup.py sdist
|
||||
|
||||
build_spkg: remove_symlinks build_sdist create_symlinks
|
||||
python3 setup.py sdist bdist_wheel
|
||||
|
||||
release: remove_symlinks build_wheel create_symlinks
|
||||
|
||||
|
|
|
|||
23
README.md
23
README.md
|
|
@ -1,7 +1,5 @@
|
|||
# Lightweight Extensible Message Format
|
||||
|
||||
*This repository is [a public mirror](./MIRROR.md). All development is happening elsewhere.*
|
||||
|
||||
LXMF is a simple and flexible messaging format and delivery protocol that allows a wide variety of implementations, while using as little bandwidth as possible. It is built on top of [Reticulum](https://reticulum.network) and offers zero-conf message routing, end-to-end encryption and Forward Secrecy, and can be transported over any kind of medium that Reticulum supports.
|
||||
|
||||
LXMF is efficient enough that it can deliver messages over extremely low-bandwidth systems such as packet radio or LoRa. Encrypted LXMF messages can also be encoded as QR-codes or text-based URIs, allowing completely analog *paper message* transport.
|
||||
|
|
@ -14,7 +12,6 @@ User-facing clients built on LXMF include:
|
|||
|
||||
Community-provided tools and utilities for LXMF include:
|
||||
|
||||
- [LXMFy](https://lxmfy.quad4.io/)
|
||||
- [LXMF-Bot](https://github.com/randogoth/lxmf-bot)
|
||||
- [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard)
|
||||
- [LXMEvent](https://github.com/faragher/LXMEvent)
|
||||
|
|
@ -185,26 +182,6 @@ options:
|
|||
|
||||
Or run `lxmd --exampleconfig` to generate a commented example configuration documenting all the available configuration directives.
|
||||
|
||||
## Support LXMF Development
|
||||
You can help support the continued development of open, free and private communications systems by donating via one of the following channels:
|
||||
|
||||
- Monero:
|
||||
```
|
||||
84FpY1QbxHcgdseePYNmhTHcrgMX4nFfBYtz2GKYToqHVVhJp8Eaw1Z1EedRnKD19b3B8NiLCGVxzKV17UMmmeEsCrPyA5w
|
||||
```
|
||||
- Bitcoin
|
||||
```
|
||||
bc1pgqgu8h8xvj4jtafslq396v7ju7hkgymyrzyqft4llfslz5vp99psqfk3a6
|
||||
```
|
||||
- Ethereum
|
||||
```
|
||||
0x91C421DdfB8a30a49A71d63447ddb54cEBe3465E
|
||||
```
|
||||
- Liberapay: https://liberapay.com/Reticulum/
|
||||
|
||||
- Ko-Fi: https://ko-fi.com/markqvist
|
||||
|
||||
|
||||
## Caveat Emptor
|
||||
|
||||
LXMF is beta software, and should be considered experimental. While it has been built with cryptography best practices very foremost in mind, it _has not_ been externally security audited, and there could very well be privacy-breaking bugs. If you want to help out, or help sponsor an audit, please do get in touch.
|
||||
|
|
|
|||
|
|
@ -29,8 +29,8 @@ def delivery_callback(message):
|
|||
RNS.log("\t| Destination instance : "+str(message.get_destination()))
|
||||
RNS.log("\t| Transport Encryption : "+str(message.transport_encryption))
|
||||
RNS.log("\t| Timestamp : "+time_string)
|
||||
RNS.log("\t| Title : "+str(message.title_as_string()))
|
||||
RNS.log("\t| Content : "+str(message.content_as_string()))
|
||||
RNS.log("\t| Title : "+message.title_as_string())
|
||||
RNS.log("\t| Content : "+message.content_as_string())
|
||||
RNS.log("\t| Fields : "+str(message.fields))
|
||||
if message.ratchet_id:
|
||||
RNS.log("\t| Ratchet : "+str(RNS.Identity._get_ratchet_id(message.ratchet_id)))
|
||||
|
|
@ -69,4 +69,4 @@ while True:
|
|||
|
||||
# input()
|
||||
# RNS.log("Requesting messages from propagation node...")
|
||||
# router.request_messages_from_propagation_node(identity)
|
||||
# router.request_messages_from_propagation_node(identity)
|
||||
|
|
@ -1,2 +1,3 @@
|
|||
qrcode>=7.4.2
|
||||
rns>=1.0.0
|
||||
qrcode==7.4.2
|
||||
rns==0.7.7
|
||||
setuptools==70.0.0
|
||||
|
|
|
|||
7
setup.py
7
setup.py
|
|
@ -15,10 +15,9 @@ setuptools.setup(
|
|||
long_description_content_type="text/markdown",
|
||||
url="https://github.com/markqvist/lxmf",
|
||||
packages=["LXMF", "LXMF.Utilities"],
|
||||
license="Reticulum License",
|
||||
license_files = ("LICENSE"),
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Operating System :: OS Independent",
|
||||
],
|
||||
entry_points= {
|
||||
|
|
@ -26,6 +25,6 @@ setuptools.setup(
|
|||
'lxmd=LXMF.Utilities.lxmd:main',
|
||||
]
|
||||
},
|
||||
install_requires=["rns>=1.1.0"],
|
||||
python_requires=">=3.7",
|
||||
install_requires=['rns>=0.7.7'],
|
||||
python_requires='>=3.7',
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue