Tunnel table indices

This commit is contained in:
Mark Qvist 2025-04-08 01:35:59 +02:00
parent 194f6aef1d
commit fa31dced22
3 changed files with 81 additions and 24 deletions

@ -231,7 +231,17 @@ class BackboneInterface(Interface):
if len(received_bytes): spawned_interface.receive(received_bytes)
else:
BackboneInterface.deregister_fileno(fileno); client_socket.close()
if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno)
try:
if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno)
except Exception as e: RNS.log(f"Error while removing spawned interface file descriptor from BackboneInterface I/O handler: {e}", RNS.LOG_ERROR)
try:
if spawned_interface.parent_interface:
pif = spawned_interface.parent_interface
if pif.spawned_interfaces != None:
while spawned_interface in pif.spawned_interfaces: pif.spawned_interfaces.remove(spawned_interface)
except Exception as e: RNS.log(f"Error while removing spawned interface from {pif}: {e}", RNS.LOG_ERROR)
spawned_interface.receive(received_bytes)
elif client_socket and fileno == client_socket.fileno() and (event & select.EPOLLOUT):
@ -241,7 +251,18 @@ class BackboneInterface(Interface):
written = 0
if not spawned_interface.detached: RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_DEBUG)
BackboneInterface.deregister_fileno(fileno)
if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno)
try:
if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno)
except Exception as e: RNS.log(f"Error while removing spawned interface file descriptor from BackboneInterface I/O handler: {e}", RNS.LOG_ERROR)
try:
if spawned_interface.parent_interface:
pif = spawned_interface.parent_interface
if pif.spawned_interfaces != None:
while spawned_interface in pif.spawned_interfaces: pif.spawned_interfaces.remove(spawned_interface)
except Exception as e: RNS.log(f"Error while removing spawned interface from {pif}: {e}", RNS.LOG_ERROR)
try: client_socket.close()
except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR)
spawned_interface.receive(b"")
@ -253,7 +274,17 @@ class BackboneInterface(Interface):
elif client_socket and fileno == client_socket.fileno() and event & (select.EPOLLHUP):
BackboneInterface.deregister_fileno(fileno)
if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno)
try:
if fileno in BackboneInterface.spawned_interface_filenos: BackboneInterface.spawned_interface_filenos.pop(fileno)
except Exception as e: RNS.log(f"Error while removing spawned interface file descriptor from BackboneInterface I/O handler: {e}", RNS.LOG_ERROR)
try:
if spawned_interface.parent_interface:
pif = spawned_interface.parent_interface
if pif.spawned_interfaces != None:
while spawned_interface in pif.spawned_interfaces: pif.spawned_interfaces.remove(spawned_interface)
except Exception as e: RNS.log(f"Error while removing spawned interface from {pif}: {e}", RNS.LOG_ERROR)
try: client_socket.close()
except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR)
spawned_interface.receive(b"")

@ -77,6 +77,8 @@ class Interface:
self.HW_MTU = None
self.parent_interface = None
self.spawned_interfaces = None
self.tunnel_id = None
self.ingress_control = True
self.ic_max_held_announces = Interface.MAX_HELD_ANNOUNCES
self.ic_burst_hold = Interface.IC_BURST_HOLD

@ -250,9 +250,11 @@ class Transport:
specifier = "entries"
RNS.log("Loaded "+str(len(Transport.path_table))+" path table "+specifier+" from storage", RNS.LOG_VERBOSE)
gc.collect()
except Exception as e:
RNS.log("Could not load destination table from storage, the contained exception was: "+str(e), RNS.LOG_ERROR)
gc.collect()
if os.path.isfile(tunnel_table_path) and not Transport.owner.is_connected_to_shared_instance:
serialised_tunnels = []
@ -262,10 +264,10 @@ class Transport:
file.close()
for serialised_tunnel in serialised_tunnels:
tunnel_id = serialised_tunnel[0]
interface_hash = serialised_tunnel[1]
serialised_paths = serialised_tunnel[2]
expires = serialised_tunnel[3]
tunnel_id = serialised_tunnel[IDX_TT_TUNNEL_ID]
interface_hash = serialised_tunnel[IDX_TT_IF]
serialised_paths = serialised_tunnel[IDX_TT_PATHS]
expires = serialised_tunnel[IDX_TT_EXPIRES]
tunnel_paths = {}
for serialised_entry in serialised_paths:
@ -274,7 +276,7 @@ class Transport:
received_from = serialised_entry[2]
hops = serialised_entry[3]
expires = serialised_entry[4]
random_blobs = serialised_entry[5]
random_blobs = list(set(serialised_entry[5]))
receiving_interface = Transport.find_interface_from_hash(serialised_entry[6])
announce_packet = Transport.get_cached_packet(serialised_entry[7])
@ -296,9 +298,11 @@ class Transport:
else: specifier = "entries"
RNS.log("Loaded "+str(len(Transport.tunnels))+" tunnel table "+specifier+" from storage", RNS.LOG_VERBOSE)
gc.collect()
except Exception as e:
RNS.log("Could not load tunnel table from storage, the contained exception was: "+str(e), RNS.LOG_ERROR)
gc.collect()
if RNS.Reticulum.probe_destination_enabled():
Transport.probe_destination = RNS.Destination(Transport.identity, RNS.Destination.IN, RNS.Destination.SINGLE, Transport.APP_NAME, "probe")
@ -321,6 +325,8 @@ class Transport:
if hasattr(interface, "wants_tunnel") and interface.wants_tunnel:
Transport.synthesize_tunnel(interface)
gc.collect()
@staticmethod
def prioritize_interfaces():
try:
@ -630,14 +636,19 @@ class Transport:
for tunnel_id in Transport.tunnels:
tunnel_entry = Transport.tunnels[tunnel_id]
expires = tunnel_entry[3]
expires = tunnel_entry[IDX_TT_EXPIRES]
if time.time() > expires:
stale_tunnels.append(tunnel_id)
should_collect = True
RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME)
else:
if tunnel_entry[IDX_TT_IF] and not tunnel_entry[IDX_TT_IF] in Transport.interfaces:
# TODO: Reset loglevel
RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_CRITICAL)
tunnel_entry[IDX_TT_IF] = None
stale_tunnel_paths = []
tunnel_paths = tunnel_entry[2]
tunnel_paths = tunnel_entry[IDX_TT_PATHS]
for tunnel_path in tunnel_paths:
tunnel_path_entry = tunnel_paths[tunnel_path]
@ -1723,7 +1734,7 @@ class Transport:
new_announce.hops = packet.hops
new_announce.send()
Transport.cache(packet, force_cache=True)
if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True)
path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash]
Transport.path_table[packet.destination_hash] = path_table_entry
RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG)
@ -1732,10 +1743,10 @@ class Transport:
# announce to the tunnels table
if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None:
tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id]
paths = tunnel_entry[2]
paths = tunnel_entry[IDX_TT_PATHS]
paths[packet.destination_hash] = path_table_entry
expires = time.time() + Transport.DESTINATION_TIMEOUT
tunnel_entry[3] = expires
tunnel_entry[IDX_TT_EXPIRES] = expires
RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG)
# Call externally registered callbacks from apps
@ -2006,6 +2017,12 @@ class Transport:
RNS.log("An error occurred while validating tunnel establishment packet.", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
@staticmethod
def void_tunnel_interface(tunnel_id):
if tunnel_id in Transport.tunnels:
RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_CRITICAL) # TODO: Reset loglevel
Transport.tunnels[tunnel_id][IDX_TT_IF] = None
@staticmethod
def handle_tunnel(tunnel_id, interface):
expires = time.time() + Transport.DESTINATION_TIMEOUT
@ -2018,10 +2035,10 @@ class Transport:
else:
RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" reappeared. Restoring paths...", RNS.LOG_DEBUG)
tunnel_entry = Transport.tunnels[tunnel_id]
tunnel_entry[1] = interface
tunnel_entry[3] = expires
tunnel_entry[IDX_TT_IF] = interface
tunnel_entry[IDX_TT_EXPIRES] = expires
interface.tunnel_id = tunnel_id
paths = tunnel_entry[2]
paths = tunnel_entry[IDX_TT_PATHS]
deprecated_paths = []
for destination_hash, path_entry in paths.items():
@ -2115,8 +2132,8 @@ class Transport:
:param handler: The announce handler to be deregistered.
"""
while handler in Transport.announce_handlers:
Transport.announce_handlers.remove(handler)
while handler in Transport.announce_handlers: Transport.announce_handlers.remove(handler)
gc.collect()
@staticmethod
def find_interface_from_hash(interface_hash):
@ -2523,12 +2540,8 @@ class Transport:
RNS.log("Not answering path request on roaming-mode interface, since next hop is on same roaming-mode interface", RNS.LOG_DEBUG)
else:
# We increase the hops, since reading a packet
# from cache is equivalent to receiving it again
# over an interface. It is cached with it's non-
# increased hop-count.
packet.unpack()
packet.hops += 1
packet.hops = Transport.path_table[destination_hash][IDX_PT_HOPS]
if requestor_transport_id != None and next_hop == requestor_transport_id:
# TODO: Find a bandwidth efficient way to invalidate our
@ -2725,6 +2738,8 @@ class Transport:
interface.announce_queue = []
RNS.log("Dropped "+na_str+" on "+str(interface), RNS.LOG_VERBOSE)
gc.collect()
@staticmethod
def timebase_from_random_blob(random_blob):
return int.from_bytes(random_blob[5:10], "big")
@ -2779,6 +2794,7 @@ class Transport:
RNS.log("Could not save packet hashlist to storage, the contained exception was: "+str(e), RNS.LOG_ERROR)
Transport.saving_packet_hashlist = False
gc.collect()
@staticmethod
@ -2849,6 +2865,7 @@ class Transport:
RNS.trace_exception(e)
Transport.saving_path_table = False
gc.collect()
@staticmethod
@ -2923,6 +2940,7 @@ class Transport:
RNS.log("Could not save tunnel table to storage, the contained exception was: "+str(e), RNS.LOG_ERROR)
Transport.saving_tunnel_table = False
gc.collect()
@staticmethod
def persist_data():
@ -2972,4 +2990,10 @@ IDX_LT_RCVD_IF = 4
IDX_LT_HOPS = 5
IDX_LT_DSTHASH = 6
IDX_LT_VALIDATED = 7
IDX_LT_PROOF_TMO = 8
IDX_LT_PROOF_TMO = 8
# Transport.tunnels entry indices
IDX_TT_TUNNEL_ID = 0
IDX_TT_IF = 1
IDX_TT_PATHS = 2
IDX_TT_EXPIRES = 3