diff --git a/LXMF/Handlers.py b/LXMF/Handlers.py index 5671170..e1dba8e 100644 --- a/LXMF/Handlers.py +++ b/LXMF/Handlers.py @@ -41,19 +41,21 @@ class LXMFPropagationAnnounceHandler: def received_announce(self, destination_hash, announced_identity, app_data): try: if type(app_data) == bytes: - if self.lxmrouter.propagation_node and self.lxmrouter.autopeer: + if self.lxmrouter.propagation_node: data = msgpack.unpackb(app_data) - if pn_announce_data_is_valid(data): node_timebase = data[1] propagation_transfer_limit = None + propagation_sync_limit = None wanted_inbound_peers = None + if len(data) >= 5: + try: propagation_sync_limit = int(data[4]) + except Exception as e: propagation_sync_limit = None + if len(data) >= 4: # TODO: Rethink, probably not necessary anymore - # try: - # wanted_inbound_peers = int(data[3]) - # except: - # wanted_inbound_peers = None + # try: wanted_inbound_peers = int(data[3]) + # except: wanted_inbound_peers = None pass if len(data) >= 3: @@ -61,15 +63,24 @@ class LXMFPropagationAnnounceHandler: except: propagation_transfer_limit = None if destination_hash in self.lxmrouter.static_peers: - self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers) + self.lxmrouter.peer(destination_hash=destination_hash, + timestamp=node_timebase, + propagation_transfer_limit=propagation_transfer_limit, + propagation_sync_limit=propagation_sync_limit, + wanted_inbound_peers=wanted_inbound_peers) else: - if data[0] == True: - if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: - self.lxmrouter.peer(destination_hash, node_timebase, propagation_transfer_limit, wanted_inbound_peers) + if self.lxmrouter.autopeer: + if data[0] == True: + if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: + self.lxmrouter.peer(destination_hash=destination_hash, + timestamp=node_timebase, + propagation_transfer_limit=propagation_transfer_limit, + propagation_sync_limit=propagation_sync_limit, + wanted_inbound_peers=wanted_inbound_peers) - elif data[0] == False: - self.lxmrouter.unpeer(destination_hash, node_timebase) + elif data[0] == False: + self.lxmrouter.unpeer(destination_hash, node_timebase) except Exception as e: RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index c1294bd..41ea69a 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -11,16 +11,20 @@ class LXMPeer: OFFER_REQUEST_PATH = "/offer" MESSAGE_GET_PATH = "/get" - IDLE = 0x00 - LINK_ESTABLISHING = 0x01 - LINK_READY = 0x02 - REQUEST_SENT = 0x03 - RESPONSE_RECEIVED = 0x04 + IDLE = 0x00 + LINK_ESTABLISHING = 0x01 + LINK_READY = 0x02 + REQUEST_SENT = 0x03 + RESPONSE_RECEIVED = 0x04 RESOURCE_TRANSFERRING = 0x05 - ERROR_NO_IDENTITY = 0xf0 - ERROR_NO_ACCESS = 0xf1 - ERROR_TIMEOUT = 0xfe + ERROR_NO_IDENTITY = 0xf0 + ERROR_NO_ACCESS = 0xf1 + ERROR_TIMEOUT = 0xfe + + STRATEGY_LAZY = 0x01 + STRATEGY_PERSISTENT = 0x02 + DEFAULT_SYNC_STRATEGY = STRATEGY_PERSISTENT # Maximum amount of time a peer can # be unreachable before it is removed @@ -67,6 +71,16 @@ class LXMPeer: peer.propagation_transfer_limit = None else: peer.propagation_transfer_limit = None + + if "propagation_sync_limit" in dictionary: + try: peer.propagation_sync_limit = int(dictionary["propagation_sync_limit"]) + except: peer.propagation_sync_limit = peer.propagation_transfer_limit + else: peer.propagation_sync_limit = peer.propagation_transfer_limit + + if "sync_strategy" in dictionary: + try: peer.sync_strategy = int(dictionary["sync_strategy"]) + except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY + else: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY if "offered" in dictionary: peer.offered = dictionary["offered"] @@ -127,6 +141,8 @@ class LXMPeer: dictionary["link_establishment_rate"] = self.link_establishment_rate dictionary["sync_transfer_rate"] = self.sync_transfer_rate dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit + dictionary["propagation_sync_limit"] = self.propagation_sync_limit + dictionary["sync_strategy"] = self.sync_strategy dictionary["last_sync_attempt"] = self.last_sync_attempt dictionary["offered"] = self.offered dictionary["outgoing"] = self.outgoing @@ -150,9 +166,11 @@ class LXMPeer: return peer_bytes - def __init__(self, router, destination_hash): + def __init__(self, router, destination_hash, sync_strategy=DEFAULT_SYNC_STRATEGY): self.alive = False self.last_heard = 0 + self.sync_strategy = sync_strategy + self.next_sync_attempt = 0 self.last_sync_attempt = 0 self.sync_backoff = 0 @@ -160,6 +178,8 @@ class LXMPeer: self.link_establishment_rate = 0 self.sync_transfer_rate = 0 self.propagation_transfer_limit = None + self.propagation_sync_limit = None + self.currently_transferring_messages = None self.handled_messages_queue = deque() self.unhandled_messages_queue = deque() @@ -209,6 +229,10 @@ class LXMPeer: if self.destination != None: if len(self.unhandled_messages) > 0: + if self.currently_transferring_messages != None: + RNS.log(f"Sync requested for {self}, but current message transfer index was not clear. Aborting.", RNS.LOG_ERROR) + return + if self.state == LXMPeer.IDLE: RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG) self.sync_backoff += LXMPeer.SYNC_BACKOFF_STEP @@ -244,21 +268,26 @@ class LXMPeer: unhandled_entries.sort(key=lambda e: e[1], reverse=False) per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now cumulative_size = 24 # Initialised to highest reasonable binary structure overhead + RNS.log(f"Syncing to peer with per-message limit {RNS.prettysize(self.propagation_transfer_limit*1000)} and sync limit {RNS.prettysize(self.propagation_sync_limit*1000)}") # TODO: Remove debug for unhandled_entry in unhandled_entries: transient_id = unhandled_entry[0] weight = unhandled_entry[1] lxm_size = unhandled_entry[2] - next_size = cumulative_size + (lxm_size+per_message_overhead) - if self.propagation_transfer_limit != None and next_size > (self.propagation_transfer_limit*1000): - if lxm_size+per_message_overhead > (self.propagation_transfer_limit*1000): - self.remove_unhandled_message(transient_id) - self.add_handled_message(transient_id) - RNS.log(f"Message {RNS.prettyhexrep(transient_id)} exceeds transfer limit for {self}, considering handled", RNS.LOG_DEBUG) - else: - cumulative_size += (lxm_size+per_message_overhead) - unhandled_ids.append(transient_id) + lxm_transfer_size = lxm_size+per_message_overhead + next_size = cumulative_size + lxm_transfer_size - RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_VERBOSE) + if self.propagation_transfer_limit != None and lxm_transfer_size > (self.propagation_transfer_limit*1000): + self.remove_unhandled_message(transient_id) + self.add_handled_message(transient_id) + continue + + if self.propagation_sync_limit != None and next_size >= (self.propagation_sync_limit*1000): + continue + + cumulative_size += lxm_transfer_size + unhandled_ids.append(transient_id) + + RNS.log(f"Offering {len(unhandled_ids)} messages to peer {RNS.prettyhexrep(self.destination.hash)} ({RNS.prettysize(len(msgpack.packb(unhandled_ids)))})", RNS.LOG_VERBOSE) self.last_offer = unhandled_ids self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) self.state = LXMPeer.REQUEST_SENT @@ -327,7 +356,7 @@ class LXMPeer: wanted_message_ids.append(transient_id) if len(wanted_messages) > 0: - RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_VERBOSE) + RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} wanted {str(len(wanted_messages))} of the available messages", RNS.LOG_VERBOSE) lxm_list = [] for message_entry in wanted_messages: @@ -339,13 +368,14 @@ class LXMPeer: lxm_list.append(lxmf_data) data = msgpack.packb([time.time(), lxm_list]) + RNS.log(f"Total transfer size for this sync is {RNS.prettysize(len(data))}", RNS.LOG_VERBOSE) resource = RNS.Resource(data, self.link, callback = self.resource_concluded) - resource.transferred_messages = wanted_message_ids - resource.sync_transfer_started = time.time() + self.currently_transferring_messages = wanted_message_ids + self.current_sync_transfer_started = time.time() self.state = LXMPeer.RESOURCE_TRANSFERRING else: - RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_VERBOSE) + RNS.log(f"Peer {RNS.prettyhexrep(self.destination_hash)} did not request any of the available messages, sync completed", RNS.LOG_VERBOSE) self.offered += len(self.last_offer) if self.link != None: self.link.teardown() @@ -365,7 +395,13 @@ class LXMPeer: def resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: - for transient_id in resource.transferred_messages: + if self.currently_transferring_messages == None: + RNS.log(f"Sync transfer completed on {self}, but transferred message index was unavailable. Aborting.", RNS.LOG_ERROR) + if self.link != None: self.link.teardown() + self.link = None + self.state = LXMPeer.IDLE + + for transient_id in self.currently_transferring_messages: self.add_handled_message(transient_id) self.remove_unhandled_message(transient_id) @@ -376,24 +412,30 @@ class LXMPeer: self.state = LXMPeer.IDLE rate_str = "" - if hasattr(resource, "sync_transfer_started") and resource.sync_transfer_started: - self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-resource.sync_transfer_started) + if self.current_sync_transfer_started != None: + self.sync_transfer_rate = (resource.get_transfer_size()*8)/(time.time()-self.current_sync_transfer_started) rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}" - RNS.log(f"Syncing {len(resource.transferred_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE) + RNS.log(f"Syncing {len(self.currently_transferring_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE) self.alive = True self.last_heard = time.time() self.offered += len(self.last_offer) - self.outgoing += len(resource.transferred_messages) + self.outgoing += len(self.currently_transferring_messages) self.tx_bytes += resource.get_data_size() + + self.currently_transferring_messages = None + self.current_sync_transfer_started = None + + if self.sync_strategy == self.STRATEGY_PERSISTENT: + if self.unhandled_message_count > 0: self.sync() else: RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE) - if self.link != None: - self.link.teardown() - + if self.link != None: self.link.teardown() self.link = None self.state = LXMPeer.IDLE + self.currently_transferring_messages = None + self.current_sync_transfer_started = None def link_established(self, link): self.link.identify(self.router.identity) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index fe16b05..0ad75b7 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -45,6 +45,7 @@ class LXMRouter: ROTATION_AR_MAX = 0.5 PROPAGATION_LIMIT = 256 + SYNC_LIMIT = PROPAGATION_LIMIT*40 DELIVERY_LIMIT = 1000 PR_PATH_TIMEOUT = 10 @@ -73,8 +74,9 @@ class LXMRouter: ####################################################### def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None, - propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, enforce_ratchets=False, - enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False): + propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT, + enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None, + from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT): random.seed(os.urandom(10)) @@ -91,9 +93,10 @@ class LXMRouter: self.auth_required = False self.retain_synced_on_node = False - self.processing_outbound = False - self.processing_inbound = False - self.processing_count = 0 + self.default_sync_strategy = sync_strategy + self.processing_outbound = False + self.processing_inbound = False + self.processing_count = 0 self.propagation_node = False self.propagation_node_start_time = None @@ -107,17 +110,20 @@ class LXMRouter: self.outbound_propagation_node = None self.outbound_propagation_link = None - if delivery_limit == None: - delivery_limit = LXMRouter.DELIVERY_LIMIT + if delivery_limit == None: delivery_limit = LXMRouter.DELIVERY_LIMIT self.message_storage_limit = None self.information_storage_limit = None self.propagation_per_transfer_limit = propagation_limit + self.propagation_per_sync_limit = sync_limit self.delivery_per_transfer_limit = delivery_limit self.enforce_ratchets = enforce_ratchets self._enforce_stamps = enforce_stamps self.pending_deferred_stamps = {} + if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit: + self.propagation_per_sync_limit = self.propagation_per_transfer_limit + self.wants_download_on_path_available_from = None self.wants_download_on_path_available_to = None self.propagation_transfer_state = LXMRouter.PR_IDLE @@ -287,6 +293,7 @@ class LXMRouter: int(time.time()), # Current node timebase self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes None, # How many more inbound peers this node wants + self.propagation_per_sync_limit, # Limit for incoming propagation node syncs ] data = msgpack.packb(announce_data) @@ -546,7 +553,7 @@ class LXMRouter: for static_peer in self.static_peers: if not static_peer in self.peers: RNS.log(f"Activating static peering with {RNS.prettyhexrep(static_peer)}", RNS.LOG_NOTICE) - self.peers[static_peer] = LXMPeer(self, static_peer) + self.peers[static_peer] = LXMPeer(self, static_peer, sync_strategy=self.default_sync_strategy) if self.peers[static_peer].last_heard == 0: # TODO: Allow path request responses through announce handler # momentarily here, so peering config can be updated even if @@ -708,6 +715,7 @@ class LXMRouter: "ler": int(peer.link_establishment_rate), "str": int(peer.sync_transfer_rate), "transfer_limit": peer.propagation_transfer_limit, + "sync_limit": peer.propagation_sync_limit, "network_distance": RNS.Transport.hops_to(peer_id), "rx_bytes": peer.rx_bytes, "tx_bytes": peer.tx_bytes, @@ -725,6 +733,7 @@ class LXMRouter: "uptime": time.time()-self.propagation_node_start_time, "delivery_limit": self.delivery_per_transfer_limit, "propagation_limit": self.propagation_per_transfer_limit, + "sync_limit": self.propagation_per_sync_limit, "autopeer_maxdepth": self.autopeer_maxdepth, "from_static_only": self.from_static_only, "messagestore": { @@ -1777,7 +1786,7 @@ class LXMRouter: ### Peer Sync & Propagation ########################### ####################################################### - def peer(self, destination_hash, timestamp, propagation_transfer_limit, wanted_inbound_peers = None): + def peer(self, destination_hash, timestamp, propagation_transfer_limit, propagation_sync_limit, wanted_inbound_peers = None): if destination_hash in self.peers: peer = self.peers[destination_hash] if timestamp > peer.peering_timebase: @@ -1787,16 +1796,23 @@ class LXMRouter: peer.peering_timebase = timestamp peer.last_heard = time.time() peer.propagation_transfer_limit = propagation_transfer_limit + if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit + else: peer.propagation_sync_limit = propagation_transfer_limit + RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE) else: if len(self.peers) < self.max_peers: - peer = LXMPeer(self, destination_hash) + peer = LXMPeer(self, destination_hash, sync_strategy=self.default_sync_strategy) peer.alive = True peer.last_heard = time.time() peer.propagation_transfer_limit = propagation_transfer_limit + if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit + else: peer.propagation_sync_limit = propagation_transfer_limit + self.peers[destination_hash] = peer RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE) + else: RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) @@ -1895,18 +1911,14 @@ class LXMRouter: for peer_id in peers: peer = peers[peer_id] if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: - if not peer_id in self.static_peers: - culled_peers.append(peer_id) + if not peer_id in self.static_peers: culled_peers.append(peer_id) + else: if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: - if peer.alive: - waiting_peers.append(peer) + if peer.alive: waiting_peers.append(peer) else: - if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt: - unresponsive_peers.append(peer) - else: - pass - # RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG) + if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt: unresponsive_peers.append(peer) + else: pass # RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG) peer_pool = [] if len(waiting_peers) > 0: @@ -1970,7 +1982,7 @@ class LXMRouter: return False size = resource.get_data_size() - limit = self.propagation_per_transfer_limit*1000 + limit = self.propagation_per_sync_limit*1000 if limit != None and size > limit: RNS.log(f"Rejecting {RNS.prettysize(size)} incoming propagation resource, since it exceeds the limit of {RNS.prettysize(limit)}", RNS.LOG_DEBUG) return False diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index a4ccaf5..e49bd7a 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -135,6 +135,20 @@ def apply_config(): active_configuration["propagation_transfer_max_accepted_size"] = 0.38 else: active_configuration["propagation_transfer_max_accepted_size"] = 256 + + if "propagation" in lxmd_config and "propagation_message_max_accepted_size" in lxmd_config["propagation"]: + active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_message_max_accepted_size") + if active_configuration["propagation_transfer_max_accepted_size"] < 0.38: + active_configuration["propagation_transfer_max_accepted_size"] = 0.38 + else: + active_configuration["propagation_transfer_max_accepted_size"] = 256 + + if "propagation" in lxmd_config and "propagation_sync_max_accepted_size" in lxmd_config["propagation"]: + active_configuration["propagation_sync_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_sync_max_accepted_size") + if active_configuration["propagation_sync_max_accepted_size"] < 0.38: + active_configuration["propagation_sync_max_accepted_size"] = 0.38 + else: + active_configuration["propagation_sync_max_accepted_size"] = 256*40 if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") @@ -323,6 +337,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo autopeer = active_configuration["autopeer"], autopeer_maxdepth = active_configuration["autopeer_maxdepth"], propagation_limit = active_configuration["propagation_transfer_max_accepted_size"], + sync_limit = active_configuration["propagation_sync_max_accepted_size"], delivery_limit = active_configuration["delivery_transfer_max_accepted_size"], max_peers = active_configuration["max_peers"], static_peers = active_configuration["static_peers"], @@ -676,9 +691,14 @@ autopeer = yes autopeer_maxdepth = 4 # The maximum accepted transfer size per in- -# coming propagation transfer, in kilobytes. -# This also sets the upper limit for the size -# of single messages accepted onto this node. +# coming propagation message, in kilobytes. +# This sets the upper limit for the size of +# single messages accepted onto this node. + +propagation_message_max_accepted_size = 256 + +# The maximum accepted transfer size per in- +# coming propagation node sync. # # If a node wants to propagate a larger number # of messages to this node, than what can fit @@ -686,7 +706,7 @@ autopeer_maxdepth = 4 # the smallest messages first, and try again # with any remaining messages at a later point. -propagation_transfer_max_accepted_size = 256 +propagation_sync_max_accepted_size = 256 # The maximum amount of storage to use for # the LXMF Propagation Node message store, diff --git a/LXMF/_version.py b/LXMF/_version.py index 777f190..8088f75 100644 --- a/LXMF/_version.py +++ b/LXMF/_version.py @@ -1 +1 @@ -__version__ = "0.8.0" +__version__ = "0.8.1"