diff --git a/RNS/Interfaces/BackboneInterface.py b/RNS/Interfaces/BackboneInterface.py index ce34b90..d570615 100644 --- a/RNS/Interfaces/BackboneInterface.py +++ b/RNS/Interfaces/BackboneInterface.py @@ -42,10 +42,17 @@ class HDLC(): class BackboneInterface(Interface): HW_MTU = 1048576 - BITRATE_GUESS = 100_000_000 + BITRATE_GUESS = 1_000_000_000 DEFAULT_IFAC_SIZE = 16 AUTOCONFIGURE_MTU = True + listener_filenos = {} + spawned_interfaces = [] + spawned_interface_filenos = {} + epoll = None + _job_active = False + _job_lock = threading.Lock() + @staticmethod def get_address_for_if(name, bind_port, prefer_ipv6=False): import RNS.vendor.ifaddr.niwrapper as netinfo @@ -95,21 +102,15 @@ class BackboneInterface(Interface): bindport = int(c["listen_port"]) if "listen_port" in c else None prefer_ipv6 = c.as_bool("prefer_ipv6") if "prefer_ipv6" in c else False - if port != None: - bindport = port + if port != None: bindport = port self.HW_MTU = BackboneInterface.HW_MTU - self.online = False - self.listeners = [] - self.spawned_interfaces = [] - self.IN = True self.OUT = False self.name = name self.detached = False - - self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL + self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL if bindport == None: raise SystemError(f"No TCP port configured for interface \"{name}\"") @@ -129,16 +130,7 @@ class BackboneInterface(Interface): self.bind_ip = bind_address[0] self.owner = owner - # if len(bind_address) == 4: - # try: - # ThreadingTCP6Server.allow_reuse_address = True - # self.server = ThreadingTCP6Server(bind_address, handlerFactory(self.incoming_connection)) - # except Exception as e: - # RNS.log(f"Error while binding IPv6 socket for interface, the contained exception was: {e}", RNS.LOG_ERROR) - # raise SystemError("Could not bind IPv6 socket for interface. Please check the specified \"listen_ip\" configuration option") - # else: - - self.epoll = select.epoll() + if not BackboneInterface.epoll: BackboneInterface.epoll = select.epoll() self.add_listener(bind_address) self.bitrate = self.BITRATE_GUESS @@ -150,89 +142,95 @@ class BackboneInterface(Interface): def start(self): RNS.log(f"Starting {self}") - threading.Thread(target=self.__job, daemon=True).start() + if not BackboneInterface._job_active: threading.Thread(target=self.__job, daemon=True).start() - def __job(self): - try: - while True: - events = self.epoll.poll(1) + @staticmethod + def __job(): + with BackboneInterface._job_lock: + if BackboneInterface._job_active: return + else: + BackboneInterface._job_active = True + try: + while True: + events = BackboneInterface.epoll.poll(1) + for fileno, event in BackboneInterface.epoll.poll(1): + if fileno in BackboneInterface.spawned_interface_filenos: + spawned_interface = BackboneInterface.spawned_interface_filenos[fileno] + client_socket = spawned_interface.socket + if fileno == client_socket.fileno() and (event & select.EPOLLIN): + try: received_bytes = client_socket.recv(4096) + except Exception as e: + RNS.log(f"Error while reading from {spawned_interface}: {e}", RNS.LOG_ERROR) + received_bytes = b"" - for spawned_interface in self.spawned_interfaces: - clientsocket = spawned_interface.socket - for fileno, event in events: - if fileno == clientsocket.fileno() and (event & select.EPOLLIN): - try: - inb = clientsocket.recv(4096) - except Exception as e: - RNS.log(f"Error while reading from {spawned_interface}: {e}", RNS.LOG_ERROR) - inb = b"" + if len(received_bytes): spawned_interface.receive(received_bytes) + else: + BackboneInterface.epoll.unregister(fileno); client_socket.close() + spawned_interface.receive(received_bytes) + + elif fileno == client_socket.fileno() and (event & select.EPOLLOUT): + try: + written = client_socket.send(spawned_interface.transmit_buffer) + except Exception as e: + RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR) + written = 0 + BackboneInterface.epoll.unregister(fileno) + try: client_socket.close() + except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR) + spawned_interface.receive(b"") - if len(inb): - spawned_interface.receive(inb) - else: - self.epoll.unregister(fileno) - clientsocket.close() - spawned_interface.receive(inb) - - elif fileno == clientsocket.fileno() and (event & select.EPOLLOUT): - try: - written = clientsocket.send(spawned_interface.transmit_buffer) - except Exception as e: - RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR) - written = 0 + spawned_interface.transmit_buffer = spawned_interface.transmit_buffer[written:] + if len(spawned_interface.transmit_buffer) == 0: BackboneInterface.epoll.modify(fileno, select.EPOLLIN) + spawned_interface.parent_interface.txb += written; spawned_interface.txb += written + + elif fileno == client_socket.fileno() and event & (select.EPOLLHUP): + BackboneInterface.epoll.unregister(fileno) + try: client_socket.close() + except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR) + spawned_interface.receive(b"") - spawned_interface.transmit_buffer = spawned_interface.transmit_buffer[written:] - if len(spawned_interface.transmit_buffer) == 0: self.epoll.modify(fileno, select.EPOLLIN) - self.txb += written; spawned_interface.txb += written - - elif fileno == clientsocket.fileno() and event & (select.EPOLLHUP): - self.epoll.unregister(fileno) - try: clientsocket.close() - except Exception as e: - RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR) + elif fileno in BackboneInterface.listener_filenos: + owner_interface, server_socket = BackboneInterface.listener_filenos[fileno] + if fileno == server_socket.fileno() and (event & select.EPOLLIN): + client_socket, address = server_socket.accept() + client_socket.setblocking(0) + if owner_interface.incoming_connection(client_socket): + BackboneInterface.epoll.register(client_socket.fileno(), select.EPOLLIN) + else: + client_socket.close() + + elif fileno == server_socket.fileno() and (event & select.EPOLLHUP): + try: BackboneInterface.epoll.unregister(fileno) + except Exception as e: RNS.log(f"Error while deregistering listener file descriptor {fileno}: {e}", RNS.LOG_ERROR) - spawned_interface.receive(b"") + try: server_socket.close() + except Exception as e: RNS.log(f"Error while closing listener socket for {server_socket}: {e}", RNS.LOG_ERROR) - for serversocket in self.listeners: - for fileno, event in events: - if fileno == serversocket.fileno(): RNS.log(f"Listener {serversocket}, fd {fileno}, event {event}") - if fileno == serversocket.fileno() and (event & select.EPOLLIN): - connection, address = serversocket.accept() - connection.setblocking(0) - if self.incoming_connection(connection): - self.epoll.register(connection.fileno(), select.EPOLLIN) - else: - connection.close() - - elif fileno == serversocket.fileno() and (event & select.EPOLLHUP): - try: self.epoll.unregister(fileno) - except Exception as e: - RNS.log(f"Error while unregistering file descriptor {fileno}: {e}", RNS.LOG_ERROR) + except Exception as e: + RNS.log(f"BackboneInterface error: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) - try: serversocket.close() - except Exception as e: - RNS.log(f"Error while closing socket for {serversocket}: {e}", RNS.LOG_ERROR) - - except Exception as e: - RNS.log(f"{self} error: {e}", RNS.LOG_ERROR) - RNS.trace_exception(e) + finally: + for owner_interface, serversocket in BackboneInterface.listener_filenos: + fileno = serversocket.fileno() + BackboneInterface.epoll.unregister(fileno) + serversocket.close() - finally: - for serversocket in self.listeners: - self.epoll.unregister(serversocket.fileno()) - serversocket.close() - - self.epoll.close() + BackboneInterface.listener_filenos.clear() + BackboneInterface.epoll.close() - def add_listener(self, bind_address): - server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(bind_address) + def add_listener(self, bind_address, socket_type=socket.AF_INET): + if socket_type == socket.AF_INET: + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(bind_address) + else: raise TypeError(f"Invalid socket type {socket_type} for {self}") + server_socket.listen(1) server_socket.setblocking(0) - self.epoll.register(server_socket.fileno(), select.EPOLLIN) - self.listeners.append(server_socket) - RNS.log(f"Listener added: {server_socket}") + BackboneInterface.listener_filenos[server_socket.fileno()] = (self, server_socket) + BackboneInterface.epoll.register(server_socket.fileno(), select.EPOLLIN) + RNS.log(f"{self} listener added: {server_socket}", RNS.LOG_DEBUG) # TODO: Remove debug def incoming_connection(self, socket): RNS.log("Accepting incoming connection", RNS.LOG_VERBOSE) @@ -275,9 +273,9 @@ class BackboneInterface(Interface): spawned_interface.online = True RNS.log("Spawned new BackBoneClient Interface: "+str(spawned_interface), RNS.LOG_VERBOSE) RNS.Transport.interfaces.append(spawned_interface) - while spawned_interface in self.spawned_interfaces: - self.spawned_interfaces.remove(spawned_interface) + while spawned_interface in self.spawned_interfaces: self.spawned_interfaces.remove(spawned_interface) self.spawned_interfaces.append(spawned_interface) + self.spawned_interface_filenos[socket.fileno()] = spawned_interface return True @@ -293,17 +291,14 @@ class BackboneInterface(Interface): def detach(self): self.detached = True self.online = False - for listener_socket in self.listeners: - if hasattr(listener_socket, "shutdown"): - if callable(listener_socket.shutdown): - try: - # RNS.log("Detaching "+str(self), RNS.LOG_DEBUG) - listener_socket.shutdown(socket.SHUT_RDWR) - - except Exception as e: - RNS.log("Error while shutting down server for "+str(self)+": "+str(e)) - - while len(self.listeners): self.listeners.pop() + detached = [] + for fileno in BackboneInterface.listener_filenos: + owner_interface, listener_socket = BackboneInterface.listener_filenos[fileno] + if owner_interface == self: + if hasattr(listener_socket, "shutdown"): + if callable(listener_socket.shutdown): + try: listener_socket.shutdown(socket.SHUT_RDWR) + except Exception as e: RNS.log("Error while shutting down server for "+str(self)+": "+str(e)) def __str__(self): if ":" in self.bind_ip: