mirror of
https://github.com/markqvist/LXMF.git
synced 2025-04-19 14:55:52 -04:00
Compare commits
48 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
1bdcf6ad53 | ||
![]() |
e6021b8fed | ||
![]() |
326c0eed8f | ||
![]() |
336792c07a | ||
![]() |
570d2c6846 | ||
![]() |
1ef4665073 | ||
![]() |
d5540b927f | ||
![]() |
a6cf585109 | ||
![]() |
c0a8f3be49 | ||
![]() |
7b4780cfb7 | ||
![]() |
b94a712bb6 | ||
![]() |
f42ccfc4e9 | ||
![]() |
9eca747757 | ||
![]() |
b7b6753640 | ||
![]() |
40d0b9a5de | ||
![]() |
40fc75f559 | ||
![]() |
f1d060a92e | ||
![]() |
e0e901291e | ||
![]() |
886ac69a82 | ||
![]() |
e0163e100a | ||
![]() |
26a10cce8f | ||
![]() |
cec903a4dc | ||
![]() |
962d9c90d1 | ||
![]() |
6d2eb4f973 | ||
![]() |
a8cc5f41cf | ||
![]() |
aa57b16cf5 | ||
![]() |
cdea838a6c | ||
![]() |
fb4bf9b0b9 | ||
![]() |
a3e3868f92 | ||
![]() |
70186cf8d9 | ||
![]() |
fe59b265c5 | ||
![]() |
a87458d25f | ||
![]() |
35dd70c59e | ||
![]() |
a198e96064 | ||
![]() |
e3be7e0cfd | ||
![]() |
460645cea2 | ||
![]() |
f683e03891 | ||
![]() |
2c71cea7a0 | ||
![]() |
61b1ecce27 | ||
![]() |
68257a441f | ||
![]() |
e69da2ed2a | ||
![]() |
c2a08ef355 | ||
![]() |
1430b1ce90 | ||
![]() |
1c9c744107 | ||
![]() |
bfed126a7c | ||
![]() |
44d1d992f8 | ||
![]() |
7701f326d9 | ||
![]() |
356cb6412f |
2
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
2
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
@ -12,7 +12,7 @@ Before creating a bug report on this issue tracker, you **must** read the [Contr
|
||||
|
||||
- The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**.
|
||||
- Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**.
|
||||
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), delete this section from your bug report.
|
||||
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**.
|
||||
|
||||
**Describe the Bug**
|
||||
A clear and concise description of what the bug is.
|
||||
|
16
LICENSE
16
LICENSE
@ -1,6 +1,6 @@
|
||||
MIT License
|
||||
Reticulum License
|
||||
|
||||
Copyright (c) 2020 Mark Qvist / unsigned.io
|
||||
Copyright (c) 2020-2025 Mark Qvist
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@ -9,8 +9,16 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
- The Software shall not be used in any kind of system which includes amongst
|
||||
its functions the ability to purposefully do harm to human beings.
|
||||
|
||||
- The Software shall not be used, directly or indirectly, in the creation of
|
||||
an artificial intelligence, machine learning or language model training
|
||||
dataset, including but not limited to any use that contributes to the
|
||||
training or development of such a model or algorithm.
|
||||
|
||||
- The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
|
@ -45,18 +45,31 @@ class LXMFPropagationAnnounceHandler:
|
||||
if pn_announce_data_is_valid(data):
|
||||
node_timebase = data[1]
|
||||
propagation_transfer_limit = None
|
||||
wanted_inbound_peers = None
|
||||
if len(data) >= 4:
|
||||
# TODO: Rethink, probably not necessary anymore
|
||||
# try:
|
||||
# wanted_inbound_peers = int(data[3])
|
||||
# except:
|
||||
# wanted_inbound_peers = None
|
||||
pass
|
||||
|
||||
if len(data) >= 3:
|
||||
try:
|
||||
propagation_transfer_limit = float(data[2])
|
||||
except:
|
||||
propagation_transfer_limit = None
|
||||
|
||||
if data[0] == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
|
||||
if destination_hash in self.lxmrouter.static_peers:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
|
||||
|
||||
elif data[0] == False:
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
else:
|
||||
if data[0] == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
|
||||
|
||||
elif data[0] == False:
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)
|
||||
|
233
LXMF/LXMPeer.py
233
LXMF/LXMPeer.py
@ -4,6 +4,7 @@ import time
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
from collections import deque
|
||||
from .LXMF import APP_NAME
|
||||
|
||||
class LXMPeer:
|
||||
@ -19,6 +20,7 @@ class LXMPeer:
|
||||
|
||||
ERROR_NO_IDENTITY = 0xf0
|
||||
ERROR_NO_ACCESS = 0xf1
|
||||
ERROR_TIMEOUT = 0xfe
|
||||
|
||||
# Maximum amount of time a peer can
|
||||
# be unreachable before it is removed
|
||||
@ -38,11 +40,16 @@ class LXMPeer:
|
||||
@staticmethod
|
||||
def from_bytes(peer_bytes, router):
|
||||
dictionary = msgpack.unpackb(peer_bytes)
|
||||
peer_destination_hash = dictionary["destination_hash"]
|
||||
peer_peering_timebase = dictionary["peering_timebase"]
|
||||
peer_alive = dictionary["alive"]
|
||||
peer_last_heard = dictionary["last_heard"]
|
||||
|
||||
peer = LXMPeer(router, peer_destination_hash)
|
||||
peer.peering_timebase = peer_peering_timebase
|
||||
peer.alive = peer_alive
|
||||
peer.last_heard = peer_last_heard
|
||||
|
||||
peer = LXMPeer(router, dictionary["destination_hash"])
|
||||
peer.peering_timebase = dictionary["peering_timebase"]
|
||||
peer.alive = dictionary["alive"]
|
||||
peer.last_heard = dictionary["last_heard"]
|
||||
if "link_establishment_rate" in dictionary:
|
||||
peer.link_establishment_rate = dictionary["link_establishment_rate"]
|
||||
else:
|
||||
@ -60,15 +67,55 @@ class LXMPeer:
|
||||
peer.propagation_transfer_limit = None
|
||||
else:
|
||||
peer.propagation_transfer_limit = None
|
||||
|
||||
if "offered" in dictionary:
|
||||
peer.offered = dictionary["offered"]
|
||||
else:
|
||||
peer.offered = 0
|
||||
|
||||
if "outgoing" in dictionary:
|
||||
peer.outgoing = dictionary["outgoing"]
|
||||
else:
|
||||
peer.outgoing = 0
|
||||
|
||||
if "incoming" in dictionary:
|
||||
peer.incoming = dictionary["incoming"]
|
||||
else:
|
||||
peer.incoming = 0
|
||||
|
||||
if "rx_bytes" in dictionary:
|
||||
peer.rx_bytes = dictionary["rx_bytes"]
|
||||
else:
|
||||
peer.rx_bytes = 0
|
||||
|
||||
if "tx_bytes" in dictionary:
|
||||
peer.tx_bytes = dictionary["tx_bytes"]
|
||||
else:
|
||||
peer.tx_bytes = 0
|
||||
|
||||
if "last_sync_attempt" in dictionary:
|
||||
peer.last_sync_attempt = dictionary["last_sync_attempt"]
|
||||
else:
|
||||
peer.last_sync_attempt = 0
|
||||
|
||||
hm_count = 0
|
||||
for transient_id in dictionary["handled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
peer.add_handled_message(transient_id)
|
||||
hm_count += 1
|
||||
|
||||
um_count = 0
|
||||
for transient_id in dictionary["unhandled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
peer.add_unhandled_message(transient_id)
|
||||
um_count += 1
|
||||
|
||||
peer._hm_count = hm_count
|
||||
peer._um_count = um_count
|
||||
peer._hm_counts_synced = True
|
||||
peer._um_counts_synced = True
|
||||
|
||||
del dictionary
|
||||
return peer
|
||||
|
||||
def to_bytes(self):
|
||||
@ -80,6 +127,12 @@ class LXMPeer:
|
||||
dictionary["link_establishment_rate"] = self.link_establishment_rate
|
||||
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
|
||||
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
|
||||
dictionary["last_sync_attempt"] = self.last_sync_attempt
|
||||
dictionary["offered"] = self.offered
|
||||
dictionary["outgoing"] = self.outgoing
|
||||
dictionary["incoming"] = self.incoming
|
||||
dictionary["rx_bytes"] = self.rx_bytes
|
||||
dictionary["tx_bytes"] = self.tx_bytes
|
||||
|
||||
handled_ids = []
|
||||
for transient_id in self.handled_messages:
|
||||
@ -92,7 +145,10 @@ class LXMPeer:
|
||||
dictionary["handled_ids"] = handled_ids
|
||||
dictionary["unhandled_ids"] = unhandled_ids
|
||||
|
||||
return msgpack.packb(dictionary)
|
||||
peer_bytes = msgpack.packb(dictionary)
|
||||
del dictionary
|
||||
|
||||
return peer_bytes
|
||||
|
||||
def __init__(self, router, destination_hash):
|
||||
self.alive = False
|
||||
@ -104,12 +160,23 @@ class LXMPeer:
|
||||
self.link_establishment_rate = 0
|
||||
self.sync_transfer_rate = 0
|
||||
self.propagation_transfer_limit = None
|
||||
self.handled_messages_queue = deque()
|
||||
self.unhandled_messages_queue = deque()
|
||||
|
||||
self.offered = 0 # Messages offered to this peer
|
||||
self.outgoing = 0 # Messages transferred to this peer
|
||||
self.incoming = 0 # Messages received from this peer
|
||||
self.rx_bytes = 0 # Bytes received from this peer
|
||||
self.tx_bytes = 0 # Bytes sent to this peer
|
||||
|
||||
self._hm_count = 0
|
||||
self._um_count = 0
|
||||
self._hm_counts_synced = False
|
||||
self._um_counts_synced = False
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
self.unhandled_messages = {}
|
||||
self.handled_messages = {}
|
||||
self.last_offer = []
|
||||
|
||||
self.router = router
|
||||
@ -118,6 +185,7 @@ class LXMPeer:
|
||||
if self.identity != None:
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
else:
|
||||
self.destination = None
|
||||
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
|
||||
|
||||
def sync(self):
|
||||
@ -171,7 +239,7 @@ class LXMPeer:
|
||||
|
||||
for transient_id in purged_ids:
|
||||
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
|
||||
self.unhandled_messages.pop(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
unhandled_entries.sort(key=lambda e: e[1], reverse=False)
|
||||
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
|
||||
@ -182,14 +250,17 @@ class LXMPeer:
|
||||
lxm_size = unhandled_entry[2]
|
||||
next_size = cumulative_size + (lxm_size+per_message_overhead)
|
||||
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
|
||||
pass
|
||||
if lxm_size+per_message_overhead > (self.propagation_transfer_limit*1000):
|
||||
self.remove_unhandled_message(transient_id)
|
||||
self.add_handled_message(transient_id)
|
||||
RNS.log(f"Message {RNS.prettyhexrep(transient_id)} exceeds transfer limit for {self}, considering handled", RNS.LOG_DEBUG)
|
||||
else:
|
||||
cumulative_size += (lxm_size+per_message_overhead)
|
||||
unhandled_ids.append(transient_id)
|
||||
|
||||
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
|
||||
RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_VERBOSE)
|
||||
self.last_offer = unhandled_ids
|
||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||
self.state = LXMPeer.REQUEST_SENT
|
||||
|
||||
else:
|
||||
@ -217,22 +288,29 @@ class LXMPeer:
|
||||
|
||||
if response == LXMPeer.ERROR_NO_IDENTITY:
|
||||
if self.link != None:
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG)
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE)
|
||||
self.link.identify()
|
||||
self.state = LXMPeer.LINK_READY
|
||||
self.sync()
|
||||
return
|
||||
|
||||
elif response == LXMPeer.ERROR_NO_ACCESS:
|
||||
RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE)
|
||||
self.router.unpeer(self.destination_hash)
|
||||
return
|
||||
|
||||
elif response == False:
|
||||
# Peer already has all advertised messages
|
||||
for transient_id in self.last_offer:
|
||||
if transient_id in self.unhandled_messages:
|
||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
|
||||
elif response == True:
|
||||
# Peer wants all advertised messages
|
||||
for transient_id in self.last_offer:
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
else:
|
||||
@ -241,18 +319,17 @@ class LXMPeer:
|
||||
# If the peer did not want the message, it has
|
||||
# already received it from another peer.
|
||||
if not transient_id in response:
|
||||
if transient_id in self.unhandled_messages:
|
||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
for transient_id in response:
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
if len(wanted_messages) > 0:
|
||||
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG)
|
||||
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_VERBOSE)
|
||||
|
||||
lxm_list = []
|
||||
|
||||
for message_entry in wanted_messages:
|
||||
file_path = message_entry[1]
|
||||
if os.path.isfile(file_path):
|
||||
@ -268,7 +345,8 @@ class LXMPeer:
|
||||
self.state = LXMPeer.RESOURCE_TRANSFERRING
|
||||
|
||||
else:
|
||||
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
|
||||
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_VERBOSE)
|
||||
self.offered += len(self.last_offer)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
@ -288,8 +366,8 @@ class LXMPeer:
|
||||
def resource_concluded(self, resource):
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
for transient_id in resource.transferred_messages:
|
||||
message = self.unhandled_messages.pop(transient_id)
|
||||
self.handled_messages[transient_id] = message
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
@ -302,12 +380,15 @@ class LXMPeer:
|
||||
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started)
|
||||
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
|
||||
|
||||
RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed"+rate_str, RNS.LOG_DEBUG)
|
||||
RNS.log(f"Syncing {len(resource.transferred_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
|
||||
self.alive = True
|
||||
self.last_heard = time.time()
|
||||
self.offered += len(self.last_offer)
|
||||
self.outgoing += len(resource.transferred_messages)
|
||||
self.tx_bytes += resource.get_data_size()
|
||||
|
||||
else:
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
@ -328,9 +409,103 @@ class LXMPeer:
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def handle_message(self, transient_id):
|
||||
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
|
||||
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
|
||||
def queued_items(self):
|
||||
return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0
|
||||
|
||||
def queue_unhandled_message(self, transient_id):
|
||||
self.unhandled_messages_queue.append(transient_id)
|
||||
|
||||
def queue_handled_message(self, transient_id):
|
||||
self.handled_messages_queue.append(transient_id)
|
||||
|
||||
def process_queues(self):
|
||||
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
|
||||
# TODO: Remove debug
|
||||
# st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue)
|
||||
|
||||
handled_messages = self.handled_messages
|
||||
unhandled_messages = self.unhandled_messages
|
||||
|
||||
while len(self.handled_messages_queue) > 0:
|
||||
transient_id = self.handled_messages_queue.pop()
|
||||
if not transient_id in handled_messages:
|
||||
self.add_handled_message(transient_id)
|
||||
if transient_id in unhandled_messages:
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
while len(self.unhandled_messages_queue) > 0:
|
||||
transient_id = self.unhandled_messages_queue.pop()
|
||||
if not transient_id in handled_messages and not transient_id in unhandled_messages:
|
||||
self.add_unhandled_message(transient_id)
|
||||
|
||||
del handled_messages, unhandled_messages
|
||||
# TODO: Remove debug
|
||||
# RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}")
|
||||
|
||||
@property
|
||||
def handled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
hm = list(filter(lambda tid: self.destination_hash in pes[tid][4], pes))
|
||||
self._hm_count = len(hm); del pes
|
||||
self._hm_counts_synced = True
|
||||
return hm
|
||||
|
||||
@property
|
||||
def unhandled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
um = list(filter(lambda tid: self.destination_hash in pes[tid][5], pes))
|
||||
self._um_count = len(um); del pes
|
||||
self._um_counts_synced = True
|
||||
return um
|
||||
|
||||
@property
|
||||
def handled_message_count(self):
|
||||
if not self._hm_counts_synced:
|
||||
self._update_counts()
|
||||
|
||||
return self._hm_count
|
||||
|
||||
@property
|
||||
def unhandled_message_count(self):
|
||||
if not self._um_counts_synced:
|
||||
self._update_counts()
|
||||
|
||||
return self._um_count
|
||||
|
||||
@property
|
||||
def acceptance_rate(self):
|
||||
return 0 if self.offered == 0 else (self.outgoing/self.offered)
|
||||
|
||||
def _update_counts(self):
|
||||
if not self._hm_counts_synced:
|
||||
hm = self.handled_messages; del hm
|
||||
|
||||
if not self._um_counts_synced:
|
||||
um = self.unhandled_messages; del um
|
||||
|
||||
def add_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].append(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def add_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].append(self.destination_hash)
|
||||
self._um_count += 1
|
||||
|
||||
def remove_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].remove(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def remove_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].remove(self.destination_hash)
|
||||
self._um_counts_synced = False
|
||||
|
||||
def __str__(self):
|
||||
if self.destination_hash:
|
||||
|
@ -1,10 +1,15 @@
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import math
|
||||
import random
|
||||
import base64
|
||||
import atexit
|
||||
import signal
|
||||
import threading
|
||||
|
||||
from collections import deque
|
||||
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
@ -32,9 +37,12 @@ class LXMRouter:
|
||||
|
||||
NODE_ANNOUNCE_DELAY = 20
|
||||
|
||||
MAX_PEERS = 50
|
||||
AUTOPEER = True
|
||||
AUTOPEER_MAXDEPTH = 4
|
||||
FASTEST_N_RANDOM_POOL = 2
|
||||
ROTATION_HEADROOM_PCT = 10
|
||||
ROTATION_AR_MAX = 0.5
|
||||
|
||||
PROPAGATION_LIMIT = 256
|
||||
DELIVERY_LIMIT = 1000
|
||||
@ -58,11 +66,16 @@ class LXMRouter:
|
||||
|
||||
PR_ALL_MESSAGES = 0x00
|
||||
|
||||
STATS_GET_PATH = "/pn/get/stats"
|
||||
|
||||
|
||||
### Developer-facing API ##############################
|
||||
#######################################################
|
||||
|
||||
def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None, propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT, enforce_ratchets = False, enforce_stamps = False):
|
||||
def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None,
|
||||
propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, enforce_ratchets=False,
|
||||
enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False):
|
||||
|
||||
random.seed(os.urandom(10))
|
||||
|
||||
self.pending_inbound = []
|
||||
@ -83,6 +96,7 @@ class LXMRouter:
|
||||
self.processing_count = 0
|
||||
|
||||
self.propagation_node = False
|
||||
self.propagation_node_start_time = None
|
||||
|
||||
if storagepath == None:
|
||||
raise ValueError("LXMF cannot be initialised without a storage path")
|
||||
@ -93,6 +107,9 @@ class LXMRouter:
|
||||
self.outbound_propagation_node = None
|
||||
self.outbound_propagation_link = None
|
||||
|
||||
if delivery_limit == None:
|
||||
delivery_limit = LXMRouter.DELIVERY_LIMIT
|
||||
|
||||
self.message_storage_limit = None
|
||||
self.information_storage_limit = None
|
||||
self.propagation_per_transfer_limit = propagation_limit
|
||||
@ -107,6 +124,7 @@ class LXMRouter:
|
||||
self.propagation_transfer_progress = 0.0
|
||||
self.propagation_transfer_last_result = None
|
||||
self.propagation_transfer_max_messages = None
|
||||
self.prioritise_rotating_unreachable_peers = False
|
||||
self.active_propagation_links = []
|
||||
self.locally_delivered_transient_ids = {}
|
||||
self.locally_processed_transient_ids = {}
|
||||
@ -116,12 +134,18 @@ class LXMRouter:
|
||||
self.cost_file_lock = threading.Lock()
|
||||
self.ticket_file_lock = threading.Lock()
|
||||
self.stamp_gen_lock = threading.Lock()
|
||||
self.exit_handler_running = False
|
||||
|
||||
if identity == None:
|
||||
identity = RNS.Identity()
|
||||
|
||||
self.identity = identity
|
||||
self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
self.control_destination = None
|
||||
self.client_propagation_messages_received = 0
|
||||
self.client_propagation_messages_served = 0
|
||||
self.unpeered_propagation_incoming = 0
|
||||
self.unpeered_propagation_rx_bytes = 0
|
||||
|
||||
if autopeer != None:
|
||||
self.autopeer = autopeer
|
||||
@ -133,9 +157,32 @@ class LXMRouter:
|
||||
else:
|
||||
self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH
|
||||
|
||||
if max_peers == None:
|
||||
self.max_peers = LXMRouter.MAX_PEERS
|
||||
else:
|
||||
if type(max_peers) == int and max_peers >= 0:
|
||||
self.max_peers = max_peers
|
||||
else:
|
||||
raise ValueError(f"Invalid value for max_peers: {max_peers}")
|
||||
|
||||
self.from_static_only = from_static_only
|
||||
if type(static_peers) != list:
|
||||
raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}")
|
||||
else:
|
||||
for static_peer in static_peers:
|
||||
if type(static_peer) != bytes:
|
||||
raise ValueError(f"Invalid static peer destination hash: {static_peer}")
|
||||
else:
|
||||
if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
|
||||
raise ValueError(f"Invalid static peer destination hash: {static_peer}")
|
||||
|
||||
self.static_peers = static_peers
|
||||
|
||||
self.peers = {}
|
||||
self.propagation_entries = {}
|
||||
|
||||
self.peer_distribution_queue = deque()
|
||||
|
||||
RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self))
|
||||
RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self))
|
||||
|
||||
@ -220,6 +267,8 @@ class LXMRouter:
|
||||
RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
atexit.register(self.exit_handler)
|
||||
signal.signal(signal.SIGINT, self.sigint_handler)
|
||||
signal.signal(signal.SIGTERM, self.sigterm_handler)
|
||||
|
||||
job_thread = threading.Thread(target=self.jobloop)
|
||||
job_thread.setDaemon(True)
|
||||
@ -232,10 +281,12 @@ class LXMRouter:
|
||||
def announce_propagation_node(self):
|
||||
def delayed_announce():
|
||||
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
|
||||
node_state = self.propagation_node and not self.from_static_only
|
||||
announce_data = [
|
||||
self.propagation_node, # Boolean flag signalling propagation node state
|
||||
node_state, # Boolean flag signalling propagation node state
|
||||
int(time.time()), # Current node timebase
|
||||
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
|
||||
None, # How many more inbound peers this node wants
|
||||
]
|
||||
|
||||
data = msgpack.packb(announce_data)
|
||||
@ -427,6 +478,8 @@ class LXMRouter:
|
||||
os.makedirs(self.messagepath)
|
||||
|
||||
self.propagation_entries = {}
|
||||
|
||||
st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE)
|
||||
for filename in os.listdir(self.messagepath):
|
||||
components = filename.split("_")
|
||||
if len(components) == 2:
|
||||
@ -443,41 +496,94 @@ class LXMRouter:
|
||||
file.close()
|
||||
|
||||
self.propagation_entries[transient_id] = [
|
||||
destination_hash,
|
||||
filepath,
|
||||
received,
|
||||
msg_size,
|
||||
destination_hash, # 0: Destination hash
|
||||
filepath, # 1: Storage location
|
||||
received, # 2: Receive timestamp
|
||||
msg_size, # 3: Message size
|
||||
[], # 4: Handled peers
|
||||
[], # 5: Unhandled peers
|
||||
]
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st))
|
||||
RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE)
|
||||
RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
|
||||
st = time.time();
|
||||
|
||||
if os.path.isfile(self.storagepath+"/peers"):
|
||||
peers_file = open(self.storagepath+"/peers", "rb")
|
||||
peers_data = peers_file.read()
|
||||
peers_file.close()
|
||||
|
||||
if len(peers_data) > 0:
|
||||
serialised_peers = msgpack.unpackb(peers_data)
|
||||
del peers_data
|
||||
|
||||
for serialised_peer in serialised_peers:
|
||||
while len(serialised_peers) > 0:
|
||||
serialised_peer = serialised_peers.pop()
|
||||
peer = LXMPeer.from_bytes(serialised_peer, self)
|
||||
del serialised_peer
|
||||
if peer.destination_hash in self.static_peers and peer.last_heard == 0:
|
||||
# TODO: Allow path request responses through announce handler
|
||||
# momentarily here, so peering config can be updated even if
|
||||
# the static peer is not available to directly send an announce.
|
||||
RNS.Transport.request_path(peer.destination_hash)
|
||||
if peer.identity != None:
|
||||
self.peers[peer.destination_hash] = peer
|
||||
lim_str = ", no transfer limit"
|
||||
if peer.propagation_transfer_limit != None:
|
||||
lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit"
|
||||
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG)
|
||||
RNS.log("Rebuilt peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(peer.unhandled_message_count)+" unhandled messages"+lim_str, RNS.LOG_DEBUG)
|
||||
else:
|
||||
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
|
||||
del peer
|
||||
|
||||
del serialised_peers
|
||||
|
||||
if len(self.static_peers) > 0:
|
||||
for static_peer in self.static_peers:
|
||||
if not static_peer in self.peers:
|
||||
RNS.log(f"Activating static peering with {RNS.prettyhexrep(static_peer)}", RNS.LOG_NOTICE)
|
||||
self.peers[static_peer] = LXMPeer(self, static_peer)
|
||||
if self.peers[static_peer].last_heard == 0:
|
||||
# TODO: Allow path request responses through announce handler
|
||||
# momentarily here, so peering config can be updated even if
|
||||
# the static peer is not available to directly send an announce.
|
||||
RNS.Transport.request_path(static_peer)
|
||||
|
||||
RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
|
||||
|
||||
try:
|
||||
if os.path.isfile(self.storagepath+"/node_stats"):
|
||||
node_stats_file = open(self.storagepath+"/node_stats", "rb")
|
||||
data = node_stats_file.read()
|
||||
node_stats_file.close()
|
||||
node_stats = msgpack.unpackb(data)
|
||||
|
||||
if not type(node_stats) == dict:
|
||||
RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR)
|
||||
else:
|
||||
self.client_propagation_messages_received = node_stats["client_propagation_messages_received"]
|
||||
self.client_propagation_messages_served = node_stats["client_propagation_messages_served"]
|
||||
self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"]
|
||||
self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"]
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
self.propagation_node = True
|
||||
self.propagation_node_start_time = time.time()
|
||||
self.propagation_destination.set_link_established_callback(self.propagation_link_established)
|
||||
self.propagation_destination.set_packet_callback(self.propagation_packet)
|
||||
|
||||
self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
|
||||
self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)
|
||||
|
||||
self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash])
|
||||
|
||||
if self.message_storage_limit != None:
|
||||
limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit)
|
||||
else:
|
||||
@ -580,6 +686,76 @@ class LXMRouter:
|
||||
return False
|
||||
|
||||
|
||||
### Propagation Node Control ##########################
|
||||
#######################################################
|
||||
|
||||
def compile_stats(self):
|
||||
if not self.propagation_node:
|
||||
return None
|
||||
else:
|
||||
peer_stats = {}
|
||||
for peer_id in self.peers.copy():
|
||||
peer = self.peers[peer_id]
|
||||
peer_stats[peer_id] = {
|
||||
"type": "static" if peer_id in self.static_peers else "discovered",
|
||||
"state": peer.state,
|
||||
"alive": peer.alive,
|
||||
"last_heard": int(peer.last_heard),
|
||||
"next_sync_attempt": peer.next_sync_attempt,
|
||||
"last_sync_attempt": peer.last_sync_attempt,
|
||||
"sync_backoff": peer.sync_backoff,
|
||||
"peering_timebase": peer.peering_timebase,
|
||||
"ler": int(peer.link_establishment_rate),
|
||||
"str": int(peer.sync_transfer_rate),
|
||||
"transfer_limit": peer.propagation_transfer_limit,
|
||||
"network_distance": RNS.Transport.hops_to(peer_id),
|
||||
"rx_bytes": peer.rx_bytes,
|
||||
"tx_bytes": peer.tx_bytes,
|
||||
"messages": {
|
||||
"offered": peer.offered,
|
||||
"outgoing": peer.outgoing,
|
||||
"incoming": peer.incoming,
|
||||
"unhandled": peer.unhandled_message_count
|
||||
},
|
||||
}
|
||||
|
||||
node_stats = {
|
||||
"identity_hash": self.identity.hash,
|
||||
"destination_hash": self.propagation_destination.hash,
|
||||
"uptime": time.time()-self.propagation_node_start_time,
|
||||
"delivery_limit": self.delivery_per_transfer_limit,
|
||||
"propagation_limit": self.propagation_per_transfer_limit,
|
||||
"autopeer_maxdepth": self.autopeer_maxdepth,
|
||||
"from_static_only": self.from_static_only,
|
||||
"messagestore": {
|
||||
"count": len(self.propagation_entries),
|
||||
"bytes": self.message_storage_size(),
|
||||
"limit": self.message_storage_limit,
|
||||
},
|
||||
"clients" : {
|
||||
"client_propagation_messages_received": self.client_propagation_messages_received,
|
||||
"client_propagation_messages_served": self.client_propagation_messages_served,
|
||||
},
|
||||
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
|
||||
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
|
||||
"static_peers": len(self.static_peers),
|
||||
"discovered_peers": len(self.peers)-len(self.static_peers),
|
||||
"total_peers": len(self.peers),
|
||||
"max_peers": self.max_peers,
|
||||
"peers": peer_stats,
|
||||
}
|
||||
|
||||
return node_stats
|
||||
|
||||
def stats_get_request(self, path, data, request_id, remote_identity, requested_at):
|
||||
if remote_identity == None:
|
||||
return LXMPeer.ERROR_NO_IDENTITY
|
||||
elif remote_identity.hash != self.identity.hash:
|
||||
return LXMPeer.ERROR_NO_ACCESS
|
||||
else:
|
||||
return self.compile_stats()
|
||||
|
||||
|
||||
### Utility & Maintenance #############################
|
||||
#######################################################
|
||||
|
||||
@ -589,39 +765,64 @@ class LXMRouter:
|
||||
JOB_TRANSIENT_INTERVAL = 60
|
||||
JOB_STORE_INTERVAL = 120
|
||||
JOB_PEERSYNC_INTERVAL = 12
|
||||
JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL
|
||||
JOB_ROTATE_INTERVAL = 56*JOB_PEERINGEST_INTERVAL
|
||||
def jobs(self):
|
||||
self.processing_count += 1
|
||||
if not self.exit_handler_running:
|
||||
self.processing_count += 1
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
|
||||
self.process_outbound()
|
||||
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
|
||||
self.process_outbound()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0:
|
||||
threading.Thread(target=self.process_deferred_stamps, daemon=True).start()
|
||||
if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0:
|
||||
threading.Thread(target=self.process_deferred_stamps, daemon=True).start()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
|
||||
self.clean_links()
|
||||
if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
|
||||
self.clean_links()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0:
|
||||
self.clean_transient_id_caches()
|
||||
if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0:
|
||||
self.clean_transient_id_caches()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
|
||||
self.clean_message_store()
|
||||
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
|
||||
if self.propagation_node == True:
|
||||
self.clean_message_store()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
|
||||
self.sync_peers()
|
||||
if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0:
|
||||
if self.propagation_node == True:
|
||||
self.flush_queues()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0:
|
||||
if self.propagation_node == True:
|
||||
self.rotate_peers()
|
||||
|
||||
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
|
||||
if self.propagation_node == True:
|
||||
self.sync_peers()
|
||||
|
||||
def jobloop(self):
|
||||
while (True):
|
||||
# TODO: Improve this to scheduling, so manual
|
||||
# triggers can delay next run
|
||||
|
||||
try:
|
||||
self.jobs()
|
||||
except Exception as e:
|
||||
RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR)
|
||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
time.sleep(LXMRouter.PROCESSING_INTERVAL)
|
||||
|
||||
def flush_queues(self):
|
||||
if len(self.peers) > 0:
|
||||
self.flush_peer_distribution_queue()
|
||||
RNS.log("Calculating peer distribution queue mappings...", RNS.LOG_DEBUG); st = time.time()
|
||||
for peer_id in self.peers.copy():
|
||||
if peer_id in self.peers:
|
||||
peer = self.peers[peer_id]
|
||||
if peer.queued_items():
|
||||
peer.process_queues()
|
||||
|
||||
RNS.log(f"Distribution queue mapping completed in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG)
|
||||
|
||||
def clean_links(self):
|
||||
closed_links = []
|
||||
for link_hash in self.direct_links:
|
||||
@ -693,6 +894,11 @@ class LXMRouter:
|
||||
self.save_outbound_stamp_costs()
|
||||
threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start()
|
||||
|
||||
def get_wanted_inbound_peers(self):
|
||||
# TODO: Implement/rethink.
|
||||
# Probably not necessary anymore.
|
||||
return None
|
||||
|
||||
def get_announce_app_data(self, destination_hash):
|
||||
if destination_hash in self.delivery_destinations:
|
||||
delivery_destination = self.delivery_destinations[destination_hash]
|
||||
@ -794,12 +1000,12 @@ class LXMRouter:
|
||||
lxm_size = self.propagation_entries[transient_id][3]
|
||||
return lxm_size
|
||||
|
||||
|
||||
def clean_message_store(self):
|
||||
RNS.log("Cleaning message store", RNS.LOG_VERBOSE)
|
||||
# Check and remove expired messages
|
||||
now = time.time()
|
||||
removed_entries = {}
|
||||
for transient_id in self.propagation_entries:
|
||||
for transient_id in self.propagation_entries.copy():
|
||||
entry = self.propagation_entries[transient_id]
|
||||
filepath = entry[1]
|
||||
components = filepath.split("_")
|
||||
@ -807,7 +1013,7 @@ class LXMRouter:
|
||||
if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2:
|
||||
timestamp = float(components[1])
|
||||
if now > timestamp+LXMRouter.MESSAGE_EXPIRY:
|
||||
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG)
|
||||
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_EXTREME)
|
||||
removed_entries[transient_id] = filepath
|
||||
else:
|
||||
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
|
||||
@ -825,7 +1031,7 @@ class LXMRouter:
|
||||
RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
if removed_count > 0:
|
||||
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG)
|
||||
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_VERBOSE)
|
||||
|
||||
# Check size of message store and cull if needed
|
||||
try:
|
||||
@ -837,7 +1043,7 @@ class LXMRouter:
|
||||
bytes_cleaned = 0
|
||||
|
||||
weighted_entries = []
|
||||
for transient_id in self.propagation_entries:
|
||||
for transient_id in self.propagation_entries.copy():
|
||||
weighted_entries.append([
|
||||
self.propagation_entries[transient_id],
|
||||
self.get_weight(transient_id),
|
||||
@ -876,26 +1082,46 @@ class LXMRouter:
|
||||
|
||||
def save_locally_delivered_transient_ids(self):
|
||||
try:
|
||||
if not os.path.isdir(self.storagepath):
|
||||
if len(self.locally_delivered_transient_ids) > 0:
|
||||
if not os.path.isdir(self.storagepath):
|
||||
os.makedirs(self.storagepath)
|
||||
|
||||
with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file:
|
||||
locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids))
|
||||
with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file:
|
||||
locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids))
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
def save_locally_processed_transient_ids(self):
|
||||
try:
|
||||
if not os.path.isdir(self.storagepath):
|
||||
if len(self.locally_processed_transient_ids) > 0:
|
||||
if not os.path.isdir(self.storagepath):
|
||||
os.makedirs(self.storagepath)
|
||||
|
||||
with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file:
|
||||
locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids))
|
||||
with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file:
|
||||
locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids))
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
def save_node_stats(self):
|
||||
try:
|
||||
if not os.path.isdir(self.storagepath):
|
||||
os.makedirs(self.storagepath)
|
||||
|
||||
with open(self.storagepath+"/node_stats", "wb") as stats_file:
|
||||
node_stats = {
|
||||
"client_propagation_messages_received": self.client_propagation_messages_received,
|
||||
"client_propagation_messages_served": self.client_propagation_messages_served,
|
||||
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
|
||||
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
|
||||
}
|
||||
stats_file.write(msgpack.packb(node_stats))
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
|
||||
def clean_outbound_stamp_costs(self):
|
||||
try:
|
||||
expired = []
|
||||
@ -989,10 +1215,45 @@ class LXMRouter:
|
||||
RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR)
|
||||
|
||||
def exit_handler(self):
|
||||
if self.exit_handler_running:
|
||||
return
|
||||
|
||||
self.exit_handler_running = True
|
||||
|
||||
RNS.log("Tearing down delivery destinations...", RNS.LOG_NOTICE)
|
||||
for destination_hash in self.delivery_destinations:
|
||||
delivery_destination = self.delivery_destinations[destination_hash]
|
||||
delivery_destination.set_packet_callback(None)
|
||||
delivery_destination.set_link_established_callback(None)
|
||||
for link in delivery_destination.links:
|
||||
try:
|
||||
if link.status == RNS.Link.ACTIVE:
|
||||
link.teardown()
|
||||
except Exception as e:
|
||||
RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
|
||||
|
||||
if self.propagation_node:
|
||||
RNS.log("Tearing down propagation node destination...", RNS.LOG_NOTICE)
|
||||
self.propagation_destination.set_link_established_callback(None)
|
||||
self.propagation_destination.set_packet_callback(None)
|
||||
self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH)
|
||||
self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH)
|
||||
self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH)
|
||||
for link in self.active_propagation_links:
|
||||
try:
|
||||
if link.status == RNS.Link.ACTIVE:
|
||||
link.teardown()
|
||||
except Exception as e:
|
||||
RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
|
||||
|
||||
RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE)
|
||||
self.flush_queues()
|
||||
if self.propagation_node:
|
||||
try:
|
||||
st = time.time(); RNS.log(f"Saving {len(self.peers)} peer synchronisation states to storage...", RNS.LOG_NOTICE)
|
||||
serialised_peers = []
|
||||
for peer_id in self.peers:
|
||||
peer_dict = self.peers.copy()
|
||||
for peer_id in peer_dict:
|
||||
peer = self.peers[peer_id]
|
||||
serialised_peers.append(peer.to_bytes())
|
||||
|
||||
@ -1000,13 +1261,28 @@ class LXMRouter:
|
||||
peers_file.write(msgpack.packb(serialised_peers))
|
||||
peers_file.close()
|
||||
|
||||
RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG)
|
||||
RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_NOTICE)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
self.save_locally_delivered_transient_ids()
|
||||
self.save_locally_processed_transient_ids()
|
||||
self.save_node_stats()
|
||||
|
||||
def sigint_handler(self, signal, frame):
|
||||
if not self.exit_handler_running:
|
||||
RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING)
|
||||
sys.exit(0)
|
||||
else:
|
||||
RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
|
||||
|
||||
def sigterm_handler(self, signal, frame):
|
||||
if not self.exit_handler_running:
|
||||
RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING)
|
||||
sys.exit(0)
|
||||
else:
|
||||
RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
|
||||
|
||||
def __str__(self):
|
||||
return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">"
|
||||
@ -1121,6 +1397,7 @@ class LXMRouter:
|
||||
except Exception as e:
|
||||
RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
self.client_propagation_messages_served += len(response_messages)
|
||||
return response_messages
|
||||
|
||||
|
||||
@ -1341,7 +1618,7 @@ class LXMRouter:
|
||||
### Message Routing & Delivery ########################
|
||||
#######################################################
|
||||
|
||||
def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False):
|
||||
def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False, allow_duplicate=False):
|
||||
try:
|
||||
message = LXMessage.unpack_from_bytes(lxmf_data)
|
||||
if ratchet_id and not message.ratchet_id:
|
||||
@ -1408,7 +1685,7 @@ class LXMRouter:
|
||||
RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
|
||||
return False
|
||||
|
||||
if self.has_message(message.hash):
|
||||
if not allow_duplicate and self.has_message(message.hash):
|
||||
RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
|
||||
return False
|
||||
else:
|
||||
@ -1500,7 +1777,7 @@ class LXMRouter:
|
||||
### Peer Sync & Propagation ###########################
|
||||
#######################################################
|
||||
|
||||
def peer(self, destination_hash, timestamp, propagation_transfer_limit):
|
||||
def peer(self, destination_hash, timestamp, propagation_transfer_limit, wanted_inbound_peers = None):
|
||||
if destination_hash in self.peers:
|
||||
peer = self.peers[destination_hash]
|
||||
if timestamp > peer.peering_timebase:
|
||||
@ -1510,14 +1787,18 @@ class LXMRouter:
|
||||
peer.peering_timebase = timestamp
|
||||
peer.last_heard = time.time()
|
||||
peer.propagation_transfer_limit = propagation_transfer_limit
|
||||
RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE)
|
||||
|
||||
else:
|
||||
peer = LXMPeer(self, destination_hash)
|
||||
peer.alive = True
|
||||
peer.last_heard = time.time()
|
||||
peer.propagation_transfer_limit = propagation_transfer_limit
|
||||
self.peers[destination_hash] = peer
|
||||
RNS.log("Peered with "+str(peer.destination))
|
||||
if len(self.peers) < self.max_peers:
|
||||
peer = LXMPeer(self, destination_hash)
|
||||
peer.alive = True
|
||||
peer.last_heard = time.time()
|
||||
peer.propagation_transfer_limit = propagation_transfer_limit
|
||||
self.peers[destination_hash] = peer
|
||||
RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE)
|
||||
else:
|
||||
RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
|
||||
|
||||
def unpeer(self, destination_hash, timestamp = None):
|
||||
if timestamp == None:
|
||||
@ -1530,14 +1811,92 @@ class LXMRouter:
|
||||
self.peers.pop(destination_hash)
|
||||
RNS.log("Broke peering with "+str(peer.destination))
|
||||
|
||||
def rotate_peers(self):
|
||||
try:
|
||||
rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0)))
|
||||
required_drops = len(self.peers) - (self.max_peers - rotation_headroom)
|
||||
if required_drops > 0 and len(self.peers) - required_drops > 1:
|
||||
peers = self.peers.copy()
|
||||
untested_peers = []
|
||||
for peer_id in self.peers:
|
||||
peer = self.peers[peer_id]
|
||||
if peer.last_sync_attempt == 0:
|
||||
untested_peers.append(peer)
|
||||
|
||||
if len(untested_peers) >= rotation_headroom:
|
||||
RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG)
|
||||
return
|
||||
|
||||
fully_synced_peers = {}
|
||||
for peer_id in peers:
|
||||
peer = peers[peer_id]
|
||||
if peer.unhandled_message_count == 0:
|
||||
fully_synced_peers[peer_id] = peer
|
||||
|
||||
if len(fully_synced_peers) > 0:
|
||||
peers = fully_synced_peers
|
||||
ms = "" if len(fully_synced_peers) == 1 else "s"
|
||||
RNS.log(f"Found {len(fully_synced_peers)} fully synced peer{ms}, using as peer rotation pool basis", RNS.LOG_DEBUG)
|
||||
|
||||
culled_peers = []
|
||||
waiting_peers = []
|
||||
unresponsive_peers = []
|
||||
for peer_id in peers:
|
||||
peer = peers[peer_id]
|
||||
if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE:
|
||||
if peer.alive:
|
||||
if peer.offered == 0:
|
||||
# Don't consider for unpeering until at
|
||||
# least one message has been offered
|
||||
pass
|
||||
else:
|
||||
waiting_peers.append(peer)
|
||||
else:
|
||||
unresponsive_peers.append(peer)
|
||||
|
||||
drop_pool = []
|
||||
if len(unresponsive_peers) > 0:
|
||||
drop_pool.extend(unresponsive_peers)
|
||||
if not self.prioritise_rotating_unreachable_peers:
|
||||
drop_pool.extend(waiting_peers)
|
||||
|
||||
else:
|
||||
drop_pool.extend(waiting_peers)
|
||||
|
||||
if len(drop_pool) > 0:
|
||||
drop_count = min(required_drops, len(drop_pool))
|
||||
low_acceptance_rate_peers = sorted(
|
||||
drop_pool,
|
||||
key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ),
|
||||
reverse=False
|
||||
)[0:drop_count]
|
||||
|
||||
dropped_peers = 0
|
||||
for peer in low_acceptance_rate_peers:
|
||||
ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2)
|
||||
if ar < LXMRouter.ROTATION_AR_MAX*100:
|
||||
reachable_str = "reachable" if peer.alive else "unreachable"
|
||||
RNS.log(f"Acceptance rate for {reachable_str} peer {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing}/{peer.offered}, {peer.unhandled_message_count} unhandled messages)", RNS.LOG_DEBUG)
|
||||
self.unpeer(peer.destination_hash)
|
||||
dropped_peers += 1
|
||||
|
||||
ms = "" if dropped_peers == 1 else "s"
|
||||
RNS.log(f"Dropped {dropped_peers} low acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
def sync_peers(self):
|
||||
culled_peers = []
|
||||
waiting_peers = []
|
||||
unresponsive_peers = []
|
||||
for peer_id in self.peers:
|
||||
peer = self.peers[peer_id]
|
||||
peers = self.peers.copy()
|
||||
for peer_id in peers:
|
||||
peer = peers[peer_id]
|
||||
if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
|
||||
culled_peers.append(peer_id)
|
||||
if not peer_id in self.static_peers:
|
||||
culled_peers.append(peer_id)
|
||||
else:
|
||||
if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
|
||||
if peer.alive:
|
||||
@ -1597,10 +1956,23 @@ class LXMRouter:
|
||||
self.active_propagation_links.append(link)
|
||||
|
||||
def propagation_resource_advertised(self, resource):
|
||||
if self.from_static_only:
|
||||
remote_identity = resource.link.get_remote_identity()
|
||||
if remote_identity == None:
|
||||
RNS.log(f"Rejecting propagation resource from unidentified peer", RNS.LOG_DEBUG)
|
||||
return False
|
||||
else:
|
||||
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
remote_hash = remote_destination.hash
|
||||
remote_str = RNS.prettyhexrep(remote_hash)
|
||||
if not remote_hash in self.static_peers:
|
||||
RNS.log(f"Rejecting propagation resource from {remote_str} not in static peers list", RNS.LOG_DEBUG)
|
||||
return False
|
||||
|
||||
size = resource.get_data_size()
|
||||
limit = self.propagation_per_transfer_limit*1000
|
||||
if limit != None and size > limit:
|
||||
RNS.log("Rejecting "+RNS.prettysize(size)+" incoming LXMF propagation resource, since it exceeds the limit of "+RNS.prettysize(limit), RNS.LOG_DEBUG)
|
||||
RNS.log(f"Rejecting {RNS.prettysize(size)} incoming propagation resource, since it exceeds the limit of {RNS.prettysize(limit)}", RNS.LOG_DEBUG)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
@ -1616,6 +1988,7 @@ class LXMRouter:
|
||||
messages = data[1]
|
||||
for lxmf_data in messages:
|
||||
self.lxmf_propagation(lxmf_data)
|
||||
self.client_propagation_messages_received += 1
|
||||
|
||||
packet.prove()
|
||||
|
||||
@ -1627,6 +2000,14 @@ class LXMRouter:
|
||||
if remote_identity == None:
|
||||
return LXMPeer.ERROR_NO_IDENTITY
|
||||
else:
|
||||
if self.from_static_only:
|
||||
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
remote_hash = remote_destination.hash
|
||||
remote_str = RNS.prettyhexrep(remote_hash)
|
||||
if not remote_hash in self.static_peers:
|
||||
RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG)
|
||||
return LXMPeer.ERROR_NO_ACCESS
|
||||
|
||||
try:
|
||||
transient_ids = data
|
||||
wanted_ids = []
|
||||
@ -1649,7 +2030,6 @@ class LXMRouter:
|
||||
return None
|
||||
|
||||
def propagation_resource_concluded(self, resource):
|
||||
RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG)
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
# TODO: The peer this was received from should
|
||||
# have the transient id added to its list of
|
||||
@ -1661,31 +2041,73 @@ class LXMRouter:
|
||||
# This is a series of propagation messages from a peer or originator
|
||||
remote_timebase = data[0]
|
||||
remote_hash = None
|
||||
remote_str = "unknown peer"
|
||||
remote_identity = resource.link.get_remote_identity()
|
||||
|
||||
if remote_identity != None:
|
||||
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
remote_hash = remote_destination.hash
|
||||
remote_str = RNS.prettyhexrep(remote_hash)
|
||||
|
||||
if not remote_hash in self.peers:
|
||||
if self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
|
||||
self.peer(remote_hash, remote_timebase)
|
||||
# TODO: Query cache for an announce and get propagation
|
||||
# transfer limit from that. For now, initialise it to a
|
||||
# sane default value, and wait for an announce to arrive
|
||||
# that will update the peering config to the actual limit.
|
||||
propagation_transfer_limit = LXMRouter.PROPAGATION_LIMIT//4
|
||||
wanted_inbound_peers = None
|
||||
self.peer(remote_hash, remote_timebase, propagation_transfer_limit, wanted_inbound_peers)
|
||||
else:
|
||||
remote_str = f"peer {remote_str}"
|
||||
|
||||
messages = data[1]
|
||||
ms = "" if len(messages) == 1 else "s"
|
||||
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE)
|
||||
for lxmf_data in messages:
|
||||
peer = None
|
||||
transient_id = RNS.Identity.full_hash(lxmf_data)
|
||||
if remote_hash != None and remote_hash in self.peers:
|
||||
transient_id = RNS.Identity.full_hash(lxmf_data)
|
||||
peer = self.peers[remote_hash]
|
||||
peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data]
|
||||
peer.incoming += 1
|
||||
peer.rx_bytes += len(lxmf_data)
|
||||
else:
|
||||
if remote_identity != None:
|
||||
self.unpeered_propagation_incoming += 1
|
||||
self.unpeered_propagation_rx_bytes += len(lxmf_data)
|
||||
else:
|
||||
self.client_propagation_messages_received += 1
|
||||
|
||||
self.lxmf_propagation(lxmf_data, from_peer=peer)
|
||||
if peer != None:
|
||||
peer.queue_handled_message(transient_id)
|
||||
|
||||
self.lxmf_propagation(lxmf_data)
|
||||
else:
|
||||
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False):
|
||||
def enqueue_peer_distribution(self, transient_id, from_peer):
|
||||
self.peer_distribution_queue.append([transient_id, from_peer])
|
||||
|
||||
def flush_peer_distribution_queue(self):
|
||||
if len(self.peer_distribution_queue) > 0:
|
||||
entries = []
|
||||
while len(self.peer_distribution_queue) > 0:
|
||||
entries.append(self.peer_distribution_queue.pop())
|
||||
|
||||
for peer_id in self.peers.copy():
|
||||
if peer_id in self.peers:
|
||||
peer = self.peers[peer_id]
|
||||
for entry in entries:
|
||||
transient_id = entry[0]
|
||||
from_peer = entry[1]
|
||||
if peer != from_peer:
|
||||
peer.queue_unhandled_message(transient_id)
|
||||
|
||||
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None):
|
||||
no_stamp_enforcement = False
|
||||
if is_paper_message:
|
||||
no_stamp_enforcement = True
|
||||
@ -1694,9 +2116,8 @@ class LXMRouter:
|
||||
if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
|
||||
transient_id = RNS.Identity.full_hash(lxmf_data)
|
||||
|
||||
if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids:
|
||||
if (not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids) or allow_duplicate == True:
|
||||
received = time.time()
|
||||
propagation_entry = [transient_id, received, lxmf_data]
|
||||
destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH]
|
||||
|
||||
self.locally_processed_transient_ids[transient_id] = received
|
||||
@ -1707,7 +2128,7 @@ class LXMRouter:
|
||||
decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data)
|
||||
if decrypted_lxmf_data != None:
|
||||
delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data
|
||||
self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement)
|
||||
self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement, allow_duplicate=allow_duplicate)
|
||||
self.locally_delivered_transient_ids[transient_id] = time.time()
|
||||
|
||||
if signal_local_delivery != None:
|
||||
@ -1720,12 +2141,9 @@ class LXMRouter:
|
||||
msg_file.write(lxmf_data)
|
||||
msg_file.close()
|
||||
|
||||
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data)]
|
||||
|
||||
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG)
|
||||
for peer_id in self.peers:
|
||||
peer = self.peers[peer_id]
|
||||
peer.handle_message(transient_id)
|
||||
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME)
|
||||
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []]
|
||||
self.enqueue_peer_distribution(transient_id, from_peer)
|
||||
|
||||
else:
|
||||
# TODO: Add message to sneakernet queues when implemented
|
||||
@ -1745,9 +2163,10 @@ class LXMRouter:
|
||||
except Exception as e:
|
||||
RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG)
|
||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
|
||||
RNS.trace_exception(e)
|
||||
return False
|
||||
|
||||
def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None):
|
||||
def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False):
|
||||
try:
|
||||
if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"):
|
||||
RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR)
|
||||
@ -1757,7 +2176,7 @@ class LXMRouter:
|
||||
lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==")
|
||||
transient_id = RNS.Identity.full_hash(lxmf_data)
|
||||
|
||||
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, is_paper_message=True)
|
||||
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True)
|
||||
if router_propagation_result != False:
|
||||
RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG)
|
||||
return router_propagation_result
|
||||
|
@ -380,7 +380,7 @@ class LXMessage:
|
||||
if self.desired_method == LXMessage.OPPORTUNISTIC:
|
||||
if self.__destination.type == RNS.Destination.SINGLE:
|
||||
if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT:
|
||||
RNS.log(f"Opportunistic delivery was requested for {self}, but content exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
|
||||
RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
|
||||
self.desired_method = LXMessage.DIRECT
|
||||
|
||||
# Set delivery parameters according to delivery method
|
||||
|
@ -35,6 +35,7 @@ import time
|
||||
import os
|
||||
|
||||
from LXMF._version import __version__
|
||||
from LXMF import APP_NAME
|
||||
|
||||
from RNS.vendor.configobj import ConfigObj
|
||||
|
||||
@ -126,7 +127,7 @@ def apply_config():
|
||||
if active_configuration["message_storage_limit"] < 0.005:
|
||||
active_configuration["message_storage_limit"] = 0.005
|
||||
else:
|
||||
active_configuration["message_storage_limit"] = 2000
|
||||
active_configuration["message_storage_limit"] = 500
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
|
||||
@ -140,6 +141,24 @@ def apply_config():
|
||||
else:
|
||||
active_configuration["prioritised_lxmf_destinations"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]:
|
||||
static_peers = lxmd_config["propagation"].as_list("static_peers")
|
||||
active_configuration["static_peers"] = []
|
||||
for static_peer in static_peers:
|
||||
active_configuration["static_peers"].append(bytes.fromhex(static_peer))
|
||||
else:
|
||||
active_configuration["static_peers"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]:
|
||||
active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers")
|
||||
else:
|
||||
active_configuration["max_peers"] = None
|
||||
|
||||
if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]:
|
||||
active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only")
|
||||
else:
|
||||
active_configuration["from_static_only"] = False
|
||||
|
||||
# Load various settings
|
||||
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
|
||||
targetloglevel = lxmd_config["logging"].as_int("loglevel")
|
||||
@ -305,7 +324,10 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
||||
autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
|
||||
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
|
||||
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
|
||||
)
|
||||
max_peers = active_configuration["max_peers"],
|
||||
static_peers = active_configuration["static_peers"],
|
||||
from_static_only = active_configuration["from_static_only"])
|
||||
|
||||
message_router.register_delivery_callback(lxmf_delivery)
|
||||
|
||||
for destination_hash in active_configuration["ignored_lxmf_destinations"]:
|
||||
@ -362,13 +384,13 @@ def jobs():
|
||||
try:
|
||||
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
|
||||
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
last_peer_announce = time.time()
|
||||
|
||||
if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None:
|
||||
if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE)
|
||||
message_router.announce_propagation_node()
|
||||
last_node_announce = time.time()
|
||||
|
||||
@ -381,7 +403,7 @@ def deferred_start_jobs():
|
||||
global active_configuration, last_peer_announce, last_node_announce
|
||||
global message_router, lxmf_destination
|
||||
time.sleep(DEFFERED_JOBS_DELAY)
|
||||
RNS.log("Running deferred start jobs")
|
||||
RNS.log("Running deferred start jobs", RNS.LOG_DEBUG)
|
||||
if active_configuration["peer_announce_at_start"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
@ -394,6 +416,190 @@ def deferred_start_jobs():
|
||||
last_node_announce = time.time()
|
||||
threading.Thread(target=jobs, daemon=True).start()
|
||||
|
||||
def query_status(identity, timeout=5, exit_on_fail=False):
|
||||
control_destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
if exit_on_fail:
|
||||
RNS.log("Getting lxmd statistics timed out, exiting now", RNS.LOG_ERROR)
|
||||
exit(200)
|
||||
else:
|
||||
return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
|
||||
if not RNS.Transport.has_path(control_destination.hash):
|
||||
RNS.Transport.request_path(control_destination.hash)
|
||||
while not RNS.Transport.has_path(control_destination.hash):
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link = RNS.Link(control_destination)
|
||||
while not link.status == RNS.Link.ACTIVE:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.identify(identity)
|
||||
request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None)
|
||||
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.teardown()
|
||||
return request_receipt.get_response()
|
||||
|
||||
def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = 0, timeout=5, show_status=False, show_peers=False, identity_path=None):
|
||||
global configpath, identitypath, storagedir, lxmdir
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
targetlogdest = RNS.LOG_STDOUT
|
||||
|
||||
if identity_path == None:
|
||||
if configdir == None:
|
||||
if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"):
|
||||
configdir = "/etc/lxmd"
|
||||
elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"):
|
||||
configdir = RNS.Reticulum.userdir+"/.config/lxmd"
|
||||
else:
|
||||
configdir = RNS.Reticulum.userdir+"/.lxmd"
|
||||
|
||||
configpath = configdir+"/config"
|
||||
identitypath = configdir+"/identity"
|
||||
identity = None
|
||||
|
||||
if not os.path.isdir(configdir):
|
||||
RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR)
|
||||
exit(201)
|
||||
if not os.path.isfile(identitypath):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identitypath)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
else:
|
||||
if not os.path.isfile(identity_path):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identity_path)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
if targetloglevel == None:
|
||||
targetloglevel = 3
|
||||
if verbosity != 0 or quietness != 0:
|
||||
targetloglevel = targetloglevel+verbosity-quietness
|
||||
|
||||
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
|
||||
response = query_status(identity, timeout=timeout, exit_on_fail=True)
|
||||
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
|
||||
RNS.log("Remote received no identity")
|
||||
exit(203)
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
|
||||
RNS.log("Access denied")
|
||||
exit(204)
|
||||
else:
|
||||
s = response
|
||||
mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2)
|
||||
ms_util = f"{mutil}%"
|
||||
if s["from_static_only"]:
|
||||
who_str = "static peers only"
|
||||
else:
|
||||
who_str = "all nodes"
|
||||
|
||||
available_peers = 0
|
||||
unreachable_peers = 0
|
||||
peered_incoming = 0
|
||||
peered_outgoing = 0
|
||||
peered_rx_bytes = 0
|
||||
peered_tx_bytes = 0
|
||||
for peer_id in s["peers"]:
|
||||
p = s["peers"][peer_id]
|
||||
pm = p["messages"]
|
||||
peered_incoming += pm["incoming"]
|
||||
peered_outgoing += pm["outgoing"]
|
||||
peered_rx_bytes += p["rx_bytes"]
|
||||
peered_tx_bytes += p["tx_bytes"]
|
||||
if p["alive"]:
|
||||
available_peers += 1
|
||||
else:
|
||||
unreachable_peers += 1
|
||||
|
||||
total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"]
|
||||
total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"]
|
||||
df = round(peered_outgoing/total_incoming, 2)
|
||||
|
||||
dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"])
|
||||
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
|
||||
|
||||
if show_status:
|
||||
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
|
||||
ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
|
||||
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
|
||||
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
|
||||
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
|
||||
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
|
||||
print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit")
|
||||
print(f"")
|
||||
print(f"Peers : {stp} total (peer limit is {smp})")
|
||||
print(f" {sdp} discovered, {ssp} static")
|
||||
print(f" {available_peers} available, {unreachable_peers} unreachable")
|
||||
print(f"")
|
||||
print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})")
|
||||
print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})")
|
||||
print(f" {upi} messages received from unpeered nodes ({uprx})")
|
||||
print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})")
|
||||
print(f" {cprr} propagation messages received directly from clients")
|
||||
print(f" {cprs} propagation messages served to clients")
|
||||
print(f" Distribution factor is {df}")
|
||||
print(f"")
|
||||
|
||||
if show_peers:
|
||||
if not show_status:
|
||||
print("")
|
||||
|
||||
for peer_id in s["peers"]:
|
||||
ind = " "
|
||||
p = s["peers"][peer_id]
|
||||
if p["type"] == "static":
|
||||
t = "Static peer "
|
||||
elif p["type"] == "discovered":
|
||||
t = "Discovered peer "
|
||||
else:
|
||||
t = "Unknown peer "
|
||||
a = "Available" if p["alive"] == True else "Unreachable"
|
||||
h = max(time.time()-p["last_heard"], 0)
|
||||
hops = p["network_distance"]
|
||||
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
|
||||
pm = p["messages"]
|
||||
if p["last_sync_attempt"] != 0:
|
||||
lsa = p["last_sync_attempt"]
|
||||
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
|
||||
else:
|
||||
ls = "never synced"
|
||||
|
||||
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000)
|
||||
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
|
||||
pmi = pm["incoming"]; pmuh = pm["unhandled"]
|
||||
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
|
||||
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
|
||||
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit")
|
||||
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming")
|
||||
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
|
||||
ms = "" if pm["unhandled"] == 1 else "s"
|
||||
print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}")
|
||||
print("")
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
|
||||
@ -404,6 +610,10 @@ def main():
|
||||
parser.add_argument("-v", "--verbose", action="count", default=0)
|
||||
parser.add_argument("-q", "--quiet", action="count", default=0)
|
||||
parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file")
|
||||
parser.add_argument("--status", action="store_true", default=False, help="display node status")
|
||||
parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes")
|
||||
parser.add_argument("--timeout", action="store", default=5, help="timeout in seconds for query operations", type=float)
|
||||
parser.add_argument("--identity", action="store", default=None, help="path to identity used for query request", type=str)
|
||||
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
|
||||
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
|
||||
|
||||
@ -413,15 +623,24 @@ def main():
|
||||
print(__default_lxmd_config__)
|
||||
exit()
|
||||
|
||||
program_setup(
|
||||
configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service
|
||||
)
|
||||
if args.status or args.peers:
|
||||
get_status(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
timeout=args.timeout,
|
||||
show_status=args.status,
|
||||
show_peers=args.peers,
|
||||
identity_path=args.identity)
|
||||
exit()
|
||||
|
||||
program_setup(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
@ -477,9 +696,9 @@ propagation_transfer_max_accepted_size = 256
|
||||
# LXMF prioritises keeping messages that are
|
||||
# new and small. Large and old messages will
|
||||
# be removed first. This setting is optional
|
||||
# and defaults to 2 gigabytes.
|
||||
# and defaults to 500 megabytes.
|
||||
|
||||
# message_storage_limit = 2000
|
||||
# message_storage_limit = 500
|
||||
|
||||
# You can tell the LXMF message router to
|
||||
# prioritise storage for one or more
|
||||
@ -491,6 +710,25 @@ propagation_transfer_max_accepted_size = 256
|
||||
|
||||
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
|
||||
|
||||
# You can configure the maximum number of other
|
||||
# propagation nodes that this node will peer
|
||||
# with automatically. The default is 50.
|
||||
|
||||
# max_peers = 25
|
||||
|
||||
# You can configure a list of static propagation
|
||||
# node peers, that this node will always be
|
||||
# peered with, by specifying a list of
|
||||
# destination hashes.
|
||||
|
||||
# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4
|
||||
|
||||
# You can configure the propagation node to
|
||||
# only accept incoming propagation messages
|
||||
# from configured static peers.
|
||||
|
||||
# from_static_only = True
|
||||
|
||||
# By default, any destination is allowed to
|
||||
# connect and download messages, but you can
|
||||
# optionally restrict this. If you enable
|
||||
|
@ -1 +1 @@
|
||||
__version__ = "0.6.0"
|
||||
__version__ = "0.6.3"
|
||||
|
@ -12,6 +12,7 @@ User-facing clients built on LXMF include:
|
||||
|
||||
Community-provided tools and utilities for LXMF include:
|
||||
|
||||
- [LXMFy](https://lxmfy.quad4.io/)
|
||||
- [LXMF-Bot](https://github.com/randogoth/lxmf-bot)
|
||||
- [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard)
|
||||
- [LXMEvent](https://github.com/faragher/LXMEvent)
|
||||
|
@ -1,3 +1,2 @@
|
||||
qrcode==7.4.2
|
||||
rns==0.7.8
|
||||
setuptools==70.0.0
|
||||
qrcode>=7.4.2
|
||||
rns>=0.9.1
|
||||
|
Loading…
x
Reference in New Issue
Block a user