Announce cache handling

This commit is contained in:
Mark Qvist 2025-04-09 00:01:08 +02:00
parent 581b16f87c
commit b267687c7f
2 changed files with 61 additions and 34 deletions

View File

@ -283,6 +283,9 @@ class Reticulum:
if not os.path.isdir(Reticulum.cachepath):
os.makedirs(Reticulum.cachepath)
if not os.path.isdir(os.path.join(Reticulum.cachepath, "announces")):
os.makedirs(os.path.join(Reticulum.cachepath, "announces"))
if not os.path.isdir(Reticulum.resourcepath):
os.makedirs(Reticulum.resourcepath)

View File

@ -139,6 +139,8 @@ class Transport:
announces_check_interval = 1.0
pending_prs_last_checked = 0.0
pending_prs_check_interval = 30.0
cache_last_cleaned = 0.0
cache_clean_interval = 60.0
hashlist_maxsize = 1000000
tables_last_culled = 0.0
tables_cull_interval = 5.0
@ -199,14 +201,16 @@ class Transport:
Transport.control_destinations.append(Transport.remote_management_destination)
Transport.control_hashes.append(Transport.remote_management_destination.hash)
RNS.log("Enabled remote management on "+str(Transport.remote_management_destination), RNS.LOG_NOTICE)
# Defer cleaning packet cache for 30 seconds
Transport.cache_last_cleaned = time.time() + 60
# Start job loops
Transport.jobs_running = False
thread = threading.Thread(target=Transport.jobloop, daemon=True)
thread.start()
thread = threading.Thread(target=Transport.count_traffic_loop, daemon=True)
thread.start()
threading.Thread(target=Transport.jobloop, daemon=True).start()
threading.Thread(target=Transport.count_traffic_loop, daemon=True).start()
# Load transport-related data
if RNS.Reticulum.transport_enabled():
path_table_path = RNS.Reticulum.storagepath+"/destination_table"
tunnel_table_path = RNS.Reticulum.storagepath+"/tunnels"
@ -228,7 +232,7 @@ class Transport:
expires = serialised_entry[4]
random_blobs = serialised_entry[5]
receiving_interface = Transport.find_interface_from_hash(serialised_entry[6])
announce_packet = Transport.get_cached_packet(serialised_entry[7])
announce_packet = Transport.get_cached_packet(serialised_entry[7], packet_type="announce")
if announce_packet != None and receiving_interface != None:
announce_packet.unpack()
@ -280,7 +284,7 @@ class Transport:
expires = serialised_entry[4]
random_blobs = list(set(serialised_entry[5]))
receiving_interface = Transport.find_interface_from_hash(serialised_entry[6])
announce_packet = Transport.get_cached_packet(serialised_entry[7])
announce_packet = Transport.get_cached_packet(serialised_entry[7], packet_type="announce")
if announce_packet != None:
announce_packet.unpack()
@ -293,8 +297,9 @@ class Transport:
tunnel_path = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet.packet_hash]
tunnel_paths[destination_hash] = tunnel_path
tunnel = [tunnel_id, None, tunnel_paths, expires]
Transport.tunnels[tunnel_id] = tunnel
if len(tunnel_paths) > 0:
tunnel = [tunnel_id, None, tunnel_paths, expires]
Transport.tunnels[tunnel_id] = tunnel
if len(Transport.path_table) == 1: specifier = "entry"
else: specifier = "entries"
@ -507,7 +512,6 @@ class Transport:
if time.time() > Transport.pending_prs_last_checked+Transport.pending_prs_check_interval:
for destination_hash in Transport.pending_local_path_requests.copy():
if not Transport.pending_local_path_requests[destination_hash] in Transport.interfaces:
RNS.log(f"Removing {RNS.Transport.pending_local_path_requests[destination_hash]} for pending path request", RNS.LOG_CRITICAL) # TODO: Remove debug
Transport.pending_local_path_requests.pop(destination_hash)
Transport.pending_prs_last_checked = time.time()
@ -530,11 +534,9 @@ class Transport:
if time.time() > reverse_entry[IDX_RT_TIMESTAMP] + Transport.REVERSE_TIMEOUT:
stale_reverse_entries.append(truncated_packet_hash)
elif not reverse_entry[IDX_RT_OUTB_IF] in Transport.interfaces:
RNS.log(f"Removing reverse table entry since next-hop interface disappeared", RNS.LOG_CRITICAL) # TODO: Remove
stale_reverse_entries.append(truncated_packet_hash)
elif not reverse_entry[IDX_RT_RCVD_IF] in Transport.interfaces:
stale_reverse_entries.append(truncated_packet_hash)
RNS.log(f"Removing reverse table entry since prev-hop interface disappeared", RNS.LOG_CRITICAL) # TODO: Remove
# Cull the link table according to timeout
stale_links = []
@ -545,10 +547,8 @@ class Transport:
if time.time() > link_entry[IDX_LT_TIMESTAMP] + Transport.LINK_TIMEOUT:
stale_links.append(link_id)
elif not link_entry[IDX_LT_NH_IF] in Transport.interfaces:
RNS.log(f"Removing link {RNS.prettyhexrep(link_id)} since next-hop interface disappeared", RNS.LOG_CRITICAL) # TODO: Remove
stale_links.append(link_id)
elif not link_entry[IDX_LT_RCVD_IF] in Transport.interfaces:
RNS.log(f"Removing link {RNS.prettyhexrep(link_id)} since prev-hop interface disappeared", RNS.LOG_CRITICAL) # TODO: Remove
stale_links.append(link_id)
else:
if time.time() > link_entry[IDX_LT_PROOF_TMO]:
@ -666,8 +666,7 @@ class Transport:
RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME)
else:
if tunnel_entry[IDX_TT_IF] and not tunnel_entry[IDX_TT_IF] in Transport.interfaces:
# TODO: Reset loglevel
RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_CRITICAL)
RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_EXTREME)
tunnel_entry[IDX_TT_IF] = None
stale_tunnel_paths = []
@ -745,12 +744,17 @@ class Transport:
Transport.tables_last_culled = time.time()
# Run interface-related jobs
if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval:
Transport.prioritize_interfaces()
for interface in Transport.interfaces:
interface.process_held_announces()
Transport.interface_last_jobs = time.time()
# Clean packet caches
if time.time() > Transport.cache_last_cleaned+Transport.cache_clean_interval:
Transport.clean_cache()
if should_collect: gc.collect()
else:
@ -1061,13 +1065,6 @@ class Transport:
Transport.packet_hashlist.add(packet.packet_hash)
stored_hash = True
# TODO: Re-evaluate potential for blocking
# def send_packet():
# Transport.transmit(interface, packet.raw)
# thread = threading.Thread(target=send_packet)
# thread.daemon = True
# thread.start()
Transport.transmit(interface, packet.raw)
if packet.packet_type == RNS.Packet.ANNOUNCE:
interface.sent_announce()
@ -1757,7 +1754,7 @@ class Transport:
new_announce.hops = packet.hops
new_announce.send()
if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True)
if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True, packet_type="announce")
path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash]
Transport.path_table[packet.destination_hash] = path_table_entry
RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG)
@ -2043,7 +2040,7 @@ class Transport:
@staticmethod
def void_tunnel_interface(tunnel_id):
if tunnel_id in Transport.tunnels:
RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_CRITICAL) # TODO: Reset loglevel
RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_EXTREME)
Transport.tunnels[tunnel_id][IDX_TT_IF] = None
@staticmethod
@ -2176,21 +2173,47 @@ class Transport:
return False
@staticmethod
def clean_cache():
if not Transport.owner.is_connected_to_shared_instance:
Transport.clean_announce_cache()
Transport.cache_last_cleaned = time.time()
@staticmethod
def clean_announce_cache():
st = time.time()
target_path = os.path.join(RNS.Reticulum.cachepath, "announces")
active_paths = [Transport.path_table[dst_hash][6] for dst_hash in Transport.path_table]
tunnel_paths = list(set([path_dict[dst_hash][6] for path_dict in [Transport.tunnels[tunnel_id][2] for tunnel_id in Transport.tunnels] for dst_hash in path_dict]))
removed = 0
for packet_hash in os.listdir(target_path):
remove = False
full_path = os.path.join(target_path, packet_hash)
if os.path.isfile(full_path):
try: target_hash = bytes.fromhex(packet_hash)
except: remove = True
if (not target_hash in active_paths) and (not target_hash in tunnel_paths): remove = True
if remove: os.unlink(full_path); removed += 1
if removed > 0:
RNS.log(f"Removed {removed} cached announces in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG)
# When caching packets to storage, they are written
# exactly as they arrived over their interface. This
# means that they have not had their hop count
# increased yet! Take note of this when reading from
# the packet cache.
@staticmethod
def cache(packet, force_cache=False):
if RNS.Transport.should_cache(packet) or force_cache:
def cache(packet, force_cache=False, packet_type=None):
if force_cache or RNS.Transport.should_cache(packet):
try:
packet_hash = RNS.hexrep(packet.get_hash(), delimit=False)
interface_reference = None
if packet.receiving_interface != None:
interface_reference = str(packet.receiving_interface)
if packet.receiving_interface != None: interface_reference = str(packet.receiving_interface)
if packet_type == "announce": cachepath = os.path.join(RNS.Reticulum.cachepath, "announces", packet_hash)
else: cachepath = os.path.join(RNS.Reticulum.cachepath, packet_hash)
file = open(RNS.Reticulum.cachepath+"/"+packet_hash, "wb")
file = open(cachepath, "wb")
file.write(umsgpack.packb([packet.raw, interface_reference]))
file.close()
@ -2198,10 +2221,11 @@ class Transport:
RNS.log("Error writing packet to cache. The contained exception was: "+str(e), RNS.LOG_ERROR)
@staticmethod
def get_cached_packet(packet_hash):
def get_cached_packet(packet_hash, packet_type=None):
try:
packet_hash = RNS.hexrep(packet_hash, delimit=False)
path = RNS.Reticulum.cachepath+"/"+packet_hash
if packet_type == "announce": path = os.path.join(RNS.Reticulum.cachepath, "announces", packet_hash)
else: path = os.path.join(RNS.Reticulum.cachepath, packet_hash)
if os.path.isfile(path):
file = open(path, "rb")
@ -2552,12 +2576,12 @@ class Transport:
RNS.log("Answering path request for "+RNS.prettyhexrep(destination_hash)+interface_str+", destination is local to this system", RNS.LOG_DEBUG)
elif (RNS.Reticulum.transport_enabled() or is_from_local_client) and (destination_hash in Transport.path_table):
packet = Transport.get_cached_packet(Transport.path_table[destination_hash][IDX_PT_PACKET])
packet = Transport.get_cached_packet(Transport.path_table[destination_hash][IDX_PT_PACKET], packet_type="announce")
next_hop = Transport.path_table[destination_hash][IDX_PT_NEXT_HOP]
received_from = Transport.path_table[destination_hash][IDX_PT_RVCD_IF]
if packet == None:
RNS.log("Could not retrieve announce packet from cache while answering path request for "+RNS.prettyhexrep(destination_hash))
RNS.log("Could not retrieve announce packet from cache while answering path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_ERROR)
elif attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING and attached_interface == received_from:
RNS.log("Not answering path request on roaming-mode interface, since next hop is on same roaming-mode interface", RNS.LOG_DEBUG)