Run all BackboneInterface I/O on single epoll instance

This commit is contained in:
Mark Qvist 2025-04-06 18:17:37 +02:00
parent 456eea9c13
commit de3438248f

View File

@ -42,10 +42,17 @@ class HDLC():
class BackboneInterface(Interface):
HW_MTU = 1048576
BITRATE_GUESS = 100_000_000
BITRATE_GUESS = 1_000_000_000
DEFAULT_IFAC_SIZE = 16
AUTOCONFIGURE_MTU = True
listener_filenos = {}
spawned_interfaces = []
spawned_interface_filenos = {}
epoll = None
_job_active = False
_job_lock = threading.Lock()
@staticmethod
def get_address_for_if(name, bind_port, prefer_ipv6=False):
import RNS.vendor.ifaddr.niwrapper as netinfo
@ -95,21 +102,15 @@ class BackboneInterface(Interface):
bindport = int(c["listen_port"]) if "listen_port" in c else None
prefer_ipv6 = c.as_bool("prefer_ipv6") if "prefer_ipv6" in c else False
if port != None:
bindport = port
if port != None: bindport = port
self.HW_MTU = BackboneInterface.HW_MTU
self.online = False
self.listeners = []
self.spawned_interfaces = []
self.IN = True
self.OUT = False
self.name = name
self.detached = False
self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL
self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL
if bindport == None:
raise SystemError(f"No TCP port configured for interface \"{name}\"")
@ -129,16 +130,7 @@ class BackboneInterface(Interface):
self.bind_ip = bind_address[0]
self.owner = owner
# if len(bind_address) == 4:
# try:
# ThreadingTCP6Server.allow_reuse_address = True
# self.server = ThreadingTCP6Server(bind_address, handlerFactory(self.incoming_connection))
# except Exception as e:
# RNS.log(f"Error while binding IPv6 socket for interface, the contained exception was: {e}", RNS.LOG_ERROR)
# raise SystemError("Could not bind IPv6 socket for interface. Please check the specified \"listen_ip\" configuration option")
# else:
self.epoll = select.epoll()
if not BackboneInterface.epoll: BackboneInterface.epoll = select.epoll()
self.add_listener(bind_address)
self.bitrate = self.BITRATE_GUESS
@ -150,89 +142,95 @@ class BackboneInterface(Interface):
def start(self):
RNS.log(f"Starting {self}")
threading.Thread(target=self.__job, daemon=True).start()
if not BackboneInterface._job_active: threading.Thread(target=self.__job, daemon=True).start()
def __job(self):
try:
while True:
events = self.epoll.poll(1)
@staticmethod
def __job():
with BackboneInterface._job_lock:
if BackboneInterface._job_active: return
else:
BackboneInterface._job_active = True
try:
while True:
events = BackboneInterface.epoll.poll(1)
for fileno, event in BackboneInterface.epoll.poll(1):
if fileno in BackboneInterface.spawned_interface_filenos:
spawned_interface = BackboneInterface.spawned_interface_filenos[fileno]
client_socket = spawned_interface.socket
if fileno == client_socket.fileno() and (event & select.EPOLLIN):
try: received_bytes = client_socket.recv(4096)
except Exception as e:
RNS.log(f"Error while reading from {spawned_interface}: {e}", RNS.LOG_ERROR)
received_bytes = b""
for spawned_interface in self.spawned_interfaces:
clientsocket = spawned_interface.socket
for fileno, event in events:
if fileno == clientsocket.fileno() and (event & select.EPOLLIN):
try:
inb = clientsocket.recv(4096)
except Exception as e:
RNS.log(f"Error while reading from {spawned_interface}: {e}", RNS.LOG_ERROR)
inb = b""
if len(received_bytes): spawned_interface.receive(received_bytes)
else:
BackboneInterface.epoll.unregister(fileno); client_socket.close()
spawned_interface.receive(received_bytes)
elif fileno == client_socket.fileno() and (event & select.EPOLLOUT):
try:
written = client_socket.send(spawned_interface.transmit_buffer)
except Exception as e:
RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR)
written = 0
BackboneInterface.epoll.unregister(fileno)
try: client_socket.close()
except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR)
spawned_interface.receive(b"")
if len(inb):
spawned_interface.receive(inb)
else:
self.epoll.unregister(fileno)
clientsocket.close()
spawned_interface.receive(inb)
elif fileno == clientsocket.fileno() and (event & select.EPOLLOUT):
try:
written = clientsocket.send(spawned_interface.transmit_buffer)
except Exception as e:
RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR)
written = 0
spawned_interface.transmit_buffer = spawned_interface.transmit_buffer[written:]
if len(spawned_interface.transmit_buffer) == 0: BackboneInterface.epoll.modify(fileno, select.EPOLLIN)
spawned_interface.parent_interface.txb += written; spawned_interface.txb += written
elif fileno == client_socket.fileno() and event & (select.EPOLLHUP):
BackboneInterface.epoll.unregister(fileno)
try: client_socket.close()
except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR)
spawned_interface.receive(b"")
spawned_interface.transmit_buffer = spawned_interface.transmit_buffer[written:]
if len(spawned_interface.transmit_buffer) == 0: self.epoll.modify(fileno, select.EPOLLIN)
self.txb += written; spawned_interface.txb += written
elif fileno == clientsocket.fileno() and event & (select.EPOLLHUP):
self.epoll.unregister(fileno)
try: clientsocket.close()
except Exception as e:
RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR)
elif fileno in BackboneInterface.listener_filenos:
owner_interface, server_socket = BackboneInterface.listener_filenos[fileno]
if fileno == server_socket.fileno() and (event & select.EPOLLIN):
client_socket, address = server_socket.accept()
client_socket.setblocking(0)
if owner_interface.incoming_connection(client_socket):
BackboneInterface.epoll.register(client_socket.fileno(), select.EPOLLIN)
else:
client_socket.close()
elif fileno == server_socket.fileno() and (event & select.EPOLLHUP):
try: BackboneInterface.epoll.unregister(fileno)
except Exception as e: RNS.log(f"Error while deregistering listener file descriptor {fileno}: {e}", RNS.LOG_ERROR)
spawned_interface.receive(b"")
try: server_socket.close()
except Exception as e: RNS.log(f"Error while closing listener socket for {server_socket}: {e}", RNS.LOG_ERROR)
for serversocket in self.listeners:
for fileno, event in events:
if fileno == serversocket.fileno(): RNS.log(f"Listener {serversocket}, fd {fileno}, event {event}")
if fileno == serversocket.fileno() and (event & select.EPOLLIN):
connection, address = serversocket.accept()
connection.setblocking(0)
if self.incoming_connection(connection):
self.epoll.register(connection.fileno(), select.EPOLLIN)
else:
connection.close()
elif fileno == serversocket.fileno() and (event & select.EPOLLHUP):
try: self.epoll.unregister(fileno)
except Exception as e:
RNS.log(f"Error while unregistering file descriptor {fileno}: {e}", RNS.LOG_ERROR)
except Exception as e:
RNS.log(f"BackboneInterface error: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
try: serversocket.close()
except Exception as e:
RNS.log(f"Error while closing socket for {serversocket}: {e}", RNS.LOG_ERROR)
except Exception as e:
RNS.log(f"{self} error: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
finally:
for owner_interface, serversocket in BackboneInterface.listener_filenos:
fileno = serversocket.fileno()
BackboneInterface.epoll.unregister(fileno)
serversocket.close()
finally:
for serversocket in self.listeners:
self.epoll.unregister(serversocket.fileno())
serversocket.close()
self.epoll.close()
BackboneInterface.listener_filenos.clear()
BackboneInterface.epoll.close()
def add_listener(self, bind_address):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(bind_address)
def add_listener(self, bind_address, socket_type=socket.AF_INET):
if socket_type == socket.AF_INET:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(bind_address)
else: raise TypeError(f"Invalid socket type {socket_type} for {self}")
server_socket.listen(1)
server_socket.setblocking(0)
self.epoll.register(server_socket.fileno(), select.EPOLLIN)
self.listeners.append(server_socket)
RNS.log(f"Listener added: {server_socket}")
BackboneInterface.listener_filenos[server_socket.fileno()] = (self, server_socket)
BackboneInterface.epoll.register(server_socket.fileno(), select.EPOLLIN)
RNS.log(f"{self} listener added: {server_socket}", RNS.LOG_DEBUG) # TODO: Remove debug
def incoming_connection(self, socket):
RNS.log("Accepting incoming connection", RNS.LOG_VERBOSE)
@ -275,9 +273,9 @@ class BackboneInterface(Interface):
spawned_interface.online = True
RNS.log("Spawned new BackBoneClient Interface: "+str(spawned_interface), RNS.LOG_VERBOSE)
RNS.Transport.interfaces.append(spawned_interface)
while spawned_interface in self.spawned_interfaces:
self.spawned_interfaces.remove(spawned_interface)
while spawned_interface in self.spawned_interfaces: self.spawned_interfaces.remove(spawned_interface)
self.spawned_interfaces.append(spawned_interface)
self.spawned_interface_filenos[socket.fileno()] = spawned_interface
return True
@ -293,17 +291,14 @@ class BackboneInterface(Interface):
def detach(self):
self.detached = True
self.online = False
for listener_socket in self.listeners:
if hasattr(listener_socket, "shutdown"):
if callable(listener_socket.shutdown):
try:
# RNS.log("Detaching "+str(self), RNS.LOG_DEBUG)
listener_socket.shutdown(socket.SHUT_RDWR)
except Exception as e:
RNS.log("Error while shutting down server for "+str(self)+": "+str(e))
while len(self.listeners): self.listeners.pop()
detached = []
for fileno in BackboneInterface.listener_filenos:
owner_interface, listener_socket = BackboneInterface.listener_filenos[fileno]
if owner_interface == self:
if hasattr(listener_socket, "shutdown"):
if callable(listener_socket.shutdown):
try: listener_socket.shutdown(socket.SHUT_RDWR)
except Exception as e: RNS.log("Error while shutting down server for "+str(self)+": "+str(e))
def __str__(self):
if ":" in self.bind_ip: