Added static peers and peering limit

This commit is contained in:
Mark Qvist 2025-01-22 01:37:09 +01:00
parent c2a08ef355
commit e69da2ed2a
4 changed files with 179 additions and 30 deletions

View File

@ -51,12 +51,16 @@ class LXMFPropagationAnnounceHandler:
except:
propagation_transfer_limit = None
if data[0] == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
if destination_hash in self.lxmrouter.static_peers:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
elif data[0] == False:
self.lxmrouter.unpeer(destination_hash, node_timebase)
else:
if data[0] == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit)
elif data[0] == False:
self.lxmrouter.unpeer(destination_hash, node_timebase)
except Exception as e:
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)

View File

@ -66,6 +66,31 @@ class LXMPeer:
peer.propagation_transfer_limit = None
else:
peer.propagation_transfer_limit = None
if "offered" in dictionary:
peer.offered = dictionary["offered"]
else:
peer.offered = 0
if "outgoing" in dictionary:
peer.outgoing = dictionary["outgoing"]
else:
peer.outgoing = 0
if "incoming" in dictionary:
peer.incoming = dictionary["incoming"]
else:
peer.incoming = 0
if "rx_bytes" in dictionary:
peer.rx_bytes = dictionary["rx_bytes"]
else:
peer.rx_bytes = 0
if "tx_bytes" in dictionary:
peer.tx_bytes = dictionary["tx_bytes"]
else:
peer.tx_bytes = 0
hm_count = 0
for transient_id in dictionary["handled_ids"]:
@ -96,6 +121,11 @@ class LXMPeer:
dictionary["link_establishment_rate"] = self.link_establishment_rate
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
dictionary["offered"] = self.offered
dictionary["outgoing"] = self.outgoing
dictionary["incoming"] = self.incoming
dictionary["rx_bytes"] = self.rx_bytes
dictionary["tx_bytes"] = self.tx_bytes
handled_ids = []
for transient_id in self.handled_messages:
@ -126,6 +156,12 @@ class LXMPeer:
self.handled_messages_queue = deque()
self.unhandled_messages_queue = deque()
self.offered = 0 # Messages offered to this peer
self.outgoing = 0 # Messages transferred to this peer
self.incoming = 0 # Messages received from this peer
self.rx_bytes = 0 # Bytes received from this peer
self.tx_bytes = 0 # Bytes sent to this peer
self._hm_count = 0
self._um_count = 0
self._hm_counts_synced = False
@ -212,7 +248,7 @@ class LXMPeer:
cumulative_size += (lxm_size+per_message_overhead)
unhandled_ids.append(transient_id)
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_VERBOSE)
self.last_offer = unhandled_ids
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT
@ -242,10 +278,16 @@ class LXMPeer:
if response == LXMPeer.ERROR_NO_IDENTITY:
if self.link != None:
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG)
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_VERBOSE)
self.link.identify()
self.state = LXMPeer.LINK_READY
self.sync()
return
elif response == LXMPeer.ERROR_NO_ACCESS:
RNS.log("Remote indicated that access was denied, breaking peering", RNS.LOG_VERBOSE)
self.router.unpeer(self.destination_hash)
return
elif response == False:
# Peer already has all advertised messages
@ -275,10 +317,9 @@ class LXMPeer:
wanted_message_ids.append(transient_id)
if len(wanted_messages) > 0:
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG)
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_VERBOSE)
lxm_list = []
for message_entry in wanted_messages:
file_path = message_entry[1]
if os.path.isfile(file_path):
@ -294,7 +335,8 @@ class LXMPeer:
self.state = LXMPeer.RESOURCE_TRANSFERRING
else:
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_VERBOSE)
self.offered += len(self.last_offer)
if self.link != None:
self.link.teardown()
@ -328,12 +370,15 @@ class LXMPeer:
self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started)
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
RNS.log("Sync to peer "+RNS.prettyhexrep(self.destination_hash)+" completed"+rate_str, RNS.LOG_DEBUG)
RNS.log(f"Syncing {len(resource.transferred_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
self.alive = True
self.last_heard = time.time()
self.offered += len(self.last_offer)
self.outgoing += len(resource.transferred_messages)
self.tx_bytes += resource.get_data_size()
else:
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE)
if self.link != None:
self.link.teardown()

View File

@ -37,6 +37,7 @@ class LXMRouter:
NODE_ANNOUNCE_DELAY = 20
MAX_PEERS = 50
AUTOPEER = True
AUTOPEER_MAXDEPTH = 4
FASTEST_N_RANDOM_POOL = 2
@ -67,7 +68,10 @@ class LXMRouter:
### Developer-facing API ##############################
#######################################################
def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None, propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT, enforce_ratchets = False, enforce_stamps = False):
def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None,
propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, enforce_ratchets=False,
enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False):
random.seed(os.urandom(10))
self.pending_inbound = []
@ -142,6 +146,27 @@ class LXMRouter:
else:
self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH
if max_peers == None:
self.max_peers = LXMRouter.MAX_PEERS
else:
if type(max_peers) == int and max_peers >= 0:
self.max_peers = max_peers
else:
raise ValueError(f"Invalid value for max_peers: {max_peers}")
self.from_static_only = from_static_only
if type(static_peers) != list:
raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}")
else:
for static_peer in static_peers:
if type(static_peer) != bytes:
raise ValueError(f"Invalid static peer destination hash: {static_peer}")
else:
if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
raise ValueError(f"Invalid static peer destination hash: {static_peer}")
self.static_peers = static_peers
self.peers = {}
self.propagation_entries = {}
@ -245,8 +270,9 @@ class LXMRouter:
def announce_propagation_node(self):
def delayed_announce():
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
node_state = self.propagation_node and not self.from_static_only
announce_data = [
self.propagation_node, # Boolean flag signalling propagation node state
node_state, # Boolean flag signalling propagation node state
int(time.time()), # Current node timebase
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
]
@ -485,6 +511,11 @@ class LXMRouter:
serialised_peer = serialised_peers.pop()
peer = LXMPeer.from_bytes(serialised_peer, self)
del serialised_peer
if peer.destination_hash in self.static_peers and peer.last_heard == 0:
# TODO: Allow path request responses through announce handler
# momentarily here, so peering config can be updated even if
# the static peer is not available to directly send an announce.
RNS.Transport.request_path(peer.destination_hash)
if peer.identity != None:
self.peers[peer.destination_hash] = peer
lim_str = ", no transfer limit"
@ -497,6 +528,17 @@ class LXMRouter:
del serialised_peers
if len(self.static_peers) > 0:
for static_peer in self.static_peers:
if not static_peer in self.peers:
RNS.log(f"Activating static peering with {RNS.prettyhexrep(static_peer)}", RNS.LOG_NOTICE)
self.peers[static_peer] = LXMPeer(self, static_peer)
if self.peers[static_peer].last_heard == 0:
# TODO: Allow path request responses through announce handler
# momentarily here, so peering config can be updated even if
# the static peer is not available to directly send an announce.
RNS.Transport.request_path(static_peer)
RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
self.propagation_node = True
@ -643,6 +685,11 @@ class LXMRouter:
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
self.sync_peers()
# def syncstats(self):
# for peer_id in self.peers:
# p = self.peers[peer_id]
# RNS.log(f"{RNS.prettyhexrep(peer_id)} O={p.offered} S={p.outgoing} I={p.incoming} TX={RNS.prettysize(p.tx_bytes)} RX={RNS.prettysize(p.rx_bytes)}")
def jobloop(self):
while (True):
# TODO: Improve this to scheduling, so manual
@ -1070,7 +1117,7 @@ class LXMRouter:
self.flush_queues()
if self.propagation_node:
try:
st = time.time(); RNS.log("Saving peer synchronisation states to storage...", RNS.LOG_NOTICE)
st = time.time(); RNS.log(f"Saving {len(self.peers)} peer synchronisation states to storage...", RNS.LOG_NOTICE)
serialised_peers = []
peer_dict = self.peers.copy()
for peer_id in peer_dict:
@ -1081,7 +1128,7 @@ class LXMRouter:
peers_file.write(msgpack.packb(serialised_peers))
peers_file.close()
RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_NOTICE)
except Exception as e:
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -1605,14 +1652,18 @@ class LXMRouter:
peer.peering_timebase = timestamp
peer.last_heard = time.time()
peer.propagation_transfer_limit = propagation_transfer_limit
RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE)
else:
peer = LXMPeer(self, destination_hash)
peer.alive = True
peer.last_heard = time.time()
peer.propagation_transfer_limit = propagation_transfer_limit
self.peers[destination_hash] = peer
RNS.log("Peered with "+str(peer.destination))
if len(self.peers) < self.max_peers:
peer = LXMPeer(self, destination_hash)
peer.alive = True
peer.last_heard = time.time()
peer.propagation_transfer_limit = propagation_transfer_limit
self.peers[destination_hash] = peer
RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE)
else:
RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
def unpeer(self, destination_hash, timestamp = None):
if timestamp == None:
@ -1633,7 +1684,8 @@ class LXMRouter:
for peer_id in peers:
peer = peers[peer_id]
if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
culled_peers.append(peer_id)
if not peer_id in self.static_peers:
culled_peers.append(peer_id)
else:
if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
if peer.alive:
@ -1693,10 +1745,23 @@ class LXMRouter:
self.active_propagation_links.append(link)
def propagation_resource_advertised(self, resource):
if self.from_static_only:
remote_identity = resource.link.get_remote_identity()
if remote_identity == None:
RNS.log(f"Rejecting propagation resource from unidentified peer", RNS.LOG_DEBUG)
return False
else:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.static_peers:
RNS.log(f"Rejecting propagation resource from {remote_str} not in static peers list", RNS.LOG_DEBUG)
return False
size = resource.get_data_size()
limit = self.propagation_per_transfer_limit*1000
if limit != None and size > limit:
RNS.log("Rejecting "+RNS.prettysize(size)+" incoming LXMF propagation resource, since it exceeds the limit of "+RNS.prettysize(limit), RNS.LOG_DEBUG)
RNS.log(f"Rejecting {RNS.prettysize(size)} incoming propagation resource, since it exceeds the limit of {RNS.prettysize(limit)}", RNS.LOG_DEBUG)
return False
else:
return True
@ -1723,6 +1788,14 @@ class LXMRouter:
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
else:
if self.from_static_only:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.static_peers:
RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG)
return LXMPeer.ERROR_NO_ACCESS
try:
transient_ids = data
wanted_ids = []
@ -1745,7 +1818,6 @@ class LXMRouter:
return None
def propagation_resource_concluded(self, resource):
RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE:
# TODO: The peer this was received from should
# have the transient id added to its list of
@ -1757,22 +1829,29 @@ class LXMRouter:
# This is a series of propagation messages from a peer or originator
remote_timebase = data[0]
remote_hash = None
remote_str = "unknown peer"
remote_identity = resource.link.get_remote_identity()
if remote_identity != None:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.peers:
if self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
self.peer(remote_hash, remote_timebase)
else:
remote_str = f"peer {remote_str}"
messages = data[1]
RNS.log(f"Received {len(messages)} messages from {remote_str}", RNS.LOG_VERBOSE)
for lxmf_data in messages:
peer = None
transient_id = RNS.Identity.full_hash(lxmf_data)
if remote_hash != None and remote_hash in self.peers:
peer = self.peers[remote_hash]
peer.incoming += 1
peer.rx_bytes += len(lxmf_data)
self.lxmf_propagation(lxmf_data, from_peer=peer)
if peer != None:
@ -1837,7 +1916,7 @@ class LXMRouter:
msg_file.write(lxmf_data)
msg_file.close()
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG)
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME)
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []]
self.enqueue_peer_distribution(transient_id, from_peer)

View File

@ -140,6 +140,24 @@ def apply_config():
else:
active_configuration["prioritised_lxmf_destinations"] = []
if "propagation" in lxmd_config and "static_peers" in lxmd_config["propagation"]:
static_peers = lxmd_config["propagation"].as_list("static_peers")
active_configuration["static_peers"] = []
for static_peer in static_peers:
active_configuration["static_peers"].append(bytes.fromhex(static_peer))
else:
active_configuration["static_peers"] = []
if "propagation" in lxmd_config and "max_peers" in lxmd_config["propagation"]:
active_configuration["max_peers"] = lxmd_config["propagation"].as_int("max_peers")
else:
active_configuration["max_peers"] = None
if "propagation" in lxmd_config and "from_static_only" in lxmd_config["propagation"]:
active_configuration["from_static_only"] = lxmd_config["propagation"].as_bool("from_static_only")
else:
active_configuration["from_static_only"] = False
# Load various settings
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
targetloglevel = lxmd_config["logging"].as_int("loglevel")
@ -305,7 +323,10 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
)
max_peers = active_configuration["max_peers"],
static_peers = active_configuration["static_peers"],
from_static_only = active_configuration["from_static_only"])
message_router.register_delivery_callback(lxmf_delivery)
for destination_hash in active_configuration["ignored_lxmf_destinations"]:
@ -362,13 +383,13 @@ def jobs():
try:
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_VERBOSE)
message_router.announce(lxmf_destination.hash)
last_peer_announce = time.time()
if "node_announce_interval" in active_configuration and active_configuration["node_announce_interval"] != None:
if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_VERBOSE)
message_router.announce_propagation_node()
last_node_announce = time.time()
@ -381,7 +402,7 @@ def deferred_start_jobs():
global active_configuration, last_peer_announce, last_node_announce
global message_router, lxmf_destination
time.sleep(DEFFERED_JOBS_DELAY)
RNS.log("Running deferred start jobs")
RNS.log("Running deferred start jobs", RNS.LOG_DEBUG)
if active_configuration["peer_announce_at_start"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
message_router.announce(lxmf_destination.hash)