Compare commits

..

No commits in common. "master" and "0.6.1" have entirely different histories.

10 changed files with 54 additions and 521 deletions

View File

@ -12,7 +12,7 @@ Before creating a bug report on this issue tracker, you **must** read the [Contr
- The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**.
- Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**.
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**.
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), delete this section from your bug report.
**Describe the Bug**
A clear and concise description of what the bug is.

16
LICENSE
View File

@ -1,6 +1,6 @@
Reticulum License
MIT License
Copyright (c) 2020-2025 Mark Qvist
Copyright (c) 2020 Mark Qvist / unsigned.io
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@ -9,16 +9,8 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
- The Software shall not be used in any kind of system which includes amongst
its functions the ability to purposefully do harm to human beings.
- The Software shall not be used, directly or indirectly, in the creation of
an artificial intelligence, machine learning or language model training
dataset, including but not limited to any use that contributes to the
training or development of such a model or algorithm.
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

View File

@ -45,15 +45,6 @@ class LXMFPropagationAnnounceHandler:
if pn_announce_data_is_valid(data):
node_timebase = data[1]
propagation_transfer_limit = None
wanted_inbound_peers = None
if len(data) >= 4:
# TODO: Rethink, probably not necessary anymore
# try:
# wanted_inbound_peers = int(data[3])
# except:
# wanted_inbound_peers = None
pass
if len(data) >= 3:
try:
propagation_transfer_limit = float(data[2])
@ -61,12 +52,12 @@ class LXMFPropagationAnnounceHandler:
propagation_transfer_limit = None
if destination_hash in self.lxmrouter.static_peers:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
else:
if data[0] == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
elif data[0] == False:
self.lxmrouter.unpeer(destination_hash, node_timebase)

View File

@ -20,7 +20,6 @@ class LXMPeer:
ERROR_NO_IDENTITY = 0xf0
ERROR_NO_ACCESS = 0xf1
ERROR_TIMEOUT = 0xfe
# Maximum amount of time a peer can
# be unreachable before it is removed
@ -92,11 +91,6 @@ class LXMPeer:
peer.tx_bytes = dictionary["tx_bytes"]
else:
peer.tx_bytes = 0
if "last_sync_attempt" in dictionary:
peer.last_sync_attempt = dictionary["last_sync_attempt"]
else:
peer.last_sync_attempt = 0
hm_count = 0
for transient_id in dictionary["handled_ids"]:
@ -127,7 +121,6 @@ class LXMPeer:
dictionary["link_establishment_rate"] = self.link_establishment_rate
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
dictionary["last_sync_attempt"] = self.last_sync_attempt
dictionary["offered"] = self.offered
dictionary["outgoing"] = self.outgoing
dictionary["incoming"] = self.incoming
@ -250,10 +243,7 @@ class LXMPeer:
lxm_size = unhandled_entry[2]
next_size = cumulative_size + (lxm_size+per_message_overhead)
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
if lxm_size+per_message_overhead > (self.propagation_transfer_limit*1000):
self.remove_unhandled_message(transient_id)
self.add_handled_message(transient_id)
RNS.log(f"Message {RNS.prettyhexrep(transient_id)} exceeds transfer limit for {self}, considering handled", RNS.LOG_DEBUG)
pass
else:
cumulative_size += (lxm_size+per_message_overhead)
unhandled_ids.append(transient_id)
@ -472,10 +462,6 @@ class LXMPeer:
return self._um_count
@property
def acceptance_rate(self):
return 0 if self.offered == 0 else (self.outgoing/self.offered)
def _update_counts(self):
if not self._hm_counts_synced:
hm = self.handled_messages; del hm

View File

@ -41,8 +41,6 @@ class LXMRouter:
AUTOPEER = True
AUTOPEER_MAXDEPTH = 4
FASTEST_N_RANDOM_POOL = 2
ROTATION_HEADROOM_PCT = 10
ROTATION_AR_MAX = 0.5
PROPAGATION_LIMIT = 256
DELIVERY_LIMIT = 1000
@ -66,8 +64,6 @@ class LXMRouter:
PR_ALL_MESSAGES = 0x00
STATS_GET_PATH = "/pn/get/stats"
### Developer-facing API ##############################
#######################################################
@ -96,7 +92,6 @@ class LXMRouter:
self.processing_count = 0
self.propagation_node = False
self.propagation_node_start_time = None
if storagepath == None:
raise ValueError("LXMF cannot be initialised without a storage path")
@ -124,7 +119,6 @@ class LXMRouter:
self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = None
self.propagation_transfer_max_messages = None
self.prioritise_rotating_unreachable_peers = False
self.active_propagation_links = []
self.locally_delivered_transient_ids = {}
self.locally_processed_transient_ids = {}
@ -141,11 +135,6 @@ class LXMRouter:
self.identity = identity
self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.control_destination = None
self.client_propagation_messages_received = 0
self.client_propagation_messages_served = 0
self.unpeered_propagation_incoming = 0
self.unpeered_propagation_rx_bytes = 0
if autopeer != None:
self.autopeer = autopeer
@ -286,7 +275,6 @@ class LXMRouter:
node_state, # Boolean flag signalling propagation node state
int(time.time()), # Current node timebase
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
None, # How many more inbound peers this node wants
]
data = msgpack.packb(announce_data)
@ -507,10 +495,8 @@ class LXMRouter:
except Exception as e:
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st))
RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE)
RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
st = time.time();
et = time.time(); RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {math.floor(len(self.propagation_entries)/(et-st))} msgs/s", RNS.LOG_NOTICE)
st = time.time(); RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
if os.path.isfile(self.storagepath+"/peers"):
peers_file = open(self.storagepath+"/peers", "rb")
@ -555,35 +541,13 @@ class LXMRouter:
RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
try:
if os.path.isfile(self.storagepath+"/node_stats"):
node_stats_file = open(self.storagepath+"/node_stats", "rb")
data = node_stats_file.read()
node_stats_file.close()
node_stats = msgpack.unpackb(data)
if not type(node_stats) == dict:
RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR)
else:
self.client_propagation_messages_received = node_stats["client_propagation_messages_received"]
self.client_propagation_messages_served = node_stats["client_propagation_messages_served"]
self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"]
self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"]
except Exception as e:
RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.propagation_node = True
self.propagation_node_start_time = time.time()
self.propagation_destination.set_link_established_callback(self.propagation_link_established)
self.propagation_destination.set_packet_callback(self.propagation_packet)
self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)
self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash])
if self.message_storage_limit != None:
limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit)
else:
@ -686,76 +650,6 @@ class LXMRouter:
return False
### Propagation Node Control ##########################
#######################################################
def compile_stats(self):
if not self.propagation_node:
return None
else:
peer_stats = {}
for peer_id in self.peers.copy():
peer = self.peers[peer_id]
peer_stats[peer_id] = {
"type": "static" if peer_id in self.static_peers else "discovered",
"state": peer.state,
"alive": peer.alive,
"last_heard": int(peer.last_heard),
"next_sync_attempt": peer.next_sync_attempt,
"last_sync_attempt": peer.last_sync_attempt,
"sync_backoff": peer.sync_backoff,
"peering_timebase": peer.peering_timebase,
"ler": int(peer.link_establishment_rate),
"str": int(peer.sync_transfer_rate),
"transfer_limit": peer.propagation_transfer_limit,
"network_distance": RNS.Transport.hops_to(peer_id),
"rx_bytes": peer.rx_bytes,
"tx_bytes": peer.tx_bytes,
"messages": {
"offered": peer.offered,
"outgoing": peer.outgoing,
"incoming": peer.incoming,
"unhandled": peer.unhandled_message_count
},
}
node_stats = {
"identity_hash": self.identity.hash,
"destination_hash": self.propagation_destination.hash,
"uptime": time.time()-self.propagation_node_start_time,
"delivery_limit": self.delivery_per_transfer_limit,
"propagation_limit": self.propagation_per_transfer_limit,
"autopeer_maxdepth": self.autopeer_maxdepth,
"from_static_only": self.from_static_only,
"messagestore": {
"count": len(self.propagation_entries),
"bytes": self.message_storage_size(),
"limit": self.message_storage_limit,
},
"clients" : {
"client_propagation_messages_received": self.client_propagation_messages_received,
"client_propagation_messages_served": self.client_propagation_messages_served,
},
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
"static_peers": len(self.static_peers),
"discovered_peers": len(self.peers)-len(self.static_peers),
"total_peers": len(self.peers),
"max_peers": self.max_peers,
"peers": peer_stats,
}
return node_stats
def stats_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
elif remote_identity.hash != self.identity.hash:
return LXMPeer.ERROR_NO_ACCESS
else:
return self.compile_stats()
### Utility & Maintenance #############################
#######################################################
@ -766,7 +660,6 @@ class LXMRouter:
JOB_STORE_INTERVAL = 120
JOB_PEERSYNC_INTERVAL = 12
JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL
JOB_ROTATE_INTERVAL = 56*JOB_PEERINGEST_INTERVAL
def jobs(self):
if not self.exit_handler_running:
self.processing_count += 1
@ -784,20 +677,18 @@ class LXMRouter:
self.clean_transient_id_caches()
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
if self.propagation_node == True:
self.clean_message_store()
self.clean_message_store()
if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0:
if self.propagation_node == True:
self.flush_queues()
if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0:
if self.propagation_node == True:
self.rotate_peers()
self.flush_queues()
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
if self.propagation_node == True:
self.sync_peers()
self.sync_peers()
# def syncstats(self):
# for peer_id in self.peers:
# p = self.peers[peer_id]
# RNS.log(f"{RNS.prettyhexrep(peer_id)} O={p.offered} S={p.outgoing} I={p.incoming} TX={RNS.prettysize(p.tx_bytes)} RX={RNS.prettysize(p.rx_bytes)}")
def jobloop(self):
while (True):
@ -894,11 +785,6 @@ class LXMRouter:
self.save_outbound_stamp_costs()
threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start()
def get_wanted_inbound_peers(self):
# TODO: Implement/rethink.
# Probably not necessary anymore.
return None
def get_announce_app_data(self, destination_hash):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
@ -1000,12 +886,12 @@ class LXMRouter:
lxm_size = self.propagation_entries[transient_id][3]
return lxm_size
def clean_message_store(self):
RNS.log("Cleaning message store", RNS.LOG_VERBOSE)
# Check and remove expired messages
now = time.time()
removed_entries = {}
for transient_id in self.propagation_entries.copy():
for transient_id in self.propagation_entries:
entry = self.propagation_entries[transient_id]
filepath = entry[1]
components = filepath.split("_")
@ -1013,7 +899,7 @@ class LXMRouter:
if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2:
timestamp = float(components[1])
if now > timestamp+LXMRouter.MESSAGE_EXPIRY:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_EXTREME)
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG)
removed_entries[transient_id] = filepath
else:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
@ -1031,7 +917,7 @@ class LXMRouter:
RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
if removed_count > 0:
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_VERBOSE)
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG)
# Check size of message store and cull if needed
try:
@ -1043,7 +929,7 @@ class LXMRouter:
bytes_cleaned = 0
weighted_entries = []
for transient_id in self.propagation_entries.copy():
for transient_id in self.propagation_entries:
weighted_entries.append([
self.propagation_entries[transient_id],
self.get_weight(transient_id),
@ -1084,7 +970,7 @@ class LXMRouter:
try:
if len(self.locally_delivered_transient_ids) > 0:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
os.makedirs(self.storagepath)
with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file:
locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids))
@ -1096,7 +982,7 @@ class LXMRouter:
try:
if len(self.locally_processed_transient_ids) > 0:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
os.makedirs(self.storagepath)
with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file:
locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids))
@ -1104,24 +990,6 @@ class LXMRouter:
except Exception as e:
RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def save_node_stats(self):
try:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
with open(self.storagepath+"/node_stats", "wb") as stats_file:
node_stats = {
"client_propagation_messages_received": self.client_propagation_messages_received,
"client_propagation_messages_served": self.client_propagation_messages_served,
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
}
stats_file.write(msgpack.packb(node_stats))
except Exception as e:
RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def clean_outbound_stamp_costs(self):
try:
expired = []
@ -1238,7 +1106,6 @@ class LXMRouter:
self.propagation_destination.set_packet_callback(None)
self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH)
self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH)
self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH)
for link in self.active_propagation_links:
try:
if link.status == RNS.Link.ACTIVE:
@ -1268,7 +1135,6 @@ class LXMRouter:
self.save_locally_delivered_transient_ids()
self.save_locally_processed_transient_ids()
self.save_node_stats()
def sigint_handler(self, signal, frame):
if not self.exit_handler_running:
@ -1397,7 +1263,6 @@ class LXMRouter:
except Exception as e:
RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
self.client_propagation_messages_served += len(response_messages)
return response_messages
@ -1618,7 +1483,7 @@ class LXMRouter:
### Message Routing & Delivery ########################
#######################################################
def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False, allow_duplicate=False):
def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False):
try:
message = LXMessage.unpack_from_bytes(lxmf_data)
if ratchet_id and not message.ratchet_id:
@ -1685,7 +1550,7 @@ class LXMRouter:
RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False
if not allow_duplicate and self.has_message(message.hash):
if self.has_message(message.hash):
RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False
else:
@ -1777,7 +1642,7 @@ class LXMRouter:
### Peer Sync & Propagation ###########################
#######################################################
def peer(self, destination_hash, timestamp, propagation_transfer_limit, wanted_inbound_peers = None):
def peer(self, destination_hash, timestamp, propagation_transfer_limit):
if destination_hash in self.peers:
peer = self.peers[destination_hash]
if timestamp > peer.peering_timebase:
@ -1811,82 +1676,6 @@ class LXMRouter:
self.peers.pop(destination_hash)
RNS.log("Broke peering with "+str(peer.destination))
def rotate_peers(self):
try:
rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0)))
required_drops = len(self.peers) - (self.max_peers - rotation_headroom)
if required_drops > 0 and len(self.peers) - required_drops > 1:
peers = self.peers.copy()
untested_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
if peer.last_sync_attempt == 0:
untested_peers.append(peer)
if len(untested_peers) >= rotation_headroom:
RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG)
return
fully_synced_peers = {}
for peer_id in peers:
peer = peers[peer_id]
if peer.unhandled_message_count == 0:
fully_synced_peers[peer_id] = peer
if len(fully_synced_peers) > 0:
peers = fully_synced_peers
ms = "" if len(fully_synced_peers) == 1 else "s"
RNS.log(f"Found {len(fully_synced_peers)} fully synced peer{ms}, using as peer rotation pool basis", RNS.LOG_DEBUG)
culled_peers = []
waiting_peers = []
unresponsive_peers = []
for peer_id in peers:
peer = peers[peer_id]
if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE:
if peer.alive:
if peer.offered == 0:
# Don't consider for unpeering until at
# least one message has been offered
pass
else:
waiting_peers.append(peer)
else:
unresponsive_peers.append(peer)
drop_pool = []
if len(unresponsive_peers) > 0:
drop_pool.extend(unresponsive_peers)
if not self.prioritise_rotating_unreachable_peers:
drop_pool.extend(waiting_peers)
else:
drop_pool.extend(waiting_peers)
if len(drop_pool) > 0:
drop_count = min(required_drops, len(drop_pool))
low_acceptance_rate_peers = sorted(
drop_pool,
key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ),
reverse=False
)[0:drop_count]
dropped_peers = 0
for peer in low_acceptance_rate_peers:
ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2)
if ar < LXMRouter.ROTATION_AR_MAX*100:
reachable_str = "reachable" if peer.alive else "unreachable"
RNS.log(f"Acceptance rate for {reachable_str} peer {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing}/{peer.offered}, {peer.unhandled_message_count} unhandled messages)", RNS.LOG_DEBUG)
self.unpeer(peer.destination_hash)
dropped_peers += 1
ms = "" if dropped_peers == 1 else "s"
RNS.log(f"Dropped {dropped_peers} low acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG)
except Exception as e:
RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def sync_peers(self):
culled_peers = []
waiting_peers = []
@ -1988,7 +1777,6 @@ class LXMRouter:
messages = data[1]
for lxmf_data in messages:
self.lxmf_propagation(lxmf_data)
self.client_propagation_messages_received += 1
packet.prove()
@ -2056,14 +1844,12 @@ class LXMRouter:
# sane default value, and wait for an announce to arrive
# that will update the peering config to the actual limit.
propagation_transfer_limit = LXMRouter.PROPAGATION_LIMIT//4
wanted_inbound_peers = None
self.peer(remote_hash, remote_timebase, propagation_transfer_limit, wanted_inbound_peers)
self.peer(remote_hash, remote_timebase, propagation_transfer_limit)
else:
remote_str = f"peer {remote_str}"
messages = data[1]
ms = "" if len(messages) == 1 else "s"
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE)
RNS.log(f"Received {len(messages)} messages from {remote_str}", RNS.LOG_VERBOSE)
for lxmf_data in messages:
peer = None
transient_id = RNS.Identity.full_hash(lxmf_data)
@ -2071,12 +1857,6 @@ class LXMRouter:
peer = self.peers[remote_hash]
peer.incoming += 1
peer.rx_bytes += len(lxmf_data)
else:
if remote_identity != None:
self.unpeered_propagation_incoming += 1
self.unpeered_propagation_rx_bytes += len(lxmf_data)
else:
self.client_propagation_messages_received += 1
self.lxmf_propagation(lxmf_data, from_peer=peer)
if peer != None:
@ -2107,7 +1887,7 @@ class LXMRouter:
if peer != from_peer:
peer.queue_unhandled_message(transient_id)
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None):
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False, from_peer=None):
no_stamp_enforcement = False
if is_paper_message:
no_stamp_enforcement = True
@ -2116,7 +1896,7 @@ class LXMRouter:
if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
transient_id = RNS.Identity.full_hash(lxmf_data)
if (not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids) or allow_duplicate == True:
if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids:
received = time.time()
destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH]
@ -2128,7 +1908,7 @@ class LXMRouter:
decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data)
if decrypted_lxmf_data != None:
delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data
self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement, allow_duplicate=allow_duplicate)
self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement)
self.locally_delivered_transient_ids[transient_id] = time.time()
if signal_local_delivery != None:
@ -2166,7 +1946,7 @@ class LXMRouter:
RNS.trace_exception(e)
return False
def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False):
def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None):
try:
if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"):
RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR)
@ -2176,7 +1956,7 @@ class LXMRouter:
lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==")
transient_id = RNS.Identity.full_hash(lxmf_data)
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True)
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, is_paper_message=True)
if router_propagation_result != False:
RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG)
return router_propagation_result

View File

@ -380,7 +380,7 @@ class LXMessage:
if self.desired_method == LXMessage.OPPORTUNISTIC:
if self.__destination.type == RNS.Destination.SINGLE:
if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT:
RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
RNS.log(f"Opportunistic delivery was requested for {self}, but content exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
self.desired_method = LXMessage.DIRECT
# Set delivery parameters according to delivery method

View File

@ -35,7 +35,6 @@ import time
import os
from LXMF._version import __version__
from LXMF import APP_NAME
from RNS.vendor.configobj import ConfigObj
@ -127,7 +126,7 @@ def apply_config():
if active_configuration["message_storage_limit"] < 0.005:
active_configuration["message_storage_limit"] = 0.005
else:
active_configuration["message_storage_limit"] = 500
active_configuration["message_storage_limit"] = 2000
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
@ -416,190 +415,6 @@ def deferred_start_jobs():
last_node_announce = time.time()
threading.Thread(target=jobs, daemon=True).start()
def query_status(identity, timeout=5, exit_on_fail=False):
control_destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
timeout = time.time()+timeout
def check_timeout():
if time.time() > timeout:
if exit_on_fail:
RNS.log("Getting lxmd statistics timed out, exiting now", RNS.LOG_ERROR)
exit(200)
else:
return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
else:
time.sleep(0.1)
if not RNS.Transport.has_path(control_destination.hash):
RNS.Transport.request_path(control_destination.hash)
while not RNS.Transport.has_path(control_destination.hash):
tc = check_timeout()
if tc:
return tc
link = RNS.Link(control_destination)
while not link.status == RNS.Link.ACTIVE:
tc = check_timeout()
if tc:
return tc
link.identify(identity)
request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None)
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
tc = check_timeout()
if tc:
return tc
link.teardown()
return request_receipt.get_response()
def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = 0, timeout=5, show_status=False, show_peers=False, identity_path=None):
global configpath, identitypath, storagedir, lxmdir
global lxmd_config, active_configuration, targetloglevel
targetlogdest = RNS.LOG_STDOUT
if identity_path == None:
if configdir == None:
if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"):
configdir = "/etc/lxmd"
elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"):
configdir = RNS.Reticulum.userdir+"/.config/lxmd"
else:
configdir = RNS.Reticulum.userdir+"/.lxmd"
configpath = configdir+"/config"
identitypath = configdir+"/identity"
identity = None
if not os.path.isdir(configdir):
RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR)
exit(201)
if not os.path.isfile(identitypath):
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
exit(202)
else:
identity = RNS.Identity.from_file(identitypath)
if identity == None:
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
exit(4)
else:
if not os.path.isfile(identity_path):
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
exit(202)
else:
identity = RNS.Identity.from_file(identity_path)
if identity == None:
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
exit(4)
if targetloglevel == None:
targetloglevel = 3
if verbosity != 0 or quietness != 0:
targetloglevel = targetloglevel+verbosity-quietness
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
response = query_status(identity, timeout=timeout, exit_on_fail=True)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Remote received no identity")
exit(203)
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
RNS.log("Access denied")
exit(204)
else:
s = response
mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2)
ms_util = f"{mutil}%"
if s["from_static_only"]:
who_str = "static peers only"
else:
who_str = "all nodes"
available_peers = 0
unreachable_peers = 0
peered_incoming = 0
peered_outgoing = 0
peered_rx_bytes = 0
peered_tx_bytes = 0
for peer_id in s["peers"]:
p = s["peers"][peer_id]
pm = p["messages"]
peered_incoming += pm["incoming"]
peered_outgoing += pm["outgoing"]
peered_rx_bytes += p["rx_bytes"]
peered_tx_bytes += p["tx_bytes"]
if p["alive"]:
available_peers += 1
else:
unreachable_peers += 1
total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"]
total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"]
df = round(peered_outgoing/total_incoming, 2)
dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"])
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
if show_status:
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit")
print(f"")
print(f"Peers : {stp} total (peer limit is {smp})")
print(f" {sdp} discovered, {ssp} static")
print(f" {available_peers} available, {unreachable_peers} unreachable")
print(f"")
print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})")
print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})")
print(f" {upi} messages received from unpeered nodes ({uprx})")
print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})")
print(f" {cprr} propagation messages received directly from clients")
print(f" {cprs} propagation messages served to clients")
print(f" Distribution factor is {df}")
print(f"")
if show_peers:
if not show_status:
print("")
for peer_id in s["peers"]:
ind = " "
p = s["peers"][peer_id]
if p["type"] == "static":
t = "Static peer "
elif p["type"] == "discovered":
t = "Discovered peer "
else:
t = "Unknown peer "
a = "Available" if p["alive"] == True else "Unreachable"
h = max(time.time()-p["last_heard"], 0)
hops = p["network_distance"]
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
pm = p["messages"]
if p["last_sync_attempt"] != 0:
lsa = p["last_sync_attempt"]
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
else:
ls = "never synced"
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000)
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
pmi = pm["incoming"]; pmuh = pm["unhandled"]
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit")
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming")
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
ms = "" if pm["unhandled"] == 1 else "s"
print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}")
print("")
def main():
try:
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
@ -610,10 +425,6 @@ def main():
parser.add_argument("-v", "--verbose", action="count", default=0)
parser.add_argument("-q", "--quiet", action="count", default=0)
parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file")
parser.add_argument("--status", action="store_true", default=False, help="display node status")
parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes")
parser.add_argument("--timeout", action="store", default=5, help="timeout in seconds for query operations", type=float)
parser.add_argument("--identity", action="store", default=None, help="path to identity used for query request", type=str)
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
@ -623,24 +434,15 @@ def main():
print(__default_lxmd_config__)
exit()
if args.status or args.peers:
get_status(configdir = args.config,
rnsconfigdir=args.rnsconfig,
verbosity=args.verbose,
quietness=args.quiet,
timeout=args.timeout,
show_status=args.status,
show_peers=args.peers,
identity_path=args.identity)
exit()
program_setup(configdir = args.config,
rnsconfigdir=args.rnsconfig,
run_pn=args.propagation_node,
on_inbound=args.on_inbound,
verbosity=args.verbose,
quietness=args.quiet,
service=args.service)
program_setup(
configdir = args.config,
rnsconfigdir=args.rnsconfig,
run_pn=args.propagation_node,
on_inbound=args.on_inbound,
verbosity=args.verbose,
quietness=args.quiet,
service=args.service
)
except KeyboardInterrupt:
print("")
@ -696,9 +498,9 @@ propagation_transfer_max_accepted_size = 256
# LXMF prioritises keeping messages that are
# new and small. Large and old messages will
# be removed first. This setting is optional
# and defaults to 500 megabytes.
# and defaults to 2 gigabytes.
# message_storage_limit = 500
# message_storage_limit = 2000
# You can tell the LXMF message router to
# prioritise storage for one or more
@ -710,25 +512,6 @@ propagation_transfer_max_accepted_size = 256
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
# You can configure the maximum number of other
# propagation nodes that this node will peer
# with automatically. The default is 50.
# max_peers = 25
# You can configure a list of static propagation
# node peers, that this node will always be
# peered with, by specifying a list of
# destination hashes.
# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4
# You can configure the propagation node to
# only accept incoming propagation messages
# from configured static peers.
# from_static_only = True
# By default, any destination is allowed to
# connect and download messages, but you can
# optionally restrict this. If you enable

View File

@ -1 +1 @@
__version__ = "0.6.3"
__version__ = "0.6.1"

View File

@ -1,2 +1,3 @@
qrcode>=7.4.2
rns>=0.9.1
qrcode==7.4.2
rns==0.7.8
setuptools==70.0.0

View File

@ -25,6 +25,6 @@ setuptools.setup(
'lxmd=LXMF.Utilities.lxmd:main',
]
},
install_requires=["rns>=0.9.3"],
python_requires=">=3.7",
install_requires=['rns>=0.9.1'],
python_requires='>=3.7',
)