Added support for connecting RNode devices over TCP connections
Some checks are pending
Build Reticulum / test (push) Waiting to run
Build Reticulum / package (push) Blocked by required conditions
Build Reticulum / release (push) Blocked by required conditions

This commit is contained in:
Mark Qvist 2025-11-17 00:31:37 +01:00
parent d328ef5ce0
commit 1179757893
2 changed files with 383 additions and 34 deletions

View file

@ -32,6 +32,7 @@ from RNS.Interfaces.Interface import Interface
from time import sleep
import sys
import threading
import socket
import time
import math
import RNS
@ -159,8 +160,11 @@ class RNodeInterface(Interface):
lt_alock = float(c["airtime_limit_long"]) if "airtime_limit_long" in c else None
force_ble = False
ble_name = None
ble_addr = None
ble_name = None
ble_addr = None
force_tcp = False
tcp_host = None
port = c["port"] if "port" in c else None
@ -168,6 +172,7 @@ class RNodeInterface(Interface):
raise ValueError("No port specified for RNode interface")
if port != None:
tcp_uri_scheme = "tcp://"
ble_uri_scheme = "ble://"
if port.lower().startswith(ble_uri_scheme):
force_ble = True
@ -180,6 +185,13 @@ class RNodeInterface(Interface):
else:
ble_name = ble_string
if port.lower().startswith(tcp_uri_scheme):
force_tcp = True
tcp_string = port[len(tcp_uri_scheme):]
port = None
if len(tcp_string) == 0: pass
else: tcp_host = tcp_string
self.HW_MTU = 508
self.pyserial = serial
@ -196,6 +208,14 @@ class RNodeInterface(Interface):
self.reconnecting= False
self.hw_errors = []
self.use_tcp = False
self.tcp = None
self.tcp_host = tcp_host
self.tcp_rx_queue= b""
self.tcp_tx_queue= b""
self.tcp_rx_lock = threading.Lock()
self.tcp_tx_lock = threading.Lock()
self.use_ble = False
self.ble_name = ble_name
self.ble_addr = ble_addr
@ -275,8 +295,8 @@ class RNodeInterface(Interface):
self.interface_ready = False
self.announce_rate_target = None
if force_ble or self.ble_addr != None or self.ble_name != None:
self.use_ble = True
if force_ble or self.ble_addr != None or self.ble_name != None: self.use_ble = True
if force_tcp or self.tcp_host != None: self.use_tcp = True
self.validcfg = True
if (self.frequency < RNodeInterface.FREQ_MIN or self.frequency > RNodeInterface.FREQ_MAX):
@ -341,8 +361,8 @@ class RNodeInterface(Interface):
def open_port(self):
if not self.use_ble:
RNS.log("Opening serial port "+self.port+"...")
if not self.use_ble and not self.use_tcp:
RNS.log(f"Opening serial port {self.port}...")
self.serial = self.pyserial.Serial(
port = self.port,
baudrate = self.speed,
@ -358,19 +378,35 @@ class RNodeInterface(Interface):
)
else:
RNS.log(f"Opening BLE connection for {self}...")
if self.ble != None and self.ble.running == False:
self.ble.close()
self.ble.cleanup()
self.ble = None
if self.use_ble:
RNS.log(f"Opening BLE connection for {self}...")
if self.ble != None and self.ble.running == False:
self.ble.close()
self.ble.cleanup()
self.ble = None
if self.ble == None:
self.ble = BLEConnection(owner=self, target_name=self.ble_name, target_bt_addr=self.ble_addr)
self.serial = self.ble
if self.ble == None:
self.ble = BLEConnection(owner=self, target_name=self.ble_name, target_bt_addr=self.ble_addr)
self.serial = self.ble
open_time = time.time()
while not self.ble.connected and time.time() < open_time + self.ble.CONNECT_TIMEOUT:
time.sleep(1)
open_time = time.time()
while not self.ble.connected and time.time() < open_time + self.ble.CONNECT_TIMEOUT:
time.sleep(1)
if self.use_tcp:
RNS.log(f"Opening TCP connection for {self}...")
if self.tcp != None and self.tcp.running == False:
self.tcp.close()
self.tcp.cleanup()
self.tcp = None
if self.tcp == None:
self.tcp = TCPConnection(owner=self, target_host=self.tcp_host)
self.serial = self.tcp
open_time = time.time()
while not self.tcp.connected and time.time() < open_time + self.tcp.CONNECT_TIMEOUT:
time.sleep(1)
def reset_radio_state(self):
self.r_frequency = None
@ -410,7 +446,7 @@ class RNodeInterface(Interface):
if self.platform == KISS.PLATFORM_ESP32 or self.platform == KISS.PLATFORM_NRF52:
self.display = True
RNS.log("Serial port "+self.port+" is now open")
RNS.log(f"Serial port {self.port} is now open") # TODO: Cleanup this
RNS.log("Configuring RNode interface...", RNS.LOG_VERBOSE)
self.initRadio()
if (self.validateRadioState()):
@ -617,10 +653,9 @@ class RNodeInterface(Interface):
def validateRadioState(self):
RNS.log("Waiting for radio configuration validation for "+str(self)+"...", RNS.LOG_VERBOSE)
if self.use_ble:
sleep(1.00)
else:
sleep(0.25)
if self.use_ble: sleep(1.00)
elif self.use_tcp: sleep(1.5)
else: sleep(0.25)
if self.use_ble and self.ble != None and self.ble.device_disappeared:
RNS.log(f"Device disappeared during radio state validation for {self}", RNS.LOG_ERROR)
@ -1120,7 +1155,7 @@ class RNodeInterface(Interface):
self.reconnecting = False
if self.online:
RNS.log("Reconnected serial port for "+str(self))
RNS.log(f"Reconnected port for {self}")
def detach(self):
self.detached = True
@ -1133,6 +1168,9 @@ class RNodeInterface(Interface):
RNS.log(f"An error occurred while detaching {self}: {e}", RNS.LOG_ERROR)
if self.use_ble: self.ble.close()
if self.use_tcp:
time.sleep(0.5)
self.tcp.close()
def should_ingress_limit(self):
return False
@ -1153,6 +1191,17 @@ class RNodeInterface(Interface):
def get_battery_percent(self):
return self.r_battery_percent
def tcp_receive(self, data):
with self.tcp_rx_lock: self.tcp_rx_queue += data
def tcp_waiting(self): return len(self.tcp_tx_queue) > 0
def get_tcp_waiting(self, n):
with self.tcp_tx_lock:
data = self.tcp_tx_queue[:n]
self.tcp_tx_queue = self.tcp_tx_queue[n:]
return data
def ble_receive(self, data):
with self.ble_rx_lock:
self.ble_rx_queue += data
@ -1167,7 +1216,7 @@ class RNodeInterface(Interface):
return data
def __str__(self):
return "RNodeInterface["+str(self.name)+"]"
return f"RNodeInterface[{self.name}]"
class BLEConnection():
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
@ -1346,3 +1395,133 @@ class BLEConnection():
RNS.log(f"Error while determining device bond status for {device}, the contained exception was: {e}", RNS.LOG_ERROR)
return False
class TCPConnection():
TARGET_PORT = 7633
CONNECT_TIMEOUT = 2.5
INITIAL_CONNECT_TIMEOUT = 2.5
RECONNECT_WAIT = 4.0
TCP_USER_TIMEOUT = 24
TCP_PROBE_AFTER = 5
TCP_PROBE_INTERVAL = 2
TCP_PROBES = 12
@property
def is_open(self):
return self.connected
@property
def in_waiting(self):
buflen = len(self.owner.tcp_rx_queue)
return buflen > 0
def write(self, data_bytes):
if self.connected and self.socket:
with self.owner.tcp_tx_lock:
if len(self.owner.tcp_tx_queue) > 0:
self.socket.sendall(self.owner.tcp_tx_queue)
self.owner.tcp_tx_queue = b""
self.socket.sendall(data_bytes)
else:
with self.owner.tcp_tx_lock: self.owner.tcp_tx_queue += data_bytes
return len(data_bytes)
def read(self, n):
with self.owner.tcp_rx_lock:
data = self.owner.tcp_rx_queue[:n]
self.owner.tcp_rx_queue = self.owner.tcp_rx_queue[n:]
return data
def close(self):
if self.connected:
RNS.log(f"Disconnecting TCP socket for {self.owner}", RNS.LOG_DEBUG)
self.must_disconnect = True
if self.socket: self.socket.close()
def __init__(self, owner=None, target_host=None):
self.owner = owner
self.target_host = target_host
self.connected = False
self.reconnecting = False
self.running = False
self.should_run = False
self.must_disconnect = False
self.connect_job_running = False
self.should_run = True
self.connection_thread = threading.Thread(target=self.initial_connect, daemon=True).start()
def set_timeouts_linux(self):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(self.TCP_USER_TIMEOUT * 1000))
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(self.TCP_PROBE_AFTER))
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(self.TCP_PROBE_INTERVAL))
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(self.TCP_PROBES))
def set_timeouts_osx(self):
if hasattr(socket, "TCP_KEEPALIVE"): TCP_KEEPIDLE = socket.TCP_KEEPALIVE
else: TCP_KEEPIDLE = 0x10
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(self.TCP_PROBE_AFTER))
def cleanup(self):
try:
if self.socket: self.socket.close()
except Exception as e:
RNS.log(f"Error while disconnecting TCP socket on cleanup for {self.owner}", RNS.LOG_ERROR)
self.should_run = False
def initial_connect(self):
if self.connect(initial=True): threading.Thread(target=self.read_loop, daemon=True).start()
def connect(self, initial=False):
try:
if initial:
RNS.log(f"Establishing TCP connection to device for {self.owner}...", RNS.LOG_DEBUG)
address_info = socket.getaddrinfo(self.target_host, self.TARGET_PORT, proto=socket.IPPROTO_TCP)[0]
address_family = address_info[0]
target_address = address_info[4]
self.socket = socket.socket(address_family, socket.SOCK_STREAM)
self.socket.settimeout(self.INITIAL_CONNECT_TIMEOUT)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.socket.connect(target_address)
self.socket.settimeout(None)
self.connected = True
RNS.log(f"TCP connection to device for {self.owner} established", RNS.LOG_DEBUG)
if RNS.vendor.platformutils.is_linux(): self.set_timeouts_linux()
elif RNS.vendor.platformutils.is_darwin(): self.set_timeouts_osx()
return True
except Exception as e:
if initial:
RNS.log(f"TCP connection to device for {self.owner} could not be established: {e}", RNS.LOG_ERROR)
return False
else: raise e
def read_loop(self):
try:
data_in = b""
while not self.must_disconnect:
if self.socket: data_in = self.socket.recv(4096)
else: data_in = b""
if len(data_in) > 0: self.owner.tcp_receive(data_in)
else:
self.connected = False
RNS.log(f"The TCP socket for {self} was closed", RNS.LOG_WARNING)
break
except Exception as e:
self.connected = False
RNS.log(f"A TCP read error occurred for {self}, the contained exception was: {e}", RNS.LOG_WARNING)