From 3d8eaffe9a971f5c7d9645ea68c6585a80078764 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 28 Oct 2025 02:20:22 +0100 Subject: [PATCH] Added WeaveInterface --- RNS/Interfaces/WeaveInterface.py | 901 +++++++++++++++++++++++++++++++ 1 file changed, 901 insertions(+) create mode 100644 RNS/Interfaces/WeaveInterface.py diff --git a/RNS/Interfaces/WeaveInterface.py b/RNS/Interfaces/WeaveInterface.py new file mode 100644 index 0000000..d2d3fc7 --- /dev/null +++ b/RNS/Interfaces/WeaveInterface.py @@ -0,0 +1,901 @@ +import RNS +import threading +import serial +import time + +from collections import deque +from RNS.Interfaces.Interface import Interface + +class HDLC(): + FLAG = 0x7E + ESC = 0x7D + ESC_MASK = 0x20 + + @staticmethod + def escape(data): + data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK])) + data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK])) + return data + +class WDCL(): + WDCL_T_DISCOVER = 0x00 + WDCL_T_CONNECT = 0x01 + WDCL_T_CMD = 0x02 + WDCL_T_LOG = 0x03 + WDCL_T_DISP = 0x04 + WDCL_T_ENDPOINT_PKT = 0x05 + WDCL_T_ENCAP_PROTO = 0x06 + + WDCL_BROADCAST = bytes([0xFF, 0xFF, 0xFF, 0xFF]) + + HEADER_MINSIZE = 4+1 + HW_MTU = 1500 + MAX_CHUNK = 32768 + port = None + speed = None + databits = None + parity = None + stopbits = None + serial = None + + def __init__(self, owner, device, port, as_interface=False): + import importlib.util + if importlib.util.find_spec('serial') != None: import serial + else: RNS.panic() + + if port == None: raise ValueError("No port specified") + + self.switch_identity = owner.switch_identity + self.switch_id = self.switch_identity.sig_pub_bytes[-4:] + self.switch_pub_bytes = self.switch_identity.sig_pub_bytes + + self.rxb = 0 + self.txb = 0 + self.owner = owner + self.as_interface = as_interface + self.device = device + self.device.connection = self + self.pyserial = serial + self.serial = None + self.port = port + self.speed = 3000000 + self.databits = 8 + self.parity = serial.PARITY_NONE + self.stopbits = 1 + self.timeout = 100 + self.online = False + self.frame_buffer = b"" + self.next_tx = 0 + self.should_run = True + self.receiver = None + self.frame_queue = deque() + self.id = RNS.Identity.full_hash(port.hwid.encode("utf-8")) + + if self.as_interface: + try: + self.open_port() + + if self.serial.is_open: self.configure_device() + else: raise IOError("Could not open serial port") + + except Exception as e: + RNS.log("Could not open serial port for interface "+str(self), RNS.LOG_ERROR) + RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.log("Reticulum will attempt to bring up this interface periodically", RNS.LOG_ERROR) + if not self.detached and not self.reconnecting: + thread = threading.Thread(target=self.reconnect_port) + thread.daemon = True + thread.start() + + else: + try: self.open_port() + except Exception as e: + self.owner.wlog("Could not open serial port") + raise e + + if self.serial.is_open: self.configure_device() + else: raise IOError("Could not open serial port") + + + def open_port(self): + if self.as_interface: RNS.log(f"Opening serial port {self.port.device}...", RNS.LOG_VERBOSE) + else: self.owner.wlog(f"Opening serial port {self.port.device}...") + self.serial = self.pyserial.Serial( + port = self.port.device, + baudrate = self.speed, + bytesize = self.databits, + parity = self.parity, + stopbits = self.stopbits, + xonxoff = False, + rtscts = False, + timeout = 0.250, + inter_byte_timeout = None, + write_timeout = None, + dsrdtr = False) + + def close(self): + self.should_run = False + if self.serial: + self.serial.close() + if self.as_interface: RNS.LOG((f"Closed serial port {str(self.port.device)} for {str(self)}"), RNS.LOG_VERBOSE) + else: self.owner.wlog(f"Closed serial port {str(self.port.device)} for {str(self)}") + + def configure_device(self): + thread = threading.Thread(target=self.read_loop) + thread.daemon = True + thread.start() + self.online = True + if self.as_interface: RNS.log(f"Serial port {self.port.device} is now open, discovering remote device...", RNS.LOG_VERBOSE) + else: self.owner.wlog("Serial port "+self.port.device+" is now open") + self.device.discover() + + def process_incoming(self, data): + self.rxb += len(data) + if self.device: + while len(self.frame_queue): self.device.incoming_frame(self.frame_queue.pop()) + self.device.incoming_frame(data) + else: self.frame_queue.append(data) + + def process_outgoing(self, data): + if self.online: + data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) + written = self.serial.write(data) + self.txb += len(data) + if written != len(data): + raise IOError("Serial interface only wrote "+str(written)+" bytes of "+str(len(data))) + + def read_loop(self): + try: + while self.serial.is_open: + data_in = self.serial.read(1500) + if len(data_in) > 0: + # TODO: Remove debug + # self.device.receiver.log(f"Read {len(data_in)}") + # self.device.receiver.log(f"Raw:\n {RNS.hexrep(data_in).replace("7e", "[bold red]7e[/bold red]")}") + self.frame_buffer += data_in + flags_remaining = True + while flags_remaining: + frame_start = self.frame_buffer.find(HDLC.FLAG) + if frame_start != -1: + frame_end = self.frame_buffer.find(HDLC.FLAG, frame_start+1) + if frame_end != -1: + frame = self.frame_buffer[frame_start+1:frame_end] + frame = frame.replace(bytes([HDLC.ESC, HDLC.FLAG ^ HDLC.ESC_MASK]), bytes([HDLC.FLAG])) + frame = frame.replace(bytes([HDLC.ESC, HDLC.ESC ^ HDLC.ESC_MASK]), bytes([HDLC.ESC])) + if len(frame) > WDCL.HEADER_MINSIZE: self.process_incoming(frame) + self.frame_buffer = self.frame_buffer[frame_end:] + else: + flags_remaining = False + else: + flags_remaining = False + + except Exception as e: + self.online = False + if self.should_run: + if self.as_interface: + RNS.log("A serial port error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.log("Will attempt to reconnect the interface periodically.", RNS.LOG_ERROR) + else: + self.owner.wlog("A serial port error occurred, the contained exception was: "+str(e)) + self.owner.wlog("Will attempt to reconnect the interface periodically.") + RNS.trace_exception(e) + + self.online = False + try: self.serial.close() + except: pass + if self.should_run: self.reconnect_port() + + def reconnect_port(self): + while not self.online: + try: + time.sleep(5) + if self.as_interface: RNS.log("Attempting to reconnect serial port "+str(self.port.device)+" for "+str(self)+"...", RNS.LOG_INFO) + else: self.owner.wlog("Attempting to reconnect serial port "+str(self.port.device)+" for "+str(self)+"...") + self.open_port() + if self.serial.is_open: self.configure_device() + except Exception as e: + if self.as_interface: RNS.log("Error while reconnecting port, the contained exception was: "+str(e), RNS.LOG_ERROR) + else: self.owner.wlog("Error while reconnecting port, the contained exception was: "+str(e)) + + if self.as_interface: RNS.log("Reconnected serial port for "+str(self), RNS.LOG_IN) + else: self.owner.wlog("Reconnected serial port for "+str(self)) + + def __str__(self): + if self.port.serial_number: sn_str = f" {self.port.serial_number}" + else: sn_str = "" + return f"{self.port.product}{sn_str} (USB)" + +class Cmd(): + WDCL_CMD_ENDPOINT_PKT = 0x0001 + WDCL_CMD_ENDPOINTS_LIST = 0x0100 + WDCL_CMD_REMOTE_DISPLAY = 0x0A00 + WDCL_CMD_REMOTE_INPUT = 0x0A01 + +class Evt(): + ET_MSG = 0x0000 + ET_SYSTEM_BOOT = 0x0001 + ET_CORE_INIT = 0x0002 + ET_DRV_UART_INIT = 0x1000 + ET_DRV_USB_CDC_INIT = 0x1010 + ET_DRV_USB_CDC_HOST_AVAIL = 0x1011 + ET_DRV_USB_CDC_HOST_SUSPEND = 0x1012 + ET_DRV_USB_CDC_HOST_RESUME = 0x1013 + ET_DRV_USB_CDC_CONNECTED = 0x1014 + ET_DRV_USB_CDC_READ_ERR = 0x1015 + ET_DRV_USB_CDC_OVERFLOW = 0x1016 + ET_DRV_USB_CDC_DROPPED = 0x1017 + ET_DRV_USB_CDC_TX_TIMEOUT = 0x1018 + ET_DRV_I2C_INIT = 0x1020 + ET_DRV_NVS_INIT = 0x1030 + ET_DRV_NVS_ERASE = 0x1031 + ET_DRV_CRYPTO_INIT = 0x1040 + ET_DRV_DISPLAY_INIT = 0x1050 + ET_DRV_DISPLAY_BUS_AVAILABLE = 0x1051 + ET_DRV_DISPLAY_IO_CONFIGURED = 0x1052 + ET_DRV_DISPLAY_PANEL_CREATED = 0x1053 + ET_DRV_DISPLAY_PANEL_RESET = 0x1054 + ET_DRV_DISPLAY_PANEL_INIT = 0x1055 + ET_DRV_DISPLAY_PANEL_ENABLE = 0x1056 + ET_DRV_DISPLAY_REMOTE_ENABLE = 0x1057 + ET_DRV_W80211_INIT = 0x1060 + ET_DRV_W80211_INIT = 0x1061 + ET_DRV_W80211_CHANNEL = 0x1062 + ET_DRV_W80211_POWER = 0x1063 + ET_KRN_LOGGER_INIT = 0x2000 + ET_KRN_LOGGER_OUTPUT = 0x2001 + ET_KRN_UI_INIT = 0x2010 + ET_PROTO_WDCL_INIT = 0x3000 + ET_PROTO_WDCL_RUNNING = 0x3001 + ET_PROTO_WDCL_CONNECTION = 0x3002 + ET_PROTO_WEAVE_INIT = 0x3100 + ET_PROTO_WEAVE_RUNNING = 0x3101 + ET_SRVCTL_REMOTE_DISPLAY = 0xA000 + ET_INTERFACE_REGISTERED = 0xD000 + ET_STAT_STATE = 0xE000 + ET_STAT_UPTIME = 0xE001 + ET_STAT_TIMEBASE = 0xE002 + ET_STAT_CPU = 0xE003 + ET_STAT_TASK_CPU = 0xE004 + ET_STAT_MEMORY = 0xE005 + ET_STAT_STORAGE = 0xE006 + ET_SYSERR_MEM_EXHAUSTED = 0xF000 + + IF_TYPE_USB = 0x01 + IF_TYPE_UART = 0x02 + IF_TYPE_W80211 = 0x03 + IF_TYPE_BLE = 0x04 + IF_TYPE_LORA = 0x05 + IF_TYPE_ETHERNET = 0x06 + IF_TYPE_WIFI = 0x07 + IF_TYPE_TCP = 0x08 + IF_TYPE_UDP = 0x09 + IF_TYPE_IR = 0x0A + IF_TYPE_AFSK = 0x0B + IF_TYPE_GPIO = 0x0C + IF_TYPE_SPI = 0x0D + IF_TYPE_I2C = 0x0E + IF_TYPE_CAN = 0x0F + IF_TYPE_DMA = 0x10 + + event_descriptions = { + ET_SYSTEM_BOOT: "System boot", + ET_CORE_INIT: "Core initialization", + ET_DRV_UART_INIT: "UART driver initialization", + ET_DRV_USB_CDC_INIT: "USB CDC driver initialization", + ET_DRV_USB_CDC_HOST_AVAIL: "USB CDC host became available", + ET_DRV_USB_CDC_HOST_SUSPEND: "USB CDC host suspend", + ET_DRV_USB_CDC_HOST_RESUME: "USB CDC host resume", + ET_DRV_USB_CDC_CONNECTED: "USB CDC host connection", + ET_DRV_USB_CDC_READ_ERR: "USB CDC read error", + ET_DRV_USB_CDC_OVERFLOW: "USB CDC overflow occurred", + ET_DRV_USB_CDC_DROPPED: "USB CDC dropped bytes", + ET_DRV_USB_CDC_TX_TIMEOUT: "USB CDC TX flush timeout", + ET_DRV_I2C_INIT: "I2C driver initialization", + ET_DRV_NVS_INIT: "NVS driver initialization", + ET_DRV_CRYPTO_INIT: "Cryptography driver initialization", + ET_DRV_W80211_INIT: "W802.11 driver initialization", + ET_DRV_W80211_CHANNEL: "W802.11 channel configuration", + ET_DRV_W80211_POWER: "W802.11 TX power configuration", + ET_DRV_DISPLAY_INIT: "Display driver initialization", + ET_DRV_DISPLAY_BUS_AVAILABLE: "Display bus availability", + ET_DRV_DISPLAY_IO_CONFIGURED: "Display I/O configuration", + ET_DRV_DISPLAY_PANEL_CREATED: "Display panel allocation", + ET_DRV_DISPLAY_PANEL_RESET: "Display panel reset", + ET_DRV_DISPLAY_PANEL_INIT: "Display panel initialization", + ET_DRV_DISPLAY_PANEL_ENABLE: "Display panel activation", + ET_DRV_DISPLAY_REMOTE_ENABLE: "Remote display output activation", + ET_KRN_LOGGER_INIT: "Logging service initialization", + ET_KRN_LOGGER_OUTPUT: "Logging service output activation", + ET_KRN_UI_INIT: "User interface service initialization", + ET_PROTO_WDCL_INIT: "WDCL protocol initialization", + ET_PROTO_WDCL_RUNNING: "WDCL protocol activation", + ET_PROTO_WDCL_CONNECTION: "WDCL host connection", + ET_PROTO_WEAVE_INIT: "Weave protocol initialization", + ET_PROTO_WEAVE_RUNNING: "Weave protocol activation", + ET_SRVCTL_REMOTE_DISPLAY: "Remote display service control event", + ET_INTERFACE_REGISTERED: "Interface registration", + ET_SYSERR_MEM_EXHAUSTED: "System memory exhausted", + } + + interface_types = { + IF_TYPE_USB: "usb", + IF_TYPE_UART: "uart", + IF_TYPE_W80211: "mw", + IF_TYPE_BLE: "ble", + IF_TYPE_LORA: "lora", + IF_TYPE_ETHERNET: "eth", + IF_TYPE_WIFI: "wifi", + IF_TYPE_TCP: "tcp", + IF_TYPE_UDP: "udp", + IF_TYPE_IR: "ir", + IF_TYPE_AFSK: "afsk", + IF_TYPE_GPIO: "gpio", + IF_TYPE_SPI: "spi", + IF_TYPE_I2C: "i2c", + IF_TYPE_CAN: "can", + IF_TYPE_DMA: "dma", + } + + channel_descriptions = { + 1: "Channel 1 (2412 MHz)", + 2: "Channel 2 (2417 MHz)", + 3: "Channel 3 (2422 MHz)", + 4: "Channel 4 (2427 MHz)", + 5: "Channel 5 (2432 MHz)", + 6: "Channel 6 (2437 MHz)", + 7: "Channel 7 (2442 MHz)", + 8: "Channel 8 (2447 MHz)", + 9: "Channel 9 (2452 MHz)", + 10: "Channel 10 (2457 MHz)", + 11: "Channel 11 (2462 MHz)", + 12: "Channel 12 (2467 MHz)", + 13: "Channel 13 (2472 MHz)", + 14: "Channel 14 (2484 MHz)", + } + + LOG_FORCE = 0 + LOG_CRITICAL = 1 + LOG_ERROR = 2 + LOG_WARNING = 3 + LOG_NOTICE = 4 + LOG_INFO = 5 + LOG_VERBOSE = 6 + LOG_DEBUG = 7 + LOG_EXTREME = 8 + LOG_SYSTEM = 9 + + levels = { + LOG_FORCE: "Forced", + LOG_CRITICAL: "Critical", + LOG_ERROR: "Error", + LOG_WARNING: "Warning", + LOG_NOTICE: "Notice", + LOG_INFO: "Info", + LOG_VERBOSE: "Verbose", + LOG_DEBUG: "Debug", + LOG_EXTREME: "Extreme", + LOG_SYSTEM: "System", + } + + task_descriptions = { + "taskLVGL": "Driver: UI Renderer", + "ui_service": "Service: User Interface", + "TinyUSB": "Driver: USB", + "drv_w80211": "Driver: W802.11", + "system_stats": "System: Stats", + "core": "System: Core", + "protocol_wdcl": "Protocol: WDCL", + "protocol_weave": "Protocol: Weave", + "tiT": "Protocol: TCP/IP", + "ipc0": "System: CPU 0 IPC", + "ipc1": "System: CPU 1 IPC", + "esp_timer": "Driver: Timers", + "Tmr Svc": "Service: Timers", + "kernel_logger": "Service: Logging", + "remote_display": "Service: Remote Display", + "wifi": "System: WiFi Hardware", + "sys_evt": "System: Kernel Events", + } + + @staticmethod + def level(level): + if level in Evt.levels: return Evt.levels[level] + else: return "Unknown" + +class LogFrame(): + timestamp = None + level = None + event = None + data = b"" + + def __init__(self, timestamp=None, level=None, event=None, data=b""): + self.timestamp = timestamp; self.level = level + self.event = event; self.data = data + +class WeaveEndpoint(): + QUEUE_LEN = 1024 + + def __init__(self, endpoint_addr): + self.endpoint_addr = endpoint_addr + self.alive = time.time() + self.received = deque(maxlen=WeaveEndpoint.QUEUE_LEN) + + def receive(self, data): + self.received.append(data) + +class WeaveDevice(): + STATLEN_MAX = 120 + STAT_UPDATE_THROTTLE = 0.5 + + WEAVE_SWITCH_ID_LEN = 4 + WEAVE_ENDPOINT_ID_LEN = 8 + WEAVE_FLOWSEQ_LEN = 2 + WEAVE_HMAC_LEN = 8 + WEAVE_AUTH_LEN = WEAVE_ENDPOINT_ID_LEN+WEAVE_HMAC_LEN + + WEAVE_PUBKEY_SIZE = 32 + WEAVE_PRVKEY_SIZE = 64 + WEAVE_SIGNATURE_LEN = 64 + + def __init__(self, as_interface=False): + self.identity = None + self.receiver = None + self.switch_id = None + self.owner = None + self.as_interface = as_interface + self.endpoints = {} + self.active_tasks = {} + self.cpu_load = 0 + self.memory_total = 0 + self.memory_free = 0 + self.memory_used = 0 + self.memory_used_pct = 0 + self.log_queue = deque() + self.memory_stats = deque(maxlen=WeaveDevice.STATLEN_MAX) + self.cpu_stats = deque(maxlen=WeaveDevice.STATLEN_MAX) + self.display_buffer = bytearray(0) + self.update_display = False + + self.next_update_memory = 0 + self.next_update_cpu = 0 + + def wdcl_send(self, packet_type, data): + if not self.switch_id: + if self.as_interface: RNS.log("Attempt to transmit on {self} while remote Weave device identity is unknown", RNS.LOG_ERROR) + else: self.receiver.log("Error: Attempt to transmit while remote Weave device identity is unknown") + else: + frame = self.switch_id + frame += bytes([packet_type]) + frame += data + self.connection.process_outgoing(frame) + + def wdcl_broadcast(self, packet_type, data): + frame = WDCL.WDCL_BROADCAST + frame += bytes([packet_type]) + frame += data + self.connection.process_outgoing(frame) + + def wdcl_send_command(self, command, data): + frame = b"" + frame += bytes([command>>8, (command & 0xFF)]) + frame += data + self.wdcl_send(WDCL.WDCL_T_CMD, frame) + + def discover(self): + self.wdcl_broadcast(WDCL.WDCL_T_DISCOVER, self.connection.switch_id) + + def handshake(self): + if self.identity == None: + if self.as_interface: RNS.log("Attempt to perform handshake on {self} before remote device discovery completion", RNS.LOG_ERROR) + else: self.receiver.log("Attempt to perform handshake before remote device discovery completion") + else: + signed_id = self.switch_id + signature = self.connection.switch_identity.sign(signed_id) + data = self.connection.switch_pub_bytes + data += signature + self.wdcl_send(WDCL.WDCL_T_CONNECT, data) + if not self.as_interface: self.receiver.log("Connection handshake sent") + + def capture_stats_cpu(self): + self.cpu_stats.append({"timestamp": time.time(), "cpu_load": self.cpu_load}) + if self.receiver and self.receiver.ready and len(self.memory_stats) > 1: self.receiver.stats_update("cpu") + + def capture_stats_memory(self): + self.memory_stats.append({"timestamp": time.time(), "memory_used": self.memory_used}) + if self.receiver and self.receiver.ready and len(self.memory_stats) > 1: self.receiver.stats_update("memory") + + def get_cpu_stats(self): + tbegin = None + stats = {"timestamps": [], "values": [], "max": 100, "unit": "%"} + for i in range(0, len(self.cpu_stats)): + if tbegin == None: tbegin = self.cpu_stats[len(self.cpu_stats)-1]["timestamp"] + stats["timestamps"].append(self.cpu_stats[i]["timestamp"]-tbegin) + stats["values"].append(self.cpu_stats[i]["cpu_load"]) + + return stats + + def get_memory_stats(self): + tbegin = None + stats = {"timestamps": [], "values": [], "max": self.memory_total, "unit": "B"} + for i in range(0, len(self.memory_stats)): + if tbegin == None: tbegin = self.memory_stats[len(self.memory_stats)-1]["timestamp"] + stats["timestamps"].append(self.memory_stats[i]["timestamp"]-tbegin) + stats["values"].append(self.memory_stats[i]["memory_used"]) + + return stats + + def get_active_tasks(self): + active_tasks = {} + now = time.time() + for task_id in self.active_tasks: + if not task_id.startswith("IDLE"): + task_description = task_id + if task_id in Evt.task_descriptions: task_description = Evt.task_descriptions[task_id] + if now - self.active_tasks[task_id]["timestamp"] < 5: + active_tasks[task_description] = self.active_tasks[task_id] + + return active_tasks + + def disconnect_display(self): + self.wdcl_send_command(Cmd.WDCL_CMD_REMOTE_DISPLAY, bytes([0x00])) + self.update_display = False + + def connect_display(self): + self.wdcl_send_command(Cmd.WDCL_CMD_REMOTE_DISPLAY, bytes([0x01])) + self.update_display = True + + def endpoint_alive(self, endpoint_id): + if not endpoint_id in self.endpoints: self.endpoints[endpoint_id] = WeaveEndpoint(endpoint_id) + else: self.endpoints[endpoint_id].alive = time.time() + + def deliver_packet(self, endpoint_id, data): + self.wdcl_send_command(Cmd.WDCL_CMD_ENDPOINT_PKT, endpoint_id+data) + + def received_packet(self, source, data): + self.endpoint_alive(source) + self.endpoints[source].receive(data) + + def incoming_frame(self, data): + if len(data) > self.WEAVE_SWITCH_ID_LEN+2 and data[self.WEAVE_SWITCH_ID_LEN] == WDCL.WDCL_T_ENDPOINT_PKT and data[:self.WEAVE_SWITCH_ID_LEN] == self.connection.switch_id: + payload = data[self.WEAVE_SWITCH_ID_LEN+1:-self.WEAVE_ENDPOINT_ID_LEN] + src_endpoint = data[-self.WEAVE_ENDPOINT_ID_LEN:] + self.received_packet(src_endpoint, data) + + elif len(data) > self.WEAVE_SWITCH_ID_LEN+1 and data[self.WEAVE_SWITCH_ID_LEN] == WDCL.WDCL_T_DISCOVER: + discovery_response_len = self.WEAVE_SWITCH_ID_LEN+1+self.WEAVE_PUBKEY_SIZE+self.WEAVE_SIGNATURE_LEN + if len(data) == discovery_response_len: + signed_id = data[:self.WEAVE_SWITCH_ID_LEN] + remote_pub_key = data[self.WEAVE_SWITCH_ID_LEN+1:self.WEAVE_SWITCH_ID_LEN+1+self.WEAVE_PUBKEY_SIZE] + remote_switch_id = remote_pub_key[-4:] + remote_signature = data[self.WEAVE_SWITCH_ID_LEN+1+self.WEAVE_PUBKEY_SIZE:self.WEAVE_SWITCH_ID_LEN+1+self.WEAVE_PUBKEY_SIZE+self.WEAVE_SIGNATURE_LEN] + remote_identity = RNS.Identity(create_keys=False) + remote_identity.load_public_key(remote_pub_key*2) + if remote_identity.validate(remote_signature, signed_id): + if self.as_interface: RNS.log(f"Remote Weave device {RNS.hexrep(remote_switch_id)} discovered", RNS.LOG_VERBOSE) + else: self.receiver.log(f"Remote Weave device {RNS.hexrep(remote_switch_id)} discovered") + self.identity = remote_identity + self.switch_id = remote_switch_id + self.handshake() + else: + if self.as_interface: RNS.LOG("Invalid remote device discovery response received", RNS.LOG_ERROR) + else: self.receiver.log("Invalid remote device discovery response received") + + elif len(data) > self.WEAVE_SWITCH_ID_LEN+1 and data[self.WEAVE_SWITCH_ID_LEN] == WDCL.WDCL_T_LOG: + fd = data[self.WEAVE_SWITCH_ID_LEN+2:] + ts = fd[1] << 24 | fd[2] << 16 | fd[3] << 8 | fd[4] + lvl = fd[5]; evt = fd[6] << 8 | fd[7]; data = fd[8:] + self.log_handle(LogFrame(timestamp=ts/1000.0, level=lvl, event=evt, data=data)) + + elif len(data) > self.WEAVE_SWITCH_ID_LEN+10 and data[self.WEAVE_SWITCH_ID_LEN] == WDCL.WDCL_T_DISP: + fd = data[self.WEAVE_SWITCH_ID_LEN+1:] + cf = fd[0] + ofs = fd[1] << 24 | fd[2] << 16 | fd[3] << 8 | fd[4] + dsz = fd[5] << 24 | fd[6] << 16 | fd[7] << 8 | fd[8] + fbf = fd[9:] + + w = 128; h = 64 + + if dsz > len(self.display_buffer): self.display_buffer = bytearray(dsz) + self.display_buffer[ofs:ofs+len(fbf)] = fbf + + if self.receiver and self.receiver.ready and ofs+len(fbf) == dsz: + if self.update_display: self.receiver.display_update(self.display_buffer, w, h) + + def log_handle(self, frame): + # Handle system event signalling + if frame.event == Evt.ET_STAT_TASK_CPU: self.active_tasks[frame.data[1:].decode("utf-8")] = { "cpu_load": frame.data[0], "timestamp": time.time() } + elif frame.event == Evt.ET_STAT_CPU: + self.cpu_load = frame.data[0] + self.capture_stats_cpu() + elif frame.event == Evt.ET_STAT_MEMORY: + self.memory_free = int.from_bytes(frame.data[:4]) + self.memory_total = int.from_bytes(frame.data[4:]) + self.memory_used = self.memory_total-self.memory_free + self.memory_used_pct = round((self.memory_used/self.memory_total)*100, 2) + self.capture_stats_memory() + + # Handle generic messages and unmapped events + else: + ts = RNS.prettytime(frame.timestamp) + if frame.event == Evt.ET_MSG: + if len(frame.data): data_string = frame.data.decode("utf-8") + else: data_string = "" + rendered = f"[{ts}] [{Evt.level(frame.level)}]: {data_string}" + + else: + if frame.event in Evt.event_descriptions: event_description = Evt.event_descriptions[frame.event] + else: event_description = f"0x{RNS.hexrep(frame.event, delimit=False)}" + + if frame.event == Evt.ET_INTERFACE_REGISTERED: + if len(frame.data) >= 2: + interface_index = frame.data[0]; interface_type = frame.data[1] + type_name = "phy" + if interface_type in Evt.interface_types: type_name = Evt.interface_types[interface_type] + data_string = f": {type_name}{interface_index}" + else: data_string = "" + else: + if len(frame.data): + data_string = f": {RNS.hexrep(frame.data)}" + if frame.event == Evt.ET_DRV_USB_CDC_CONNECTED: + if frame.data[0] == 0x01: data_string = ": Connected" + elif frame.data[0] == 0x00: data_string = ": Disconnected" + elif frame.event == Evt.ET_DRV_W80211_CHANNEL: + if frame.data[0] in Evt.channel_descriptions: data_string = f": {Evt.channel_descriptions[frame.data[0]]}" + else: data_string = f": {RNS.hexrep(frame.data)}" + elif frame.event == Evt.ET_DRV_W80211_POWER: + tx_power = frame.data[0]*0.25 + data_string = f": {tx_power} dBm ({int(10**(tx_power/10))} mW)" + elif frame.event >= Evt.ET_CORE_INIT and frame.event <= Evt.ET_PROTO_WEAVE_RUNNING: + if frame.data[0] == 0x01: data_string = ": Success" + elif frame.data[0] == 0x00: + if frame.level == Evt.LOG_ERROR: data_string = ": Failure" + else: data_string = ": Stopped" + else: data_string = f": {RNS.hexrep(frame.data)}" + + else: data_string = "" + + rendered = f"[{ts}] [{Evt.level(frame.level)}] [{event_description}]{data_string}" + + if self.receiver and self.receiver.ready: + while len(self.log_queue): self.receiver.log(self.log_queue.pop()) + self.receiver.log(rendered) + else: self.log_queue.append(rendered) + +class WeaveInterface(Interface): + HW_MTU = 1196 + FIXED_MTU = True + + DEFAULT_IFAC_SIZE = 16 + + PEERING_TIMEOUT = 20.0 + + BITRATE_GUESS = 1*1000*1000 + + MULTI_IF_DEQUE_LEN = 48 + MULTI_IF_DEQUE_TTL = 0.75 + + def __init__(self, owner, configuration): + c = Interface.get_config_obj(configuration) + name = c["name"] + port = c["port"] + configured_bitrate = c["configured_bitrate"] if "configured_bitrate" in c else None + + from RNS.Interfaces import netinfo + super().__init__() + self.netinfo = netinfo + + self.HW_MTU = AutoInterface.HW_MTU + self.IN = True + self.OUT = False + self.name = name + self.port = port + self.device = WeaveDevice(as_interface=True) + self.connection = WDCL(owner=None, device=self.device, port=self.port, as_interface=True) + self.owner = owner + self.online = False + self.final_init_done = False + self.peers = {} + self.timed_out_interfaces = {} + self.spawned_interfaces = {} + self.write_lock = threading.Lock() + self.mif_deque = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN) + self.mif_deque_times = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN) + + self.announce_rate_target = None + self.peer_job_interval = WeaveInterface.PEERING_TIMEOUT*1.1 + self.peering_timeout = WeaveInterface.PEERING_TIMEOUT + + self.receives = True + if configured_bitrate != None: self.bitrate = configured_bitrate + else: self.bitrate = WeaveInterface.BITRATE_GUESS + + self.final_init() + + def final_init(self): + job_thread = threading.Thread(target=self.peer_jobs) + job_thread.daemon = True + job_thread.start() + + time.sleep(peering_wait) + + self.online = True + self.final_init_done = True + + def peer_jobs(self): + while True: + time.sleep(self.peer_job_interval) + now = time.time() + timed_out_peers = [] + + # Check for timed out peers + for peer_addr in self.peers: + peer = self.peers[peer_addr] + last_heard = peer[1] + if now > last_heard+self.peering_timeout: + timed_out_peers.append(peer_addr) + + # Remove any timed out peers + for peer_addr in timed_out_peers: + removed_peer = self.peers.pop(peer_addr) + if peer_addr in self.spawned_interfaces: + spawned_interface = self.spawned_interfaces[peer_addr] + spawned_interface.detach() + spawned_interface.teardown() + RNS.log(str(self)+" removed peer "+str(peer_addr)+" on "+str(removed_peer[0]), RNS.LOG_DEBUG) + + @property + def peer_count(self): + return len(self.spawned_interfaces) + + def add_peer(self, addr, ifname): + if addr in self.link_local_addresses: + ifname = None + for interface_name in self.adopted_interfaces: + if self.adopted_interfaces[interface_name] == addr: + ifname = interface_name + + if ifname != None: + self.multicast_echoes[ifname] = time.time() + else: + RNS.log(str(self)+" received multicast echo on unexpected interface "+str(ifname), RNS.LOG_WARNING) + + else: + if not addr in self.peers: + self.peers[addr] = [ifname, time.time()] + + spawned_interface = AutoInterfacePeer(self, addr, ifname) + spawned_interface.OUT = self.OUT + spawned_interface.IN = self.IN + spawned_interface.parent_interface = self + spawned_interface.bitrate = self.bitrate + + spawned_interface.ifac_size = self.ifac_size + spawned_interface.ifac_netname = self.ifac_netname + spawned_interface.ifac_netkey = self.ifac_netkey + if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None: + ifac_origin = b"" + if spawned_interface.ifac_netname != None: + ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8")) + if spawned_interface.ifac_netkey != None: + ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8")) + + ifac_origin_hash = RNS.Identity.full_hash(ifac_origin) + spawned_interface.ifac_key = RNS.Cryptography.hkdf( + length=64, + derive_from=ifac_origin_hash, + salt=RNS.Reticulum.IFAC_SALT, + context=None + ) + spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key) + spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key)) + + spawned_interface.announce_rate_target = self.announce_rate_target + spawned_interface.announce_rate_grace = self.announce_rate_grace + spawned_interface.announce_rate_penalty = self.announce_rate_penalty + spawned_interface.mode = self.mode + spawned_interface.HW_MTU = self.HW_MTU + spawned_interface.online = True + RNS.Transport.interfaces.append(spawned_interface) + if addr in self.spawned_interfaces: + self.spawned_interfaces[addr].detach() + self.spawned_interfaces[addr].teardown() + self.spawned_interfaces.pop(spawned_interface) + self.spawned_interfaces[addr] = spawned_interface + + RNS.log(str(self)+" added peer "+str(addr)+" on "+str(ifname), RNS.LOG_DEBUG) + else: + self.refresh_peer(addr) + + def refresh_peer(self, addr): + try: + self.peers[addr][1] = time.time() + except Exception as e: + RNS.log(f"An error occurred while refreshing peer {RNS.hexrep(addr)} on {self}: {e}", RNS.LOG_ERROR) + + def process_incoming(self, data, addr=None): + if self.online and addr in self.spawned_interfaces: + self.spawned_interfaces[addr].process_incoming(data, addr) + + def process_outgoing(self,data): + pass + + def should_ingress_limit(self): + return False + + def detach(self): + self.online = False + + def __str__(self): + return "WeaveInterface["+self.name+"]" + +class WeaveInterfacePeer(Interface): + + def __init__(self, owner, addr, ifname): + super().__init__() + self.owner = owner + self.parent_interface = owner + self.addr = addr + self.ifname = ifname + self.peer_addr = None + self.addr_info = None + self.HW_MTU = self.owner.HW_MTU + self.FIXED_MTU = self.owner.FIXED_MTU + + def __str__(self): + return f"WeaveInterfacePeer[{self.ifname}/{self.addr}]" + + def process_incoming(self, data, addr=None): + if self.online and self.owner.online: + data_hash = RNS.Identity.full_hash(data) + deque_hit = False + if data_hash in self.owner.mif_deque: + for te in self.owner.mif_deque_times: + if te[0] == data_hash and time.time() < te[1]+AutoInterface.MULTI_IF_DEQUE_TTL: + deque_hit = True + break + + if not deque_hit: + self.owner.refresh_peer(self.addr) + self.owner.mif_deque.append(data_hash) + self.owner.mif_deque_times.append([data_hash, time.time()]) + self.rxb += len(data) + self.owner.rxb += len(data) + self.owner.owner.inbound(data, self) + + def process_outgoing(self, data): + if self.online: + with self.owner.write_lock: + try: + # TODO: Push to weave endpoint + self.txb += len(data) + self.owner.txb += len(data) + except Exception as e: + RNS.log("Could not transmit on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + + def detach(self): + self.online = False + self.detached = True + + def teardown(self): + if not self.detached: + RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down.", RNS.LOG_ERROR) + if RNS.Reticulum.panic_on_interface_error: + RNS.panic() + + else: + RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE) + + self.online = False + self.OUT = False + self.IN = False + + if self.addr in self.owner.spawned_interfaces: + try: self.owner.spawned_interfaces.pop(self.addr) + except Exception as e: + RNS.log(f"Could not remove {self} from parent interface on detach. The contained exception was: {e}", RNS.LOG_ERROR) + + if self in RNS.Transport.interfaces: + RNS.Transport.interfaces.remove(self) + + def should_ingress_limit(self): + return False \ No newline at end of file