mirror of
https://github.com/markqvist/LXMF.git
synced 2025-11-18 14:49:58 -05:00
Compare commits
112 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bc7522b63d | ||
|
|
39e398be65 | ||
|
|
ee15e9f0b6 | ||
|
|
00ffbc09fe | ||
|
|
dca6cc2adc | ||
|
|
62038573f1 | ||
|
|
fa2d78c351 | ||
|
|
f18ce9ea99 | ||
|
|
d7a2979dd0 | ||
|
|
0b067914ea | ||
|
|
bc3f4ecff5 | ||
|
|
99830b6e8b | ||
|
|
1b5dc419b5 | ||
|
|
9c5fa4a066 | ||
|
|
383d953e06 | ||
|
|
557887d13f | ||
|
|
e1905b85d7 | ||
|
|
8e3ffb0d2a | ||
|
|
f383450b37 | ||
|
|
747ddbddd5 | ||
|
|
d0f3385f75 | ||
|
|
401328fa16 | ||
|
|
4350a239e4 | ||
|
|
9dc998f149 | ||
|
|
fa9fd2ae01 | ||
|
|
0cebd5886d | ||
|
|
b35b9213a6 | ||
|
|
df6271a026 | ||
|
|
4afb92bf3e | ||
|
|
0a5edb2895 | ||
|
|
704b37dc16 | ||
|
|
606a723e31 | ||
|
|
a44c1f368a | ||
|
|
434267784d | ||
|
|
9c646aead7 | ||
|
|
ebc8bb33c2 | ||
|
|
60bf99d151 | ||
|
|
c84aea745a | ||
|
|
a62ffa12b1 | ||
|
|
6446db4f11 | ||
|
|
81a6d503a3 | ||
|
|
c28d3b1432 | ||
|
|
d8b25e092f | ||
|
|
aca5bf9c14 | ||
|
|
bd6fe9f9d1 | ||
|
|
0f2d3b06c2 | ||
|
|
3f91e44a6d | ||
|
|
8f54d40abf | ||
|
|
9beeafb0c8 | ||
|
|
b572723a5e | ||
|
|
6cf7852271 | ||
|
|
e17263d25a | ||
|
|
16dfbc22cd | ||
|
|
98347d3ad9 | ||
|
|
61b75526d2 | ||
|
|
85d8f4f583 | ||
|
|
5b9f121593 | ||
|
|
416ccf294f | ||
|
|
787cd069dc | ||
|
|
c2207d1eb7 | ||
|
|
a9622e3a33 | ||
|
|
499fe4cc53 | ||
|
|
37e99910ec | ||
|
|
005d71707c | ||
|
|
1bdcf6ad53 | ||
|
|
e6021b8fed | ||
|
|
326c0eed8f | ||
|
|
336792c07a | ||
|
|
570d2c6846 | ||
|
|
1ef4665073 | ||
|
|
d5540b927f | ||
|
|
a6cf585109 | ||
|
|
c0a8f3be49 | ||
|
|
7b4780cfb7 | ||
|
|
b94a712bb6 | ||
|
|
f42ccfc4e9 | ||
|
|
9eca747757 | ||
|
|
b7b6753640 | ||
|
|
40d0b9a5de | ||
|
|
40fc75f559 | ||
|
|
f1d060a92e | ||
|
|
e0e901291e | ||
|
|
886ac69a82 | ||
|
|
e0163e100a | ||
|
|
26a10cce8f | ||
|
|
cec903a4dc | ||
|
|
962d9c90d1 | ||
|
|
6d2eb4f973 | ||
|
|
a8cc5f41cf | ||
|
|
aa57b16cf5 | ||
|
|
cdea838a6c | ||
|
|
fb4bf9b0b9 | ||
|
|
a3e3868f92 | ||
|
|
70186cf8d9 | ||
|
|
fe59b265c5 | ||
|
|
a87458d25f | ||
|
|
35dd70c59e | ||
|
|
a198e96064 | ||
|
|
e3be7e0cfd | ||
|
|
460645cea2 | ||
|
|
f683e03891 | ||
|
|
2c71cea7a0 | ||
|
|
61b1ecce27 | ||
|
|
68257a441f | ||
|
|
e69da2ed2a | ||
|
|
c2a08ef355 | ||
|
|
1430b1ce90 | ||
|
|
1c9c744107 | ||
|
|
bfed126a7c | ||
|
|
44d1d992f8 | ||
|
|
7701f326d9 | ||
|
|
356cb6412f |
14 changed files with 2233 additions and 534 deletions
2
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
2
.github/ISSUE_TEMPLATE/🐛-bug-report.md
vendored
|
|
@ -12,7 +12,7 @@ Before creating a bug report on this issue tracker, you **must** read the [Contr
|
|||
|
||||
- The issue tracker is used by developers of this project. **Do not use it to ask general questions, or for support requests**.
|
||||
- Ideas and feature requests can be made on the [Discussions](https://github.com/markqvist/Reticulum/discussions). **Only** feature requests accepted by maintainers and developers are tracked and included on the issue tracker. **Do not post feature requests here**.
|
||||
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), delete this section from your bug report.
|
||||
- After reading the [Contribution Guidelines](https://github.com/markqvist/Reticulum/blob/master/Contributing.md), **delete this section only** (*"Read the Contribution Guidelines"*) from your bug report, **and fill in all the other sections**.
|
||||
|
||||
**Describe the Bug**
|
||||
A clear and concise description of what the bug is.
|
||||
|
|
|
|||
3
FUNDING.yml
Normal file
3
FUNDING.yml
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
liberapay: Reticulum
|
||||
ko_fi: markqvist
|
||||
custom: "https://unsigned.io/donate"
|
||||
16
LICENSE
16
LICENSE
|
|
@ -1,6 +1,6 @@
|
|||
MIT License
|
||||
Reticulum License
|
||||
|
||||
Copyright (c) 2020 Mark Qvist / unsigned.io
|
||||
Copyright (c) 2020-2025 Mark Qvist
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
|
@ -9,8 +9,16 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
- The Software shall not be used in any kind of system which includes amongst
|
||||
its functions the ability to purposefully do harm to human beings.
|
||||
|
||||
- The Software shall not be used, directly or indirectly, in the creation of
|
||||
an artificial intelligence, machine learning or language model training
|
||||
dataset, including but not limited to any use that contributes to the
|
||||
training or development of such a model or algorithm.
|
||||
|
||||
- The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import time
|
||||
import threading
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
|
|
@ -7,20 +8,21 @@ from .LXMessage import LXMessage
|
|||
|
||||
class LXMFDeliveryAnnounceHandler:
|
||||
def __init__(self, lxmrouter):
|
||||
self.aspect_filter = APP_NAME+".delivery"
|
||||
self.aspect_filter = APP_NAME+".delivery"
|
||||
self.receive_path_responses = True
|
||||
self.lxmrouter = lxmrouter
|
||||
self.lxmrouter = lxmrouter
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data):
|
||||
for lxmessage in self.lxmrouter.pending_outbound:
|
||||
if destination_hash == lxmessage.destination_hash:
|
||||
if destination_hash == lxmessage.destination_hash:
|
||||
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
|
||||
lxmessage.next_delivery_attempt = time.time()
|
||||
|
||||
while self.lxmrouter.processing_outbound:
|
||||
time.sleep(0.1)
|
||||
def outbound_trigger():
|
||||
while self.lxmrouter.processing_outbound: time.sleep(0.1)
|
||||
self.lxmrouter.process_outbound()
|
||||
|
||||
self.lxmrouter.process_outbound()
|
||||
threading.Thread(target=outbound_trigger, daemon=True).start()
|
||||
|
||||
try:
|
||||
stamp_cost = stamp_cost_from_app_data(app_data)
|
||||
|
|
@ -32,32 +34,58 @@ class LXMFDeliveryAnnounceHandler:
|
|||
|
||||
class LXMFPropagationAnnounceHandler:
|
||||
def __init__(self, lxmrouter):
|
||||
self.aspect_filter = APP_NAME+".propagation"
|
||||
self.receive_path_responses = False
|
||||
self.lxmrouter = lxmrouter
|
||||
self.aspect_filter = APP_NAME+".propagation"
|
||||
self.receive_path_responses = True
|
||||
self.lxmrouter = lxmrouter
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data):
|
||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash, is_path_response):
|
||||
try:
|
||||
if type(app_data) == bytes:
|
||||
if self.lxmrouter.propagation_node and self.lxmrouter.autopeer:
|
||||
data = msgpack.unpackb(app_data)
|
||||
if self.lxmrouter.propagation_node:
|
||||
if pn_announce_data_is_valid(app_data):
|
||||
data = msgpack.unpackb(app_data)
|
||||
node_timebase = int(data[1])
|
||||
propagation_enabled = data[2]
|
||||
propagation_transfer_limit = int(data[3])
|
||||
propagation_sync_limit = int(data[4])
|
||||
propagation_stamp_cost = int(data[5][0])
|
||||
propagation_stamp_cost_flexibility = int(data[5][1])
|
||||
peering_cost = int(data[5][2])
|
||||
metadata = data[6]
|
||||
|
||||
if destination_hash in self.lxmrouter.static_peers:
|
||||
static_peer = self.lxmrouter.peers[destination_hash]
|
||||
if not is_path_response or static_peer.last_heard == 0:
|
||||
self.lxmrouter.peer(destination_hash=destination_hash,
|
||||
timestamp=node_timebase,
|
||||
propagation_transfer_limit=propagation_transfer_limit,
|
||||
propagation_sync_limit=propagation_sync_limit,
|
||||
propagation_stamp_cost=propagation_stamp_cost,
|
||||
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
|
||||
peering_cost=peering_cost,
|
||||
metadata=metadata)
|
||||
|
||||
if pn_announce_data_is_valid(data):
|
||||
node_timebase = data[1]
|
||||
propagation_transfer_limit = None
|
||||
if len(data) >= 3:
|
||||
try:
|
||||
propagation_transfer_limit = float(data[2])
|
||||
except:
|
||||
propagation_transfer_limit = None
|
||||
else:
|
||||
if self.lxmrouter.autopeer and not is_path_response:
|
||||
if propagation_enabled == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash=destination_hash,
|
||||
timestamp=node_timebase,
|
||||
propagation_transfer_limit=propagation_transfer_limit,
|
||||
propagation_sync_limit=propagation_sync_limit,
|
||||
propagation_stamp_cost=propagation_stamp_cost,
|
||||
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
|
||||
peering_cost=peering_cost,
|
||||
metadata=metadata)
|
||||
|
||||
if data[0] == True:
|
||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
|
||||
else:
|
||||
if destination_hash in self.lxmrouter.peers:
|
||||
RNS.log(f"Peer {self.lxmrouter.peers[destination_hash]} moved outside auto-peering range, breaking peering...")
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
|
||||
elif data[0] == False:
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
elif propagation_enabled == False:
|
||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)
|
||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
|
||||
RNS.log(f"The contained exception was: {str(e)}", RNS.LOG_DEBUG)
|
||||
|
|
|
|||
83
LXMF/LXMF.py
83
LXMF/LXMF.py
|
|
@ -91,6 +91,18 @@ RENDERER_MICRON = 0x01
|
|||
RENDERER_MARKDOWN = 0x02
|
||||
RENDERER_BBCODE = 0x03
|
||||
|
||||
# Optional propagation node metadata fields. These
|
||||
# fields may be highly unstable in allocation and
|
||||
# availability until the version 1.0.0 release, so use
|
||||
# at your own risk until then, and expect changes!
|
||||
PN_META_VERSION = 0x00
|
||||
PN_META_NAME = 0x01
|
||||
PN_META_SYNC_STRATUM = 0x02
|
||||
PN_META_SYNC_THROTTLE = 0x03
|
||||
PN_META_AUTH_BAND = 0x04
|
||||
PN_META_UTIL_PRESSURE = 0x05
|
||||
PN_META_CUSTOM = 0xFF
|
||||
|
||||
##########################################################
|
||||
# The following helper functions makes it easier to #
|
||||
# handle and operate on LXMF data in client programs #
|
||||
|
|
@ -99,21 +111,17 @@ RENDERER_BBCODE = 0x03
|
|||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
def display_name_from_app_data(app_data=None):
|
||||
if app_data == None:
|
||||
return None
|
||||
elif len(app_data) == 0:
|
||||
return None
|
||||
if app_data == None: return None
|
||||
elif len(app_data) == 0: return None
|
||||
else:
|
||||
# Version 0.5.0+ announce format
|
||||
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
|
||||
peer_data = msgpack.unpackb(app_data)
|
||||
if type(peer_data) == list:
|
||||
if len(peer_data) < 1:
|
||||
return None
|
||||
if len(peer_data) < 1: return None
|
||||
else:
|
||||
dn = peer_data[0]
|
||||
if dn == None:
|
||||
return None
|
||||
if dn == None: return None
|
||||
else:
|
||||
try:
|
||||
decoded = dn.decode("utf-8")
|
||||
|
|
@ -127,36 +135,61 @@ def display_name_from_app_data(app_data=None):
|
|||
return app_data.decode("utf-8")
|
||||
|
||||
def stamp_cost_from_app_data(app_data=None):
|
||||
if app_data == None or app_data == b"":
|
||||
return None
|
||||
if app_data == None or app_data == b"": return None
|
||||
else:
|
||||
# Version 0.5.0+ announce format
|
||||
if (app_data[0] >= 0x90 and app_data[0] <= 0x9f) or app_data[0] == 0xdc:
|
||||
peer_data = msgpack.unpackb(app_data)
|
||||
if type(peer_data) == list:
|
||||
if len(peer_data) < 2:
|
||||
return None
|
||||
else:
|
||||
return peer_data[1]
|
||||
if len(peer_data) < 2: return None
|
||||
else: return peer_data[1]
|
||||
|
||||
# Original announce format
|
||||
else: return None
|
||||
|
||||
def pn_name_from_app_data(app_data=None):
|
||||
if app_data == None: return None
|
||||
else:
|
||||
if pn_announce_data_is_valid(app_data):
|
||||
data = msgpack.unpackb(app_data)
|
||||
metadata = data[6]
|
||||
if not PN_META_NAME in metadata: return None
|
||||
else:
|
||||
try: return metadata[PN_META_NAME].decode("utf-8")
|
||||
except: return None
|
||||
|
||||
return None
|
||||
|
||||
def pn_stamp_cost_from_app_data(app_data=None):
|
||||
if app_data == None: return None
|
||||
else:
|
||||
if pn_announce_data_is_valid(app_data):
|
||||
data = msgpack.unpackb(app_data)
|
||||
return data[5][0]
|
||||
else:
|
||||
return None
|
||||
|
||||
def pn_announce_data_is_valid(data):
|
||||
try:
|
||||
if type(data) == bytes:
|
||||
data = msgpack.unpackb(data)
|
||||
|
||||
if len(data) < 3:
|
||||
raise ValueError("Invalid announce data: Insufficient peer data")
|
||||
if type(data) != bytes: return False
|
||||
else: data = msgpack.unpackb(data)
|
||||
if len(data) < 7: raise ValueError("Invalid announce data: Insufficient peer data, likely from deprecated LXMF version")
|
||||
else:
|
||||
if data[0] != True and data[0] != False:
|
||||
raise ValueError("Invalid announce data: Indeterminate propagation node status")
|
||||
try:
|
||||
int(data[1])
|
||||
except:
|
||||
raise ValueError("Invalid announce data: Could not decode peer timebase")
|
||||
try: int(data[1])
|
||||
except: raise ValueError("Invalid announce data: Could not decode timebase")
|
||||
if data[2] != True and data[2] != False: raise ValueError("Invalid announce data: Indeterminate propagation node status")
|
||||
try: int(data[3])
|
||||
except: raise ValueError("Invalid announce data: Could not decode propagation transfer limit")
|
||||
try: int(data[4])
|
||||
except: raise ValueError("Invalid announce data: Could not decode propagation sync limit")
|
||||
if type(data[5]) != list: raise ValueError("Invalid announce data: Could not decode stamp costs")
|
||||
try: int(data[5][0])
|
||||
except: raise ValueError("Invalid announce data: Could not decode target stamp cost")
|
||||
try: int(data[5][1])
|
||||
except: raise ValueError("Invalid announce data: Could not decode stamp cost flexibility")
|
||||
try: int(data[5][2])
|
||||
except: raise ValueError("Invalid announce data: Could not decode peering cost")
|
||||
if type(data[6]) != dict: raise ValueError("Invalid announce data: Could not decode metadata")
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG)
|
||||
|
|
|
|||
531
LXMF/LXMPeer.py
531
LXMF/LXMPeer.py
|
|
@ -1,24 +1,38 @@
|
|||
import os
|
||||
import time
|
||||
import threading
|
||||
|
||||
import RNS
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
import LXMF.LXStamper as LXStamper
|
||||
|
||||
from collections import deque
|
||||
from .LXMF import APP_NAME
|
||||
from .LXMF import PN_META_NAME
|
||||
|
||||
class LXMPeer:
|
||||
OFFER_REQUEST_PATH = "/offer"
|
||||
MESSAGE_GET_PATH = "/get"
|
||||
|
||||
IDLE = 0x00
|
||||
LINK_ESTABLISHING = 0x01
|
||||
LINK_READY = 0x02
|
||||
REQUEST_SENT = 0x03
|
||||
RESPONSE_RECEIVED = 0x04
|
||||
IDLE = 0x00
|
||||
LINK_ESTABLISHING = 0x01
|
||||
LINK_READY = 0x02
|
||||
REQUEST_SENT = 0x03
|
||||
RESPONSE_RECEIVED = 0x04
|
||||
RESOURCE_TRANSFERRING = 0x05
|
||||
|
||||
ERROR_NO_IDENTITY = 0xf0
|
||||
ERROR_NO_ACCESS = 0xf1
|
||||
ERROR_NO_IDENTITY = 0xf0
|
||||
ERROR_NO_ACCESS = 0xf1
|
||||
ERROR_INVALID_KEY = 0xf3
|
||||
ERROR_INVALID_DATA = 0xf4
|
||||
ERROR_INVALID_STAMP = 0xf5
|
||||
ERROR_THROTTLED = 0xf6
|
||||
ERROR_NOT_FOUND = 0xfd
|
||||
ERROR_TIMEOUT = 0xfe
|
||||
|
||||
STRATEGY_LAZY = 0x01
|
||||
STRATEGY_PERSISTENT = 0x02
|
||||
DEFAULT_SYNC_STRATEGY = STRATEGY_PERSISTENT
|
||||
|
||||
# Maximum amount of time a peer can
|
||||
# be unreachable before it is removed
|
||||
|
|
@ -38,48 +52,111 @@ class LXMPeer:
|
|||
@staticmethod
|
||||
def from_bytes(peer_bytes, router):
|
||||
dictionary = msgpack.unpackb(peer_bytes)
|
||||
peer_destination_hash = dictionary["destination_hash"]
|
||||
peer_peering_timebase = dictionary["peering_timebase"]
|
||||
peer_alive = dictionary["alive"]
|
||||
peer_last_heard = dictionary["last_heard"]
|
||||
|
||||
peer = LXMPeer(router, dictionary["destination_hash"])
|
||||
peer.peering_timebase = dictionary["peering_timebase"]
|
||||
peer.alive = dictionary["alive"]
|
||||
peer.last_heard = dictionary["last_heard"]
|
||||
if "link_establishment_rate" in dictionary:
|
||||
peer.link_establishment_rate = dictionary["link_establishment_rate"]
|
||||
else:
|
||||
peer.link_establishment_rate = 0
|
||||
peer = LXMPeer(router, peer_destination_hash)
|
||||
peer.peering_timebase = peer_peering_timebase
|
||||
peer.alive = peer_alive
|
||||
peer.last_heard = peer_last_heard
|
||||
|
||||
if "link_establishment_rate" in dictionary: peer.link_establishment_rate = dictionary["link_establishment_rate"]
|
||||
else: peer.link_establishment_rate = 0
|
||||
|
||||
if "sync_transfer_rate" in dictionary:
|
||||
peer.sync_transfer_rate = dictionary["sync_transfer_rate"]
|
||||
else:
|
||||
peer.sync_transfer_rate = 0
|
||||
if "sync_transfer_rate" in dictionary: peer.sync_transfer_rate = dictionary["sync_transfer_rate"]
|
||||
else: peer.sync_transfer_rate = 0
|
||||
|
||||
if "propagation_transfer_limit" in dictionary:
|
||||
try:
|
||||
peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"])
|
||||
except Exception as e:
|
||||
peer.propagation_transfer_limit = None
|
||||
else:
|
||||
peer.propagation_transfer_limit = None
|
||||
try: peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"])
|
||||
except Exception as e: peer.propagation_transfer_limit = None
|
||||
else: peer.propagation_transfer_limit = None
|
||||
|
||||
if "propagation_sync_limit" in dictionary:
|
||||
try: peer.propagation_sync_limit = int(dictionary["propagation_sync_limit"])
|
||||
except: peer.propagation_sync_limit = peer.propagation_transfer_limit
|
||||
else: peer.propagation_sync_limit = peer.propagation_transfer_limit
|
||||
|
||||
if "propagation_stamp_cost" in dictionary:
|
||||
try: peer.propagation_stamp_cost = int(dictionary["propagation_stamp_cost"])
|
||||
except: peer.propagation_stamp_cost = None
|
||||
else: peer.propagation_stamp_cost = None
|
||||
|
||||
if "propagation_stamp_cost_flexibility" in dictionary:
|
||||
try: peer.propagation_stamp_cost_flexibility = int(dictionary["propagation_stamp_cost_flexibility"])
|
||||
except: peer.propagation_stamp_cost_flexibility = None
|
||||
else: peer.propagation_stamp_cost_flexibility = None
|
||||
|
||||
if "peering_cost" in dictionary:
|
||||
try: peer.peering_cost = int(dictionary["peering_cost"])
|
||||
except: peer.peering_cost = None
|
||||
else: peer.peering_cost = None
|
||||
|
||||
if "sync_strategy" in dictionary:
|
||||
try: peer.sync_strategy = int(dictionary["sync_strategy"])
|
||||
except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY
|
||||
else: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY
|
||||
|
||||
if "offered" in dictionary: peer.offered = dictionary["offered"]
|
||||
else: peer.offered = 0
|
||||
if "outgoing" in dictionary: peer.outgoing = dictionary["outgoing"]
|
||||
else: peer.outgoing = 0
|
||||
if "incoming" in dictionary: peer.incoming = dictionary["incoming"]
|
||||
else: peer.incoming = 0
|
||||
if "rx_bytes" in dictionary: peer.rx_bytes = dictionary["rx_bytes"]
|
||||
else: peer.rx_bytes = 0
|
||||
if "tx_bytes" in dictionary: peer.tx_bytes = dictionary["tx_bytes"]
|
||||
else: peer.tx_bytes = 0
|
||||
if "last_sync_attempt" in dictionary: peer.last_sync_attempt = dictionary["last_sync_attempt"]
|
||||
else: peer.last_sync_attempt = 0
|
||||
if "peering_key" in dictionary: peer.peering_key = dictionary["peering_key"]
|
||||
else: peer.peering_key = None
|
||||
if "metadata" in dictionary: peer.metadata = dictionary["metadata"]
|
||||
else: peer.metadata = None
|
||||
|
||||
hm_count = 0
|
||||
for transient_id in dictionary["handled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
peer.add_handled_message(transient_id)
|
||||
hm_count += 1
|
||||
|
||||
um_count = 0
|
||||
for transient_id in dictionary["unhandled_ids"]:
|
||||
if transient_id in router.propagation_entries:
|
||||
peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id]
|
||||
peer.add_unhandled_message(transient_id)
|
||||
um_count += 1
|
||||
|
||||
peer._hm_count = hm_count
|
||||
peer._um_count = um_count
|
||||
peer._hm_counts_synced = True
|
||||
peer._um_counts_synced = True
|
||||
|
||||
del dictionary
|
||||
return peer
|
||||
|
||||
def to_bytes(self):
|
||||
dictionary = {}
|
||||
dictionary["peering_timebase"] = self.peering_timebase
|
||||
dictionary["alive"] = self.alive
|
||||
dictionary["metadata"] = self.metadata
|
||||
dictionary["last_heard"] = self.last_heard
|
||||
dictionary["sync_strategy"] = self.sync_strategy
|
||||
dictionary["peering_key"] = self.peering_key
|
||||
dictionary["destination_hash"] = self.destination_hash
|
||||
dictionary["link_establishment_rate"] = self.link_establishment_rate
|
||||
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
|
||||
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
|
||||
dictionary["propagation_sync_limit"] = self.propagation_sync_limit
|
||||
dictionary["propagation_stamp_cost"] = self.propagation_stamp_cost
|
||||
dictionary["propagation_stamp_cost_flexibility"] = self.propagation_stamp_cost_flexibility
|
||||
dictionary["peering_cost"] = self.peering_cost
|
||||
dictionary["last_sync_attempt"] = self.last_sync_attempt
|
||||
dictionary["offered"] = self.offered
|
||||
dictionary["outgoing"] = self.outgoing
|
||||
dictionary["incoming"] = self.incoming
|
||||
dictionary["rx_bytes"] = self.rx_bytes
|
||||
dictionary["tx_bytes"] = self.tx_bytes
|
||||
|
||||
handled_ids = []
|
||||
for transient_id in self.handled_messages:
|
||||
|
|
@ -92,24 +169,50 @@ class LXMPeer:
|
|||
dictionary["handled_ids"] = handled_ids
|
||||
dictionary["unhandled_ids"] = unhandled_ids
|
||||
|
||||
return msgpack.packb(dictionary)
|
||||
peer_bytes = msgpack.packb(dictionary)
|
||||
del dictionary
|
||||
|
||||
def __init__(self, router, destination_hash):
|
||||
self.alive = False
|
||||
self.last_heard = 0
|
||||
self.next_sync_attempt = 0
|
||||
self.last_sync_attempt = 0
|
||||
self.sync_backoff = 0
|
||||
self.peering_timebase = 0
|
||||
return peer_bytes
|
||||
|
||||
def __init__(self, router, destination_hash, sync_strategy=DEFAULT_SYNC_STRATEGY):
|
||||
self.alive = False
|
||||
self.last_heard = 0
|
||||
self.sync_strategy = sync_strategy
|
||||
self.peering_key = None
|
||||
self.peering_cost = None
|
||||
self.metadata = None
|
||||
|
||||
self.next_sync_attempt = 0
|
||||
self.last_sync_attempt = 0
|
||||
self.sync_backoff = 0
|
||||
self.peering_timebase = 0
|
||||
self.link_establishment_rate = 0
|
||||
self.sync_transfer_rate = 0
|
||||
self.propagation_transfer_limit = None
|
||||
self.sync_transfer_rate = 0
|
||||
|
||||
self.propagation_transfer_limit = None
|
||||
self.propagation_sync_limit = None
|
||||
self.propagation_stamp_cost = None
|
||||
self.propagation_stamp_cost_flexibility = None
|
||||
self.currently_transferring_messages = None
|
||||
self.handled_messages_queue = deque()
|
||||
self.unhandled_messages_queue = deque()
|
||||
|
||||
self.offered = 0 # Messages offered to this peer
|
||||
self.outgoing = 0 # Messages transferred to this peer
|
||||
self.incoming = 0 # Messages received from this peer
|
||||
self.rx_bytes = 0 # Bytes received from this peer
|
||||
self.tx_bytes = 0 # Bytes sent to this peer
|
||||
|
||||
self._hm_count = 0
|
||||
self._um_count = 0
|
||||
self._hm_counts_synced = False
|
||||
self._um_counts_synced = False
|
||||
|
||||
self._peering_key_lock = threading.Lock()
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
self.unhandled_messages = {}
|
||||
self.handled_messages = {}
|
||||
self.last_offer = []
|
||||
|
||||
self.router = router
|
||||
|
|
@ -118,13 +221,77 @@ class LXMPeer:
|
|||
if self.identity != None:
|
||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
else:
|
||||
self.destination = None
|
||||
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
|
||||
|
||||
def peering_key_ready(self):
|
||||
if not self.peering_cost: return False
|
||||
if type(self.peering_key) == list and len(self.peering_key) == 2:
|
||||
value = self.peering_key[1]
|
||||
if value >= self.peering_cost: return True
|
||||
else:
|
||||
RNS.log(f"Peering key value mismatch for {self}. Current value is {value}, but peer requires {self.peering_cost}. Scheduling regeneration...", RNS.LOG_WARNING)
|
||||
self.peering_key = None
|
||||
|
||||
return False
|
||||
|
||||
def peering_key_value(self):
|
||||
if type(self.peering_key) == list and len(self.peering_key) == 2: return self.peering_key[1]
|
||||
else: return None
|
||||
|
||||
def generate_peering_key(self):
|
||||
if self.peering_cost == None: return False
|
||||
with self._peering_key_lock:
|
||||
if self.peering_key != None: return True
|
||||
else:
|
||||
RNS.log(f"Generating peering key for {self}", RNS.LOG_NOTICE)
|
||||
if self.router.identity == None:
|
||||
RNS.log(f"Could not update peering key for {self} since the local LXMF router identity is not configured", RNS.LOG_ERROR)
|
||||
return False
|
||||
|
||||
if self.identity == None:
|
||||
self.identity = RNS.Identity.recall(destination_hash)
|
||||
if self.identity == None:
|
||||
RNS.log(f"Could not update peering key for {self} since its identity could not be recalled", RNS.LOG_ERROR)
|
||||
return False
|
||||
|
||||
key_material = self.identity.hash+self.router.identity.hash
|
||||
peering_key, value = LXStamper.generate_stamp(key_material, self.peering_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PEERING)
|
||||
if value >= self.peering_cost:
|
||||
self.peering_key = [peering_key, value]
|
||||
RNS.log(f"Peering key successfully generated for {self}", RNS.LOG_NOTICE)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def sync(self):
|
||||
RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
|
||||
self.last_sync_attempt = time.time()
|
||||
|
||||
if time.time() > self.next_sync_attempt:
|
||||
sync_time_reached = time.time() > self.next_sync_attempt
|
||||
stamp_costs_known = self.propagation_stamp_cost != None and self.propagation_stamp_cost_flexibility != None and self.peering_cost != None
|
||||
peering_key_ready = self.peering_key_ready()
|
||||
sync_checks = sync_time_reached and stamp_costs_known and peering_key_ready
|
||||
|
||||
if not sync_checks:
|
||||
try:
|
||||
if not sync_time_reached:
|
||||
postpone_reason = " due to previous failures"
|
||||
if self.last_sync_attempt > self.last_heard: self.alive = False
|
||||
elif not stamp_costs_known:
|
||||
postpone_reason = " since its required stamp costs are not yet known"
|
||||
elif not peering_key_ready:
|
||||
postpone_reason = " since a peering key has not been generated yet"
|
||||
def job(): self.generate_peering_key()
|
||||
threading.Thread(target=job, daemon=True).start()
|
||||
|
||||
delay = self.next_sync_attempt-time.time()
|
||||
postpone_delay = f" for {RNS.prettytime(delay)}" if delay > 0 else ""
|
||||
RNS.log(f"Postponing sync with peer {RNS.prettyhexrep(self.destination_hash)}{postpone_delay}{postpone_reason}", RNS.LOG_DEBUG)
|
||||
except Exception as e:
|
||||
RNS.trace_exception(e)
|
||||
|
||||
else:
|
||||
if not RNS.Transport.has_path(self.destination_hash):
|
||||
RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG)
|
||||
RNS.Transport.request_path(self.destination_hash)
|
||||
|
|
@ -140,7 +307,15 @@ class LXMPeer:
|
|||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
|
||||
if self.destination != None:
|
||||
if len(self.unhandled_messages) == 0:
|
||||
RNS.log(f"Sync requested for {self}, but no unhandled messages exist for peer. Sync complete.", RNS.LOG_DEBUG)
|
||||
return
|
||||
|
||||
if len(self.unhandled_messages) > 0:
|
||||
if self.currently_transferring_messages != None:
|
||||
RNS.log(f"Sync requested for {self}, but current message transfer index was not clear. Aborting.", RNS.LOG_ERROR)
|
||||
return
|
||||
|
||||
if self.state == LXMPeer.IDLE:
|
||||
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
|
||||
self.sync_backoff += LXMPeer.SYNC_BACKOFF_STEP
|
||||
|
|
@ -153,58 +328,69 @@ class LXMPeer:
|
|||
self.alive = True
|
||||
self.last_heard = time.time()
|
||||
self.sync_backoff = 0
|
||||
min_accepted_cost = min(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
|
||||
|
||||
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
|
||||
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing sync offer...", RNS.LOG_DEBUG)
|
||||
unhandled_entries = []
|
||||
unhandled_ids = []
|
||||
purged_ids = []
|
||||
unhandled_ids = []
|
||||
purged_ids = []
|
||||
low_value_ids = []
|
||||
for transient_id in self.unhandled_messages:
|
||||
if transient_id in self.router.propagation_entries:
|
||||
unhandled_entry = [
|
||||
transient_id,
|
||||
self.router.get_weight(transient_id),
|
||||
self.router.get_size(transient_id),
|
||||
]
|
||||
unhandled_entries.append(unhandled_entry)
|
||||
else:
|
||||
purged_ids.append(transient_id)
|
||||
if self.router.get_stamp_value(transient_id) < min_accepted_cost: low_value_ids.append(transient_id)
|
||||
else:
|
||||
unhandled_entry = [ transient_id,
|
||||
self.router.get_weight(transient_id),
|
||||
self.router.get_size(transient_id) ]
|
||||
|
||||
unhandled_entries.append(unhandled_entry)
|
||||
|
||||
else: purged_ids.append(transient_id)
|
||||
|
||||
for transient_id in purged_ids:
|
||||
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
|
||||
self.unhandled_messages.pop(transient_id)
|
||||
RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since it no longer exists in the message store.", RNS.LOG_DEBUG)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
for transient_id in low_value_ids:
|
||||
RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since its stamp value is lower than peer requirement of {min_accepted_cost}.", RNS.LOG_DEBUG)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
unhandled_entries.sort(key=lambda e: e[1], reverse=False)
|
||||
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
|
||||
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
|
||||
for unhandled_entry in unhandled_entries:
|
||||
transient_id = unhandled_entry[0]
|
||||
weight = unhandled_entry[1]
|
||||
lxm_size = unhandled_entry[2]
|
||||
next_size = cumulative_size + (lxm_size+per_message_overhead)
|
||||
if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000):
|
||||
pass
|
||||
else:
|
||||
cumulative_size += (lxm_size+per_message_overhead)
|
||||
unhandled_ids.append(transient_id)
|
||||
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
|
||||
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
|
||||
RNS.log(f"Syncing to peer with per-message limit {RNS.prettysize(self.propagation_transfer_limit*1000)} and sync limit {RNS.prettysize(self.propagation_sync_limit*1000)}") # TODO: Remove debug
|
||||
|
||||
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
|
||||
for unhandled_entry in unhandled_entries:
|
||||
transient_id = unhandled_entry[0]
|
||||
weight = unhandled_entry[1]
|
||||
lxm_size = unhandled_entry[2]
|
||||
lxm_transfer_size = lxm_size+per_message_overhead
|
||||
next_size = cumulative_size + lxm_transfer_size
|
||||
|
||||
if self.propagation_transfer_limit != None and lxm_transfer_size > (self.propagation_transfer_limit*1000):
|
||||
self.remove_unhandled_message(transient_id)
|
||||
self.add_handled_message(transient_id)
|
||||
continue
|
||||
|
||||
if self.propagation_sync_limit != None and next_size >= (self.propagation_sync_limit*1000):
|
||||
continue
|
||||
|
||||
cumulative_size += lxm_transfer_size
|
||||
unhandled_ids.append(transient_id)
|
||||
|
||||
offer = [self.peering_key[0], unhandled_ids]
|
||||
|
||||
RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)} ({RNS.prettysize(len(msgpack.packb(unhandled_ids)))})", RNS.LOG_VERBOSE)
|
||||
self.last_offer = unhandled_ids
|
||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, offer, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||
self.state = LXMPeer.REQUEST_SENT
|
||||
|
||||
else:
|
||||
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
|
||||
|
||||
else:
|
||||
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
|
||||
if self.last_sync_attempt > self.last_heard:
|
||||
self.alive = False
|
||||
RNS.log(f"Could not request sync to peer {RNS.prettyhexrep(self.destination_hash)} since its identity could not be recalled.", RNS.LOG_ERROR)
|
||||
|
||||
def request_failed(self, request_receipt):
|
||||
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
RNS.log(f"Sync request to peer {self.destination} failed", RNS.LOG_DEBUG)
|
||||
if self.link != None: self.link.teardown()
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def offer_response(self, request_receipt):
|
||||
|
|
@ -217,22 +403,35 @@ class LXMPeer:
|
|||
|
||||
if response == LXMPeer.ERROR_NO_IDENTITY:
|
||||
if self.link != None:
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG)
|
||||
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE)
|
||||
self.link.identify()
|
||||
self.state = LXMPeer.LINK_READY
|
||||
self.sync()
|
||||
return
|
||||
|
||||
elif response == LXMPeer.ERROR_NO_ACCESS:
|
||||
RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE)
|
||||
self.router.unpeer(self.destination_hash)
|
||||
return
|
||||
|
||||
elif response == LXMPeer.ERROR_THROTTLED:
|
||||
throttle_time = self.router.PN_STAMP_THROTTLE
|
||||
RNS.log(f"Remote indicated that we're throttled, postponing sync for {RNS.prettytime(throttle_time)}", RNS.LOG_VERBOSE)
|
||||
self.next_sync_attempt = time.time()+throttle_time
|
||||
return
|
||||
|
||||
elif response == False:
|
||||
# Peer already has all advertised messages
|
||||
for transient_id in self.last_offer:
|
||||
if transient_id in self.unhandled_messages:
|
||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
|
||||
elif response == True:
|
||||
# Peer wants all advertised messages
|
||||
for transient_id in self.last_offer:
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
else:
|
||||
|
|
@ -241,18 +440,17 @@ class LXMPeer:
|
|||
# If the peer did not want the message, it has
|
||||
# already received it from another peer.
|
||||
if not transient_id in response:
|
||||
if transient_id in self.unhandled_messages:
|
||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
for transient_id in response:
|
||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
||||
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||
wanted_message_ids.append(transient_id)
|
||||
|
||||
if len(wanted_messages) > 0:
|
||||
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG)
|
||||
RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} wanted {str(len(wanted_messages))} of the available messages", RNS.LOG_VERBOSE)
|
||||
|
||||
lxm_list = []
|
||||
|
||||
for message_entry in wanted_messages:
|
||||
file_path = message_entry[1]
|
||||
if os.path.isfile(file_path):
|
||||
|
|
@ -262,13 +460,15 @@ class LXMPeer:
|
|||
lxm_list.append(lxmf_data)
|
||||
|
||||
data = msgpack.packb([time.time(), lxm_list])
|
||||
RNS.log(f"Total transfer size for this sync is {RNS.prettysize(len(data))}", RNS.LOG_VERBOSE)
|
||||
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
||||
resource.transferred_messages = wanted_message_ids
|
||||
resource.sync_transfer_started = time.time()
|
||||
self.currently_transferring_messages = wanted_message_ids
|
||||
self.current_sync_transfer_started = time.time()
|
||||
self.state = LXMPeer.RESOURCE_TRANSFERRING
|
||||
|
||||
else:
|
||||
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
|
||||
RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} did not request any of the available messages, sync completed", RNS.LOG_VERBOSE)
|
||||
self.offered += len(self.last_offer)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
|
|
@ -287,32 +487,45 @@ class LXMPeer:
|
|||
|
||||
def resource_concluded(self, resource):
|
||||
if resource.status == RNS.Resource.COMPLETE:
|
||||
for transient_id in resource.transferred_messages:
|
||||
message = self.unhandled_messages.pop(transient_id)
|
||||
self.handled_messages[transient_id] = message
|
||||
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
if self.currently_transferring_messages == None:
|
||||
RNS.log(f"Sync transfer completed on {self}, but transferred message index was unavailable. Aborting.", RNS.LOG_ERROR)
|
||||
if self.link != None: self.link.teardown()
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
for transient_id in self.currently_transferring_messages:
|
||||
self.add_handled_message(transient_id)
|
||||
self.remove_unhandled_message(transient_id)
|
||||
|
||||
if self.link != None: self.link.teardown()
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
rate_str = ""
|
||||
if hasattr(resource, "sync_transfer_started") and resource.sync_transfer_started:
|
||||
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started)
|
||||
if self.current_sync_transfer_started != None:
|
||||
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-self.current_sync_transfer_started)
|
||||
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
|
||||
|
||||
RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed"+rate_str, RNS.LOG_DEBUG)
|
||||
self.alive = True
|
||||
RNS.log(f"Syncing {len(self.currently_transferring_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
|
||||
self.alive = True
|
||||
self.last_heard = time.time()
|
||||
self.offered += len(self.last_offer)
|
||||
self.outgoing += len(self.currently_transferring_messages)
|
||||
self.tx_bytes += resource.get_data_size()
|
||||
|
||||
self.currently_transferring_messages = None
|
||||
self.current_sync_transfer_started = None
|
||||
|
||||
if self.sync_strategy == self.STRATEGY_PERSISTENT:
|
||||
if self.unhandled_message_count > 0: self.sync()
|
||||
|
||||
else:
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
|
||||
if self.link != None:
|
||||
self.link.teardown()
|
||||
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE)
|
||||
if self.link != None: self.link.teardown()
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
self.currently_transferring_messages = None
|
||||
self.current_sync_transfer_started = None
|
||||
|
||||
def link_established(self, link):
|
||||
self.link.identify(self.router.identity)
|
||||
|
|
@ -325,15 +538,105 @@ class LXMPeer:
|
|||
self.sync()
|
||||
|
||||
def link_closed(self, link):
|
||||
self.link = None
|
||||
self.link = None
|
||||
self.state = LXMPeer.IDLE
|
||||
|
||||
def handle_message(self, transient_id):
|
||||
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
|
||||
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
|
||||
def queued_items(self):
|
||||
return len(self.handled_messages_queue) > 0 or len(self.unhandled_messages_queue) > 0
|
||||
|
||||
def __str__(self):
|
||||
if self.destination_hash:
|
||||
return RNS.prettyhexrep(self.destination_hash)
|
||||
def queue_unhandled_message(self, transient_id):
|
||||
self.unhandled_messages_queue.append(transient_id)
|
||||
|
||||
def queue_handled_message(self, transient_id):
|
||||
self.handled_messages_queue.append(transient_id)
|
||||
|
||||
def process_queues(self):
|
||||
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
|
||||
handled_messages = self.handled_messages
|
||||
unhandled_messages = self.unhandled_messages
|
||||
|
||||
while len(self.handled_messages_queue) > 0:
|
||||
transient_id = self.handled_messages_queue.pop()
|
||||
if not transient_id in handled_messages: self.add_handled_message(transient_id)
|
||||
if transient_id in unhandled_messages: self.remove_unhandled_message(transient_id)
|
||||
|
||||
while len(self.unhandled_messages_queue) > 0:
|
||||
transient_id = self.unhandled_messages_queue.pop()
|
||||
if not transient_id in handled_messages and not transient_id in unhandled_messages:
|
||||
self.add_unhandled_message(transient_id)
|
||||
|
||||
del handled_messages, unhandled_messages
|
||||
|
||||
@property
|
||||
def handled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
hm = list(filter(lambda tid: self.destination_hash in pes[tid][4], pes))
|
||||
self._hm_count = len(hm); del pes
|
||||
self._hm_counts_synced = True
|
||||
return hm
|
||||
|
||||
@property
|
||||
def unhandled_messages(self):
|
||||
pes = self.router.propagation_entries.copy()
|
||||
um = list(filter(lambda tid: self.destination_hash in pes[tid][5], pes))
|
||||
self._um_count = len(um); del pes
|
||||
self._um_counts_synced = True
|
||||
return um
|
||||
|
||||
@property
|
||||
def handled_message_count(self):
|
||||
if not self._hm_counts_synced: self._update_counts()
|
||||
return self._hm_count
|
||||
|
||||
@property
|
||||
def unhandled_message_count(self):
|
||||
if not self._um_counts_synced: self._update_counts()
|
||||
return self._um_count
|
||||
|
||||
@property
|
||||
def acceptance_rate(self):
|
||||
return 0 if self.offered == 0 else (self.outgoing/self.offered)
|
||||
|
||||
def _update_counts(self):
|
||||
if not self._hm_counts_synced:
|
||||
hm = self.handled_messages; del hm
|
||||
|
||||
if not self._um_counts_synced:
|
||||
um = self.unhandled_messages; del um
|
||||
|
||||
def add_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].append(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def add_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if not self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].append(self.destination_hash)
|
||||
self._um_count += 1
|
||||
|
||||
def remove_handled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][4]:
|
||||
self.router.propagation_entries[transient_id][4].remove(self.destination_hash)
|
||||
self._hm_counts_synced = False
|
||||
|
||||
def remove_unhandled_message(self, transient_id):
|
||||
if transient_id in self.router.propagation_entries:
|
||||
if self.destination_hash in self.router.propagation_entries[transient_id][5]:
|
||||
self.router.propagation_entries[transient_id][5].remove(self.destination_hash)
|
||||
self._um_counts_synced = False
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
if type(self.metadata) != dict: return None
|
||||
else:
|
||||
return "<Unknown>"
|
||||
if not PN_META_NAME in self.metadata: return None
|
||||
else:
|
||||
try: return self.metadata[PN_META_NAME].decode("utf-8")
|
||||
except: return None
|
||||
|
||||
def __str__(self):
|
||||
if self.destination_hash: return RNS.prettyhexrep(self.destination_hash)
|
||||
else: return "<Unknown>"
|
||||
1166
LXMF/LXMRouter.py
1166
LXMF/LXMRouter.py
File diff suppressed because it is too large
Load diff
|
|
@ -145,26 +145,32 @@ class LXMessage:
|
|||
|
||||
self.set_fields(fields)
|
||||
|
||||
self.payload = None
|
||||
self.timestamp = None
|
||||
self.signature = None
|
||||
self.hash = None
|
||||
self.packed = None
|
||||
self.state = LXMessage.GENERATING
|
||||
self.method = LXMessage.UNKNOWN
|
||||
self.progress = 0.0
|
||||
self.rssi = None
|
||||
self.snr = None
|
||||
self.q = None
|
||||
self.payload = None
|
||||
self.timestamp = None
|
||||
self.signature = None
|
||||
self.hash = None
|
||||
self.transient_id = None
|
||||
self.packed = None
|
||||
self.state = LXMessage.GENERATING
|
||||
self.method = LXMessage.UNKNOWN
|
||||
self.progress = 0.0
|
||||
self.rssi = None
|
||||
self.snr = None
|
||||
self.q = None
|
||||
|
||||
self.stamp = None
|
||||
self.stamp_cost = stamp_cost
|
||||
self.stamp_value = None
|
||||
self.stamp_valid = False
|
||||
self.stamp_checked = False
|
||||
self.defer_stamp = True
|
||||
self.outbound_ticket = None
|
||||
self.include_ticket = include_ticket
|
||||
self.stamp = None
|
||||
self.stamp_cost = stamp_cost
|
||||
self.stamp_value = None
|
||||
self.stamp_valid = False
|
||||
self.stamp_checked = False
|
||||
self.propagation_stamp = None
|
||||
self.propagation_stamp_value = None
|
||||
self.propagation_stamp_valid = False
|
||||
self.propagation_target_cost = None
|
||||
self.defer_stamp = True
|
||||
self.defer_propagation_stamp = True
|
||||
self.outbound_ticket = None
|
||||
self.include_ticket = include_ticket
|
||||
|
||||
self.propagation_packed = None
|
||||
self.paper_packed = None
|
||||
|
|
@ -184,6 +190,7 @@ class LXMessage:
|
|||
self.resource_representation = None
|
||||
self.__delivery_destination = None
|
||||
self.__delivery_callback = None
|
||||
self.__pn_encrypted_data = None
|
||||
self.failed_callback = None
|
||||
|
||||
self.deferred_stamp_generating = False
|
||||
|
|
@ -268,15 +275,6 @@ class LXMessage:
|
|||
def register_failed_callback(self, callback):
|
||||
self.failed_callback = callback
|
||||
|
||||
@staticmethod
|
||||
def stamp_valid(stamp, target_cost, workblock):
|
||||
target = 0b1 << 256-target_cost
|
||||
result = RNS.Identity.full_hash(workblock+stamp)
|
||||
if int.from_bytes(result, byteorder="big") > target:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def validate_stamp(self, target_cost, tickets=None):
|
||||
if tickets != None:
|
||||
for ticket in tickets:
|
||||
|
|
@ -293,7 +291,7 @@ class LXMessage:
|
|||
return False
|
||||
else:
|
||||
workblock = LXStamper.stamp_workblock(self.message_id)
|
||||
if LXMessage.stamp_valid(self.stamp, target_cost, workblock):
|
||||
if LXStamper.stamp_valid(self.stamp, target_cost, workblock):
|
||||
RNS.log(f"Stamp on {self} validated", RNS.LOG_DEBUG) # TODO: Remove at some point
|
||||
self.stamp_value = LXStamper.stamp_value(workblock, self.stamp)
|
||||
return True
|
||||
|
|
@ -333,10 +331,35 @@ class LXMessage:
|
|||
else:
|
||||
return None
|
||||
|
||||
def pack(self):
|
||||
def get_propagation_stamp(self, target_cost, timeout=None):
|
||||
# If a stamp was already generated, return
|
||||
# it immediately.
|
||||
if self.propagation_stamp != None:
|
||||
return self.propagation_stamp
|
||||
|
||||
# Otherwise, we will need to generate a
|
||||
# valid stamp according to the cost that
|
||||
# the propagation node has specified.
|
||||
else:
|
||||
self.propagation_target_cost = target_cost
|
||||
if self.propagation_target_cost == None:
|
||||
raise ValueError("Cannot generate propagation stamp without configured target propagation cost")
|
||||
|
||||
|
||||
if not self.transient_id: self.pack()
|
||||
generated_stamp, value = LXStamper.generate_stamp(self.transient_id, target_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN)
|
||||
if generated_stamp:
|
||||
self.propagation_stamp = generated_stamp
|
||||
self.propagation_stamp_value = value
|
||||
self.propagation_stamp_valid = True
|
||||
return generated_stamp
|
||||
|
||||
else:
|
||||
return None
|
||||
|
||||
def pack(self, payload_updated=False):
|
||||
if not self.packed:
|
||||
if self.timestamp == None:
|
||||
self.timestamp = time.time()
|
||||
if self.timestamp == None: self.timestamp = time.time()
|
||||
|
||||
self.propagation_packed = None
|
||||
self.paper_packed = None
|
||||
|
|
@ -352,9 +375,8 @@ class LXMessage:
|
|||
|
||||
if not self.defer_stamp:
|
||||
self.stamp = self.get_stamp()
|
||||
if self.stamp != None:
|
||||
self.payload.append(self.stamp)
|
||||
|
||||
if self.stamp != None: self.payload.append(self.stamp)
|
||||
|
||||
signed_part = b""
|
||||
signed_part += hashed_part
|
||||
signed_part += self.hash
|
||||
|
|
@ -380,7 +402,7 @@ class LXMessage:
|
|||
if self.desired_method == LXMessage.OPPORTUNISTIC:
|
||||
if self.__destination.type == RNS.Destination.SINGLE:
|
||||
if content_size > LXMessage.ENCRYPTED_PACKET_MAX_CONTENT:
|
||||
RNS.log(f"Opportunistic delivery was requested for {self}, but content exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
|
||||
RNS.log(f"Opportunistic delivery was requested for {self}, but content of length {content_size} exceeds packet size limit. Falling back to link-based delivery.", RNS.LOG_DEBUG)
|
||||
self.desired_method = LXMessage.DIRECT
|
||||
|
||||
# Set delivery parameters according to delivery method
|
||||
|
|
@ -409,9 +431,14 @@ class LXMessage:
|
|||
elif self.desired_method == LXMessage.PROPAGATED:
|
||||
single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT
|
||||
|
||||
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
|
||||
self.ratchet_id = self.__destination.latest_ratchet_id
|
||||
self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]])
|
||||
if self.__pn_encrypted_data == None or payload_updated:
|
||||
self.__pn_encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
|
||||
self.ratchet_id = self.__destination.latest_ratchet_id
|
||||
|
||||
lxmf_data = self.packed[:LXMessage.DESTINATION_LENGTH]+self.__pn_encrypted_data
|
||||
self.transient_id = RNS.Identity.full_hash(lxmf_data)
|
||||
if self.propagation_stamp != None: lxmf_data += self.propagation_stamp
|
||||
self.propagation_packed = msgpack.packb([time.time(), [lxmf_data]])
|
||||
|
||||
content_size = len(self.propagation_packed)
|
||||
if content_size <= single_packet_content_limit:
|
||||
|
|
|
|||
|
|
@ -3,25 +3,30 @@ import RNS.vendor.umsgpack as msgpack
|
|||
|
||||
import os
|
||||
import time
|
||||
import math
|
||||
import itertools
|
||||
import multiprocessing
|
||||
|
||||
WORKBLOCK_EXPAND_ROUNDS = 3000
|
||||
WORKBLOCK_EXPAND_ROUNDS = 3000
|
||||
WORKBLOCK_EXPAND_ROUNDS_PN = 1000
|
||||
WORKBLOCK_EXPAND_ROUNDS_PEERING = 25
|
||||
STAMP_SIZE = RNS.Identity.HASHLENGTH//8
|
||||
PN_VALIDATION_POOL_MIN_SIZE = 256
|
||||
|
||||
active_jobs = {}
|
||||
|
||||
def stamp_workblock(message_id):
|
||||
if RNS.vendor.platformutils.is_linux(): multiprocessing.set_start_method("fork")
|
||||
|
||||
def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
|
||||
wb_st = time.time()
|
||||
expand_rounds = WORKBLOCK_EXPAND_ROUNDS
|
||||
workblock = b""
|
||||
for n in range(expand_rounds):
|
||||
workblock += RNS.Cryptography.hkdf(
|
||||
length=256,
|
||||
derive_from=message_id,
|
||||
salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)),
|
||||
context=None,
|
||||
)
|
||||
workblock += RNS.Cryptography.hkdf(length=256,
|
||||
derive_from=material,
|
||||
salt=RNS.Identity.full_hash(material+msgpack.packb(n)),
|
||||
context=None)
|
||||
wb_time = time.time() - wb_st
|
||||
RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)
|
||||
# RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)
|
||||
|
||||
return workblock
|
||||
|
||||
|
|
@ -36,28 +41,70 @@ def stamp_value(workblock, stamp):
|
|||
|
||||
return value
|
||||
|
||||
def generate_stamp(message_id, stamp_cost):
|
||||
def stamp_valid(stamp, target_cost, workblock):
|
||||
target = 0b1 << 256-target_cost
|
||||
result = RNS.Identity.full_hash(workblock+stamp)
|
||||
if int.from_bytes(result, byteorder="big") > target: return False
|
||||
else: return True
|
||||
|
||||
def validate_peering_key(peering_id, peering_key, target_cost):
|
||||
workblock = stamp_workblock(peering_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING)
|
||||
if not stamp_valid(peering_key, target_cost, workblock): return False
|
||||
else: return True
|
||||
|
||||
def validate_pn_stamp(transient_data, target_cost):
|
||||
from .LXMessage import LXMessage
|
||||
if len(transient_data) <= LXMessage.LXMF_OVERHEAD+STAMP_SIZE: return None, None, None, None
|
||||
else:
|
||||
lxm_data = transient_data[:-STAMP_SIZE]
|
||||
stamp = transient_data[-STAMP_SIZE:]
|
||||
transient_id = RNS.Identity.full_hash(lxm_data)
|
||||
workblock = stamp_workblock(transient_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN)
|
||||
|
||||
if not stamp_valid(stamp, target_cost, workblock): return None, None, None, None
|
||||
else:
|
||||
value = stamp_value(workblock, stamp)
|
||||
return transient_id, lxm_data, value, stamp
|
||||
|
||||
def validate_pn_stamps_job_simple(transient_list, target_cost):
|
||||
validated_messages = []
|
||||
for transient_data in transient_list:
|
||||
transient_id, lxm_data, value, stamp_data = validate_pn_stamp(transient_data, target_cost)
|
||||
if transient_id: validated_messages.append([transient_id, lxm_data, value, stamp_data])
|
||||
|
||||
return validated_messages
|
||||
|
||||
def validate_pn_stamps_job_multip(transient_list, target_cost):
|
||||
cores = multiprocessing.cpu_count()
|
||||
pool_count = min(cores, math.ceil(len(transient_list) / PN_VALIDATION_POOL_MIN_SIZE))
|
||||
|
||||
RNS.log(f"Validating {len(transient_list)} stamps using {pool_count} processes...", RNS.LOG_VERBOSE)
|
||||
with multiprocessing.Pool(pool_count) as p:
|
||||
validated_entries = p.starmap(validate_pn_stamp, zip(transient_list, itertools.repeat(target_cost)))
|
||||
|
||||
return [e for e in validated_entries if e[0] != None]
|
||||
|
||||
def validate_pn_stamps(transient_list, target_cost):
|
||||
non_mp_platform = RNS.vendor.platformutils.is_android()
|
||||
if len(transient_list) <= PN_VALIDATION_POOL_MIN_SIZE or non_mp_platform: return validate_pn_stamps_job_simple(transient_list, target_cost)
|
||||
else: return validate_pn_stamps_job_multip(transient_list, target_cost)
|
||||
|
||||
def generate_stamp(message_id, stamp_cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
|
||||
RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG)
|
||||
workblock = stamp_workblock(message_id)
|
||||
workblock = stamp_workblock(message_id, expand_rounds=expand_rounds)
|
||||
|
||||
start_time = time.time()
|
||||
stamp = None
|
||||
rounds = 0
|
||||
value = 0
|
||||
|
||||
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin():
|
||||
stamp, rounds = job_simple(stamp_cost, workblock, message_id)
|
||||
|
||||
elif RNS.vendor.platformutils.is_android():
|
||||
stamp, rounds = job_android(stamp_cost, workblock, message_id)
|
||||
|
||||
else:
|
||||
stamp, rounds = job_linux(stamp_cost, workblock, message_id)
|
||||
if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin(): stamp, rounds = job_simple(stamp_cost, workblock, message_id)
|
||||
elif RNS.vendor.platformutils.is_android(): stamp, rounds = job_android(stamp_cost, workblock, message_id)
|
||||
else: stamp, rounds = job_linux(stamp_cost, workblock, message_id)
|
||||
|
||||
duration = time.time() - start_time
|
||||
speed = rounds/duration
|
||||
if stamp != None:
|
||||
value = stamp_value(workblock, stamp)
|
||||
if stamp != None: value = stamp_value(workblock, stamp)
|
||||
|
||||
RNS.log(f"Stamp with value {value} generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG)
|
||||
|
||||
|
|
@ -113,10 +160,8 @@ def job_simple(stamp_cost, workblock, message_id):
|
|||
def sv(s, c, w):
|
||||
target = 0b1<<256-c; m = w+s
|
||||
result = RNS.Identity.full_hash(m)
|
||||
if int.from_bytes(result, byteorder="big") > target:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
if int.from_bytes(result, byteorder="big") > target: return False
|
||||
else: return True
|
||||
|
||||
while not sv(pstamp, stamp_cost, workblock) and not active_jobs[message_id]:
|
||||
pstamp = os.urandom(256//8); rounds += 1
|
||||
|
|
@ -135,7 +180,8 @@ def job_linux(stamp_cost, workblock, message_id):
|
|||
allow_kill = True
|
||||
stamp = None
|
||||
total_rounds = 0
|
||||
jobs = multiprocessing.cpu_count()
|
||||
cores = multiprocessing.cpu_count()
|
||||
jobs = cores if cores <= 12 else int(cores/2)
|
||||
stop_event = multiprocessing.Event()
|
||||
result_queue = multiprocessing.Queue(1)
|
||||
rounds_queue = multiprocessing.Queue()
|
||||
|
|
@ -310,6 +356,13 @@ def job_android(stamp_cost, workblock, message_id):
|
|||
|
||||
return stamp, total_rounds
|
||||
|
||||
# def stamp_value_linear(workblock, stamp):
|
||||
# value = 0
|
||||
# bits = 256
|
||||
# material = RNS.Identity.full_hash(workblock+stamp)
|
||||
# s = int.from_bytes(material, byteorder="big")
|
||||
# return s.bit_count()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
if len(sys.argv) < 2:
|
||||
|
|
@ -325,4 +378,14 @@ if __name__ == "__main__":
|
|||
RNS.loglevel = RNS.LOG_DEBUG
|
||||
RNS.log("Testing LXMF stamp generation", RNS.LOG_DEBUG)
|
||||
message_id = os.urandom(32)
|
||||
generate_stamp(message_id, cost)
|
||||
generate_stamp(message_id, cost)
|
||||
|
||||
RNS.log("", RNS.LOG_DEBUG)
|
||||
RNS.log("Testing propagation stamp generation", RNS.LOG_DEBUG)
|
||||
message_id = os.urandom(32)
|
||||
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN)
|
||||
|
||||
RNS.log("", RNS.LOG_DEBUG)
|
||||
RNS.log("Testing peering key generation", RNS.LOG_DEBUG)
|
||||
message_id = os.urandom(32)
|
||||
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING)
|
||||
|
|
@ -1,8 +1,8 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
# MIT License
|
||||
# Reticulum License
|
||||
#
|
||||
# Copyright (c) 2016-2022 Mark Qvist / unsigned.io
|
||||
# Copyright (c) 2020-2025 Mark Qvist
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
|
|
@ -11,8 +11,16 @@
|
|||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
# - The Software shall not be used in any kind of system which includes amongst
|
||||
# its functions the ability to purposefully do harm to human beings.
|
||||
#
|
||||
# - The Software shall not be used, directly or indirectly, in the creation of
|
||||
# an artificial intelligence, machine learning or language model training
|
||||
# dataset, including but not limited to any use that contributes to the
|
||||
# training or development of such a model or algorithm.
|
||||
#
|
||||
# - The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
|
|
@ -35,6 +43,7 @@ import time
|
|||
import os
|
||||
|
||||
from LXMF._version import __version__
|
||||
from LXMF import APP_NAME
|
||||
|
||||
from RNS.vendor.configobj import ConfigObj
|
||||
|
||||
|
|
@ -96,6 +105,11 @@ def apply_config():
|
|||
else:
|
||||
active_configuration["enable_propagation_node"] = False
|
||||
|
||||
if "propagation" in lxmd_config and "node_name" in lxmd_config["propagation"]:
|
||||
active_configuration["node_name"] = lxmd_config["propagation"].get("node_name")
|
||||
else:
|
||||
active_configuration["node_name"] = None
|
||||
|
||||
if "propagation" in lxmd_config and "auth_required" in lxmd_config["propagation"]:
|
||||
active_configuration["auth_required"] = lxmd_config["propagation"].as_bool("auth_required")
|
||||
else:
|
||||
|
|
@ -126,7 +140,7 @@ def apply_config():
|
|||
if active_configuration["message_storage_limit"] < 0.005:
|
||||
active_configuration["message_storage_limit"] = 0.005
|
||||
else:
|
||||
active_configuration["message_storage_limit"] = 2000
|
||||
active_configuration["message_storage_limit"] = 500
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
|
||||
|
|
@ -134,11 +148,76 @@ def apply_config():
|
|||
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
|
||||
else:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = 256
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_message_max_accepted_size" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_message_max_accepted_size")
|
||||
if active_configuration["propagation_transfer_max_accepted_size"] < 0.38:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
|
||||
else:
|
||||
active_configuration["propagation_transfer_max_accepted_size"] = 256
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_sync_max_accepted_size" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_sync_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_sync_max_accepted_size")
|
||||
if active_configuration["propagation_sync_max_accepted_size"] < 0.38:
|
||||
active_configuration["propagation_sync_max_accepted_size"] = 0.38
|
||||
else:
|
||||
active_configuration["propagation_sync_max_accepted_size"] = 256*40
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_stamp_cost_target" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_stamp_cost_target"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_target")
|
||||
if active_configuration["propagation_stamp_cost_target"] < LXMF.LXMRouter.PROPAGATION_COST_MIN:
|
||||
active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST_MIN
|
||||
else:
|
||||
active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST
|
||||
|
||||
if "propagation" in lxmd_config and "propagation_stamp_cost_flexibility" in lxmd_config["propagation"]:
|
||||
active_configuration["propagation_stamp_cost_flexibility"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_flexibility")
|
||||
if active_configuration["propagation_stamp_cost_flexibility"] < 0:
|
||||
active_configuration["propagation_stamp_cost_flexibility"] = 0
|
||||
else:
|
||||
active_configuration["propagation_stamp_cost_flexibility"] = LXMF.LXMRouter.PROPAGATION_COST_FLEX
|
||||
|
||||
if "propagation" in lxmd_config and "peering_cost" in lxmd_config["propagation"]:
|
||||
active_configuration["peering_cost"] = lxmd_config["propagation"].as_int("peering_cost")
|
||||
if active_configuration["peering_cost"] < 0:
|
||||
active_configuration["peering_cost"] = 0
|
||||
else:
|
||||
active_configuration["peering_cost"] = LXMF.LXMRouter.PEERING_COST
|
||||
|
||||
if "propagation" in lxmd_config and "remote_peering_cost_max" in lxmd_config["propagation"]:
|
||||
active_configuration["remote_peering_cost_max"] = lxmd_config["propagation"].as_int("remote_peering_cost_max")
|
||||
if active_configuration["remote_peering_cost_max"] < 0:
|
||||
active_configuration["remote_peering_cost_max"] = 0
|
||||
else:
|
||||
active_configuration["remote_peering_cost_max"] = LXMF.LXMRouter.MAX_PEERING_COST
|
||||
|
||||
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
|
||||
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
|
||||
else:
|
||||
active_configuration["prioritised_lxmf_destinations"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "control_allowed" in lxmd_config["propagation"]:
|
||||
active_configuration["control_allowed_identities"] = lxmd_config["propagation"].as_list("control_allowed")
|
||||
else:
|
||||
active_configuration["control_allowed_identities"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]:
|
||||
static_peers = lxmd_config["propagation"].as_list("static_peers")
|
||||
active_configuration["static_peers"] = []
|
||||
for static_peer in static_peers:
|
||||
active_configuration["static_peers"].append(bytes.fromhex(static_peer))
|
||||
else:
|
||||
active_configuration["static_peers"] = []
|
||||
|
||||
if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]:
|
||||
active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers")
|
||||
else:
|
||||
active_configuration["max_peers"] = None
|
||||
|
||||
if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]:
|
||||
active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only")
|
||||
else:
|
||||
active_configuration["from_static_only"] = False
|
||||
|
||||
# Load various settings
|
||||
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
|
||||
|
|
@ -304,8 +383,17 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
|||
autopeer = active_configuration["autopeer"],
|
||||
autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
|
||||
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
|
||||
propagation_cost = active_configuration["propagation_stamp_cost_target"],
|
||||
propagation_cost_flexibility = active_configuration["propagation_stamp_cost_flexibility"],
|
||||
peering_cost = active_configuration["peering_cost"],
|
||||
max_peering_cost = active_configuration["remote_peering_cost_max"],
|
||||
sync_limit = active_configuration["propagation_sync_max_accepted_size"],
|
||||
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
|
||||
)
|
||||
max_peers = active_configuration["max_peers"],
|
||||
static_peers = active_configuration["static_peers"],
|
||||
from_static_only = active_configuration["from_static_only"],
|
||||
name = active_configuration["node_name"])
|
||||
|
||||
message_router.register_delivery_callback(lxmf_delivery)
|
||||
|
||||
for destination_hash in active_configuration["ignored_lxmf_destinations"]:
|
||||
|
|
@ -337,13 +425,16 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
|
|||
for dest_str in active_configuration["prioritised_lxmf_destinations"]:
|
||||
try:
|
||||
dest_hash = bytes.fromhex(dest_str)
|
||||
if len(dest_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
|
||||
message_router.prioritise(dest_hash)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Cannot prioritise "+str(dest_str)+", it is not a valid destination hash", RNS.LOG_ERROR)
|
||||
if len(dest_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: message_router.prioritise(dest_hash)
|
||||
except Exception as e: RNS.log("Cannot prioritise "+str(dest_str)+", it is not a valid destination hash", RNS.LOG_ERROR)
|
||||
|
||||
message_router.enable_propagation()
|
||||
|
||||
for ident_str in active_configuration["control_allowed_identities"]:
|
||||
try:
|
||||
identity_hash = bytes.fromhex(ident_str)
|
||||
if len(identity_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: message_router.allow_control(identity_hash)
|
||||
except Exception as e: RNS.log(f"Cannot allow control from {ident_str}, it is not a valid identity hash", RNS.LOG_ERROR)
|
||||
|
||||
RNS.log("LXMF Propagation Node started on "+RNS.prettyhexrep(message_router.propagation_destination.hash))
|
||||
|
||||
|
|
@ -362,13 +453,13 @@ def jobs():
|
|||
try:
|
||||
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
|
||||
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
last_peer_announce = time.time()
|
||||
|
||||
if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None:
|
||||
if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
|
||||
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE)
|
||||
message_router.announce_propagation_node()
|
||||
last_node_announce = time.time()
|
||||
|
||||
|
|
@ -381,7 +472,7 @@ def deferred_start_jobs():
|
|||
global active_configuration, last_peer_announce, last_node_announce
|
||||
global message_router, lxmf_destination
|
||||
time.sleep(DEFFERED_JOBS_DELAY)
|
||||
RNS.log("Running deferred start jobs")
|
||||
RNS.log("Running deferred start jobs", RNS.LOG_DEBUG)
|
||||
if active_configuration["peer_announce_at_start"]:
|
||||
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
|
||||
message_router.announce(lxmf_destination.hash)
|
||||
|
|
@ -394,6 +485,379 @@ def deferred_start_jobs():
|
|||
last_node_announce = time.time()
|
||||
threading.Thread(target=jobs, daemon=True).start()
|
||||
|
||||
def _request_sync(identity, destination_hash, remote_identity, timeout=15, exit_on_fail=False):
|
||||
control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
if exit_on_fail:
|
||||
print("Requesting lxmd peer sync timed out, exiting now")
|
||||
exit(200)
|
||||
else:
|
||||
return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
|
||||
if not RNS.Transport.has_path(control_destination.hash):
|
||||
RNS.Transport.request_path(control_destination.hash)
|
||||
while not RNS.Transport.has_path(control_destination.hash):
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link = RNS.Link(control_destination)
|
||||
while not link.status == RNS.Link.ACTIVE:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.identify(identity)
|
||||
request_receipt = link.request(LXMF.LXMRouter.SYNC_REQUEST_PATH, data=destination_hash, response_callback=None, failed_callback=None)
|
||||
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.teardown()
|
||||
return request_receipt.get_response()
|
||||
|
||||
|
||||
def request_sync(target, remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=15, identity_path=None):
|
||||
global configpath, identitypath, storagedir, lxmdir
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
|
||||
try:
|
||||
peer_destination_hash = bytes.fromhex(target)
|
||||
if len(peer_destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters")
|
||||
except Exception as e:
|
||||
print(f"Invalid peer destination hash: {e}")
|
||||
exit(203)
|
||||
remote
|
||||
_remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path)
|
||||
response = _request_sync(identity, peer_destination_hash, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True)
|
||||
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
|
||||
print("Remote received no identity")
|
||||
exit(203)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
|
||||
print("Access denied")
|
||||
exit(204)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_INVALID_DATA:
|
||||
print("Invalid data received by remote")
|
||||
exit(205)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NOT_FOUND:
|
||||
print("The requested peer was not found")
|
||||
exit(206)
|
||||
elif response == None:
|
||||
print("Empty response received")
|
||||
exit(207)
|
||||
else:
|
||||
print(f"Sync requested for peer {RNS.prettyhexrep(peer_destination_hash)}")
|
||||
exit(0)
|
||||
|
||||
def _request_unpeer(identity, destination_hash, remote_identity, timeout=15, exit_on_fail=False):
|
||||
control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
if exit_on_fail:
|
||||
print("Requesting lxmd peering break timed out, exiting now")
|
||||
exit(200)
|
||||
else: return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
|
||||
else: time.sleep(0.1)
|
||||
|
||||
if not RNS.Transport.has_path(control_destination.hash):
|
||||
RNS.Transport.request_path(control_destination.hash)
|
||||
while not RNS.Transport.has_path(control_destination.hash):
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link = RNS.Link(control_destination)
|
||||
while not link.status == RNS.Link.ACTIVE:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.identify(identity)
|
||||
request_receipt = link.request(LXMF.LXMRouter.UNPEER_REQUEST_PATH, data=destination_hash, response_callback=None, failed_callback=None)
|
||||
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
|
||||
tc = check_timeout()
|
||||
if tc:
|
||||
return tc
|
||||
|
||||
link.teardown()
|
||||
return request_receipt.get_response()
|
||||
|
||||
|
||||
def request_unpeer(target, remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=15, identity_path=None):
|
||||
global configpath, identitypath, storagedir, lxmdir
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
|
||||
try:
|
||||
peer_destination_hash = bytes.fromhex(target)
|
||||
if len(peer_destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters")
|
||||
except Exception as e:
|
||||
print(f"Invalid peer destination hash: {e}")
|
||||
exit(203)
|
||||
remote
|
||||
_remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path)
|
||||
response = _request_unpeer(identity, peer_destination_hash, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True)
|
||||
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
|
||||
print("Remote received no identity")
|
||||
exit(203)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
|
||||
print("Access denied")
|
||||
exit(204)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_INVALID_DATA:
|
||||
print("Invalid data received by remote")
|
||||
exit(205)
|
||||
elif response == LXMF.LXMPeer.LXMPeer.ERROR_NOT_FOUND:
|
||||
print("The requested peer was not found")
|
||||
exit(206)
|
||||
elif response == None:
|
||||
print("Empty response received")
|
||||
exit(207)
|
||||
else:
|
||||
print(f"Broke peering with {RNS.prettyhexrep(peer_destination_hash)}")
|
||||
exit(0)
|
||||
|
||||
def query_status(identity, remote_identity=None, timeout=5, exit_on_fail=False):
|
||||
if remote_identity == None: remote_identity = identity
|
||||
control_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
|
||||
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
if exit_on_fail:
|
||||
print("Getting lxmd statistics timed out, exiting now")
|
||||
exit(200)
|
||||
else: return LXMF.LXMPeer.LXMPeer.ERROR_TIMEOUT
|
||||
else: time.sleep(0.1)
|
||||
|
||||
if not RNS.Transport.has_path(control_destination.hash):
|
||||
RNS.Transport.request_path(control_destination.hash)
|
||||
while not RNS.Transport.has_path(control_destination.hash):
|
||||
tc = check_timeout()
|
||||
if tc: return tc
|
||||
|
||||
link = RNS.Link(control_destination)
|
||||
while not link.status == RNS.Link.ACTIVE:
|
||||
tc = check_timeout()
|
||||
if tc: return tc
|
||||
|
||||
link.identify(identity)
|
||||
request_receipt = link.request(LXMF.LXMRouter.STATS_GET_PATH, data=None, response_callback=None, failed_callback=None)
|
||||
while not request_receipt.get_status() == RNS.RequestReceipt.READY:
|
||||
tc = check_timeout()
|
||||
if tc: return tc
|
||||
|
||||
link.teardown()
|
||||
return request_receipt.get_response()
|
||||
|
||||
def get_status(remote=None, configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, timeout=5,
|
||||
show_status=False, show_peers=False, identity_path=None):
|
||||
|
||||
global identity
|
||||
_remote_init(configdir, rnsconfigdir, verbosity, quietness, identity_path)
|
||||
response = query_status(identity, remote_identity=_get_target_identity(remote), timeout=timeout, exit_on_fail=True)
|
||||
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_IDENTITY:
|
||||
print("Remote received no identity")
|
||||
exit(203)
|
||||
if response == LXMF.LXMPeer.LXMPeer.ERROR_NO_ACCESS:
|
||||
print("Access denied")
|
||||
exit(204)
|
||||
elif response == None:
|
||||
print("Empty response received")
|
||||
exit(207)
|
||||
else:
|
||||
s = response
|
||||
mutil = round((s["messagestore"]["bytes"]/s["messagestore"]["limit"])*100, 2)
|
||||
ms_util = f"{mutil}%"
|
||||
if s["from_static_only"]:
|
||||
who_str = "static peers only"
|
||||
else:
|
||||
who_str = "all nodes"
|
||||
|
||||
available_peers = 0
|
||||
unreachable_peers = 0
|
||||
peered_incoming = 0
|
||||
peered_outgoing = 0
|
||||
peered_rx_bytes = 0
|
||||
peered_tx_bytes = 0
|
||||
for peer_id in s["peers"]:
|
||||
p = s["peers"][peer_id]
|
||||
pm = p["messages"]
|
||||
peered_incoming += pm["incoming"]
|
||||
peered_outgoing += pm["outgoing"]
|
||||
peered_rx_bytes += p["rx_bytes"]
|
||||
peered_tx_bytes += p["tx_bytes"]
|
||||
|
||||
if p["alive"]: available_peers += 1
|
||||
else: unreachable_peers += 1
|
||||
|
||||
total_incoming = peered_incoming+s["unpeered_propagation_incoming"]+s["clients"]["client_propagation_messages_received"]
|
||||
total_rx_bytes = peered_rx_bytes+s["unpeered_propagation_rx_bytes"]
|
||||
if total_incoming != 0: df = round(peered_outgoing/total_incoming, 2)
|
||||
else: df = 0
|
||||
|
||||
dhs = RNS.prettyhexrep(s["destination_hash"]); uts = RNS.prettytime(s["uptime"])
|
||||
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
|
||||
|
||||
if show_status:
|
||||
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
|
||||
ptl = RNS.prettysize(s["propagation_limit"]*1000); psl = RNS.prettysize(s["sync_limit"]*1000);
|
||||
uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
|
||||
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
|
||||
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
|
||||
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
|
||||
psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"]
|
||||
pc = s["peering_cost"]; pcm = s["max_peering_cost"]
|
||||
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
|
||||
print(f"Required propagation stamp cost is {psc}, flexibility is {scf}")
|
||||
print(f"Peering cost is {pc}, max remote peering cost is {pcm}")
|
||||
print(f"Accepting propagated messages from {who_str}")
|
||||
print(f"{ptl} message limit, {psl} sync limit")
|
||||
print(f"")
|
||||
print(f"Peers : {stp} total (peer limit is {smp})")
|
||||
print(f" {sdp} discovered, {ssp} static")
|
||||
print(f" {available_peers} available, {unreachable_peers} unreachable")
|
||||
print(f"")
|
||||
print(f"Traffic : {total_incoming} messages received in total ({RNS.prettysize(total_rx_bytes)})")
|
||||
print(f" {peered_incoming} messages received from peered nodes ({RNS.prettysize(peered_rx_bytes)})")
|
||||
print(f" {upi} messages received from unpeered nodes ({uprx})")
|
||||
print(f" {peered_outgoing} messages transferred to peered nodes ({RNS.prettysize(peered_tx_bytes)})")
|
||||
print(f" {cprr} propagation messages received directly from clients")
|
||||
print(f" {cprs} propagation messages served to clients")
|
||||
print(f" Distribution factor is {df}")
|
||||
print(f"")
|
||||
|
||||
if show_peers:
|
||||
if not show_status:
|
||||
print("")
|
||||
|
||||
for peer_id in s["peers"]:
|
||||
ind = " "
|
||||
p = s["peers"][peer_id]
|
||||
if p["type"] == "static":
|
||||
t = "Static peer "
|
||||
elif p["type"] == "discovered":
|
||||
t = "Discovered peer "
|
||||
else:
|
||||
t = "Unknown peer "
|
||||
a = "Available" if p["alive"] == True else "Unreachable"
|
||||
h = max(time.time()-p["last_heard"], 0)
|
||||
hops = p["network_distance"]
|
||||
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
|
||||
pm = p["messages"]; pk = p["peering_key"]
|
||||
pc = p["peering_cost"]; psc = p["target_stamp_cost"]; psf = p["stamp_cost_flexibility"]
|
||||
if pc == None: pc = "unknown"
|
||||
if psc == None: psc = "unknown"
|
||||
if psf == None: psf = "unknown"
|
||||
if pk == None: pk = "Not generated"
|
||||
else: pk = f"Generated, value is {pk}"
|
||||
if p["last_sync_attempt"] != 0:
|
||||
lsa = p["last_sync_attempt"]
|
||||
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
|
||||
else:
|
||||
ls = "never synced"
|
||||
|
||||
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"])
|
||||
stl = RNS.prettysize(p["transfer_limit"]*1000) if p["transfer_limit"] else "Unknown"
|
||||
ssl = RNS.prettysize(p["sync_limit"]*1000) if p["sync_limit"] else "unknown"
|
||||
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
|
||||
pmi = pm["incoming"]; pmuh = pm["unhandled"]; ar = round(p["acceptance_rate"]*100, 2)
|
||||
if p["name"] == None: nn = ""
|
||||
else: nn = p["name"].strip().replace("\n", "").replace("\r", "")
|
||||
if len(nn) > 45: nn = f"{nn[:45]}..."
|
||||
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
|
||||
if len(nn): print(f"{ind*2}Name : {nn}")
|
||||
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
|
||||
print(f"{ind*2}Costs : Propagation {psc} (flex {psf}), peering {pc}")
|
||||
print(f"{ind*2}Sync key : {pk}")
|
||||
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER")
|
||||
print(f"{ind*2}Limits : {stl} message limit, {ssl} sync limit")
|
||||
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming, {ar}% acceptance rate")
|
||||
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
|
||||
ms = "" if pm["unhandled"] == 1 else "s"
|
||||
print(f"{ind*2}Sync state : {pmuh} unhandled message{ms}, {ls}")
|
||||
print("")
|
||||
|
||||
def _get_target_identity(remote=None, timeout=5):
|
||||
global identity
|
||||
timeout = time.time()+timeout
|
||||
def check_timeout():
|
||||
if time.time() > timeout:
|
||||
print("Resolving remote identity timed out, exiting now")
|
||||
exit(200)
|
||||
else: time.sleep(0.1)
|
||||
|
||||
if remote == None: return identity
|
||||
else:
|
||||
try:
|
||||
destination_hash = bytes.fromhex(remote)
|
||||
if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Destination hash length must be {RNS.Identity.TRUNCATED_HASHLENGTH//8*2} characters")
|
||||
except Exception as e:
|
||||
print(f"Invalid remote destination hash: {e}")
|
||||
exit(203)
|
||||
|
||||
remote_identity = RNS.Identity.recall(destination_hash)
|
||||
if remote_identity: return remote_identity
|
||||
else:
|
||||
if not RNS.Transport.has_path(destination_hash):
|
||||
RNS.Transport.request_path(destination_hash)
|
||||
while not RNS.Transport.has_path(destination_hash):
|
||||
tc = check_timeout()
|
||||
if tc: return tc
|
||||
|
||||
return RNS.Identity.recall(destination_hash)
|
||||
|
||||
def _remote_init(configdir=None, rnsconfigdir=None, verbosity=0, quietness=0, identity_path=None):
|
||||
global configpath, identitypath, storagedir, lxmdir, identity
|
||||
global lxmd_config, active_configuration, targetloglevel
|
||||
targetlogdest = RNS.LOG_STDOUT
|
||||
|
||||
if identity_path == None:
|
||||
if configdir == None:
|
||||
if os.path.isdir("/etc/lxmd") and os.path.isfile("/etc/lxmd/config"): configdir = "/etc/lxmd"
|
||||
elif os.path.isdir(RNS.Reticulum.userdir+"/.config/lxmd") and os.path.isfile(Reticulum.userdir+"/.config/lxmd/config"): configdir = RNS.Reticulum.userdir+"/.config/lxmd"
|
||||
else: configdir = RNS.Reticulum.userdir+"/.lxmd"
|
||||
|
||||
configpath = configdir+"/config"
|
||||
identitypath = configdir+"/identity"
|
||||
identity = None
|
||||
|
||||
if not os.path.isdir(configdir):
|
||||
RNS.log("Specified configuration directory does not exist, exiting now", RNS.LOG_ERROR)
|
||||
exit(201)
|
||||
if not os.path.isfile(identitypath):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identitypath)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
else:
|
||||
if not os.path.isfile(identity_path):
|
||||
RNS.log("Identity file not found in specified configuration directory, exiting now", RNS.LOG_ERROR)
|
||||
exit(202)
|
||||
else:
|
||||
identity = RNS.Identity.from_file(identity_path)
|
||||
if identity == None:
|
||||
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
|
||||
exit(4)
|
||||
|
||||
if targetloglevel == None: targetloglevel = 3
|
||||
if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness
|
||||
|
||||
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
|
||||
|
||||
def main():
|
||||
try:
|
||||
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
|
||||
|
|
@ -404,6 +868,13 @@ def main():
|
|||
parser.add_argument("-v", "--verbose", action="count", default=0)
|
||||
parser.add_argument("-q", "--quiet", action="count", default=0)
|
||||
parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file")
|
||||
parser.add_argument("--status", action="store_true", default=False, help="display node status")
|
||||
parser.add_argument("--peers", action="store_true", default=False, help="display peered nodes")
|
||||
parser.add_argument("--sync", action="store", default=None, help="request a sync with the specified peer", type=str)
|
||||
parser.add_argument("-b", "--break", dest="unpeer", action="store", default=None, help="break peering with the specified peer", type=str)
|
||||
parser.add_argument("--timeout", action="store", default=None, help="timeout in seconds for query operations", type=float)
|
||||
parser.add_argument("-r", "--remote", action="store", default=None, help="remote propagation node destination hash", type=str)
|
||||
parser.add_argument("--identity", action="store", default=None, help="path to identity used for remote requests", type=str)
|
||||
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
|
||||
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
|
||||
|
||||
|
|
@ -413,15 +884,50 @@ def main():
|
|||
print(__default_lxmd_config__)
|
||||
exit()
|
||||
|
||||
program_setup(
|
||||
configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service
|
||||
)
|
||||
if args.status or args.peers:
|
||||
if not args.timeout: args.timeout = 5
|
||||
get_status(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
timeout=args.timeout,
|
||||
show_status=args.status,
|
||||
show_peers=args.peers,
|
||||
identity_path=args.identity,
|
||||
remote=args.remote)
|
||||
exit()
|
||||
|
||||
if args.sync:
|
||||
if not args.timeout: args.timeout = 10
|
||||
request_sync(target=args.sync,
|
||||
configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
timeout=args.timeout,
|
||||
identity_path=args.identity,
|
||||
remote=args.remote)
|
||||
exit()
|
||||
|
||||
if args.unpeer:
|
||||
if not args.timeout: args.timeout = 10
|
||||
request_unpeer(target=args.unpeer,
|
||||
configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
timeout=args.timeout,
|
||||
identity_path=args.identity,
|
||||
remote=args.remote)
|
||||
exit()
|
||||
|
||||
program_setup(configdir = args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
run_pn=args.propagation_node,
|
||||
on_inbound=args.on_inbound,
|
||||
verbosity=args.verbose,
|
||||
quietness=args.quiet,
|
||||
service=args.service)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
|
|
@ -437,6 +943,17 @@ __default_lxmd_config__ = """# This is an example LXM Daemon config file.
|
|||
|
||||
enable_node = no
|
||||
|
||||
# You can specify identity hashes for remotes
|
||||
# that are allowed to control and query status
|
||||
# for this propagation node.
|
||||
|
||||
# control_allowed = 7d7e542829b40f32364499b27438dba8, 437229f8e29598b2282b88bad5e44698
|
||||
|
||||
# An optional name for this node, included
|
||||
# in announces.
|
||||
|
||||
# node_name = Anonymous Propagation Node
|
||||
|
||||
# Automatic announce interval in minutes.
|
||||
# 6 hours by default.
|
||||
|
||||
|
|
@ -456,19 +973,6 @@ autopeer = yes
|
|||
|
||||
autopeer_maxdepth = 4
|
||||
|
||||
# The maximum accepted transfer size per in-
|
||||
# coming propagation transfer, in kilobytes.
|
||||
# This also sets the upper limit for the size
|
||||
# of single messages accepted onto this node.
|
||||
#
|
||||
# If a node wants to propagate a larger number
|
||||
# of messages to this node, than what can fit
|
||||
# within this limit, it will prioritise sending
|
||||
# the smallest messages first, and try again
|
||||
# with any remaining messages at a later point.
|
||||
|
||||
propagation_transfer_max_accepted_size = 256
|
||||
|
||||
# The maximum amount of storage to use for
|
||||
# the LXMF Propagation Node message store,
|
||||
# specified in megabytes. When this limit
|
||||
|
|
@ -477,9 +981,57 @@ propagation_transfer_max_accepted_size = 256
|
|||
# LXMF prioritises keeping messages that are
|
||||
# new and small. Large and old messages will
|
||||
# be removed first. This setting is optional
|
||||
# and defaults to 2 gigabytes.
|
||||
# and defaults to 500 megabytes.
|
||||
|
||||
# message_storage_limit = 2000
|
||||
# message_storage_limit = 500
|
||||
|
||||
# The maximum accepted transfer size per in-
|
||||
# coming propagation message, in kilobytes.
|
||||
# This sets the upper limit for the size of
|
||||
# single messages accepted onto this node.
|
||||
|
||||
# propagation_message_max_accepted_size = 256
|
||||
|
||||
# The maximum accepted transfer size per in-
|
||||
# coming propagation node sync.
|
||||
#
|
||||
# If a node wants to propagate a larger number
|
||||
# of messages to this node, than what can fit
|
||||
# within this limit, it will prioritise sending
|
||||
# the smallest messages first, and try again
|
||||
# with any remaining messages at a later point.
|
||||
|
||||
# propagation_sync_max_accepted_size = 10240
|
||||
|
||||
# You can configure the target stamp cost
|
||||
# required to deliver messages via this node.
|
||||
|
||||
# propagation_stamp_cost_target = 16
|
||||
|
||||
# If set higher than 0, the stamp cost flexi-
|
||||
# bility option will make this node accept
|
||||
# messages with a lower stamp cost than the
|
||||
# target from other propagation nodes (but
|
||||
# not from peers directly). This allows the
|
||||
# network to gradually adjust stamp cost.
|
||||
|
||||
# propagation_stamp_cost_flexibility = 3
|
||||
|
||||
# The peering_cost option configures the target
|
||||
# value required for a remote node to peer with
|
||||
# and deliver messages to this node.
|
||||
|
||||
# peering_cost = 18
|
||||
|
||||
# You can configure the maximum peering cost
|
||||
# of remote nodes that this node will peer with.
|
||||
# Setting this to a higher number will allow
|
||||
# this node to peer with other nodes requiring
|
||||
# a higher peering key value, but will require
|
||||
# more computation time during initial peering
|
||||
# when generating the peering key.
|
||||
|
||||
# remote_peering_cost_max = 26
|
||||
|
||||
# You can tell the LXMF message router to
|
||||
# prioritise storage for one or more
|
||||
|
|
@ -491,6 +1043,25 @@ propagation_transfer_max_accepted_size = 256
|
|||
|
||||
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
|
||||
|
||||
# You can configure the maximum number of other
|
||||
# propagation nodes that this node will peer
|
||||
# with automatically. The default is 20.
|
||||
|
||||
# max_peers = 20
|
||||
|
||||
# You can configure a list of static propagation
|
||||
# node peers, that this node will always be
|
||||
# peered with, by specifying a list of
|
||||
# destination hashes.
|
||||
|
||||
# static_peers = e17f833c4ddf8890dd3a79a6fea8161d, 5a2d0029b6e5ec87020abaea0d746da4
|
||||
|
||||
# You can configure the propagation node to
|
||||
# only accept incoming propagation messages
|
||||
# from configured static peers.
|
||||
|
||||
# from_static_only = True
|
||||
|
||||
# By default, any destination is allowed to
|
||||
# connect and download messages, but you can
|
||||
# optionally restrict this. If you enable
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
__version__ = "0.6.0"
|
||||
__version__ = "0.9.3"
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ User-facing clients built on LXMF include:
|
|||
|
||||
Community-provided tools and utilities for LXMF include:
|
||||
|
||||
- [LXMFy](https://lxmfy.quad4.io/)
|
||||
- [LXMF-Bot](https://github.com/randogoth/lxmf-bot)
|
||||
- [LXMF Messageboard](https://github.com/chengtripp/lxmf_messageboard)
|
||||
- [LXMEvent](https://github.com/faragher/LXMEvent)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,2 @@
|
|||
qrcode==7.4.2
|
||||
rns==0.7.8
|
||||
setuptools==70.0.0
|
||||
qrcode>=7.4.2
|
||||
rns>=1.0.0
|
||||
|
|
|
|||
7
setup.py
7
setup.py
|
|
@ -15,9 +15,10 @@ setuptools.setup(
|
|||
long_description_content_type="text/markdown",
|
||||
url="https://github.com/markqvist/lxmf",
|
||||
packages=["LXMF", "LXMF.Utilities"],
|
||||
license="Reticulum License",
|
||||
license_files = ("LICENSE"),
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Operating System :: OS Independent",
|
||||
],
|
||||
entry_points= {
|
||||
|
|
@ -25,6 +26,6 @@ setuptools.setup(
|
|||
'lxmd=LXMF.Utilities.lxmd:main',
|
||||
]
|
||||
},
|
||||
install_requires=['rns>=0.9.1'],
|
||||
python_requires='>=3.7',
|
||||
install_requires=["rns>=1.0.1"],
|
||||
python_requires=">=3.7",
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue