From 5392d635dd9148ccaee682da856dcbf9defeedca Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 1 Jan 2026 14:51:33 +0100 Subject: [PATCH] Improved announce processing --- RNS/Transport.py | 89 ++++++++++++++++++++++++------------------------ RNS/__init__.py | 4 ++- 2 files changed, 48 insertions(+), 45 deletions(-) diff --git a/RNS/Transport.py b/RNS/Transport.py index cb52474..c8bba3f 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -467,7 +467,10 @@ class Transport: completed_announces = [] for destination_hash in Transport.announce_table: announce_entry = Transport.announce_table[destination_hash] - if announce_entry[IDX_AT_RETRIES] > Transport.PATHFINDER_R: + if announce_entry[IDX_AT_RETRIES] > 0 and announce_entry[IDX_AT_RETRIES] >= Transport.LOCAL_REBROADCASTS_MAX: + RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME) + completed_announces.append(destination_hash) + elif announce_entry[IDX_AT_RETRIES] > Transport.PATHFINDER_R: RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", retry limit reached", RNS.LOG_EXTREME) completed_announces.append(destination_hash) else: @@ -787,8 +790,7 @@ class Transport: Transport.jobs_running = False - for packet in outgoing: - packet.send() + for packet in outgoing: packet.send() for destination_hash in path_requests: blocked_if = path_requests[destination_hash] @@ -845,8 +847,7 @@ class Transport: @staticmethod def outbound(packet): - while (Transport.jobs_running): - sleep(0.0005) + while (Transport.jobs_running): sleep(0.0005) Transport.jobs_locked = True @@ -1491,17 +1492,17 @@ class Transport: announce_entry = Transport.announce_table[packet.destination_hash] if packet.hops-1 == announce_entry[IDX_AT_HOPS]: - RNS.log("Heard a local rebroadcast of announce for "+RNS.prettyhexrep(packet.destination_hash), RNS.LOG_DEBUG) + RNS.log(f"Heard a rebroadcast of announce for {RNS.prettyhexrep(packet.destination_hash)} on {packet.receiving_interface}", RNS.LOG_EXTREME) announce_entry[IDX_AT_LCL_RBRD] += 1 - if announce_entry[IDX_AT_LCL_RBRD] >= Transport.LOCAL_REBROADCASTS_MAX: - RNS.log("Max local rebroadcasts of announce for "+RNS.prettyhexrep(packet.destination_hash)+" reached, dropping announce from our table", RNS.LOG_DEBUG) - if packet.destination_hash in Transport.announce_table: - Transport.announce_table.pop(packet.destination_hash) + if announce_entry[IDX_AT_RETRIES] > 0: + if announce_entry[IDX_AT_LCL_RBRD] >= Transport.LOCAL_REBROADCASTS_MAX: + RNS.log("Completed announce processing for "+RNS.prettyhexrep(packet.destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME) + if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash) if packet.hops-1 == announce_entry[IDX_AT_HOPS]+1 and announce_entry[IDX_AT_RETRIES] > 0: now = time.time() if now < announce_entry[IDX_AT_RTRNS_TMO]: - RNS.log("Rebroadcasted announce for "+RNS.prettyhexrep(packet.destination_hash)+" has been passed on to another node, no further tries needed", RNS.LOG_DEBUG) + RNS.log("Rebroadcasted announce for "+RNS.prettyhexrep(packet.destination_hash)+" has been passed on to another node, no further tries needed", RNS.LOG_EXTREME) if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash) @@ -1802,16 +1803,13 @@ class Transport: execute_callback = True else: handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity) - if packet.destination_hash == handler_expected_hash: - execute_callback = True + if packet.destination_hash == handler_expected_hash: execute_callback = True # If this is a path response, check whether the # handler wants to receive it. if packet.context == RNS.Packet.PATH_RESPONSE: - if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: - pass - else: - execute_callback = False + if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: pass + else: execute_callback = False if execute_callback: if len(inspect.signature(handler.received_announce).parameters) == 3: @@ -2903,38 +2901,41 @@ class Transport: serialised_destinations = [] for destination_hash in Transport.path_table.copy(): - # Get the destination entry from the destination table - de = Transport.path_table[destination_hash] - interface_hash = de[IDX_PT_RVCD_IF].get_hash() - - # Only store destination table entry if the associated - # interface is still active - interface = Transport.find_interface_from_hash(interface_hash) - if interface != None: + try: # Get the destination entry from the destination table de = Transport.path_table[destination_hash] - timestamp = de[IDX_PT_TIMESTAMP] - received_from = de[IDX_PT_NEXT_HOP] - hops = de[IDX_PT_HOPS] - expires = de[IDX_PT_EXPIRES] - random_blobs = de[IDX_PT_RANDBLOBS] - packet_hash = de[IDX_PT_PACKET] + interface_hash = de[IDX_PT_RVCD_IF].get_hash() - serialised_entry = [ - destination_hash, - timestamp, - received_from, - hops, - expires, - random_blobs, - interface_hash, - packet_hash - ] + # Only store destination table entry if the associated + # interface is still active + interface = Transport.find_interface_from_hash(interface_hash) + if interface != None: + # Get the destination entry from the destination table + de = Transport.path_table[destination_hash] + timestamp = de[IDX_PT_TIMESTAMP] + received_from = de[IDX_PT_NEXT_HOP] + hops = de[IDX_PT_HOPS] + expires = de[IDX_PT_EXPIRES] + random_blobs = de[IDX_PT_RANDBLOBS] + packet_hash = de[IDX_PT_PACKET] - serialised_destinations.append(serialised_entry) + serialised_entry = [ + destination_hash, + timestamp, + received_from, + hops, + expires, + random_blobs, + interface_hash, + packet_hash + ] - # TODO: Reevaluate whether there is any cases where this is needed - # Transport.cache(de[IDX_PT_PACKET], force_cache=True) + serialised_destinations.append(serialised_entry) + + # TODO: Reevaluate whether there is any cases where this is needed + # Transport.cache(de[IDX_PT_PACKET], force_cache=True) + + except Exception as e: RNS.log(f"Skipping persist for path table entry due to error: {e}", RNS.LOG_ERROR) path_table_path = RNS.Reticulum.storagepath+"/destination_table" file = open(path_table_path, "wb") diff --git a/RNS/__init__.py b/RNS/__init__.py index 71a0f63..f21adb5 100755 --- a/RNS/__init__.py +++ b/RNS/__init__.py @@ -143,7 +143,9 @@ def log(msg, level=3, _override_destination = False, pt=False): with logging_lock: if (logdest == LOG_STDOUT or _always_override_destination or _override_destination): if not threading.main_thread().is_alive(): return - else: print(logstring) + else: + try: print(logstring) + except: pass elif (logdest == LOG_FILE and logfile != None): try: