diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index 3a9fc00..9f2519d 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -70,6 +70,16 @@ class LXMPeer: except: peer.propagation_sync_limit = peer.propagation_transfer_limit else: peer.propagation_sync_limit = peer.propagation_transfer_limit + if "propagation_stamp_cost" in dictionary: + try: peer.propagation_stamp_cost = int(dictionary["propagation_stamp_cost"]) + except: peer.propagation_stamp_cost = None + else: peer.propagation_stamp_cost = None + + if "propagation_stamp_cost_flexibility" in dictionary: + try: peer.propagation_stamp_cost_flexibility = int(dictionary["propagation_stamp_cost_flexibility"]) + except: peer.propagation_stamp_cost_flexibility = None + else: peer.propagation_stamp_cost_flexibility = None + if "sync_strategy" in dictionary: try: peer.sync_strategy = int(dictionary["sync_strategy"]) except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY @@ -118,9 +128,11 @@ class LXMPeer: dictionary["sync_transfer_rate"] = self.sync_transfer_rate dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit dictionary["propagation_sync_limit"] = self.propagation_sync_limit + dictionary["propagation_stamp_cost"] = self.propagation_stamp_cost + dictionary["propagation_stamp_cost_flexibility"] = self.propagation_stamp_cost_flexibility dictionary["sync_strategy"] = self.sync_strategy dictionary["last_sync_attempt"] = self.last_sync_attempt - dictionary["offered"] = self.offered + dictionary["offered"] = self.offered dictionary["outgoing"] = self.outgoing dictionary["incoming"] = self.incoming dictionary["rx_bytes"] = self.rx_bytes @@ -153,11 +165,14 @@ class LXMPeer: self.peering_timebase = 0 self.link_establishment_rate = 0 self.sync_transfer_rate = 0 - self.propagation_transfer_limit = None - self.propagation_sync_limit = None - self.currently_transferring_messages = None - self.handled_messages_queue = deque() - self.unhandled_messages_queue = deque() + + self.propagation_transfer_limit = None + self.propagation_sync_limit = None + self.propagation_stamp_cost = None + self.propagation_stamp_cost_flexibility = None + self.currently_transferring_messages = None + self.handled_messages_queue = deque() + self.unhandled_messages_queue = deque() self.offered = 0 # Messages offered to this peer self.outgoing = 0 # Messages transferred to this peer diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 30abaa6..5c02cf0 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -44,6 +44,9 @@ class LXMRouter: ROTATION_HEADROOM_PCT = 10 ROTATION_AR_MAX = 0.5 + PROPAGATION_COST = 12 + PROPAGATION_COST_MIN = 10 + PROPAGATION_COST_FLEX = 3 PROPAGATION_LIMIT = 256 SYNC_LIMIT = PROPAGATION_LIMIT*40 DELIVERY_LIMIT = 1000 @@ -76,7 +79,8 @@ class LXMRouter: def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None, propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT, enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None, - from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT): + from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT, + propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX): random.seed(os.urandom(10)) @@ -101,8 +105,7 @@ class LXMRouter: self.propagation_node = False self.propagation_node_start_time = None - if storagepath == None: - raise ValueError("LXMF cannot be initialised without a storage path") + if storagepath == None: raise ValueError("LXMF cannot be initialised without a storage path") else: self.storagepath = storagepath+"/lxmf" self.ratchetpath = self.storagepath+"/ratchets" @@ -117,6 +120,8 @@ class LXMRouter: self.propagation_per_transfer_limit = propagation_limit self.propagation_per_sync_limit = sync_limit self.delivery_per_transfer_limit = delivery_limit + self.propagation_stamp_cost = propagation_cost + self.propagation_stamp_cost_flexibility = propagation_cost_flexibility self.enforce_ratchets = enforce_ratchets self._enforce_stamps = enforce_stamps self.pending_deferred_stamps = {} @@ -153,34 +158,24 @@ class LXMRouter: self.unpeered_propagation_incoming = 0 self.unpeered_propagation_rx_bytes = 0 - if autopeer != None: - self.autopeer = autopeer - else: - self.autopeer = LXMRouter.AUTOPEER + if autopeer != None: self.autopeer = autopeer + else: self.autopeer = LXMRouter.AUTOPEER - if autopeer_maxdepth != None: - self.autopeer_maxdepth = autopeer_maxdepth - else: - self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH + if autopeer_maxdepth != None: self.autopeer_maxdepth = autopeer_maxdepth + else: self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH - if max_peers == None: - self.max_peers = LXMRouter.MAX_PEERS + if max_peers == None: self.max_peers = LXMRouter.MAX_PEERS else: - if type(max_peers) == int and max_peers >= 0: - self.max_peers = max_peers - else: - raise ValueError(f"Invalid value for max_peers: {max_peers}") + if type(max_peers) == int and max_peers >= 0: self.max_peers = max_peers + else: raise ValueError(f"Invalid value for max_peers: {max_peers}") self.from_static_only = from_static_only - if type(static_peers) != list: - raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}") + if type(static_peers) != list: raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}") else: for static_peer in static_peers: - if type(static_peer) != bytes: - raise ValueError(f"Invalid static peer destination hash: {static_peer}") + if type(static_peer) != bytes: raise ValueError(f"Invalid static peer destination hash: {static_peer}") else: - if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: - raise ValueError(f"Invalid static peer destination hash: {static_peer}") + if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Invalid static peer destination hash: {static_peer}") self.static_peers = static_peers @@ -288,11 +283,12 @@ class LXMRouter: def delayed_announce(): time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) node_state = self.propagation_node and not self.from_static_only + stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility] announce_data = [ node_state, # Boolean flag signalling propagation node state int(time.time()), # Current node timebase self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes - None, # How many more inbound peers this node wants + stamp_cost, # Propagation stamp cost for this node self.propagation_per_sync_limit, # Limit for incoming propagation node syncs ] @@ -716,6 +712,8 @@ class LXMRouter: "str": int(peer.sync_transfer_rate), "transfer_limit": peer.propagation_transfer_limit, "sync_limit": peer.propagation_sync_limit, + "target_stamp_cost": peer.propagation_stamp_cost, + "stamp_cost_flexibility": peer.propagation_stamp_cost_flexibility, "network_distance": RNS.Transport.hops_to(peer_id), "rx_bytes": peer.rx_bytes, "tx_bytes": peer.tx_bytes, @@ -734,6 +732,8 @@ class LXMRouter: "delivery_limit": self.delivery_per_transfer_limit, "propagation_limit": self.propagation_per_transfer_limit, "sync_limit": self.propagation_per_sync_limit, + "target_stamp_cost": self.propagation_stamp_cost, + "stamp_cost_flexibility": self.propagation_stamp_cost_flexibility, "autopeer_maxdepth": self.autopeer_maxdepth, "from_static_only": self.from_static_only, "messagestore": { @@ -757,12 +757,9 @@ class LXMRouter: return node_stats def stats_get_request(self, path, data, request_id, remote_identity, requested_at): - if remote_identity == None: - return LXMPeer.ERROR_NO_IDENTITY - elif remote_identity.hash != self.identity.hash: - return LXMPeer.ERROR_NO_ACCESS - else: - return self.compile_stats() + if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY + elif remote_identity.hash != self.identity.hash: return LXMPeer.ERROR_NO_ACCESS + else: return self.compile_stats() ### Utility & Maintenance ############################# diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index e49bd7a..03d1282 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -150,6 +150,20 @@ def apply_config(): else: active_configuration["propagation_sync_max_accepted_size"] = 256*40 + if "propagation" in lxmd_config and "propagation_stamp_cost_target" in lxmd_config["propagation"]: + active_configuration["propagation_stamp_cost_target"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_target") + if active_configuration["propagation_stamp_cost_target"] < LXMF.LXMRouter.PROPAGATION_COST_MIN: + active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST_MIN + else: + active_configuration["propagation_stamp_cost_target"] = LXMF.LXMRouter.PROPAGATION_COST + + if "propagation" in lxmd_config and "propagation_stamp_cost_flexibility" in lxmd_config["propagation"]: + active_configuration["propagation_stamp_cost_flexibility"] = lxmd_config["propagation"].as_int("propagation_stamp_cost_flexibility") + if active_configuration["propagation_stamp_cost_flexibility"] < 0: + active_configuration["propagation_stamp_cost_flexibility"] = 0 + else: + active_configuration["propagation_stamp_cost_flexibility"] = LXMF.LXMRouter.PROPAGATION_COST_FLEX + if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") else: @@ -337,6 +351,8 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo autopeer = active_configuration["autopeer"], autopeer_maxdepth = active_configuration["autopeer_maxdepth"], propagation_limit = active_configuration["propagation_transfer_max_accepted_size"], + propagation_cost = active_configuration["propagation_stamp_cost_target"], + propagation_cost_flexibility = active_configuration["propagation_stamp_cost_flexibility"], sync_limit = active_configuration["propagation_sync_max_accepted_size"], delivery_limit = active_configuration["delivery_transfer_max_accepted_size"], max_peers = active_configuration["max_peers"], @@ -557,13 +573,15 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = print(f"\nLXMF Propagation Node running on {dhs}, uptime is {uts}") if show_status: - msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"]) - ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"]) + msb = RNS.prettysize(s["messagestore"]["bytes"]); msl = RNS.prettysize(s["messagestore"]["limit"]) + ptl = RNS.prettysize(s["propagation_limit"]*1000); uprx = RNS.prettysize(s["unpeered_propagation_rx_bytes"]) mscnt = s["messagestore"]["count"]; stp = s["total_peers"]; smp = s["max_peers"]; sdp = s["discovered_peers"] - ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"] - cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"] + ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"] + cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"] + psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"] print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})") print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit") + print(f"Required propagation stamp cost is {psc}, flexibility is {scf}") print(f"") print(f"Peers : {stp} total (peer limit is {smp})") print(f" {sdp} discovered, {ssp} static") @@ -690,24 +708,6 @@ autopeer = yes autopeer_maxdepth = 4 -# The maximum accepted transfer size per in- -# coming propagation message, in kilobytes. -# This sets the upper limit for the size of -# single messages accepted onto this node. - -propagation_message_max_accepted_size = 256 - -# The maximum accepted transfer size per in- -# coming propagation node sync. -# -# If a node wants to propagate a larger number -# of messages to this node, than what can fit -# within this limit, it will prioritise sending -# the smallest messages first, and try again -# with any remaining messages at a later point. - -propagation_sync_max_accepted_size = 256 - # The maximum amount of storage to use for # the LXMF Propagation Node message store, # specified in megabytes. When this limit @@ -720,6 +720,38 @@ propagation_sync_max_accepted_size = 256 # message_storage_limit = 500 +# The maximum accepted transfer size per in- +# coming propagation message, in kilobytes. +# This sets the upper limit for the size of +# single messages accepted onto this node. + +# propagation_message_max_accepted_size = 256 + +# The maximum accepted transfer size per in- +# coming propagation node sync. +# +# If a node wants to propagate a larger number +# of messages to this node, than what can fit +# within this limit, it will prioritise sending +# the smallest messages first, and try again +# with any remaining messages at a later point. + +# propagation_sync_max_accepted_size = 10240 + +# You can configure the target stamp cost +# required to deliver messages via this node. + +# propagation_stamp_cost_target = 12 + +# If set higher than 0, the stamp cost flexi- +# bility option will make this node accept +# messages with a lower stamp cost than the +# target from other propagation nodes (but +# not from peers directly). This allows the +# network to gradually adjust stamp cost. + +# propagation_stamp_cost_flexibility = 3 + # You can tell the LXMF message router to # prioritise storage for one or more # destinations. If the message store reaches