Propagation node stamp cost handling

This commit is contained in:
Mark Qvist 2025-10-30 14:08:39 +01:00
parent b572723a5e
commit 9beeafb0c8
3 changed files with 102 additions and 58 deletions

View file

@ -70,6 +70,16 @@ class LXMPeer:
except: peer.propagation_sync_limit = peer.propagation_transfer_limit
else: peer.propagation_sync_limit = peer.propagation_transfer_limit
if "propagation_stamp_cost" in dictionary:
try: peer.propagation_stamp_cost = int(dictionary["propagation_stamp_cost"])
except: peer.propagation_stamp_cost = None
else: peer.propagation_stamp_cost = None
if "propagation_stamp_cost_flexibility" in dictionary:
try: peer.propagation_stamp_cost_flexibility = int(dictionary["propagation_stamp_cost_flexibility"])
except: peer.propagation_stamp_cost_flexibility = None
else: peer.propagation_stamp_cost_flexibility = None
if "sync_strategy" in dictionary:
try: peer.sync_strategy = int(dictionary["sync_strategy"])
except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY
@ -118,9 +128,11 @@ class LXMPeer:
dictionary["sync_transfer_rate"] = self.sync_transfer_rate
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
dictionary["propagation_sync_limit"] = self.propagation_sync_limit
dictionary["propagation_stamp_cost"] = self.propagation_stamp_cost
dictionary["propagation_stamp_cost_flexibility"] = self.propagation_stamp_cost_flexibility
dictionary["sync_strategy"] = self.sync_strategy
dictionary["last_sync_attempt"] = self.last_sync_attempt
dictionary["offered"] = self.offered
dictionary["offered"] = self.offered
dictionary["outgoing"] = self.outgoing
dictionary["incoming"] = self.incoming
dictionary["rx_bytes"] = self.rx_bytes
@ -153,11 +165,14 @@ class LXMPeer:
self.peering_timebase = 0
self.link_establishment_rate = 0
self.sync_transfer_rate = 0
self.propagation_transfer_limit = None
self.propagation_sync_limit = None
self.currently_transferring_messages = None
self.handled_messages_queue = deque()
self.unhandled_messages_queue = deque()
self.propagation_transfer_limit = None
self.propagation_sync_limit = None
self.propagation_stamp_cost = None
self.propagation_stamp_cost_flexibility = None
self.currently_transferring_messages = None
self.handled_messages_queue = deque()
self.unhandled_messages_queue = deque()
self.offered = 0 # Messages offered to this peer
self.outgoing = 0 # Messages transferred to this peer

View file

@ -44,6 +44,9 @@ class LXMRouter:
ROTATION_HEADROOM_PCT = 10
ROTATION_AR_MAX = 0.5
PROPAGATION_COST = 12
PROPAGATION_COST_MIN = 10
PROPAGATION_COST_FLEX = 3
PROPAGATION_LIMIT = 256
SYNC_LIMIT = PROPAGATION_LIMIT*40
DELIVERY_LIMIT = 1000
@ -76,7 +79,8 @@ class LXMRouter:
def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None,
propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT,
enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None,
from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT):
from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT,
propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX):
random.seed(os.urandom(10))
@ -101,8 +105,7 @@ class LXMRouter:
self.propagation_node = False
self.propagation_node_start_time = None
if storagepath == None:
raise ValueError("LXMF cannot be initialised without a storage path")
if storagepath == None: raise ValueError("LXMF cannot be initialised without a storage path")
else:
self.storagepath = storagepath+"/lxmf"
self.ratchetpath = self.storagepath+"/ratchets"
@ -117,6 +120,8 @@ class LXMRouter:
self.propagation_per_transfer_limit = propagation_limit
self.propagation_per_sync_limit = sync_limit
self.delivery_per_transfer_limit = delivery_limit
self.propagation_stamp_cost = propagation_cost
self.propagation_stamp_cost_flexibility = propagation_cost_flexibility
self.enforce_ratchets = enforce_ratchets
self._enforce_stamps = enforce_stamps
self.pending_deferred_stamps = {}
@ -153,34 +158,24 @@ class LXMRouter:
self.unpeered_propagation_incoming = 0
self.unpeered_propagation_rx_bytes = 0
if autopeer != None:
self.autopeer = autopeer
else:
self.autopeer = LXMRouter.AUTOPEER
if autopeer != None: self.autopeer = autopeer
else: self.autopeer = LXMRouter.AUTOPEER
if autopeer_maxdepth != None:
self.autopeer_maxdepth = autopeer_maxdepth
else:
self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH
if autopeer_maxdepth != None: self.autopeer_maxdepth = autopeer_maxdepth
else: self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH
if max_peers == None:
self.max_peers = LXMRouter.MAX_PEERS
if max_peers == None: self.max_peers = LXMRouter.MAX_PEERS
else:
if type(max_peers) == int and max_peers >= 0:
self.max_peers = max_peers
else:
raise ValueError(f"Invalid value for max_peers: {max_peers}")
if type(max_peers) == int and max_peers >= 0: self.max_peers = max_peers
else: raise ValueError(f"Invalid value for max_peers: {max_peers}")
self.from_static_only = from_static_only
if type(static_peers) != list:
raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}")
if type(static_peers) != list: raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}")
else:
for static_peer in static_peers:
if type(static_peer) != bytes:
raise ValueError(f"Invalid static peer destination hash: {static_peer}")
if type(static_peer) != bytes: raise ValueError(f"Invalid static peer destination hash: {static_peer}")
else:
if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
raise ValueError(f"Invalid static peer destination hash: {static_peer}")
if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Invalid static peer destination hash: {static_peer}")
self.static_peers = static_peers
@ -288,11 +283,12 @@ class LXMRouter:
def delayed_announce():
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
node_state = self.propagation_node and not self.from_static_only
stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility]
announce_data = [
node_state, # Boolean flag signalling propagation node state
int(time.time()), # Current node timebase
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
None, # How many more inbound peers this node wants
stamp_cost, # Propagation stamp cost for this node
self.propagation_per_sync_limit, # Limit for incoming propagation node syncs
]
@ -716,6 +712,8 @@ class LXMRouter:
"str": int(peer.sync_transfer_rate),
"transfer_limit": peer.propagation_transfer_limit,
"sync_limit": peer.propagation_sync_limit,
"target_stamp_cost": peer.propagation_stamp_cost,
"stamp_cost_flexibility": peer.propagation_stamp_cost_flexibility,
"network_distance": RNS.Transport.hops_to(peer_id),
"rx_bytes": peer.rx_bytes,
"tx_bytes": peer.tx_bytes,
@ -734,6 +732,8 @@ class LXMRouter:
"delivery_limit": self.delivery_per_transfer_limit,
"propagation_limit": self.propagation_per_transfer_limit,
"sync_limit": self.propagation_per_sync_limit,
"target_stamp_cost": self.propagation_stamp_cost,
"stamp_cost_flexibility": self.propagation_stamp_cost_flexibility,
"autopeer_maxdepth": self.autopeer_maxdepth,
"from_static_only": self.from_static_only,
"messagestore": {
@ -757,12 +757,9 @@ class LXMRouter:
return node_stats
def stats_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
elif remote_identity.hash != self.identity.hash:
return LXMPeer.ERROR_NO_ACCESS
else:
return self.compile_stats()
if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY
elif remote_identity.hash != self.identity.hash: return LXMPeer.ERROR_NO_ACCESS
else: return self.compile_stats()
### Utility & Maintenance #############################

View file

@ -150,6 +150,20 @@ def apply_config():
else:
active_configuration["propagation_sync_max_accepted_size"] = 256*40
if "propagation" in lxmd_config and "propagation_stamp_cost_target" in lxmd_config["propagation"]:
active_configuration["propagation_stamp_cost_target"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_target")
if active_configuration["propagation_stamp_cost_target"] < LXMF.LXMRouter.PROPAGATION_COST_MIN:
active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST_MIN
else:
active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST
if "propagation" in lxmd_config and "propagation_stamp_cost_flexibility" in lxmd_config["propagation"]:
active_configuration["propagation_stamp_cost_flexibility"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_flexibility")
if active_configuration["propagation_stamp_cost_flexibility"] < 0:
active_configuration["propagation_stamp_cost_flexibility"] = 0
else:
active_configuration["propagation_stamp_cost_flexibility"] = LXMF.LXMRouter.PROPAGATION_COST_FLEX
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
else:
@ -337,6 +351,8 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
autopeer = active_configuration["autopeer"],
autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
propagation_cost = active_configuration["propagation_stamp_cost_target"],
propagation_cost_flexibility = active_configuration["propagation_stamp_cost_flexibility"],
sync_limit = active_configuration["propagation_sync_max_accepted_size"],
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
max_peers = active_configuration["max_peers"],
@ -557,13 +573,15 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness =
print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}")
if show_status:
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"])
ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"])
mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"]
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"]
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit")
print(f"Required propagation stamp cost is {psc}, flexibility is {scf}")
print(f"")
print(f"Peers : {stp} total (peer limit is {smp})")
print(f" {sdp} discovered, {ssp} static")
@ -690,24 +708,6 @@ autopeer = yes
autopeer_maxdepth = 4
# The maximum accepted transfer size per in-
# coming propagation message, in kilobytes.
# This sets the upper limit for the size of
# single messages accepted onto this node.
propagation_message_max_accepted_size = 256
# The maximum accepted transfer size per in-
# coming propagation node sync.
#
# If a node wants to propagate a larger number
# of messages to this node, than what can fit
# within this limit, it will prioritise sending
# the smallest messages first, and try again
# with any remaining messages at a later point.
propagation_sync_max_accepted_size = 256
# The maximum amount of storage to use for
# the LXMF Propagation Node message store,
# specified in megabytes. When this limit
@ -720,6 +720,38 @@ propagation_sync_max_accepted_size = 256
# message_storage_limit = 500
# The maximum accepted transfer size per in-
# coming propagation message, in kilobytes.
# This sets the upper limit for the size of
# single messages accepted onto this node.
# propagation_message_max_accepted_size = 256
# The maximum accepted transfer size per in-
# coming propagation node sync.
#
# If a node wants to propagate a larger number
# of messages to this node, than what can fit
# within this limit, it will prioritise sending
# the smallest messages first, and try again
# with any remaining messages at a later point.
# propagation_sync_max_accepted_size = 10240
# You can configure the target stamp cost
# required to deliver messages via this node.
# propagation_stamp_cost_target = 12
# If set higher than 0, the stamp cost flexi-
# bility option will make this node accept
# messages with a lower stamp cost than the
# target from other propagation nodes (but
# not from peers directly). This allows the
# network to gradually adjust stamp cost.
# propagation_stamp_cost_flexibility = 3
# You can tell the LXMF message router to
# prioritise storage for one or more
# destinations. If the message store reaches