diff --git a/RNS/Interfaces/AutoInterface.py b/RNS/Interfaces/AutoInterface.py index 7d62f13..06d9adc 100644 --- a/RNS/Interfaces/AutoInterface.py +++ b/RNS/Interfaces/AutoInterface.py @@ -50,7 +50,7 @@ class AutoInterface(Interface): MULTICAST_PERMANENT_ADDRESS_TYPE = "0" MULTICAST_TEMPORARY_ADDRESS_TYPE = "1" - PEERING_TIMEOUT = 7.5 + PEERING_TIMEOUT = 10.0 ALL_IGNORE_IFS = ["lo0"] DARWIN_IGNORE_IFS = ["awdl0", "llw0", "lo0", "en5"] @@ -117,6 +117,8 @@ class AutoInterface(Interface): self.interface_servers = {} self.multicast_echoes = {} self.timed_out_interfaces = {} + self.spawned_interfaces = {} + self.write_lock = threading.Lock() self.mif_deque = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN) self.mif_deque_times = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN) self.carrier_changed = False @@ -132,7 +134,7 @@ class AutoInterface(Interface): # Increase peering timeout on Android, due to potential # low-power modes implemented on many chipsets. if RNS.vendor.platformutils.is_android(): - self.peering_timeout *= 3 + self.peering_timeout *= 2.5 if allowed_interfaces == None: self.allowed_interfaces = [] @@ -312,7 +314,8 @@ class AutoInterface(Interface): job_thread.daemon = True job_thread.start() - time.sleep(peering_wait) + # TODO: Fix this + # time.sleep(peering_wait) self.online = True @@ -327,8 +330,9 @@ class AutoInterface(Interface): while True: data, ipv6_src = socket.recvfrom(1024) + peering_hash = data[:RNS.Identity.HASHLENGTH//8] expected_hash = RNS.Identity.full_hash(self.group_id+ipv6_src[0].encode("utf-8")) - if data == expected_hash: + if peering_hash == expected_hash: self.add_peer(ipv6_src[0], ifname) else: RNS.log(str(self)+" received peering packet on "+str(ifname)+" from "+str(ipv6_src[0])+", but authentication hash was incorrect.", RNS.LOG_DEBUG) @@ -349,6 +353,10 @@ class AutoInterface(Interface): # Remove any timed out peers for peer_addr in timed_out_peers: removed_peer = self.peers.pop(peer_addr) + if peer_addr in self.spawned_interfaces: + spawned_interface = self.spawned_interfaces[peer_addr] + spawned_interface.detach() + spawned_interface.teardown() RNS.log(str(self)+" removed peer "+str(peer_addr)+" on "+str(removed_peer[0]), RNS.LOG_DEBUG) for ifname in self.adopted_interfaces: @@ -435,6 +443,10 @@ class AutoInterface(Interface): else: pass + @property + def peer_count(self): + return len(self.spawned_interfaces) + def add_peer(self, addr, ifname): if addr in self.link_local_addresses: ifname = None @@ -450,46 +462,62 @@ class AutoInterface(Interface): else: if not addr in self.peers: self.peers[addr] = [ifname, time.time()] + + spawned_interface = AutoInterfacePeer(self, addr, ifname) + spawned_interface.OUT = self.OUT + spawned_interface.IN = self.IN + spawned_interface.parent_interface = self + spawned_interface.bitrate = self.bitrate + + spawned_interface.ifac_size = self.ifac_size + spawned_interface.ifac_netname = self.ifac_netname + spawned_interface.ifac_netkey = self.ifac_netkey + if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None: + ifac_origin = b"" + if spawned_interface.ifac_netname != None: + ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8")) + if spawned_interface.ifac_netkey != None: + ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8")) + + ifac_origin_hash = RNS.Identity.full_hash(ifac_origin) + spawned_interface.ifac_key = RNS.Cryptography.hkdf( + length=64, + derive_from=ifac_origin_hash, + salt=RNS.Reticulum.IFAC_SALT, + context=None + ) + spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key) + spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key)) + + spawned_interface.announce_rate_target = self.announce_rate_target + spawned_interface.announce_rate_grace = self.announce_rate_grace + spawned_interface.announce_rate_penalty = self.announce_rate_penalty + spawned_interface.mode = self.mode + spawned_interface.HW_MTU = self.HW_MTU + spawned_interface.online = True + RNS.Transport.interfaces.append(spawned_interface) + if addr in self.spawned_interfaces: + self.spawned_interfaces[addr].detach() + self.spawned_interfaces[addr].teardown() + self.spawned_interfaces.pop(spawned_interface) + self.spawned_interfaces[addr] = spawned_interface + RNS.log(str(self)+" added peer "+str(addr)+" on "+str(ifname), RNS.LOG_DEBUG) else: self.refresh_peer(addr) def refresh_peer(self, addr): - self.peers[addr][1] = time.time() + try: + self.peers[addr][1] = time.time() + except Exception as e: + RNS.log(f"An error occurred while refreshing peer {addr} on {self}: {e}", RNS.LOG_ERROR) - def process_incoming(self, data): - if self.online: - data_hash = RNS.Identity.full_hash(data) - deque_hit = False - if data_hash in self.mif_deque: - for te in self.mif_deque_times: - if te[0] == data_hash and time.time() < te[1]+AutoInterface.MULTI_IF_DEQUE_TTL: - deque_hit = True - break - - if not deque_hit: - self.mif_deque.append(data_hash) - self.mif_deque_times.append([data_hash, time.time()]) - self.rxb += len(data) - self.owner.inbound(data, self) + def process_incoming(self, data, addr=None): + if self.online and addr in self.spawned_interfaces: + self.spawned_interfaces[addr].process_incoming(data, addr) def process_outgoing(self,data): - if self.online: - for peer in self.peers: - try: - if self.outbound_udp_socket == None: - self.outbound_udp_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - - peer_addr = str(peer)+"%"+str(self.interface_name_to_index(self.peers[peer][0])) - addr_info = socket.getaddrinfo(peer_addr, self.data_port, socket.AF_INET6, socket.SOCK_DGRAM) - self.outbound_udp_socket.sendto(data, addr_info[0][4]) - - except Exception as e: - RNS.log("Could not transmit on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) - - - self.txb += len(data) - + pass # Until per-device sub-interfacing is implemented, # ingress limiting should be disabled on AutoInterface @@ -502,6 +530,83 @@ class AutoInterface(Interface): def __str__(self): return "AutoInterface["+self.name+"]" +class AutoInterfacePeer(Interface): + + def __init__(self, owner, addr, ifname): + super().__init__() + self.owner = owner + self.parent_interface = owner + self.addr = addr + self.ifname = ifname + self.peer_addr = None + self.addr_info = None + self.HW_MTU = self.owner.HW_MTU + self.FIXED_MTU = self.owner.FIXED_MTU + + def __str__(self): + return f"AutoInterfacePeer[{self.ifname}/{self.addr}]" + + def process_incoming(self, data, addr=None): + if self.online and self.owner.online: + data_hash = RNS.Identity.full_hash(data) + deque_hit = False + if data_hash in self.owner.mif_deque: + for te in self.owner.mif_deque_times: + if te[0] == data_hash and time.time() < te[1]+AutoInterface.MULTI_IF_DEQUE_TTL: + deque_hit = True + break + + if not deque_hit: + self.owner.refresh_peer(self.addr) + self.owner.mif_deque.append(data_hash) + self.owner.mif_deque_times.append([data_hash, time.time()]) + self.rxb += len(data) + self.owner.rxb += len(data) + self.owner.owner.inbound(data, self) + + def process_outgoing(self, data): + if self.online: + with self.owner.write_lock: + try: + if self.owner.outbound_udp_socket == None: self.owner.outbound_udp_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + if self.peer_addr == None: self.peer_addr = str(self.addr)+"%"+str(self.owner.interface_name_to_index(self.ifname)) + if self.addr_info == None: addr_info = socket.getaddrinfo(self.peer_addr, self.owner.data_port, socket.AF_INET6, socket.SOCK_DGRAM) + self.owner.outbound_udp_socket.sendto(data, addr_info[0][4]) + self.txb += len(data) + self.owner.txb += len(data) + except Exception as e: + RNS.log("Could not transmit on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + + def detach(self): + self.online = False + self.detached = True + + def teardown(self): + if not self.detached: + RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down.", RNS.LOG_ERROR) + if RNS.Reticulum.panic_on_interface_error: + RNS.panic() + + else: + RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE) + + self.online = False + self.OUT = False + self.IN = False + + if self.addr in self.owner.spawned_interfaces: + try: self.owner.spawned_interfaces.pop(self.addr) + except Exception as e: + RNS.log(f"Could not remove {self} from parent interface on detach. The contained exception was: {e}", RNS.LOG_ERROR) + + if self in RNS.Transport.interfaces: + RNS.Transport.interfaces.remove(self) + + # Until per-device sub-interfacing is implemented, + # ingress limiting should be disabled on AutoInterface + def should_ingress_limit(self): + return False + class AutoInterfaceHandler(socketserver.BaseRequestHandler): def __init__(self, callback, *args, **keys): self.callback = callback @@ -509,4 +614,5 @@ class AutoInterfaceHandler(socketserver.BaseRequestHandler): def handle(self): data = self.request[0] - self.callback(data) + addr = self.client_address[0] + self.callback(data, addr) \ No newline at end of file diff --git a/RNS/Interfaces/Interface.py b/RNS/Interfaces/Interface.py index ccf92b8..6440ba0 100755 --- a/RNS/Interfaces/Interface.py +++ b/RNS/Interfaces/Interface.py @@ -182,12 +182,12 @@ class Interface: RNS.log("An error occurred while processing held announces for "+str(self), RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - def received_announce(self): + def received_announce(self, from_spawned=False): self.ia_freq_deque.append(time.time()) if hasattr(self, "parent_interface") and self.parent_interface != None: self.parent_interface.received_announce(from_spawned=True) - def sent_announce(self): + def sent_announce(self, from_spawned=False): self.oa_freq_deque.append(time.time()) if hasattr(self, "parent_interface") and self.parent_interface != None: self.parent_interface.sent_announce(from_spawned=True) diff --git a/RNS/Utilities/rnstatus.py b/RNS/Utilities/rnstatus.py index 1559585..7ede8d2 100644 --- a/RNS/Utilities/rnstatus.py +++ b/RNS/Utilities/rnstatus.py @@ -250,6 +250,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json= if dispall or not ( name.startswith("LocalInterface[") or name.startswith("TCPInterface[Client") or + name.startswith("AutoInterfacePeer[") or name.startswith("I2PInterfacePeer[Connected peer") or (name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False)) ):