diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index a3ede88..88dc164 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -283,6 +283,9 @@ class Reticulum: if not os.path.isdir(Reticulum.cachepath): os.makedirs(Reticulum.cachepath) + if not os.path.isdir(os.path.join(Reticulum.cachepath, "announces")): + os.makedirs(os.path.join(Reticulum.cachepath, "announces")) + if not os.path.isdir(Reticulum.resourcepath): os.makedirs(Reticulum.resourcepath) diff --git a/RNS/Transport.py b/RNS/Transport.py index bed5dee..d4f23ef 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -139,6 +139,8 @@ class Transport: announces_check_interval = 1.0 pending_prs_last_checked = 0.0 pending_prs_check_interval = 30.0 + cache_last_cleaned = 0.0 + cache_clean_interval = 60.0 hashlist_maxsize = 1000000 tables_last_culled = 0.0 tables_cull_interval = 5.0 @@ -199,14 +201,16 @@ class Transport: Transport.control_destinations.append(Transport.remote_management_destination) Transport.control_hashes.append(Transport.remote_management_destination.hash) RNS.log("Enabled remote management on "+str(Transport.remote_management_destination), RNS.LOG_NOTICE) + + # Defer cleaning packet cache for 30 seconds + Transport.cache_last_cleaned = time.time() + 60 + # Start job loops Transport.jobs_running = False - thread = threading.Thread(target=Transport.jobloop, daemon=True) - thread.start() - - thread = threading.Thread(target=Transport.count_traffic_loop, daemon=True) - thread.start() + threading.Thread(target=Transport.jobloop, daemon=True).start() + threading.Thread(target=Transport.count_traffic_loop, daemon=True).start() + # Load transport-related data if RNS.Reticulum.transport_enabled(): path_table_path = RNS.Reticulum.storagepath+"/destination_table" tunnel_table_path = RNS.Reticulum.storagepath+"/tunnels" @@ -228,7 +232,7 @@ class Transport: expires = serialised_entry[4] random_blobs = serialised_entry[5] receiving_interface = Transport.find_interface_from_hash(serialised_entry[6]) - announce_packet = Transport.get_cached_packet(serialised_entry[7]) + announce_packet = Transport.get_cached_packet(serialised_entry[7], packet_type="announce") if announce_packet != None and receiving_interface != None: announce_packet.unpack() @@ -280,7 +284,7 @@ class Transport: expires = serialised_entry[4] random_blobs = list(set(serialised_entry[5])) receiving_interface = Transport.find_interface_from_hash(serialised_entry[6]) - announce_packet = Transport.get_cached_packet(serialised_entry[7]) + announce_packet = Transport.get_cached_packet(serialised_entry[7], packet_type="announce") if announce_packet != None: announce_packet.unpack() @@ -293,8 +297,9 @@ class Transport: tunnel_path = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet.packet_hash] tunnel_paths[destination_hash] = tunnel_path - tunnel = [tunnel_id, None, tunnel_paths, expires] - Transport.tunnels[tunnel_id] = tunnel + if len(tunnel_paths) > 0: + tunnel = [tunnel_id, None, tunnel_paths, expires] + Transport.tunnels[tunnel_id] = tunnel if len(Transport.path_table) == 1: specifier = "entry" else: specifier = "entries" @@ -507,7 +512,6 @@ class Transport: if time.time() > Transport.pending_prs_last_checked+Transport.pending_prs_check_interval: for destination_hash in Transport.pending_local_path_requests.copy(): if not Transport.pending_local_path_requests[destination_hash] in Transport.interfaces: - RNS.log(f"Removing {RNS.Transport.pending_local_path_requests[destination_hash]} for pending path request", RNS.LOG_CRITICAL) # TODO: Remove debug Transport.pending_local_path_requests.pop(destination_hash) Transport.pending_prs_last_checked = time.time() @@ -530,11 +534,9 @@ class Transport: if time.time() > reverse_entry[IDX_RT_TIMESTAMP] + Transport.REVERSE_TIMEOUT: stale_reverse_entries.append(truncated_packet_hash) elif not reverse_entry[IDX_RT_OUTB_IF] in Transport.interfaces: - RNS.log(f"Removing reverse table entry since next-hop interface disappeared", RNS.LOG_CRITICAL) # TODO: Remove stale_reverse_entries.append(truncated_packet_hash) elif not reverse_entry[IDX_RT_RCVD_IF] in Transport.interfaces: stale_reverse_entries.append(truncated_packet_hash) - RNS.log(f"Removing reverse table entry since prev-hop interface disappeared", RNS.LOG_CRITICAL) # TODO: Remove # Cull the link table according to timeout stale_links = [] @@ -545,10 +547,8 @@ class Transport: if time.time() > link_entry[IDX_LT_TIMESTAMP] + Transport.LINK_TIMEOUT: stale_links.append(link_id) elif not link_entry[IDX_LT_NH_IF] in Transport.interfaces: - RNS.log(f"Removing link {RNS.prettyhexrep(link_id)} since next-hop interface disappeared", RNS.LOG_CRITICAL) # TODO: Remove stale_links.append(link_id) elif not link_entry[IDX_LT_RCVD_IF] in Transport.interfaces: - RNS.log(f"Removing link {RNS.prettyhexrep(link_id)} since prev-hop interface disappeared", RNS.LOG_CRITICAL) # TODO: Remove stale_links.append(link_id) else: if time.time() > link_entry[IDX_LT_PROOF_TMO]: @@ -666,8 +666,7 @@ class Transport: RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME) else: if tunnel_entry[IDX_TT_IF] and not tunnel_entry[IDX_TT_IF] in Transport.interfaces: - # TODO: Reset loglevel - RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_CRITICAL) + RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_EXTREME) tunnel_entry[IDX_TT_IF] = None stale_tunnel_paths = [] @@ -745,12 +744,17 @@ class Transport: Transport.tables_last_culled = time.time() + # Run interface-related jobs if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval: Transport.prioritize_interfaces() for interface in Transport.interfaces: interface.process_held_announces() Transport.interface_last_jobs = time.time() + # Clean packet caches + if time.time() > Transport.cache_last_cleaned+Transport.cache_clean_interval: + Transport.clean_cache() + if should_collect: gc.collect() else: @@ -1061,13 +1065,6 @@ class Transport: Transport.packet_hashlist.add(packet.packet_hash) stored_hash = True - # TODO: Re-evaluate potential for blocking - # def send_packet(): - # Transport.transmit(interface, packet.raw) - # thread = threading.Thread(target=send_packet) - # thread.daemon = True - # thread.start() - Transport.transmit(interface, packet.raw) if packet.packet_type == RNS.Packet.ANNOUNCE: interface.sent_announce() @@ -1757,7 +1754,7 @@ class Transport: new_announce.hops = packet.hops new_announce.send() - if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True) + if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True, packet_type="announce") path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash] Transport.path_table[packet.destination_hash] = path_table_entry RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG) @@ -2043,7 +2040,7 @@ class Transport: @staticmethod def void_tunnel_interface(tunnel_id): if tunnel_id in Transport.tunnels: - RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_CRITICAL) # TODO: Reset loglevel + RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_EXTREME) Transport.tunnels[tunnel_id][IDX_TT_IF] = None @staticmethod @@ -2176,21 +2173,47 @@ class Transport: return False + @staticmethod + def clean_cache(): + if not Transport.owner.is_connected_to_shared_instance: + Transport.clean_announce_cache() + Transport.cache_last_cleaned = time.time() + + @staticmethod + def clean_announce_cache(): + st = time.time() + target_path = os.path.join(RNS.Reticulum.cachepath, "announces") + active_paths = [Transport.path_table[dst_hash][6] for dst_hash in Transport.path_table] + tunnel_paths = list(set([path_dict[dst_hash][6] for path_dict in [Transport.tunnels[tunnel_id][2] for tunnel_id in Transport.tunnels] for dst_hash in path_dict])) + removed = 0 + for packet_hash in os.listdir(target_path): + remove = False + full_path = os.path.join(target_path, packet_hash) + if os.path.isfile(full_path): + try: target_hash = bytes.fromhex(packet_hash) + except: remove = True + if (not target_hash in active_paths) and (not target_hash in tunnel_paths): remove = True + if remove: os.unlink(full_path); removed += 1 + + if removed > 0: + RNS.log(f"Removed {removed} cached announces in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG) + # When caching packets to storage, they are written # exactly as they arrived over their interface. This # means that they have not had their hop count # increased yet! Take note of this when reading from # the packet cache. @staticmethod - def cache(packet, force_cache=False): - if RNS.Transport.should_cache(packet) or force_cache: + def cache(packet, force_cache=False, packet_type=None): + if force_cache or RNS.Transport.should_cache(packet): try: packet_hash = RNS.hexrep(packet.get_hash(), delimit=False) interface_reference = None - if packet.receiving_interface != None: - interface_reference = str(packet.receiving_interface) + if packet.receiving_interface != None: interface_reference = str(packet.receiving_interface) + if packet_type == "announce": cachepath = os.path.join(RNS.Reticulum.cachepath, "announces", packet_hash) + else: cachepath = os.path.join(RNS.Reticulum.cachepath, packet_hash) - file = open(RNS.Reticulum.cachepath+"/"+packet_hash, "wb") + file = open(cachepath, "wb") file.write(umsgpack.packb([packet.raw, interface_reference])) file.close() @@ -2198,10 +2221,11 @@ class Transport: RNS.log("Error writing packet to cache. The contained exception was: "+str(e), RNS.LOG_ERROR) @staticmethod - def get_cached_packet(packet_hash): + def get_cached_packet(packet_hash, packet_type=None): try: packet_hash = RNS.hexrep(packet_hash, delimit=False) - path = RNS.Reticulum.cachepath+"/"+packet_hash + if packet_type == "announce": path = os.path.join(RNS.Reticulum.cachepath, "announces", packet_hash) + else: path = os.path.join(RNS.Reticulum.cachepath, packet_hash) if os.path.isfile(path): file = open(path, "rb") @@ -2552,12 +2576,12 @@ class Transport: RNS.log("Answering path request for "+RNS.prettyhexrep(destination_hash)+interface_str+", destination is local to this system", RNS.LOG_DEBUG) elif (RNS.Reticulum.transport_enabled() or is_from_local_client) and (destination_hash in Transport.path_table): - packet = Transport.get_cached_packet(Transport.path_table[destination_hash][IDX_PT_PACKET]) + packet = Transport.get_cached_packet(Transport.path_table[destination_hash][IDX_PT_PACKET], packet_type="announce") next_hop = Transport.path_table[destination_hash][IDX_PT_NEXT_HOP] received_from = Transport.path_table[destination_hash][IDX_PT_RVCD_IF] if packet == None: - RNS.log("Could not retrieve announce packet from cache while answering path request for "+RNS.prettyhexrep(destination_hash)) + RNS.log("Could not retrieve announce packet from cache while answering path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_ERROR) elif attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING and attached_interface == received_from: RNS.log("Not answering path request on roaming-mode interface, since next hop is on same roaming-mode interface", RNS.LOG_DEBUG)