mirror of
https://github.com/markqvist/LXMF.git
synced 2025-04-19 14:55:52 -04:00
Compare commits
No commits in common. "master" and "0.5.1" have entirely different histories.
2
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
2
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
@ -12,7 +12,7 @@ Before creating a bug report on this issue tracker, you **must** read the [Contr
|
||||
|
||||
- The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**.
|
||||
- Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**.
|
||||
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**.
|
||||
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), delete this section from your bug report.
|
||||
|
||||
**Describe the Bug**
|
||||
A clear and concise description of what the bug is.
|
||||
|
16
LICENSE
16
LICENSE
@ -1,6 +1,6 @@
|
||||
Reticulum License
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2020-2025 Mark Qvist
|
||||
Copyright (c) 2020 Mark Qvist / unsigned.io
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@ -9,16 +9,8 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
- The Software shall not be used in any kind of system which includes amongst
|
||||
its functions the ability to purposefully do harm to human beings.
|
||||
|
||||
- The Software shall not be used, directly or indirectly, in the creation of
|
||||
an artificial intelligence, machine learning or language model training
|
||||
dataset, including but not limited to any use that contributes to the
|
||||
training or development of such a model or algorithm.
|
||||
|
||||
- The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
|
@ -2,7 +2,8 @@ import time
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
from .LXMF import APP_NAME, stamp_cost_from_app_data, pn_announce_data_is_valid
|
||||
from .LXMF import APP_NAME, stamp_cost_from_app_data
|
||||
|
||||
from .LXMessage import LXMessage
|
||||
|
||||
class LXMFDeliveryAnnounceHandler:
|
||||
@ -14,7 +15,7 @@ class LXMFDeliveryAnnounceHandler:
|
||||
def received_announce(self, destination_hash, announced_identity, app_data):
|
||||
for lxmessage in self.lxmrouter.pending_outbound:
|
||||
if destination_hash == lxmessage.destination_hash:
|
||||
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
|
||||
if lxmessage.method == LXMessage.DIRECT:
|
||||
lxmessage.next_delivery_attempt = time.time()
|
||||
|
||||
while self.lxmrouter.processing_outbound:
|
||||
@ -39,37 +40,23 @@ class LXMFPropagationAnnounceHandler:
|
||||
def received_announce(self, destination_hash, announced_identity, app_data):
|
||||
try:
|
||||
if type(app_data) == bytes:
|
||||
data = msgpack.unpackb(app_data)
|
||||
|
||||
if self.lxmrouter.propagation_node and self.lxmrouter.autopeer:
|
||||
data = msgpack.unpackb(app_data)
|
||||
node_timebase = data[1]
|
||||
propagation_transfer_limit = None
|
||||
if len(data) >= 3:
|
||||
try:
|
||||
propagation_transfer_limit = float(data[2])
|
||||
except:
|
||||
propagation_transfer_limit = None
|
||||
|
||||
if pn_announce_data_is_valid(data):
|
||||
node_timebase = data[1]
|
||||
propagation_transfer_limit = None
|
||||
wanted_inbound_peers = None
|
||||
if len(data) >= 4:
|
||||
# TODO: Rethink, probably not necessary anymore
|
||||
# try:
|
||||
# wanted_inbound_peers = int(data[3])
|
||||
# except:
|
||||
# wanted_inbound_peers = None
|
||||
pass
|
||||
if data[0] == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
|
||||
|
||||
if len(data) >= 3:
|
||||
try:
|
||||
propagation_transfer_limit = float(data[2])
|
||||
except:
|
||||
propagation_transfer_limit = None
|
||||
|
||||
if destination_hash in self.lxmrouter.static_peers:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
|
||||
|
||||
else:
|
||||
if data[0] == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers)
|
||||
|
||||
elif data[0] == False:
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
elif data[0] == False:
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)
|
||||
|
44
LXMF/LXMF.py
44
LXMF/LXMF.py
@ -18,8 +18,6 @@ FIELD_RESULTS = 0x0A
|
||||
FIELD_GROUP = 0x0B
|
||||
FIELD_TICKET = 0x0C
|
||||
FIELD_EVENT = 0x0D
|
||||
FIELD_RNR_REFS = 0x0E
|
||||
FIELD_RENDERER = 0x0F
|
||||
|
||||
# For usecases such as including custom data structures,
|
||||
# embedding or encapsulating other data types or protocols
|
||||
@ -78,25 +76,12 @@ AM_OPUS_LOSSLESS = 0x19
|
||||
# determine it itself based on the included data.
|
||||
AM_CUSTOM = 0xFF
|
||||
|
||||
# Message renderer specifications for FIELD_RENDERER.
|
||||
# The renderer specification is completely optional,
|
||||
# and only serves as an indication to the receiving
|
||||
# client on how to render the message contents. It is
|
||||
# not mandatory to implement, either on sending or
|
||||
# receiving sides, but is the recommended way to
|
||||
# signal how to render a message, if non-plaintext
|
||||
# formatting is used.
|
||||
RENDERER_PLAIN = 0x00
|
||||
RENDERER_MICRON = 0x01
|
||||
RENDERER_MARKDOWN = 0x02
|
||||
RENDERER_BBCODE = 0x03
|
||||
|
||||
##########################################################
|
||||
# The following helper functions makes it easier to #
|
||||
# handle and operate on LXMF data in client programs #
|
||||
##########################################################
|
||||
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
def display_name_from_app_data(app_data=None):
|
||||
if app_data == None:
|
||||
@ -118,8 +103,8 @@ def display_name_from_app_data(app_data=None):
|
||||
try:
|
||||
decoded = dn.decode("utf-8")
|
||||
return decoded
|
||||
except Exception as e:
|
||||
RNS.log(f"Could not decode display name in included announce data. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
except:
|
||||
RNS.log("Could not decode display name in included announce data. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
return None
|
||||
|
||||
# Original announce format
|
||||
@ -127,7 +112,7 @@ def display_name_from_app_data(app_data=None):
|
||||
return app_data.decode("utf-8")
|
||||
|
||||
def stamp_cost_from_app_data(app_data=None):
|
||||
if app_data == None or app_data == b"":
|
||||
if app_data == None:
|
||||
return None
|
||||
else:
|
||||
# Version 0.5.0+ announce format
|
||||
@ -141,25 +126,4 @@ def stamp_cost_from_app_data(app_data=None):
|
||||
|
||||
# Original announce format
|
||||
else:
|
||||
return None
|
||||
|
||||
def pn_announce_data_is_valid(data):
|
||||
try:
|
||||
if type(data) == bytes:
|
||||
data = msgpack.unpackb(data)
|
||||
|
||||
if len(data) < 3:
|
||||
raise ValueError("Invalid announce data: Insufficient peer data")
|
||||
else:
|
||||
if data[0] != True and data[0] != False:
|
||||
raise ValueError("Invalid announce data: Indeterminate propagation node status")
|
||||
try:
|
||||
int(data[1])
|
||||
except:
|
||||
raise ValueError("Invalid announce data: Could not decode peer timebase")
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG)
|
||||
return False
|
||||
|
||||
return True
|
||||
return None
|
256
LXMF/LXMPeer.py
256
LXMF/LXMPeer.py
@ -4,7 +4,6 @@ import time
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
from collections import deque
|
||||
from .LXMF import APP_NAME
|
||||
|
||||
class LXMPeer:
|
||||
@ -20,7 +19,6 @@ class LXMPeer:
|
||||
|
||||
ERROR_NO_IDENTITY = 0xf0
|
||||
ERROR_NO_ACCESS = 0xf1
|
||||
ERROR_TIMEOUT = 0xfe
|
||||
|
||||
# Maximum amount of time a peer can
|
||||
# be unreachable before it is removed
|
||||
@ -40,25 +38,15 @@ class LXMPeer:
|
||||
@staticmethod
|
||||
def from_bytes(peer_bytes, router):
|
||||
dictionary = msgpack.unpackb(peer_bytes)
|
||||
peer_destination_hash = dictionary["destination_hash"]
|
||||
peer_peering_timebase = dictionary["peering_timebase"]
|
||||
peer_alive = dictionary["alive"]
|
||||
peer_last_heard = dictionary["last_heard"]
|
||||
|
||||
peer = LXMPeer(router, peer_destination_hash)
|
||||
peer.peering_timebase = peer_peering_timebase
|
||||
peer.alive = peer_alive
|
||||
peer.last_heard = peer_last_heard
|
||||
|
||||
peer = LXMPeer(router, dictionary["destination_hash"])
|
||||
peer.peering_timebase = dictionary["peering_timebase"]
|
||||
peer.alive = dictionary["alive"]
|
||||
peer.last_heard = dictionary["last_heard"]
|
||||
if "link_establishment_rate" in dictionary:
|
||||
peer.link_establishment_rate = dictionary["link_establishment_rate"]
|
||||
else:
|
||||
peer.link_establishment_rate = 0
|
||||
|
||||
if "sync_transfer_rate" in dictionary:
|
||||
peer.sync_transfer_rate = dictionary["sync_transfer_rate"]
|
||||
else:
|
||||
peer.sync_transfer_rate = 0
|
||||
|
||||
if "propagation_transfer_limit" in dictionary:
|
||||
try:
|
||||
@ -67,55 +55,15 @@ class LXMPeer:
|
||||
peer.propagation_transfer_limit = None
|
||||
else:
|
||||
peer.propagation_transfer_limit = None
|
||||
|
||||
if "offered" in dictionary:
|
||||
peer.offered = dictionary["offered"]
|
||||
else:
|
||||
peer.offered = 0
|
||||
|
||||
if "outgoing" in dictionary:
|
||||
peer.outgoing = dictionary["outgoing"]
|
||||
else:
|
||||
peer.outgoing = 0
|
||||
|
||||
if "incoming" in dictionary:
|
||||
peer.incoming = dictionary["incoming"]
|
||||
else:
|
||||
peer.incoming = 0
|
||||
|
||||
if "rx_bytes" in dictionary:
|
||||
peer.rx_bytes = dictionary["rx_bytes"]
|
||||
else:
|
||||
peer.rx_bytes = 0
|
||||
|
||||
if "tx_bytes" in dictionary:
|
||||
peer.tx_bytes = dictionary["tx_bytes"]
|
||||
else:
|
||||
peer.tx_bytes = 0
|
||||
|
||||
if "last_sync_attempt" in dictionary:
|
||||
peer.last_sync_attempt = dictionary["last_sync_attempt"]
|
||||
else:
|
||||
peer.last_sync_attempt = 0
|
||||
|
||||
hm_count = 0
|
||||
for transient_id in dictionary["handled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.add_handled_message(transient_id)
|
||||
hm_count += 1
|
||||
peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
|
||||
um_count = 0
|
||||
for transient_id in dictionary["unhandled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.add_unhandled_message(transient_id)
|
||||
um_count += 1
|
||||
peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
|
||||
peer._hm_count = hm_count
|
||||
peer._um_count = um_count
|
||||
peer._hm_counts_synced = True
|
||||
peer._um_counts_synced = True
|
||||
|
||||
del dictionary
|
||||
return peer
|
||||
|
||||
def to_bytes(self):
|
||||
@ -125,14 +73,7 @@ class LXMPeer:
|
||||
dictionary["last_heard"] = self.last_heard
|
||||
dictionary["destination_hash"] = self.destination_hash
|
||||
dictionary["link_establishment_rate"] = self.link_establishment_rate
|
||||
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
|
||||
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
|
||||
dictionary["last_sync_attempt"] = self.last_sync_attempt
|
||||
dictionary["offered"] = self.offered
|
||||
dictionary["outgoing"] = self.outgoing
|
||||
dictionary["incoming"] = self.incoming
|
||||
dictionary["rx_bytes"] = self.rx_bytes
|
||||
dictionary["tx_bytes"] = self.tx_bytes
|
||||
|
||||
handled_ids = []
|
||||
for transient_id in self.handled_messages:
|
||||
@ -145,10 +86,7 @@ class LXMPeer:
|
||||
dictionary["handled_ids"] = handled_ids
|
||||
dictionary["unhandled_ids"] = unhandled_ids
|
||||
|
||||
peer_bytes = msgpack.packb(dictionary)
|
||||
del dictionary
|
||||
|
||||
return peer_bytes
|
||||
return msgpack.packb(dictionary)
|
||||
|
||||
def __init__(self, router, destination_hash):
|
||||
self.alive = False
|
||||
@ -158,35 +96,19 @@ class LXMPeer:
|
||||
self.sync_backoff = 0
|
||||
self.peering_timebase = 0
|
||||
self.link_establishment_rate = 0
|
||||
self.sync_transfer_rate = 0
|
||||
self.propagation_transfer_limit = None
|
||||
self.handled_messages_queue = deque()
|
||||
self.unhandled_messages_queue = deque()
|
||||
|
||||
self.offered = 0 # Messages offered to this peer
|
||||
self.outgoing = 0 # Messages transferred to this peer
|
||||
self.incoming = 0 # Messages received from this peer
|
||||
self.rx_bytes = 0 # Bytes received from this peer
|
||||
self.tx_bytes = 0 # Bytes sent to this peer
|
||||
|
||||
self._hm_count = 0
|
||||
self._um_count = 0
|
||||
self._hm_counts_synced = False
|
||||
self._um_counts_synced = False
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
self.unhandled_messages = {}
|
||||
self.handled_messages = {}
|
||||
self.last_offer = []
|
||||
|
||||
self.router = router
|
||||
self.destination_hash = destination_hash
|
||||
self.identity = RNS.Identity.recall(destination_hash)
|
||||
if self.identity != None:
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
else:
|
||||
self.destination = None
|
||||
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
|
||||
def sync(self):
|
||||
RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
|
||||
@ -204,10 +126,9 @@ class LXMPeer:
|
||||
else:
|
||||
if self.identity == None:
|
||||
self.identity = RNS.Identity.recall(destination_hash)
|
||||
if self.identity != None:
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
|
||||
if self.destination != None:
|
||||
if self.identity != None:
|
||||
if len(self.unhandled_messages) > 0:
|
||||
if self.state == LXMPeer.IDLE:
|
||||
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
|
||||
@ -239,7 +160,7 @@ class LXMPeer:
|
||||
|
||||
for transient_id in purged_ids:
|
||||
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
self.unhandled_messages.pop(transient_id)
|
||||
|
||||
unhandled_entries.sort(key=lambda e: e[1], reverse=False)
|
||||
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
|
||||
@ -250,17 +171,14 @@ class LXMPeer:
|
||||
lxm_size = unhandled_entry[2]
|
||||
next_size = cumulative_size + (lxm_size+per_message_overhead)
|
||||
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
|
||||
if lxm_size+per_message_overhead > (self.propagation_transfer_limit*1000):
|
||||
self.remove_unhandled_message(transient_id)
|
||||
self.add_handled_message(transient_id)
|
||||
RNS.log(f"Message {RNS.prettyhexrep(transient_id)} exceeds transfer limit for {self}, considering handled", RNS.LOG_DEBUG)
|
||||
pass
|
||||
else:
|
||||
cumulative_size += (lxm_size+per_message_overhead)
|
||||
unhandled_ids.append(transient_id)
|
||||
|
||||
RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_VERBOSE)
|
||||
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
|
||||
self.last_offer = unhandled_ids
|
||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||
self.state = LXMPeer.REQUEST_SENT
|
||||
|
||||
else:
|
||||
@ -288,29 +206,22 @@ class LXMPeer:
|
||||
|
||||
if response == LXMPeer.ERROR_NO_IDENTITY:
|
||||
if self.link != None:
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE)
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG)
|
||||
self.link.identify()
|
||||
self.state = LXMPeer.LINK_READY
|
||||
self.sync()
|
||||
return
|
||||
|
||||
elif response == LXMPeer.ERROR_NO_ACCESS:
|
||||
RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE)
|
||||
self.router.unpeer(self.destination_hash)
|
||||
return
|
||||
|
||||
elif response == False:
|
||||
# Peer already has all advertised messages
|
||||
for transient_id in self.last_offer:
|
||||
if transient_id in self.unhandled_messages:
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
||||
|
||||
|
||||
elif response == True:
|
||||
# Peer wants all advertised messages
|
||||
for transient_id in self.last_offer:
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
else:
|
||||
@ -319,17 +230,18 @@ class LXMPeer:
|
||||
# If the peer did not want the message, it has
|
||||
# already received it from another peer.
|
||||
if not transient_id in response:
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
if transient_id in self.unhandled_messages:
|
||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
||||
|
||||
for transient_id in response:
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
if len(wanted_messages) > 0:
|
||||
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_VERBOSE)
|
||||
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG)
|
||||
|
||||
lxm_list = []
|
||||
|
||||
for message_entry in wanted_messages:
|
||||
file_path = message_entry[1]
|
||||
if os.path.isfile(file_path):
|
||||
@ -341,12 +253,10 @@ class LXMPeer:
|
||||
data = msgpack.packb([time.time(), lxm_list])
|
||||
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
||||
resource.transferred_messages = wanted_message_ids
|
||||
resource.sync_transfer_started = time.time()
|
||||
self.state = LXMPeer.RESOURCE_TRANSFERRING
|
||||
|
||||
else:
|
||||
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_VERBOSE)
|
||||
self.offered += len(self.last_offer)
|
||||
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
@ -366,8 +276,8 @@ class LXMPeer:
|
||||
def resource_concluded(self, resource):
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
for transient_id in resource.transferred_messages:
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
message = self.unhandled_messages.pop(transient_id)
|
||||
self.handled_messages[transient_id] = message
|
||||
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
@ -375,20 +285,12 @@ class LXMPeer:
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
rate_str = ""
|
||||
if hasattr(resource, "sync_transfer_started") and resource.sync_transfer_started:
|
||||
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started)
|
||||
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
|
||||
|
||||
RNS.log(f"Syncing {len(resource.transferred_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
|
||||
RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed", RNS.LOG_DEBUG)
|
||||
self.alive = True
|
||||
self.last_heard = time.time()
|
||||
self.offered += len(self.last_offer)
|
||||
self.outgoing += len(resource.transferred_messages)
|
||||
self.tx_bytes += resource.get_data_size()
|
||||
|
||||
else:
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE)
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
@ -409,103 +311,9 @@ class LXMPeer:
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def queued_items(self):
|
||||
return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0
|
||||
|
||||
def queue_unhandled_message(self, transient_id):
|
||||
self.unhandled_messages_queue.append(transient_id)
|
||||
|
||||
def queue_handled_message(self, transient_id):
|
||||
self.handled_messages_queue.append(transient_id)
|
||||
|
||||
def process_queues(self):
|
||||
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
|
||||
# TODO: Remove debug
|
||||
# st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue)
|
||||
|
||||
handled_messages = self.handled_messages
|
||||
unhandled_messages = self.unhandled_messages
|
||||
|
||||
while len(self.handled_messages_queue) > 0:
|
||||
transient_id = self.handled_messages_queue.pop()
|
||||
if not transient_id in handled_messages:
|
||||
self.add_handled_message(transient_id)
|
||||
if transient_id in unhandled_messages:
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
while len(self.unhandled_messages_queue) > 0:
|
||||
transient_id = self.unhandled_messages_queue.pop()
|
||||
if not transient_id in handled_messages and not transient_id in unhandled_messages:
|
||||
self.add_unhandled_message(transient_id)
|
||||
|
||||
del handled_messages, unhandled_messages
|
||||
# TODO: Remove debug
|
||||
# RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}")
|
||||
|
||||
@property
|
||||
def handled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
hm = list(filter(lambda tid: self.destination_hash in pes[tid][4], pes))
|
||||
self._hm_count = len(hm); del pes
|
||||
self._hm_counts_synced = True
|
||||
return hm
|
||||
|
||||
@property
|
||||
def unhandled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
um = list(filter(lambda tid: self.destination_hash in pes[tid][5], pes))
|
||||
self._um_count = len(um); del pes
|
||||
self._um_counts_synced = True
|
||||
return um
|
||||
|
||||
@property
|
||||
def handled_message_count(self):
|
||||
if not self._hm_counts_synced:
|
||||
self._update_counts()
|
||||
|
||||
return self._hm_count
|
||||
|
||||
@property
|
||||
def unhandled_message_count(self):
|
||||
if not self._um_counts_synced:
|
||||
self._update_counts()
|
||||
|
||||
return self._um_count
|
||||
|
||||
@property
|
||||
def acceptance_rate(self):
|
||||
return 0 if self.offered == 0 else (self.outgoing/self.offered)
|
||||
|
||||
def _update_counts(self):
|
||||
if not self._hm_counts_synced:
|
||||
hm = self.handled_messages; del hm
|
||||
|
||||
if not self._um_counts_synced:
|
||||
um = self.unhandled_messages; del um
|
||||
|
||||
def add_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].append(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def add_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].append(self.destination_hash)
|
||||
self._um_count += 1
|
||||
|
||||
def remove_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].remove(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def remove_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].remove(self.destination_hash)
|
||||
self._um_counts_synced = False
|
||||
def handle_message(self, transient_id):
|
||||
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
|
||||
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
|
||||
|
||||
def __str__(self):
|
||||
if self.destination_hash:
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -16,10 +16,8 @@ class LXMessage:
|
||||
SENDING = 0x02
|
||||
SENT = 0x04
|
||||
DELIVERED = 0x08
|
||||
REJECTED = 0xFD
|
||||
CANCELLED = 0xFE
|
||||
FAILED = 0xFF
|
||||
states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, REJECTED, CANCELLED, FAILED]
|
||||
states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, FAILED]
|
||||
|
||||
UNKNOWN = 0x00
|
||||
PACKET = 0x01
|
||||
@ -51,20 +49,18 @@ class LXMessage:
|
||||
TICKET_INTERVAL = 1*24*60*60
|
||||
COST_TICKET = 0x100
|
||||
|
||||
# LXMF overhead is 112 bytes per message:
|
||||
# LXMF overhead is 111 bytes per message:
|
||||
# 16 bytes for destination hash
|
||||
# 16 bytes for source hash
|
||||
# 64 bytes for Ed25519 signature
|
||||
# 8 bytes for timestamp
|
||||
# 8 bytes for msgpack structure
|
||||
TIMESTAMP_SIZE = 8
|
||||
STRUCT_OVERHEAD = 8
|
||||
LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + TIMESTAMP_SIZE + STRUCT_OVERHEAD
|
||||
# 7 bytes for msgpack structure
|
||||
LXMF_OVERHEAD = 2*DESTINATION_LENGTH + SIGNATURE_LENGTH + 8 + 7
|
||||
|
||||
# With an MTU of 500, the maximum amount of data
|
||||
# we can send in a single encrypted packet is
|
||||
# 391 bytes.
|
||||
ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU + TIMESTAMP_SIZE
|
||||
# 383 bytes.
|
||||
ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU
|
||||
|
||||
# The max content length we can fit in LXMF message
|
||||
# inside a single RNS packet is the encrypted MDU, minus
|
||||
@ -73,7 +69,7 @@ class LXMessage:
|
||||
# field of the packet, therefore we also add the length
|
||||
# of a destination hash to the calculation. With default
|
||||
# RNS and LXMF parameters, the largest single-packet
|
||||
# LXMF message we can send is 295 bytes. If a message
|
||||
# LXMF message we can send is 288 bytes. If a message
|
||||
# is larger than that, a Reticulum link will be used.
|
||||
ENCRYPTED_PACKET_MAX_CONTENT = ENCRYPTED_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
|
||||
|
||||
@ -83,13 +79,13 @@ class LXMessage:
|
||||
LINK_PACKET_MDU = RNS.Link.MDU
|
||||
|
||||
# Which means that we can deliver single-packet LXMF
|
||||
# messages with content of up to 319 bytes over a link.
|
||||
# messages with content of up to 320 bytes over a link.
|
||||
# If a message is larger than that, LXMF will sequence
|
||||
# and transfer it as a RNS resource over the link instead.
|
||||
LINK_PACKET_MAX_CONTENT = LINK_PACKET_MDU - LXMF_OVERHEAD
|
||||
|
||||
# For plain packets without encryption, we can
|
||||
# fit up to 368 bytes of content.
|
||||
# fit up to 369 bytes of content.
|
||||
PLAIN_PACKET_MDU = RNS.Packet.PLAIN_MDU
|
||||
PLAIN_PACKET_MAX_CONTENT = PLAIN_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH
|
||||
|
||||
@ -133,16 +129,8 @@ class LXMessage:
|
||||
if title == None:
|
||||
title = ""
|
||||
|
||||
if type(title) == bytes:
|
||||
self.set_title_from_bytes(title)
|
||||
else:
|
||||
self.set_title_from_string(title)
|
||||
|
||||
if type(content) == bytes:
|
||||
self.set_content_from_bytes(content)
|
||||
else:
|
||||
self.set_content_from_string(content)
|
||||
|
||||
self.set_title_from_string(title)
|
||||
self.set_content_from_string(content)
|
||||
self.set_fields(fields)
|
||||
|
||||
self.payload = None
|
||||
@ -204,11 +192,7 @@ class LXMessage:
|
||||
self.content = content_bytes
|
||||
|
||||
def content_as_string(self):
|
||||
try:
|
||||
return self.content.decode("utf-8")
|
||||
except Exception as e:
|
||||
RNS.log(f"{self} could not decode message content as string: {e}")
|
||||
return None
|
||||
return self.content.decode("utf-8")
|
||||
|
||||
def set_fields(self, fields):
|
||||
if isinstance(fields, dict) or fields == None:
|
||||
@ -368,22 +352,14 @@ class LXMessage:
|
||||
self.packed += self.signature
|
||||
self.packed += packed_payload
|
||||
self.packed_size = len(self.packed)
|
||||
content_size = len(packed_payload)-LXMessage.TIMESTAMP_SIZE-LXMessage.STRUCT_OVERHEAD
|
||||
content_size = len(packed_payload)
|
||||
|
||||
# If no desired delivery method has been defined,
|
||||
# one will be chosen according to these rules:
|
||||
if self.desired_method == None:
|
||||
self.desired_method = LXMessage.DIRECT
|
||||
|
||||
# If opportunistic delivery was requested, check
|
||||
# that message will fit within packet size limits
|
||||
if self.desired_method == LXMessage.OPPORTUNISTIC:
|
||||
if self.__destination.type == RNS.Destination.SINGLE:
|
||||
if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT:
|
||||
RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
|
||||
self.desired_method = LXMessage.DIRECT
|
||||
# TODO: Expand rules to something more intelligent
|
||||
|
||||
# Set delivery parameters according to delivery method
|
||||
if self.desired_method == LXMessage.OPPORTUNISTIC:
|
||||
if self.__destination.type == RNS.Destination.SINGLE:
|
||||
single_packet_content_limit = LXMessage.ENCRYPTED_PACKET_MAX_CONTENT
|
||||
@ -391,7 +367,7 @@ class LXMessage:
|
||||
single_packet_content_limit = LXMessage.PLAIN_PACKET_MAX_CONTENT
|
||||
|
||||
if content_size > single_packet_content_limit:
|
||||
raise TypeError(f"LXMessage desired opportunistic delivery method, but content of length {content_size} exceeds single-packet content limit of {single_packet_content_limit}.")
|
||||
raise TypeError("LXMessage desired opportunistic delivery method, but content exceeds single-packet size.")
|
||||
else:
|
||||
self.method = LXMessage.OPPORTUNISTIC
|
||||
self.representation = LXMessage.PACKET
|
||||
@ -444,7 +420,6 @@ class LXMessage:
|
||||
if self.method == LXMessage.OPPORTUNISTIC:
|
||||
lxm_packet = self.__as_packet()
|
||||
lxm_packet.send().set_delivery_callback(self.__mark_delivered)
|
||||
self.progress = 0.50
|
||||
self.ratchet_id = lxm_packet.ratchet_id
|
||||
self.state = LXMessage.SENT
|
||||
|
||||
@ -566,27 +541,22 @@ class LXMessage:
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
self.__mark_delivered()
|
||||
else:
|
||||
if resource.status == RNS.Resource.REJECTED:
|
||||
self.state = LXMessage.REJECTED
|
||||
|
||||
elif self.state != LXMessage.CANCELLED:
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
def __propagation_resource_concluded(self, resource):
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
self.__mark_propagated()
|
||||
else:
|
||||
if self.state != LXMessage.CANCELLED:
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
resource.link.teardown()
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
def __link_packet_timed_out(self, packet_receipt):
|
||||
if self.state != LXMessage.CANCELLED:
|
||||
if packet_receipt:
|
||||
packet_receipt.destination.teardown()
|
||||
|
||||
self.state = LXMessage.OUTBOUND
|
||||
if packet_receipt:
|
||||
packet_receipt.destination.teardown()
|
||||
|
||||
self.state = LXMessage.OUTBOUND
|
||||
|
||||
|
||||
def __update_transfer_progress(self, resource):
|
||||
self.progress = 0.10 + (resource.get_progress()*0.90)
|
||||
|
@ -7,8 +7,6 @@ import multiprocessing
|
||||
|
||||
WORKBLOCK_EXPAND_ROUNDS = 3000
|
||||
|
||||
active_jobs = {}
|
||||
|
||||
def stamp_workblock(message_id):
|
||||
wb_st = time.time()
|
||||
expand_rounds = WORKBLOCK_EXPAND_ROUNDS
|
||||
@ -29,7 +27,7 @@ def stamp_value(workblock, stamp):
|
||||
value = 0
|
||||
bits = 256
|
||||
material = RNS.Identity.full_hash(workblock+stamp)
|
||||
i = int.from_bytes(material, byteorder="big")
|
||||
i = int.from_bytes(material)
|
||||
while ((i & (1 << (bits - 1))) == 0):
|
||||
i = (i << 1)
|
||||
value += 1
|
||||
@ -46,56 +44,23 @@ def generate_stamp(message_id, stamp_cost):
|
||||
value = 0
|
||||
|
||||
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
|
||||
stamp, rounds = job_simple(stamp_cost, workblock, message_id)
|
||||
stamp, rounds = job_simple(stamp_cost, workblock)
|
||||
|
||||
elif RNS.vendor.platformutils.is_android():
|
||||
stamp, rounds = job_android(stamp_cost, workblock, message_id)
|
||||
stamp, rounds = job_android(stamp_cost, workblock)
|
||||
|
||||
else:
|
||||
stamp, rounds = job_linux(stamp_cost, workblock, message_id)
|
||||
stamp, rounds = job_linux(stamp_cost, workblock)
|
||||
|
||||
duration = time.time() - start_time
|
||||
speed = rounds/duration
|
||||
if stamp != None:
|
||||
value = stamp_value(workblock, stamp)
|
||||
value = stamp_value(workblock, stamp)
|
||||
|
||||
RNS.log(f"Stamp with value {value} generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG)
|
||||
|
||||
return stamp, value
|
||||
|
||||
def cancel_work(message_id):
|
||||
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
|
||||
try:
|
||||
if message_id in active_jobs:
|
||||
active_jobs[message_id] = True
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
elif RNS.vendor.platformutils.is_android():
|
||||
try:
|
||||
if message_id in active_jobs:
|
||||
active_jobs[message_id] = True
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
else:
|
||||
try:
|
||||
if message_id in active_jobs:
|
||||
stop_event = active_jobs[message_id][0]
|
||||
result_queue = active_jobs[message_id][1]
|
||||
stop_event.set()
|
||||
result_queue.put(None)
|
||||
active_jobs.pop(message_id)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while terminating stamp generation workers: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
def job_simple(stamp_cost, workblock, message_id):
|
||||
def job_simple(stamp_cost, workblock):
|
||||
# A simple, single-process stamp generator.
|
||||
# should work on any platform, and is used
|
||||
# as a fall-back, in case of limited multi-
|
||||
@ -108,8 +73,6 @@ def job_simple(stamp_cost, workblock, message_id):
|
||||
pstamp = os.urandom(256//8)
|
||||
st = time.time()
|
||||
|
||||
active_jobs[message_id] = False;
|
||||
|
||||
def sv(s, c, w):
|
||||
target = 0b1<<256-c; m = w+s
|
||||
result = RNS.Identity.full_hash(m)
|
||||
@ -118,20 +81,15 @@ def job_simple(stamp_cost, workblock, message_id):
|
||||
else:
|
||||
return True
|
||||
|
||||
while not sv(pstamp, stamp_cost, workblock) and not active_jobs[message_id]:
|
||||
while not sv(pstamp, stamp_cost, workblock):
|
||||
pstamp = os.urandom(256//8); rounds += 1
|
||||
if rounds % 2500 == 0:
|
||||
speed = rounds / (time.time()-st)
|
||||
RNS.log(f"Stamp generation running. {rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG)
|
||||
|
||||
if active_jobs[message_id] == True:
|
||||
pstamp = None
|
||||
|
||||
active_jobs.pop(message_id)
|
||||
|
||||
return pstamp, rounds
|
||||
|
||||
def job_linux(stamp_cost, workblock, message_id):
|
||||
def job_linux(stamp_cost, workblock):
|
||||
allow_kill = True
|
||||
stamp = None
|
||||
total_rounds = 0
|
||||
@ -168,8 +126,6 @@ def job_linux(stamp_cost, workblock, message_id):
|
||||
job_procs.append(process)
|
||||
process.start()
|
||||
|
||||
active_jobs[message_id] = [stop_event, result_queue]
|
||||
|
||||
stamp = result_queue.get()
|
||||
RNS.log("Got stamp result from worker", RNS.LOG_DEBUG) # TODO: Remove
|
||||
|
||||
@ -214,7 +170,7 @@ def job_linux(stamp_cost, workblock, message_id):
|
||||
|
||||
return stamp, total_rounds
|
||||
|
||||
def job_android(stamp_cost, workblock, message_id):
|
||||
def job_android(stamp_cost, workblock):
|
||||
# Semaphore support is flaky to non-existent on
|
||||
# Android, so we need to manually dispatch and
|
||||
# manage workloads here, while periodically
|
||||
@ -274,12 +230,10 @@ def job_android(stamp_cost, workblock, message_id):
|
||||
RNS.log(f"Stamp generation worker error: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
active_jobs[message_id] = False;
|
||||
|
||||
RNS.log(f"Dispatching {jobs} workers for stamp generation...", RNS.LOG_DEBUG) # TODO: Remove
|
||||
|
||||
results_dict = wm.dict()
|
||||
while stamp == None and active_jobs[message_id] == False:
|
||||
while stamp == None:
|
||||
job_procs = []
|
||||
try:
|
||||
for pnum in range(jobs):
|
||||
@ -306,8 +260,6 @@ def job_android(stamp_cost, workblock, message_id):
|
||||
RNS.log(f"Stamp generation job error: {e}")
|
||||
RNS.trace_exception(e)
|
||||
|
||||
active_jobs.pop(message_id)
|
||||
|
||||
return stamp, total_rounds
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -35,7 +35,6 @@ import time
|
||||
import os
|
||||
|
||||
from LXMF._version import __version__
|
||||
from LXMF import APP_NAME
|
||||
|
||||
from RNS.vendor.configobj import ConfigObj
|
||||
|
||||
@ -127,7 +126,7 @@ def apply_config():
|
||||
if active_configuration["message_storage_limit"] < 0.005:
|
||||
active_configuration["message_storage_limit"] = 0.005
|
||||
else:
|
||||
active_configuration["message_storage_limit"] = 500
|
||||
active_configuration["message_storage_limit"] = 2000
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
|
||||
@ -141,24 +140,6 @@ def apply_config():
|
||||
else:
|
||||
active_configuration["prioritised_lxmf_destinations"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]:
|
||||
static_peers = lxmd_config["propagation"].as_list("static_peers")
|
||||
active_configuration["static_peers"] = []
|
||||
for static_peer in static_peers:
|
||||
active_configuration["static_peers"].append(bytes.fromhex(static_peer))
|
||||
else:
|
||||
active_configuration["static_peers"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]:
|
||||
active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers")
|
||||
else:
|
||||
active_configuration["max_peers"] = None
|
||||
|
||||
if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]:
|
||||
active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only")
|
||||
else:
|
||||
active_configuration["from_static_only"] = False
|
||||
|
||||
# Load various settings
|
||||
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
|
||||
targetloglevel = lxmd_config["logging"].as_int("loglevel")
|
||||
@ -324,10 +305,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
||||
autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
|
||||
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
|
||||
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
|
||||
max_peers = active_configuration["max_peers"],
|
||||
static_peers = active_configuration["static_peers"],
|
||||
from_static_only = active_configuration["from_static_only"])
|
||||
|
||||
)
|
||||
message_router.register_delivery_callback(lxmf_delivery)
|
||||
|
||||
for destination_hash in active_configuration["ignored_lxmf_destinations"]:
|
||||
@ -384,13 +362,13 @@ def jobs():
|
||||
try:
|
||||
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
|
||||
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE)
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
last_peer_announce = time.time()
|
||||
|
||||
if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None:
|
||||
if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE)
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
|
||||
message_router.announce_propagation_node()
|
||||
last_node_announce = time.time()
|
||||
|
||||
@ -403,7 +381,7 @@ def deferred_start_jobs():
|
||||
global active_configuration, last_peer_announce, last_node_announce
|
||||
global message_router, lxmf_destination
|
||||
time.sleep(DEFFERED_JOBS_DELAY)
|
||||
RNS.log("Running deferred start jobs", RNS.LOG_DEBUG)
|
||||
RNS.log("Running deferred start jobs")
|
||||
if active_configuration["peer_announce_at_start"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
@ -416,190 +394,6 @@ def deferred_start_jobs():
|
||||
last_node_announce = time.time()
|
||||
threading.Thread(target=jobs, daemon=True).start()
|
||||
|
||||
def query_status(identity, timeout=5, exit_on_fail=False):
|
||||
control_destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
if exit_on_fail:
|
||||
RNS.log("Getting lxmd statistics timed out, exiting now", RNS.LOG_ERROR)
|
||||
exit(200)
|
||||
else:
|
||||
return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
|
||||
if not RNS.Transport.has_path(control_destination.hash):
|
||||
RNS.Transport.request_path(control_destination.hash)
|
||||
while not RNS.Transport.has_path(control_destination.hash):
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link = RNS.Link(control_destination)
|
||||
while not link.status == RNS.Link.ACTIVE:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.identify(identity)
|
||||
request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None)
|
||||
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.teardown()
|
||||
return request_receipt.get_response()
|
||||
|
||||
def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = 0, timeout=5, show_status=False, show_peers=False, identity_path=None):
|
||||
global configpath, identitypath, storagedir, lxmdir
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
targetlogdest = RNS.LOG_STDOUT
|
||||
|
||||
if identity_path == None:
|
||||
if configdir == None:
|
||||
if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"):
|
||||
configdir = "/etc/lxmd"
|
||||
elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"):
|
||||
configdir = RNS.Reticulum.userdir+"/.config/lxmd"
|
||||
else:
|
||||
configdir = RNS.Reticulum.userdir+"/.lxmd"
|
||||
|
||||
configpath = configdir+"/config"
|
||||
identitypath = configdir+"/identity"
|
||||
identity = None
|
||||
|
||||
if not os.path.isdir(configdir):
|
||||
RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR)
|
||||
exit(201)
|
||||
if not os.path.isfile(identitypath):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identitypath)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
else:
|
||||
if not os.path.isfile(identity_path):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identity_path)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
if targetloglevel == None:
|
||||
targetloglevel = 3
|
||||
if verbosity != 0 or quietness != 0:
|
||||
targetloglevel = targetloglevel+verbosity-quietness
|
||||
|
||||
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
|
||||
response = query_status(identity, timeout=timeout, exit_on_fail=True)
|
||||
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
|
||||
RNS.log("Remote received no identity")
|
||||
exit(203)
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
|
||||
RNS.log("Access denied")
|
||||
exit(204)
|
||||
else:
|
||||
s = response
|
||||
mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2)
|
||||
ms_util = f"{mutil}%"
|
||||
if s["from_static_only"]:
|
||||
who_str = "static peers only"
|
||||
else:
|
||||
who_str = "all nodes"
|
||||
|
||||
available_peers = 0
|
||||
unreachable_peers = 0
|
||||
peered_incoming = 0
|
||||
peered_outgoing = 0
|
||||
peered_rx_bytes = 0
|
||||
peered_tx_bytes = 0
|
||||
for peer_id in s["peers"]:
|
||||
p = s["peers"][peer_id]
|
||||
pm = p["messages"]
|
||||
peered_incoming += pm["incoming"]
|
||||
peered_outgoing += pm["outgoing"]
|
||||
peered_rx_bytes += p["rx_bytes"]
|
||||
peered_tx_bytes += p["tx_bytes"]
|
||||
if p["alive"]:
|
||||
available_peers += 1
|
||||
else:
|
||||
unreachable_peers += 1
|
||||
|
||||
total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"]
|
||||
total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"]
|
||||
df = round(peered_outgoing/total_incoming, 2)
|
||||
|
||||
dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"])
|
||||
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
|
||||
|
||||
if show_status:
|
||||
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
|
||||
ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
|
||||
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
|
||||
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
|
||||
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
|
||||
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
|
||||
print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit")
|
||||
print(f"")
|
||||
print(f"Peers : {stp} total (peer limit is {smp})")
|
||||
print(f" {sdp} discovered, {ssp} static")
|
||||
print(f" {available_peers} available, {unreachable_peers} unreachable")
|
||||
print(f"")
|
||||
print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})")
|
||||
print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})")
|
||||
print(f" {upi} messages received from unpeered nodes ({uprx})")
|
||||
print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})")
|
||||
print(f" {cprr} propagation messages received directly from clients")
|
||||
print(f" {cprs} propagation messages served to clients")
|
||||
print(f" Distribution factor is {df}")
|
||||
print(f"")
|
||||
|
||||
if show_peers:
|
||||
if not show_status:
|
||||
print("")
|
||||
|
||||
for peer_id in s["peers"]:
|
||||
ind = " "
|
||||
p = s["peers"][peer_id]
|
||||
if p["type"] == "static":
|
||||
t = "Static peer "
|
||||
elif p["type"] == "discovered":
|
||||
t = "Discovered peer "
|
||||
else:
|
||||
t = "Unknown peer "
|
||||
a = "Available" if p["alive"] == True else "Unreachable"
|
||||
h = max(time.time()-p["last_heard"], 0)
|
||||
hops = p["network_distance"]
|
||||
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
|
||||
pm = p["messages"]
|
||||
if p["last_sync_attempt"] != 0:
|
||||
lsa = p["last_sync_attempt"]
|
||||
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
|
||||
else:
|
||||
ls = "never synced"
|
||||
|
||||
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000)
|
||||
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
|
||||
pmi = pm["incoming"]; pmuh = pm["unhandled"]
|
||||
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
|
||||
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
|
||||
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit")
|
||||
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming")
|
||||
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
|
||||
ms = "" if pm["unhandled"] == 1 else "s"
|
||||
print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}")
|
||||
print("")
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
|
||||
@ -610,10 +404,6 @@ def main():
|
||||
parser.add_argument("-v", "--verbose", action="count", default=0)
|
||||
parser.add_argument("-q", "--quiet", action="count", default=0)
|
||||
parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file")
|
||||
parser.add_argument("--status", action="store_true", default=False, help="display node status")
|
||||
parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes")
|
||||
parser.add_argument("--timeout", action="store", default=5, help="timeout in seconds for query operations", type=float)
|
||||
parser.add_argument("--identity", action="store", default=None, help="path to identity used for query request", type=str)
|
||||
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
|
||||
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
|
||||
|
||||
@ -623,24 +413,15 @@ def main():
|
||||
print(__default_lxmd_config__)
|
||||
exit()
|
||||
|
||||
if args.status or args.peers:
|
||||
get_status(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
timeout=args.timeout,
|
||||
show_status=args.status,
|
||||
show_peers=args.peers,
|
||||
identity_path=args.identity)
|
||||
exit()
|
||||
|
||||
program_setup(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service)
|
||||
program_setup(
|
||||
configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service
|
||||
)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
@ -696,9 +477,9 @@ propagation_transfer_max_accepted_size = 256
|
||||
# LXMF prioritises keeping messages that are
|
||||
# new and small. Large and old messages will
|
||||
# be removed first. This setting is optional
|
||||
# and defaults to 500 megabytes.
|
||||
# and defaults to 2 gigabytes.
|
||||
|
||||
# message_storage_limit = 500
|
||||
# message_storage_limit = 2000
|
||||
|
||||
# You can tell the LXMF message router to
|
||||
# prioritise storage for one or more
|
||||
@ -710,25 +491,6 @@ propagation_transfer_max_accepted_size = 256
|
||||
|
||||
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
|
||||
|
||||
# You can configure the maximum number of other
|
||||
# propagation nodes that this node will peer
|
||||
# with automatically. The default is 50.
|
||||
|
||||
# max_peers = 25
|
||||
|
||||
# You can configure a list of static propagation
|
||||
# node peers, that this node will always be
|
||||
# peered with, by specifying a list of
|
||||
# destination hashes.
|
||||
|
||||
# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4
|
||||
|
||||
# You can configure the propagation node to
|
||||
# only accept incoming propagation messages
|
||||
# from configured static peers.
|
||||
|
||||
# from_static_only = True
|
||||
|
||||
# By default, any destination is allowed to
|
||||
# connect and download messages, but you can
|
||||
# optionally restrict this. If you enable
|
||||
|
@ -1 +1 @@
|
||||
__version__ = "0.6.3"
|
||||
__version__ = "0.5.1"
|
||||
|
@ -12,7 +12,6 @@ User-facing clients built on LXMF include:
|
||||
|
||||
Community-provided tools and utilities for LXMF include:
|
||||
|
||||
- [LXMFy](https://lxmfy.quad4.io/)
|
||||
- [LXMF-Bot](https://github.com/randogoth/lxmf-bot)
|
||||
- [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard)
|
||||
- [LXMEvent](https://github.com/faragher/LXMEvent)
|
||||
|
@ -29,8 +29,8 @@ def delivery_callback(message):
|
||||
RNS.log("\t| Destination instance : "+str(message.get_destination()))
|
||||
RNS.log("\t| Transport Encryption : "+str(message.transport_encryption))
|
||||
RNS.log("\t| Timestamp : "+time_string)
|
||||
RNS.log("\t| Title : "+str(message.title_as_string()))
|
||||
RNS.log("\t| Content : "+str(message.content_as_string()))
|
||||
RNS.log("\t| Title : "+message.title_as_string())
|
||||
RNS.log("\t| Content : "+message.content_as_string())
|
||||
RNS.log("\t| Fields : "+str(message.fields))
|
||||
if message.ratchet_id:
|
||||
RNS.log("\t| Ratchet : "+str(RNS.Identity._get_ratchet_id(message.ratchet_id)))
|
||||
@ -69,4 +69,4 @@ while True:
|
||||
|
||||
# input()
|
||||
# RNS.log("Requesting messages from propagation node...")
|
||||
# router.request_messages_from_propagation_node(identity)
|
||||
# router.request_messages_from_propagation_node(identity)
|
@ -1,2 +1,3 @@
|
||||
qrcode>=7.4.2
|
||||
rns>=0.9.1
|
||||
qrcode==7.4.2
|
||||
rns==0.7.7
|
||||
setuptools==70.0.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user