Use epoll backend for LocalInterface

This commit is contained in:
Mark Qvist 2025-04-06 22:50:43 +02:00
parent df3c2cffb3
commit 3483de1fc2
3 changed files with 152 additions and 92 deletions

View file

@ -131,10 +131,8 @@ class BackboneInterface(Interface):
self.bind_ip = bind_address[0]
self.owner = owner
self.add_listener(bind_address)
BackboneInterface.add_listener(bind_address)
self.bitrate = self.BITRATE_GUESS
self.start()
self.online = True
else:
@ -152,7 +150,20 @@ class BackboneInterface(Interface):
def add_client_socket(client_socket, interface):
BackboneInterface.ensure_epoll()
BackboneInterface.spawned_interface_filenos[client_socket.fileno()] = interface
BackboneInterface.epoll.register(client_socket.fileno(), select.EPOLLIN)
BackboneInterface.register_in(client_socket.fileno())
BackboneInterface.start()
@staticmethod
def register_in(fileno):
# TODO: Remove debug
RNS.log(f"Registering EPOLL_IN for {fileno}", RNS.LOG_DEBUG)
BackboneInterface.epoll.register(fileno, select.EPOLLIN)
@staticmethod
def deregister_fileno(fileno):
# TODO: Remove debug
RNS.log(f"Deregistering {fileno}", RNS.LOG_DEBUG)
BackboneInterface.epoll.unregister(fileno)
@staticmethod
def tx_ready(interface):
@ -169,7 +180,7 @@ class BackboneInterface(Interface):
with BackboneInterface._job_lock:
if BackboneInterface._job_active: return
else:
RNS.log(f"Starting BackboneInterface I/O handler") # TODO: Remove debug
RNS.log(f"Starting BackboneInterface I/O handler", RNS.LOG_DEBUG) # TODO: Remove debug
BackboneInterface._job_active = True
BackboneInterface.ensure_epoll()
try:
@ -187,7 +198,7 @@ class BackboneInterface(Interface):
if len(received_bytes): spawned_interface.receive(received_bytes)
else:
BackboneInterface.epoll.unregister(fileno); client_socket.close()
BackboneInterface.deregister_fileno(fileno); client_socket.close()
spawned_interface.receive(received_bytes)
elif fileno == client_socket.fileno() and (event & select.EPOLLOUT):
@ -196,7 +207,7 @@ class BackboneInterface(Interface):
except Exception as e:
RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR)
written = 0
BackboneInterface.epoll.unregister(fileno)
BackboneInterface.deregister_fileno(fileno)
try: client_socket.close()
except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR)
spawned_interface.receive(b"")
@ -207,7 +218,7 @@ class BackboneInterface(Interface):
if spawned_interface.parent_interface: spawned_interface.parent_interface.txb += written
elif fileno == client_socket.fileno() and event & (select.EPOLLHUP):
BackboneInterface.epoll.unregister(fileno)
BackboneInterface.deregister_fileno(fileno)
try: client_socket.close()
except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR)
spawned_interface.receive(b"")
@ -217,11 +228,11 @@ class BackboneInterface(Interface):
if fileno == server_socket.fileno() and (event & select.EPOLLIN):
client_socket, address = server_socket.accept()
client_socket.setblocking(0)
if owner_interface.incoming_connection(client_socket): pass
else: client_socket.close()
if not owner_interface.incoming_connection(client_socket):
client_socket.close()
elif fileno == server_socket.fileno() and (event & select.EPOLLHUP):
try: BackboneInterface.epoll.unregister(fileno)
try: BackboneInterface.deregister_fileno(fileno)
except Exception as e: RNS.log(f"Error while deregistering listener file descriptor {fileno}: {e}", RNS.LOG_ERROR)
try: server_socket.close()
@ -234,23 +245,25 @@ class BackboneInterface(Interface):
finally:
for owner_interface, serversocket in BackboneInterface.listener_filenos:
fileno = serversocket.fileno()
BackboneInterface.epoll.unregister(fileno)
BackboneInterface.deregister_fileno(fileno)
serversocket.close()
BackboneInterface.listener_filenos.clear()
def add_listener(self, bind_address, socket_type=socket.AF_INET):
@staticmethod
def add_listener(interface, bind_address, socket_type=socket.AF_INET):
BackboneInterface.ensure_epoll()
if socket_type == socket.AF_INET:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(bind_address)
else: raise TypeError(f"Invalid socket type {socket_type} for {self}")
else: raise TypeError(f"Invalid socket type {socket_type} for {interface}")
server_socket.listen(1)
server_socket.setblocking(0)
BackboneInterface.listener_filenos[server_socket.fileno()] = (self, server_socket)
BackboneInterface.listener_filenos[server_socket.fileno()] = (interface, server_socket)
BackboneInterface.epoll.register(server_socket.fileno(), select.EPOLLIN)
RNS.log(f"{self} listener added: {server_socket}", RNS.LOG_DEBUG) # TODO: Remove debug
BackboneInterface.start()
def incoming_connection(self, socket):
RNS.log("Accepting incoming connection", RNS.LOG_VERBOSE)
@ -295,7 +308,7 @@ class BackboneInterface(Interface):
RNS.Transport.interfaces.append(spawned_interface)
while spawned_interface in self.spawned_interfaces: self.spawned_interfaces.remove(spawned_interface)
self.spawned_interfaces.append(spawned_interface)
BackboneInterface.add_client_socket(client_socket, spawned_interface)
BackboneInterface.add_client_socket(socket, spawned_interface)
return True
@ -459,8 +472,6 @@ class BackboneClientInterface(Interface):
self.socket.settimeout(None)
BackboneInterface.add_client_socket(self.socket, self)
BackboneInterface.start()
self.online = True
if initial: