diff --git a/RNS/Interfaces/BackboneInterface.py b/RNS/Interfaces/BackboneInterface.py index 3181b56..158aa1c 100644 --- a/RNS/Interfaces/BackboneInterface.py +++ b/RNS/Interfaces/BackboneInterface.py @@ -231,7 +231,17 @@ class BackboneInterface(Interface): if len(received_bytes): spawned_interface.receive(received_bytes) else: BackboneInterface.deregister_fileno(fileno); client_socket.close() - if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno) + try: + if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno) + except Exception as e: RNS.log(f"Error while removing spawned interface file descriptor from BackboneInterface I/O handler: {e}", RNS.LOG_ERROR) + + try: + if spawned_interface.parent_interface: + pif = spawned_interface.parent_interface + if pif.spawned_interfaces != None: + while spawned_interface in pif.spawned_interfaces: pif.spawned_interfaces.remove(spawned_interface) + except Exception as e: RNS.log(f"Error while removing spawned interface from {pif}: {e}", RNS.LOG_ERROR) + spawned_interface.receive(received_bytes) elif client_socket and fileno == client_socket.fileno() and (event & select.EPOLLOUT): @@ -241,7 +251,18 @@ class BackboneInterface(Interface): written = 0 if not spawned_interface.detached: RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_DEBUG) BackboneInterface.deregister_fileno(fileno) - if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno) + + try: + if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno) + except Exception as e: RNS.log(f"Error while removing spawned interface file descriptor from BackboneInterface I/O handler: {e}", RNS.LOG_ERROR) + + try: + if spawned_interface.parent_interface: + pif = spawned_interface.parent_interface + if pif.spawned_interfaces != None: + while spawned_interface in pif.spawned_interfaces: pif.spawned_interfaces.remove(spawned_interface) + except Exception as e: RNS.log(f"Error while removing spawned interface from {pif}: {e}", RNS.LOG_ERROR) + try: client_socket.close() except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR) spawned_interface.receive(b"") @@ -253,7 +274,17 @@ class BackboneInterface(Interface): elif client_socket and fileno == client_socket.fileno() and event & (select.EPOLLHUP): BackboneInterface.deregister_fileno(fileno) - if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno) + try: + if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno) + except Exception as e: RNS.log(f"Error while removing spawned interface file descriptor from BackboneInterface I/O handler: {e}", RNS.LOG_ERROR) + + try: + if spawned_interface.parent_interface: + pif = spawned_interface.parent_interface + if pif.spawned_interfaces != None: + while spawned_interface in pif.spawned_interfaces: pif.spawned_interfaces.remove(spawned_interface) + except Exception as e: RNS.log(f"Error while removing spawned interface from {pif}: {e}", RNS.LOG_ERROR) + try: client_socket.close() except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR) spawned_interface.receive(b"") diff --git a/RNS/Interfaces/Interface.py b/RNS/Interfaces/Interface.py index 37008a7..a28c98c 100755 --- a/RNS/Interfaces/Interface.py +++ b/RNS/Interfaces/Interface.py @@ -77,6 +77,8 @@ class Interface: self.HW_MTU = None self.parent_interface = None + self.spawned_interfaces = None + self.tunnel_id = None self.ingress_control = True self.ic_max_held_announces = Interface.MAX_HELD_ANNOUNCES self.ic_burst_hold = Interface.IC_BURST_HOLD diff --git a/RNS/Transport.py b/RNS/Transport.py index d2aebca..b35bf92 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -250,9 +250,11 @@ class Transport: specifier = "entries" RNS.log("Loaded "+str(len(Transport.path_table))+" path table "+specifier+" from storage", RNS.LOG_VERBOSE) + gc.collect() except Exception as e: RNS.log("Could not load destination table from storage, the contained exception was: "+str(e), RNS.LOG_ERROR) + gc.collect() if os.path.isfile(tunnel_table_path) and not Transport.owner.is_connected_to_shared_instance: serialised_tunnels = [] @@ -262,10 +264,10 @@ class Transport: file.close() for serialised_tunnel in serialised_tunnels: - tunnel_id = serialised_tunnel[0] - interface_hash = serialised_tunnel[1] - serialised_paths = serialised_tunnel[2] - expires = serialised_tunnel[3] + tunnel_id = serialised_tunnel[IDX_TT_TUNNEL_ID] + interface_hash = serialised_tunnel[IDX_TT_IF] + serialised_paths = serialised_tunnel[IDX_TT_PATHS] + expires = serialised_tunnel[IDX_TT_EXPIRES] tunnel_paths = {} for serialised_entry in serialised_paths: @@ -274,7 +276,7 @@ class Transport: received_from = serialised_entry[2] hops = serialised_entry[3] expires = serialised_entry[4] - random_blobs = serialised_entry[5] + random_blobs = list(set(serialised_entry[5])) receiving_interface = Transport.find_interface_from_hash(serialised_entry[6]) announce_packet = Transport.get_cached_packet(serialised_entry[7]) @@ -296,9 +298,11 @@ class Transport: else: specifier = "entries" RNS.log("Loaded "+str(len(Transport.tunnels))+" tunnel table "+specifier+" from storage", RNS.LOG_VERBOSE) + gc.collect() except Exception as e: RNS.log("Could not load tunnel table from storage, the contained exception was: "+str(e), RNS.LOG_ERROR) + gc.collect() if RNS.Reticulum.probe_destination_enabled(): Transport.probe_destination = RNS.Destination(Transport.identity, RNS.Destination.IN, RNS.Destination.SINGLE, Transport.APP_NAME, "probe") @@ -321,6 +325,8 @@ class Transport: if hasattr(interface, "wants_tunnel") and interface.wants_tunnel: Transport.synthesize_tunnel(interface) + gc.collect() + @staticmethod def prioritize_interfaces(): try: @@ -630,14 +636,19 @@ class Transport: for tunnel_id in Transport.tunnels: tunnel_entry = Transport.tunnels[tunnel_id] - expires = tunnel_entry[3] + expires = tunnel_entry[IDX_TT_EXPIRES] if time.time() > expires: stale_tunnels.append(tunnel_id) should_collect = True RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME) else: + if tunnel_entry[IDX_TT_IF] and not tunnel_entry[IDX_TT_IF] in Transport.interfaces: + # TODO: Reset loglevel + RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_CRITICAL) + tunnel_entry[IDX_TT_IF] = None + stale_tunnel_paths = [] - tunnel_paths = tunnel_entry[2] + tunnel_paths = tunnel_entry[IDX_TT_PATHS] for tunnel_path in tunnel_paths: tunnel_path_entry = tunnel_paths[tunnel_path] @@ -1723,7 +1734,7 @@ class Transport: new_announce.hops = packet.hops new_announce.send() - Transport.cache(packet, force_cache=True) + if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True) path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash] Transport.path_table[packet.destination_hash] = path_table_entry RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG) @@ -1732,10 +1743,10 @@ class Transport: # announce to the tunnels table if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None: tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id] - paths = tunnel_entry[2] + paths = tunnel_entry[IDX_TT_PATHS] paths[packet.destination_hash] = path_table_entry expires = time.time() + Transport.DESTINATION_TIMEOUT - tunnel_entry[3] = expires + tunnel_entry[IDX_TT_EXPIRES] = expires RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG) # Call externally registered callbacks from apps @@ -2006,6 +2017,12 @@ class Transport: RNS.log("An error occurred while validating tunnel establishment packet.", RNS.LOG_DEBUG) RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + @staticmethod + def void_tunnel_interface(tunnel_id): + if tunnel_id in Transport.tunnels: + RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_CRITICAL) # TODO: Reset loglevel + Transport.tunnels[tunnel_id][IDX_TT_IF] = None + @staticmethod def handle_tunnel(tunnel_id, interface): expires = time.time() + Transport.DESTINATION_TIMEOUT @@ -2018,10 +2035,10 @@ class Transport: else: RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" reappeared. Restoring paths...", RNS.LOG_DEBUG) tunnel_entry = Transport.tunnels[tunnel_id] - tunnel_entry[1] = interface - tunnel_entry[3] = expires + tunnel_entry[IDX_TT_IF] = interface + tunnel_entry[IDX_TT_EXPIRES] = expires interface.tunnel_id = tunnel_id - paths = tunnel_entry[2] + paths = tunnel_entry[IDX_TT_PATHS] deprecated_paths = [] for destination_hash, path_entry in paths.items(): @@ -2115,8 +2132,8 @@ class Transport: :param handler: The announce handler to be deregistered. """ - while handler in Transport.announce_handlers: - Transport.announce_handlers.remove(handler) + while handler in Transport.announce_handlers: Transport.announce_handlers.remove(handler) + gc.collect() @staticmethod def find_interface_from_hash(interface_hash): @@ -2523,12 +2540,8 @@ class Transport: RNS.log("Not answering path request on roaming-mode interface, since next hop is on same roaming-mode interface", RNS.LOG_DEBUG) else: - # We increase the hops, since reading a packet - # from cache is equivalent to receiving it again - # over an interface. It is cached with it's non- - # increased hop-count. packet.unpack() - packet.hops += 1 + packet.hops = Transport.path_table[destination_hash][IDX_PT_HOPS] if requestor_transport_id != None and next_hop == requestor_transport_id: # TODO: Find a bandwidth efficient way to invalidate our @@ -2725,6 +2738,8 @@ class Transport: interface.announce_queue = [] RNS.log("Dropped "+na_str+" on "+str(interface), RNS.LOG_VERBOSE) + gc.collect() + @staticmethod def timebase_from_random_blob(random_blob): return int.from_bytes(random_blob[5:10], "big") @@ -2779,6 +2794,7 @@ class Transport: RNS.log("Could not save packet hashlist to storage, the contained exception was: "+str(e), RNS.LOG_ERROR) Transport.saving_packet_hashlist = False + gc.collect() @staticmethod @@ -2849,6 +2865,7 @@ class Transport: RNS.trace_exception(e) Transport.saving_path_table = False + gc.collect() @staticmethod @@ -2923,6 +2940,7 @@ class Transport: RNS.log("Could not save tunnel table to storage, the contained exception was: "+str(e), RNS.LOG_ERROR) Transport.saving_tunnel_table = False + gc.collect() @staticmethod def persist_data(): @@ -2972,4 +2990,10 @@ IDX_LT_RCVD_IF = 4 IDX_LT_HOPS = 5 IDX_LT_DSTHASH = 6 IDX_LT_VALIDATED = 7 -IDX_LT_PROOF_TMO = 8 \ No newline at end of file +IDX_LT_PROOF_TMO = 8 + +# Transport.tunnels entry indices +IDX_TT_TUNNEL_ID = 0 +IDX_TT_IF = 1 +IDX_TT_PATHS = 2 +IDX_TT_EXPIRES = 3 \ No newline at end of file