Added ability to run local shared instance over abstract domain sockets

This commit is contained in:
Mark Qvist 2025-04-07 00:46:40 +02:00
parent 3483de1fc2
commit 4cd94c776a
4 changed files with 116 additions and 57 deletions

View file

@ -146,6 +146,24 @@ class BackboneInterface(Interface):
def ensure_epoll(): def ensure_epoll():
if not BackboneInterface.epoll: BackboneInterface.epoll = select.epoll() if not BackboneInterface.epoll: BackboneInterface.epoll = select.epoll()
@staticmethod
def add_listener(interface, bind_address, socket_type=socket.AF_INET):
BackboneInterface.ensure_epoll()
if socket_type == socket.AF_INET:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(bind_address)
elif socket_type == socket.AF_UNIX:
server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_socket.bind(bind_address)
else: raise TypeError(f"Invalid socket type {socket_type} for {interface}")
server_socket.listen(1)
server_socket.setblocking(0)
BackboneInterface.listener_filenos[server_socket.fileno()] = (interface, server_socket)
BackboneInterface.epoll.register(server_socket.fileno(), select.EPOLLIN)
BackboneInterface.start()
@staticmethod @staticmethod
def add_client_socket(client_socket, interface): def add_client_socket(client_socket, interface):
BackboneInterface.ensure_epoll() BackboneInterface.ensure_epoll()
@ -156,14 +174,28 @@ class BackboneInterface(Interface):
@staticmethod @staticmethod
def register_in(fileno): def register_in(fileno):
# TODO: Remove debug # TODO: Remove debug
RNS.log(f"Registering EPOLL_IN for {fileno}", RNS.LOG_DEBUG) # RNS.log(f"Registering EPOLL_IN for {fileno}", RNS.LOG_DEBUG)
BackboneInterface.epoll.register(fileno, select.EPOLLIN) try: BackboneInterface.epoll.register(fileno, select.EPOLLIN)
except Exception as e:
RNS.log(f"An error occurred while registering EPOLL_IN for file descriptor {fileno}: {e}", RNS.LOG_ERROR)
@staticmethod @staticmethod
def deregister_fileno(fileno): def deregister_fileno(fileno):
# TODO: Remove debug # TODO: Remove debug
RNS.log(f"Deregistering {fileno}", RNS.LOG_DEBUG) # RNS.log(f"Deregistering {fileno}", RNS.LOG_DEBUG)
BackboneInterface.epoll.unregister(fileno) try: BackboneInterface.epoll.unregister(fileno)
except Exception as e:
RNS.log(f"An error occurred while deregistering file descriptor {fileno}: {e}", RNS.LOG_DEBUG)
@staticmethod
def deregister_listeners():
for fileno in BackboneInterface.listener_filenos:
owner_interface, server_socket = BackboneInterface.listener_filenos[fileno]
fileno = server_socket.fileno()
BackboneInterface.deregister_fileno(fileno)
server_socket.close()
BackboneInterface.listener_filenos.clear()
@staticmethod @staticmethod
def tx_ready(interface): def tx_ready(interface):
@ -205,8 +237,8 @@ class BackboneInterface(Interface):
try: try:
written = client_socket.send(spawned_interface.transmit_buffer) written = client_socket.send(spawned_interface.transmit_buffer)
except Exception as e: except Exception as e:
RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR)
written = 0 written = 0
if not spawned_interface.detached: RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR)
BackboneInterface.deregister_fileno(fileno) BackboneInterface.deregister_fileno(fileno)
try: client_socket.close() try: client_socket.close()
except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR) except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR)
@ -243,28 +275,8 @@ class BackboneInterface(Interface):
RNS.trace_exception(e) RNS.trace_exception(e)
finally: finally:
for owner_interface, serversocket in BackboneInterface.listener_filenos: BackboneInterface.deregister_listeners()
fileno = serversocket.fileno()
BackboneInterface.deregister_fileno(fileno)
serversocket.close()
BackboneInterface.listener_filenos.clear()
@staticmethod
def add_listener(interface, bind_address, socket_type=socket.AF_INET):
BackboneInterface.ensure_epoll()
if socket_type == socket.AF_INET:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(bind_address)
else: raise TypeError(f"Invalid socket type {socket_type} for {interface}")
server_socket.listen(1)
server_socket.setblocking(0)
BackboneInterface.listener_filenos[server_socket.fileno()] = (interface, server_socket)
BackboneInterface.epoll.register(server_socket.fileno(), select.EPOLLIN)
BackboneInterface.start()
def incoming_connection(self, socket): def incoming_connection(self, socket):
RNS.log("Accepting incoming connection", RNS.LOG_VERBOSE) RNS.log("Accepting incoming connection", RNS.LOG_VERBOSE)
spawned_configuration = {"name": "Client on "+self.name, "target_host": None, "target_port": None} spawned_configuration = {"name": "Client on "+self.name, "target_host": None, "target_port": None}
@ -331,7 +343,7 @@ class BackboneInterface(Interface):
if hasattr(listener_socket, "shutdown"): if hasattr(listener_socket, "shutdown"):
if callable(listener_socket.shutdown): if callable(listener_socket.shutdown):
try: listener_socket.shutdown(socket.SHUT_RDWR) try: listener_socket.shutdown(socket.SHUT_RDWR)
except Exception as e: RNS.log("Error while shutting down server for "+str(self)+": "+str(e)) except Exception as e: RNS.log("Error while shutting down socket for "+str(self)+": "+str(e))
def __str__(self): def __str__(self):
if ":" in self.bind_ip: if ":" in self.bind_ip:

View file

@ -55,13 +55,16 @@ class LocalClientInterface(Interface):
RECONNECT_WAIT = 8 RECONNECT_WAIT = 8
AUTOCONFIGURE_MTU = True AUTOCONFIGURE_MTU = True
def __init__(self, owner, name, target_port = None, connected_socket=None): def __init__(self, owner, name, target_port = None, connected_socket=None, socket_path=None):
super().__init__() super().__init__()
self.epoll_backend = False self.epoll_backend = False
self.HW_MTU = 262144 self.HW_MTU = 262144
self.online = False self.online = False
if RNS.vendor.platformutils.is_linux(): self.socket_path = f"\0rns/{socket_path}"
else: self.socket_path = None
self.IN = True self.IN = True
self.OUT = False self.OUT = False
self.socket = None self.socket = None
@ -82,10 +85,18 @@ class LocalClientInterface(Interface):
self.target_ip = None self.target_ip = None
self.target_port = None self.target_port = None
self.socket = connected_socket self.socket = connected_socket
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.socket.family == socket.AF_INET:
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.is_connected_to_shared_instance = False self.is_connected_to_shared_instance = False
elif self.socket_path != None:
self.receives = True
self.target_ip = None
self.target_port = None
self.connect()
elif target_port != None: elif target_port != None:
self.receives = True self.receives = True
self.target_ip = "127.0.0.1" self.target_ip = "127.0.0.1"
@ -113,9 +124,14 @@ class LocalClientInterface(Interface):
return False return False
def connect(self): def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if self.socket_path != None:
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.connect((self.target_ip, self.target_port)) self.socket.connect(self.socket_path)
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.socket.connect((self.target_ip, self.target_port))
self.online = True self.online = True
self.is_connected_to_shared_instance = True self.is_connected_to_shared_instance = True
@ -319,12 +335,15 @@ class LocalClientInterface(Interface):
class LocalServerInterface(Interface): class LocalServerInterface(Interface):
AUTOCONFIGURE_MTU = True AUTOCONFIGURE_MTU = True
def __init__(self, owner, bindport=None): def __init__(self, owner, bindport=None, socket_path=None):
super().__init__() super().__init__()
self.epoll_backend = False self.epoll_backend = False
self.online = False self.online = False
self.clients = 0 self.clients = 0
if RNS.vendor.platformutils.is_linux(): self.socket_path = f"\0rns/{socket_path}"
else: self.socket_path = None
self.IN = True self.IN = True
self.OUT = False self.OUT = False
self.name = "Reticulum" self.name = "Reticulum"
@ -333,53 +352,73 @@ class LocalServerInterface(Interface):
if RNS.vendor.platformutils.is_linux(): if RNS.vendor.platformutils.is_linux():
self.epoll_backend = True self.epoll_backend = True
if (bindport != None): if socket_path != None and self.epoll_backend:
self.receives = True
self.bind_ip = None
self.bind_port = None
self.owner = owner
self.is_local_shared_instance = True
BackboneInterface.add_listener(self, self.socket_path, socket_type=socket.AF_UNIX)
elif bindport != None:
self.receives = True self.receives = True
self.bind_ip = "127.0.0.1" self.bind_ip = "127.0.0.1"
self.bind_port = bindport self.bind_port = bindport
def handlerFactory(callback):
def createHandler(*args, **keys):
return LocalInterfaceHandler(callback, *args, **keys)
return createHandler
self.owner = owner self.owner = owner
self.is_local_shared_instance = True self.is_local_shared_instance = True
address = (self.bind_ip, self.bind_port) address = (self.bind_ip, self.bind_port)
if self.epoll_backend: BackboneInterface.add_listener(self, address) if self.epoll_backend: BackboneInterface.add_listener(self, address)
else: else:
def handlerFactory(callback):
def createHandler(*args, **keys):
return LocalInterfaceHandler(callback, *args, **keys)
return createHandler
self.server = ThreadingTCPServer(address, handlerFactory(self.incoming_connection)) self.server = ThreadingTCPServer(address, handlerFactory(self.incoming_connection))
self.server.daemon_threads = True self.server.daemon_threads = True
thread = threading.Thread(target=self.server.serve_forever) thread = threading.Thread(target=self.server.serve_forever)
thread.daemon = True thread.daemon = True
thread.start() thread.start()
self.announce_rate_target = None self.announce_rate_target = None
self.announce_rate_grace = None self.announce_rate_grace = None
self.announce_rate_penalty = None self.announce_rate_penalty = None
self.bitrate = 1000*1000*1000 self.bitrate = 1000*1000*1000
self.online = True self.online = True
def incoming_connection(self, handler): def incoming_connection(self, handler):
if self.epoll_backend: if self.epoll_backend:
socket = handler client_socket = handler
interface_name = str(str(socket.getpeername()[1])) if client_socket.family == socket.AF_INET:
spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=socket) interface_name = str(str(client_socket.getpeername()[1]))
elif client_socket.family == socket.AF_UNIX:
interface_name = f"{self.clients}@{self.socket_path}"
spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=client_socket)
spawned_interface.OUT = self.OUT spawned_interface.OUT = self.OUT
spawned_interface.IN = self.IN spawned_interface.IN = self.IN
spawned_interface.socket = socket spawned_interface.socket = client_socket
spawned_interface.target_ip = socket.getpeername()[0]
spawned_interface.target_port = str(socket.getpeername()[1])
spawned_interface.parent_interface = self spawned_interface.parent_interface = self
spawned_interface.bitrate = self.bitrate spawned_interface.bitrate = self.bitrate
if client_socket.family == socket.AF_INET:
spawned_interface.target_ip = client_socket.getpeername()[0]
spawned_interface.target_port = str(client_socket.getpeername()[1])
elif client_socket.family == socket.AF_UNIX:
spawned_interface.target_ip = None
spawned_interface.target_port = interface_name
spawned_interface.socket_path = self.socket_path
if hasattr(self, "_force_bitrate"): spawned_interface._force_bitrate = self._force_bitrate if hasattr(self, "_force_bitrate"): spawned_interface._force_bitrate = self._force_bitrate
RNS.Transport.interfaces.append(spawned_interface) RNS.Transport.interfaces.append(spawned_interface)
RNS.Transport.local_client_interfaces.append(spawned_interface) RNS.Transport.local_client_interfaces.append(spawned_interface)
BackboneInterface.add_client_socket(client_socket, spawned_interface)
self.clients += 1 self.clients += 1
BackboneInterface.add_client_socket(socket, spawned_interface)
return True return True
else: else:
@ -407,7 +446,8 @@ class LocalServerInterface(Interface):
if from_spawned: self.oa_freq_deque.append(time.time()) if from_spawned: self.oa_freq_deque.append(time.time())
def __str__(self): def __str__(self):
return "Shared Instance["+str(self.bind_port)+"]" if self.socket_path: return "Shared Instance["+str(self.socket_path)+"]"
else: return "Shared Instance["+str(self.bind_port)+"]"
class LocalInterfaceHandler(socketserver.BaseRequestHandler): class LocalInterfaceHandler(socketserver.BaseRequestHandler):
def __init__(self, callback, *args, **keys): def __init__(self, callback, *args, **keys):

View file

@ -251,6 +251,7 @@ class Reticulum:
self.local_interface_port = 37428 self.local_interface_port = 37428
self.local_control_port = 37429 self.local_control_port = 37429
self.local_socket_path = None
self.share_instance = True self.share_instance = True
self.rpc_listener = None self.rpc_listener = None
self.rpc_key = None self.rpc_key = None
@ -351,7 +352,8 @@ class Reticulum:
try: try:
interface = LocalInterface.LocalServerInterface( interface = LocalInterface.LocalServerInterface(
RNS.Transport, RNS.Transport,
self.local_interface_port self.local_interface_port,
socket_path=self.local_socket_path
) )
interface.OUT = True interface.OUT = True
if hasattr(Reticulum, "_force_shared_instance_bitrate"): if hasattr(Reticulum, "_force_shared_instance_bitrate"):
@ -377,7 +379,8 @@ class Reticulum:
interface = LocalInterface.LocalClientInterface( interface = LocalInterface.LocalClientInterface(
RNS.Transport, RNS.Transport,
"Local shared instance", "Local shared instance",
self.local_interface_port) self.local_interface_port,
socket_path=self.local_socket_path)
interface.target_port = self.local_interface_port interface.target_port = self.local_interface_port
interface.OUT = True interface.OUT = True
if hasattr(Reticulum, "_force_shared_instance_bitrate"): if hasattr(Reticulum, "_force_shared_instance_bitrate"):
@ -428,6 +431,9 @@ class Reticulum:
if option == "share_instance": if option == "share_instance":
value = self.config["reticulum"].as_bool(option) value = self.config["reticulum"].as_bool(option)
self.share_instance = value self.share_instance = value
if option == "instance_name":
value = self.config["reticulum"][option]
self.local_socket_path = value
if option == "shared_instance_port": if option == "shared_instance_port":
value = int(self.config["reticulum"][option]) value = int(self.config["reticulum"][option])
self.local_interface_port = value self.local_interface_port = value

View file

@ -29,6 +29,7 @@ import inspect
import threading import threading
from time import sleep from time import sleep
from .vendor import umsgpack as umsgpack from .vendor import umsgpack as umsgpack
from RNS.Interfaces.BackboneInterface import BackboneInterface
class Transport: class Transport:
""" """
@ -2706,8 +2707,8 @@ class Transport:
li.detach() li.detach()
RNS.log("Detaching shared instance", RNS.LOG_DEBUG) RNS.log("Detaching shared instance", RNS.LOG_DEBUG)
if shared_instance_master != None: if shared_instance_master != None: shared_instance_master.detach()
shared_instance_master.detach() BackboneInterface.deregister_listeners()
RNS.log("All interfaces detached", RNS.LOG_DEBUG) RNS.log("All interfaces detached", RNS.LOG_DEBUG)