Added propagation transfer limit options

This commit is contained in:
Mark Qvist 2024-03-01 22:37:54 +01:00
parent c7489dc0fa
commit 64050d39bf
4 changed files with 160 additions and 48 deletions

View File

@ -37,7 +37,10 @@ class LXMFPropagationAnnounceHandler:
node_timebase = data[1]
propagation_transfer_limit = None
if len(data) >= 3:
propagation_transfer_limit = data[2]
try:
propagation_transfer_limit = float(data[2])
except:
propagation_transfer_limit = None
if data[0] == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:

View File

@ -48,6 +48,14 @@ class LXMPeer:
else:
peer.link_establishment_rate = 0
if "propagation_transfer_limit" in dictionary:
try:
peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"])
except Exception as e:
peer.propagation_transfer_limit = None
else:
peer.propagation_transfer_limit = None
for transient_id in dictionary["handled_ids"]:
if transient_id in router.propagation_entries:
peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
@ -65,6 +73,7 @@ class LXMPeer:
dictionary["last_heard"] = self.last_heard
dictionary["destination_hash"] = self.destination_hash
dictionary["link_establishment_rate"] = self.link_establishment_rate
dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit
handled_ids = []
for transient_id in self.handled_messages:
@ -87,12 +96,14 @@ class LXMPeer:
self.sync_backoff = 0
self.peering_timebase = 0
self.link_establishment_rate = 0
self.propagation_transfer_limit = None
self.link = None
self.state = LXMPeer.IDLE
self.unhandled_messages = {}
self.handled_messages = {}
self.last_offer = []
self.router = router
self.destination_hash = destination_hash
@ -133,11 +144,17 @@ class LXMPeer:
self.sync_backoff = 0
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
unhandled_entries = []
unhandled_ids = []
purged_ids = []
for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries:
unhandled_ids.append(transient_id)
unhandled_entry = [
transient_id,
self.router.get_weight(transient_id),
self.router.get_size(transient_id),
]
unhandled_entries.append(unhandled_entry)
else:
purged_ids.append(transient_id)
@ -145,8 +162,21 @@ class LXMPeer:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.unhandled_messages.pop(transient_id)
unhandled_entries.sort(key=lambda e: e[1], reverse=False)
cumulative_size = 0
for unhandled_entry in unhandled_entries:
transient_id = unhandled_entry[0]
weight = unhandled_entry[1]
lxm_size = unhandled_entry[2]
if self.propagation_transfer_limit != None and cumulative_size + lxm_size > (self.propagation_transfer_limit*1000):
pass
else:
cumulative_size += lxm_size
unhandled_ids.append(transient_id)
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.last_offer = unhandled_ids
self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT
else:
@ -175,33 +205,31 @@ class LXMPeer:
if response == LXMPeer.ERROR_NO_IDENTITY:
if self.link != None:
RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG)
self.link.indentify()
self.link.identify()
self.state = LXMPeer.LINK_READY
self.sync()
elif response == False:
# Peer already has all advertised messages
for transient_id in self.unhandled_messages:
message_entry = self.unhandled_messages[transient_id]
self.handled_messages[transient_id] = message_entry
self.unhandled_messages = {}
for transient_id in self.last_offer:
if transient_id in self.unhandled_messages:
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
elif response == True:
# Peer wants all advertised messages
for transient_id in self.unhandled_messages:
for transient_id in self.last_offer:
wanted_messages.append(self.unhandled_messages[transient_id])
wanted_message_ids.append(transient_id)
else:
# Peer wants some advertised messages
peer_had_messages = []
for transient_id in self.unhandled_messages.copy():
for transient_id in self.last_offer.copy():
# If the peer did not want the message, it has
# already received it from another peer.
if not transient_id in response:
message_entry = self.unhandled_messages.pop(transient_id)
self.handled_messages[transient_id] = message_entry
if transient_id in self.unhandled_messages:
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
for transient_id in response:
wanted_messages.append(self.unhandled_messages[transient_id])

View File

@ -31,30 +31,35 @@ class LXMRouter:
AUTOPEER_MAXDEPTH = 4
FASTEST_N_RANDOM_POOL = 2
PR_PATH_TIMEOUT = 10
PROPAGATION_LIMIT = 256
DELIVERY_LIMIT = 1024
PR_IDLE = 0x00
PR_PATH_REQUESTED = 0x01
PR_LINK_ESTABLISHING = 0x02
PR_LINK_ESTABLISHED = 0x03
PR_REQUEST_SENT = 0x04
PR_RECEIVING = 0x05
PR_RESPONSE_RECEIVED = 0x06
PR_COMPLETE = 0x07
PR_NO_PATH = 0xf0
PR_LINK_FAILED = 0xf1
PR_TRANSFER_FAILED = 0xf2
PR_NO_IDENTITY_RCVD = 0xf3
PR_NO_ACCESS = 0xf4
PR_FAILED = 0xfe
PR_PATH_TIMEOUT = 10
PR_ALL_MESSAGES = 0x00
PR_IDLE = 0x00
PR_PATH_REQUESTED = 0x01
PR_LINK_ESTABLISHING = 0x02
PR_LINK_ESTABLISHED = 0x03
PR_REQUEST_SENT = 0x04
PR_RECEIVING = 0x05
PR_RESPONSE_RECEIVED = 0x06
PR_COMPLETE = 0x07
PR_NO_PATH = 0xf0
PR_LINK_FAILED = 0xf1
PR_TRANSFER_FAILED = 0xf2
PR_NO_IDENTITY_RCVD = 0xf3
PR_NO_ACCESS = 0xf4
PR_FAILED = 0xfe
PR_ALL_MESSAGES = 0x00
### Developer-facing API ##############################
#######################################################
def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None):
def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None,
propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT):
random.seed(os.urandom(10))
self.pending_inbound = []
@ -84,6 +89,8 @@ class LXMRouter:
self.message_storage_limit = None
self.information_storage_limit = None
self.propagation_per_transfer_limit = propagation_limit
self.delivery_per_transfer_limit = delivery_limit
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
@ -152,7 +159,13 @@ class LXMRouter:
def announce_propagation_node(self):
def delayed_announce():
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
data = msgpack.packb([self.propagation_node, int(time.time())])
announce_data = [
self.propagation_node, # Boolean flag signalling propagation node state
int(time.time()), # Current node timebase
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
]
data = msgpack.packb(announce_data)
self.propagation_destination.announce(app_data=data)
da_thread = threading.Thread(target=delayed_announce)
@ -319,7 +332,10 @@ class LXMRouter:
peer = LXMPeer.from_bytes(serialised_peer, self)
if peer.identity != None:
self.peers[peer.destination_hash] = peer
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages", RNS.LOG_DEBUG)
lim_str = ", no transfer limit"
if peer.propagation_transfer_limit != None:
lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit"
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG)
else:
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
@ -522,6 +538,28 @@ class LXMRouter:
self.locally_processed_transient_ids.pop(transient_id)
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from locally processed cache", RNS.LOG_DEBUG)
def get_weight(self, transient_id):
dst_hash = self.propagation_entries[transient_id][0]
lxm_rcvd = self.propagation_entries[transient_id][2]
lxm_size = self.propagation_entries[transient_id][3]
now = time.time()
age_weight = max(1, (now - lxm_rcvd)/60/60/24/4)
if dst_hash in self.prioritised_list:
priority_weight = 0.1
else:
priority_weight = 1.0
weight = priority_weight * age_weight * lxm_size
return weight
def get_size(self, transient_id):
lxm_size = self.propagation_entries[transient_id][3]
return lxm_size
def clean_message_store(self):
# Check and remove expired messages
now = time.time()
@ -563,22 +601,13 @@ class LXMRouter:
bytes_needed = message_storage_size - self.message_storage_limit
bytes_cleaned = 0
now = time.time()
weighted_entries = []
for transient_id in self.propagation_entries:
entry = self.propagation_entries[transient_id]
dst_hash = entry[0]
lxm_rcvd = entry[2]
lxm_size = entry[3]
age_weight = max(1, (now - lxm_rcvd)/60/60/24/4)
if dst_hash in self.prioritised_list:
priority_weight = 0.1
else:
priority_weight = 1.0
weight = priority_weight * age_weight * lxm_size
weighted_entries.append([entry, weight, transient_id])
weighted_entries.append([
self.propagation_entries[transient_id],
self.get_weight(transient_id),
transient_id
])
weighted_entries.sort(key=lambda we: we[1], reverse=True)
@ -961,7 +990,7 @@ class LXMRouter:
### Peer Sync & Propagation ###########################
#######################################################
def peer(self, destination_hash, timestamp):
def peer(self, destination_hash, timestamp, propagation_transfer_limit):
if destination_hash in self.peers:
peer = self.peers[destination_hash]
if timestamp > peer.peering_timebase:
@ -970,11 +999,13 @@ class LXMRouter:
peer.next_sync_attempt = 0
peer.peering_timebase = timestamp
peer.last_heard = time.time()
peer.propagation_transfer_limit = propagation_transfer_limit
else:
peer = LXMPeer(self, destination_hash)
peer.alive = True
peer.last_heard = time.time()
peer.propagation_transfer_limit = propagation_transfer_limit
self.peers[destination_hash] = peer
RNS.log("Peered with "+str(peer.destination))

View File

@ -77,6 +77,13 @@ def apply_config():
active_configuration["peer_announce_interval"] = lxmd_config["lxmf"].as_int("announce_interval")*60
else:
active_configuration["peer_announce_interval"] = None
if "lxmf" in lxmd_config and "delivery_transfer_max_accepted_size" in lxmd_config["lxmf"]:
active_configuration["delivery_transfer_max_accepted_size"] = lxmd_config["lxmf"].as_float("delivery_transfer_max_accepted_size")
if active_configuration["delivery_transfer_max_accepted_size"] < 0.38:
active_configuration["delivery_transfer_max_accepted_size"] = 0.38
else:
active_configuration["delivery_transfer_max_accepted_size"] = 1024
if "lxmf" in lxmd_config and "on_inbound" in lxmd_config["lxmf"]:
active_configuration["on_inbound"] = lxmd_config["lxmf"]["on_inbound"]
@ -121,6 +128,13 @@ def apply_config():
else:
active_configuration["message_storage_limit"] = 2000
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
if active_configuration["propagation_transfer_max_accepted_size"] < 0.38:
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
else:
active_configuration["propagation_transfer_max_accepted_size"] = 256
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
else:
@ -289,6 +303,8 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
storagepath = storagedir,
autopeer = active_configuration["autopeer"],
autopeer_maxdepth = active_configuration["autopeer_maxdepth"],
propagation_limit = active_configuration["propagation_transfer_max_accepted_size"],
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
)
message_router.register_delivery_callback(lxmf_delivery)
@ -418,23 +434,41 @@ __default_lxmd_config__ = """# This is an example LXM Daemon config file.
[propagation]
# Whether to enable propagation node
enable_node = no
# Automatic announce interval in minutes.
# 6 hours by default.
announce_interval = 360
# Whether to announce when the node starts.
announce_at_start = yes
# Wheter to automatically peer with other
# propagation nodes on the network.
autopeer = yes
# The maximum peering depth (in hops) for
# automatically peered nodes.
autopeer_maxdepth = 4
# The maximum accepted transfer size per in-
# coming propagation transfer, in kilobytes.
# This also sets the upper limit for the size
# of single messages accepted onto this node.
#
# If a node wants to propagate a larger number
# of messages to this node, than what can fit
# within this limit, it will prioritise sending
# the smallest messages first, and try again
# with any remaining messages at a later point.
propagation_transfer_max_accepted_size = 256
# The maximum amount of storage to use for
# the LXMF Propagation Node message store,
# specified in megabytes. When this limit
@ -444,6 +478,7 @@ autopeer_maxdepth = 4
# new and small. Large and old messages will
# be removed first. This setting is optional
# and defaults to 2 gigabytes.
# message_storage_limit = 2000
# You can tell the LXMF message router to
@ -453,6 +488,7 @@ autopeer_maxdepth = 4
# keeping messages for destinations specified
# with this option. This setting is optional,
# and generally you do not need to use it.
# prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
# By default, any destination is allowed to
@ -461,6 +497,7 @@ autopeer_maxdepth = 4
# authentication, you must provide a list of
# allowed identity hashes in the a file named
# "allowed" in the lxmd config directory.
auth_required = no
@ -469,23 +506,35 @@ auth_required = no
# The LXM Daemon will create an LXMF destination
# that it can receive messages on. This option sets
# the announced display name for this destination.
display_name = Anonymous Peer
# It is possible to announce the internal LXMF
# destination when the LXM Daemon starts up.
announce_at_start = no
# You can also announce the delivery destination
# at a specified interval. This is not enabled by
# default.
# announce_interval = 360
# The maximum accepted unpacked size for mes-
# sages received directly from other peers,
# specified in kilobytes. Messages larger than
# this will be rejected before the transfer
# begins.
delivery_transfer_max_accepted_size = 1024
# You can configure an external program to be run
# every time a message is received. The program
# will receive as an argument the full path to the
# message saved as a file. The example below will
# simply result in the message getting deleted as
# soon as it has been received.
# on_inbound = rm
@ -499,6 +548,7 @@ announce_at_start = no
# 5: Verbose logging
# 6: Debug logging
# 7: Extreme logging
loglevel = 4
"""