diff --git a/RNS/Interfaces/Android/RNodeInterface.py b/RNS/Interfaces/Android/RNodeInterface.py index 9900ae6..9632158 100644 --- a/RNS/Interfaces/Android/RNodeInterface.py +++ b/RNS/Interfaces/Android/RNodeInterface.py @@ -364,7 +364,9 @@ class RNodeInterface(Interface): target_device_address = c["target_device_address"] if "target_device_address" in c else None ble_name = c["ble_name"] if "ble_name" in c else None ble_addr = c["ble_addr"] if "ble_addr" in c else None + tcp_host = c["tcp_host"] if "tcp_host" in c else None force_ble = c["force_ble"] if "force_ble" in c else False + force_tcp = c["force_tcp"] if "force_tcp" in c else False frequency = int(c["frequency"]) if "frequency" in c else 0 bandwidth = int(c["bandwidth"]) if "bandwidth" in c else 0 txpower = int(c["txpower"]) if "txpower" in c else 0 @@ -436,6 +438,14 @@ class RNodeInterface(Interface): self.ble_rx_queue= b"" self.ble_tx_queue= b"" + self.tcp = None + self.use_tcp = False + self.tcp_host = tcp_host + self.tcp_rx_queue= b"" + self.tcp_tx_queue= b"" + self.tcp_rx_lock = threading.Lock() + self.tcp_tx_lock = threading.Lock() + self.frequency = frequency self.bandwidth = bandwidth self.txpower = txpower @@ -511,8 +521,8 @@ class RNodeInterface(Interface): self.port_io_timeout = RNodeInterface.PORT_IO_TIMEOUT self.last_imagedata = None - if force_ble or self.ble_addr != None or self.ble_name != None: - self.use_ble = True + if force_ble or self.ble_addr != None or self.ble_name != None: self.use_ble = True + if force_tcp or self.tcp_host != None: self.use_tcp = True self.validcfg = True if (self.frequency < RNodeInterface.FREQ_MIN or self.frequency > RNodeInterface.FREQ_MAX): @@ -612,7 +622,7 @@ class RNodeInterface(Interface): RNS.log(f"New connection instance: "+str(self.ble), RNS.LOG_DEBUG) def open_port(self): - if not self.use_ble: + if not self.use_ble and not self.use_tcp: if self.port != None: RNS.log("Opening serial port "+self.port+"...") # Get device parameters @@ -680,7 +690,7 @@ class RNodeInterface(Interface): if self.bt_manager != None: self.bt_manager.connect_any_device() - else: + elif self.use_ble: if self.ble == None: self.ble = BLEConnection(owner=self, target_name=self.ble_name, target_bt_addr=self.ble_addr) self.serial = self.ble @@ -689,6 +699,24 @@ class RNodeInterface(Interface): while not self.ble.connected and time.time() < open_time + self.ble.CONNECT_TIMEOUT: time.sleep(1) + elif self.use_tcp: + RNS.log(f"Opening TCP connection for {self}...") + if self.tcp != None and self.tcp.running == False: + self.tcp.close() + self.tcp.cleanup() + self.tcp = None + + if self.tcp == None: + self.tcp = TCPConnection(owner=self, target_host=self.tcp_host) + self.serial = self.tcp + + open_time = time.time() + while not self.tcp.connected and time.time() < open_time + self.tcp.CONNECT_TIMEOUT: + time.sleep(1) + + else: + raise TypeError("No valid device connection type defined for RNode interface") + def configure_device(self): self.resetRadioState() @@ -979,10 +1007,8 @@ class RNodeInterface(Interface): def validateRadioState(self): RNS.log("Waiting for radio configuration validation for "+str(self)+"...", RNS.LOG_VERBOSE) - if not self.platform == KISS.PLATFORM_ESP32: - sleep(1.00); - else: - sleep(2.00); + if not self.platform == KISS.PLATFORM_ESP32: sleep(1.00); + else: sleep(2.00); self.validcfg = True if (self.r_frequency != None and abs(self.frequency - int(self.r_frequency)) > 100): @@ -1533,6 +1559,9 @@ class RNodeInterface(Interface): RNS.log(f"An error occurred while detaching {self}: {e}", RNS.LOG_ERROR) if self.use_ble: self.ble.close() + if self.use_tcp: + time.sleep(0.5) + self.tcp.close() def should_ingress_limit(self): return False @@ -1553,6 +1582,17 @@ class RNodeInterface(Interface): def get_battery_percent(self): return self.r_battery_percent + def tcp_receive(self, data): + with self.tcp_rx_lock: self.tcp_rx_queue += data + + def tcp_waiting(self): return len(self.tcp_tx_queue) > 0 + + def get_tcp_waiting(self, n): + with self.tcp_tx_lock: + data = self.tcp_tx_queue[:n] + self.tcp_tx_queue = self.tcp_tx_queue[n:] + return data + def ble_receive(self, data): with self.ble_rx_lock: self.ble_rx_queue += data @@ -1567,7 +1607,7 @@ class RNodeInterface(Interface): return data def __str__(self): - return "RNodeInterface["+str(self.name)+"]" + return f"RNodeInterface[{self.name}]" class BLEConnection(BluetoothDispatcher): UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e" @@ -1810,4 +1850,134 @@ class BLEConnection(BluetoothDispatcher): def on_characteristic_changed(self, characteristic): if characteristic.getUuid().toString() == BLEConnection.UART_TX_CHAR_UUID: recvd = bytes(characteristic.getValue()) - self.owner.ble_receive(recvd) \ No newline at end of file + self.owner.ble_receive(recvd) + +class TCPConnection(): + TARGET_PORT = 7633 + CONNECT_TIMEOUT = 2.5 + INITIAL_CONNECT_TIMEOUT = 2.5 + RECONNECT_WAIT = 4.0 + + TCP_USER_TIMEOUT = 24 + TCP_PROBE_AFTER = 5 + TCP_PROBE_INTERVAL = 2 + TCP_PROBES = 12 + + @property + def is_open(self): + return self.connected + + @property + def in_waiting(self): + buflen = len(self.owner.tcp_rx_queue) + return buflen > 0 + + def write(self, data_bytes): + if self.connected and self.socket: + with self.owner.tcp_tx_lock: + if len(self.owner.tcp_tx_queue) > 0: + self.socket.sendall(self.owner.tcp_tx_queue) + self.owner.tcp_tx_queue = b"" + + self.socket.sendall(data_bytes) + + else: + with self.owner.tcp_tx_lock: self.owner.tcp_tx_queue += data_bytes + + return len(data_bytes) + + def read(self, n): + with self.owner.tcp_rx_lock: + data = self.owner.tcp_rx_queue[:n] + self.owner.tcp_rx_queue = self.owner.tcp_rx_queue[n:] + return data + + def close(self): + if self.connected: + RNS.log(f"Disconnecting TCP socket for {self.owner}", RNS.LOG_DEBUG) + self.must_disconnect = True + if self.socket: self.socket.close() + + def __init__(self, owner=None, target_host=None): + self.owner = owner + self.target_host = target_host + self.connected = False + self.reconnecting = False + self.running = False + self.should_run = False + self.must_disconnect = False + self.connect_job_running = False + + self.should_run = True + self.connection_thread = threading.Thread(target=self.initial_connect, daemon=True).start() + + def set_timeouts_linux(self): + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(self.TCP_USER_TIMEOUT * 1000)) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(self.TCP_PROBE_AFTER)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(self.TCP_PROBE_INTERVAL)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(self.TCP_PROBES)) + + def set_timeouts_osx(self): + if hasattr(socket, "TCP_KEEPALIVE"): TCP_KEEPIDLE = socket.TCP_KEEPALIVE + else: TCP_KEEPIDLE = 0x10 + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(self.TCP_PROBE_AFTER)) + + def cleanup(self): + try: + if self.socket: self.socket.close() + except Exception as e: + RNS.log(f"Error while disconnecting TCP socket on cleanup for {self.owner}", RNS.LOG_ERROR) + + self.should_run = False + + def initial_connect(self): + if self.connect(initial=True): threading.Thread(target=self.read_loop, daemon=True).start() + + def connect(self, initial=False): + try: + if initial: + RNS.log(f"Establishing TCP connection to device for {self.owner}...", RNS.LOG_DEBUG) + + address_info = socket.getaddrinfo(self.target_host, self.TARGET_PORT, proto=socket.IPPROTO_TCP)[0] + address_family = address_info[0] + target_address = address_info[4] + + self.socket = socket.socket(address_family, socket.SOCK_STREAM) + self.socket.settimeout(self.INITIAL_CONNECT_TIMEOUT) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.socket.connect(target_address) + self.socket.settimeout(None) + self.connected = True + + RNS.log(f"TCP connection to device for {self.owner} established", RNS.LOG_DEBUG) + + if RNS.vendor.platformutils.is_linux(): self.set_timeouts_linux() + elif RNS.vendor.platformutils.is_darwin(): self.set_timeouts_osx() + + return True + + except Exception as e: + if initial: + RNS.log(f"TCP connection to device for {self.owner} could not be established: {e}", RNS.LOG_ERROR) + return False + + else: raise e + + def read_loop(self): + try: + data_in = b"" + while not self.must_disconnect: + if self.socket: data_in = self.socket.recv(4096) + else: data_in = b"" + + if len(data_in) > 0: self.owner.tcp_receive(data_in) + else: + self.connected = False + RNS.log(f"The TCP socket for {self} was closed", RNS.LOG_WARNING) + break + + except Exception as e: + self.connected = False + RNS.log(f"A TCP read error occurred for {self}, the contained exception was: {e}", RNS.LOG_WARNING) \ No newline at end of file diff --git a/RNS/Interfaces/RNodeInterface.py b/RNS/Interfaces/RNodeInterface.py index ee35c01..a5caf26 100644 --- a/RNS/Interfaces/RNodeInterface.py +++ b/RNS/Interfaces/RNodeInterface.py @@ -32,6 +32,7 @@ from RNS.Interfaces.Interface import Interface from time import sleep import sys import threading +import socket import time import math import RNS @@ -159,8 +160,11 @@ class RNodeInterface(Interface): lt_alock = float(c["airtime_limit_long"]) if "airtime_limit_long" in c else None force_ble = False - ble_name = None - ble_addr = None + ble_name = None + ble_addr = None + + force_tcp = False + tcp_host = None port = c["port"] if "port" in c else None @@ -168,6 +172,7 @@ class RNodeInterface(Interface): raise ValueError("No port specified for RNode interface") if port != None: + tcp_uri_scheme = "tcp://" ble_uri_scheme = "ble://" if port.lower().startswith(ble_uri_scheme): force_ble = True @@ -180,6 +185,13 @@ class RNodeInterface(Interface): else: ble_name = ble_string + if port.lower().startswith(tcp_uri_scheme): + force_tcp = True + tcp_string = port[len(tcp_uri_scheme):] + port = None + if len(tcp_string) == 0: pass + else: tcp_host = tcp_string + self.HW_MTU = 508 self.pyserial = serial @@ -196,6 +208,14 @@ class RNodeInterface(Interface): self.reconnecting= False self.hw_errors = [] + self.use_tcp = False + self.tcp = None + self.tcp_host = tcp_host + self.tcp_rx_queue= b"" + self.tcp_tx_queue= b"" + self.tcp_rx_lock = threading.Lock() + self.tcp_tx_lock = threading.Lock() + self.use_ble = False self.ble_name = ble_name self.ble_addr = ble_addr @@ -275,8 +295,8 @@ class RNodeInterface(Interface): self.interface_ready = False self.announce_rate_target = None - if force_ble or self.ble_addr != None or self.ble_name != None: - self.use_ble = True + if force_ble or self.ble_addr != None or self.ble_name != None: self.use_ble = True + if force_tcp or self.tcp_host != None: self.use_tcp = True self.validcfg = True if (self.frequency < RNodeInterface.FREQ_MIN or self.frequency > RNodeInterface.FREQ_MAX): @@ -341,8 +361,8 @@ class RNodeInterface(Interface): def open_port(self): - if not self.use_ble: - RNS.log("Opening serial port "+self.port+"...") + if not self.use_ble and not self.use_tcp: + RNS.log(f"Opening serial port {self.port}...") self.serial = self.pyserial.Serial( port = self.port, baudrate = self.speed, @@ -358,19 +378,35 @@ class RNodeInterface(Interface): ) else: - RNS.log(f"Opening BLE connection for {self}...") - if self.ble != None and self.ble.running == False: - self.ble.close() - self.ble.cleanup() - self.ble = None + if self.use_ble: + RNS.log(f"Opening BLE connection for {self}...") + if self.ble != None and self.ble.running == False: + self.ble.close() + self.ble.cleanup() + self.ble = None - if self.ble == None: - self.ble = BLEConnection(owner=self, target_name=self.ble_name, target_bt_addr=self.ble_addr) - self.serial = self.ble + if self.ble == None: + self.ble = BLEConnection(owner=self, target_name=self.ble_name, target_bt_addr=self.ble_addr) + self.serial = self.ble - open_time = time.time() - while not self.ble.connected and time.time() < open_time + self.ble.CONNECT_TIMEOUT: - time.sleep(1) + open_time = time.time() + while not self.ble.connected and time.time() < open_time + self.ble.CONNECT_TIMEOUT: + time.sleep(1) + + if self.use_tcp: + RNS.log(f"Opening TCP connection for {self}...") + if self.tcp != None and self.tcp.running == False: + self.tcp.close() + self.tcp.cleanup() + self.tcp = None + + if self.tcp == None: + self.tcp = TCPConnection(owner=self, target_host=self.tcp_host) + self.serial = self.tcp + + open_time = time.time() + while not self.tcp.connected and time.time() < open_time + self.tcp.CONNECT_TIMEOUT: + time.sleep(1) def reset_radio_state(self): self.r_frequency = None @@ -410,7 +446,7 @@ class RNodeInterface(Interface): if self.platform == KISS.PLATFORM_ESP32 or self.platform == KISS.PLATFORM_NRF52: self.display = True - RNS.log("Serial port "+self.port+" is now open") + RNS.log(f"Serial port {self.port} is now open") # TODO: Cleanup this RNS.log("Configuring RNode interface...", RNS.LOG_VERBOSE) self.initRadio() if (self.validateRadioState()): @@ -617,10 +653,9 @@ class RNodeInterface(Interface): def validateRadioState(self): RNS.log("Waiting for radio configuration validation for "+str(self)+"...", RNS.LOG_VERBOSE) - if self.use_ble: - sleep(1.00) - else: - sleep(0.25) + if self.use_ble: sleep(1.00) + elif self.use_tcp: sleep(1.5) + else: sleep(0.25) if self.use_ble and self.ble != None and self.ble.device_disappeared: RNS.log(f"Device disappeared during radio state validation for {self}", RNS.LOG_ERROR) @@ -1120,7 +1155,7 @@ class RNodeInterface(Interface): self.reconnecting = False if self.online: - RNS.log("Reconnected serial port for "+str(self)) + RNS.log(f"Reconnected port for {self}") def detach(self): self.detached = True @@ -1133,6 +1168,9 @@ class RNodeInterface(Interface): RNS.log(f"An error occurred while detaching {self}: {e}", RNS.LOG_ERROR) if self.use_ble: self.ble.close() + if self.use_tcp: + time.sleep(0.5) + self.tcp.close() def should_ingress_limit(self): return False @@ -1153,6 +1191,17 @@ class RNodeInterface(Interface): def get_battery_percent(self): return self.r_battery_percent + def tcp_receive(self, data): + with self.tcp_rx_lock: self.tcp_rx_queue += data + + def tcp_waiting(self): return len(self.tcp_tx_queue) > 0 + + def get_tcp_waiting(self, n): + with self.tcp_tx_lock: + data = self.tcp_tx_queue[:n] + self.tcp_tx_queue = self.tcp_tx_queue[n:] + return data + def ble_receive(self, data): with self.ble_rx_lock: self.ble_rx_queue += data @@ -1167,7 +1216,7 @@ class RNodeInterface(Interface): return data def __str__(self): - return "RNodeInterface["+str(self.name)+"]" + return f"RNodeInterface[{self.name}]" class BLEConnection(): UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" @@ -1346,3 +1395,133 @@ class BLEConnection(): RNS.log(f"Error while determining device bond status for {device}, the contained exception was: {e}", RNS.LOG_ERROR) return False + +class TCPConnection(): + TARGET_PORT = 7633 + CONNECT_TIMEOUT = 2.5 + INITIAL_CONNECT_TIMEOUT = 2.5 + RECONNECT_WAIT = 4.0 + + TCP_USER_TIMEOUT = 24 + TCP_PROBE_AFTER = 5 + TCP_PROBE_INTERVAL = 2 + TCP_PROBES = 12 + + @property + def is_open(self): + return self.connected + + @property + def in_waiting(self): + buflen = len(self.owner.tcp_rx_queue) + return buflen > 0 + + def write(self, data_bytes): + if self.connected and self.socket: + with self.owner.tcp_tx_lock: + if len(self.owner.tcp_tx_queue) > 0: + self.socket.sendall(self.owner.tcp_tx_queue) + self.owner.tcp_tx_queue = b"" + + self.socket.sendall(data_bytes) + + else: + with self.owner.tcp_tx_lock: self.owner.tcp_tx_queue += data_bytes + + return len(data_bytes) + + def read(self, n): + with self.owner.tcp_rx_lock: + data = self.owner.tcp_rx_queue[:n] + self.owner.tcp_rx_queue = self.owner.tcp_rx_queue[n:] + return data + + def close(self): + if self.connected: + RNS.log(f"Disconnecting TCP socket for {self.owner}", RNS.LOG_DEBUG) + self.must_disconnect = True + if self.socket: self.socket.close() + + def __init__(self, owner=None, target_host=None): + self.owner = owner + self.target_host = target_host + self.connected = False + self.reconnecting = False + self.running = False + self.should_run = False + self.must_disconnect = False + self.connect_job_running = False + + self.should_run = True + self.connection_thread = threading.Thread(target=self.initial_connect, daemon=True).start() + + def set_timeouts_linux(self): + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(self.TCP_USER_TIMEOUT * 1000)) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(self.TCP_PROBE_AFTER)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(self.TCP_PROBE_INTERVAL)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(self.TCP_PROBES)) + + def set_timeouts_osx(self): + if hasattr(socket, "TCP_KEEPALIVE"): TCP_KEEPIDLE = socket.TCP_KEEPALIVE + else: TCP_KEEPIDLE = 0x10 + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(self.TCP_PROBE_AFTER)) + + def cleanup(self): + try: + if self.socket: self.socket.close() + except Exception as e: + RNS.log(f"Error while disconnecting TCP socket on cleanup for {self.owner}", RNS.LOG_ERROR) + + self.should_run = False + + def initial_connect(self): + if self.connect(initial=True): threading.Thread(target=self.read_loop, daemon=True).start() + + def connect(self, initial=False): + try: + if initial: + RNS.log(f"Establishing TCP connection to device for {self.owner}...", RNS.LOG_DEBUG) + + address_info = socket.getaddrinfo(self.target_host, self.TARGET_PORT, proto=socket.IPPROTO_TCP)[0] + address_family = address_info[0] + target_address = address_info[4] + + self.socket = socket.socket(address_family, socket.SOCK_STREAM) + self.socket.settimeout(self.INITIAL_CONNECT_TIMEOUT) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.socket.connect(target_address) + self.socket.settimeout(None) + self.connected = True + + RNS.log(f"TCP connection to device for {self.owner} established", RNS.LOG_DEBUG) + + if RNS.vendor.platformutils.is_linux(): self.set_timeouts_linux() + elif RNS.vendor.platformutils.is_darwin(): self.set_timeouts_osx() + + return True + + except Exception as e: + if initial: + RNS.log(f"TCP connection to device for {self.owner} could not be established: {e}", RNS.LOG_ERROR) + return False + + else: raise e + + def read_loop(self): + try: + data_in = b"" + while not self.must_disconnect: + if self.socket: data_in = self.socket.recv(4096) + else: data_in = b"" + + if len(data_in) > 0: self.owner.tcp_receive(data_in) + else: + self.connected = False + RNS.log(f"The TCP socket for {self} was closed", RNS.LOG_WARNING) + break + + except Exception as e: + self.connected = False + RNS.log(f"A TCP read error occurred for {self}, the contained exception was: {e}", RNS.LOG_WARNING) \ No newline at end of file