mirror of
https://github.com/markqvist/NomadNet.git
synced 2025-05-17 13:20:25 -04:00
3196 lines
No EOL
116 KiB
Python
3196 lines
No EOL
116 KiB
Python
import RNS
|
|
import time
|
|
from math import log10, pow
|
|
|
|
from nomadnet.vendor.additional_urwid_widgets.FormWidgets import *
|
|
from nomadnet.vendor.AsciiChart import AsciiChart
|
|
|
|
### GYLPHS ###
|
|
INTERFACE_GLYPHS = {
|
|
# Glyph name # Plain # Unicode # Nerd Font
|
|
("NetworkInterfaceType", "(IP)", "\U0001f5a7", "\U000f0200"),
|
|
("SerialInterfaceType", "(<->)", "\u2194", "\U000f065c"),
|
|
("RNodeInterfaceType", "(R)" , "\u16b1", "\U000f043a"),
|
|
("OtherInterfaceType", "(#)" , "\U0001f67e", "\ued95"),
|
|
}
|
|
|
|
### HELPER ###
|
|
PLATFORM_IS_LINUX = False
|
|
try:
|
|
PLATFORM_IS_LINUX = (RNS.vendor.platformutils.is_android() or
|
|
RNS.vendor.platformutils.is_linux())
|
|
except Exception:
|
|
pass
|
|
|
|
def _get_interface_icon(glyphset, iface_type):
|
|
glyphset_index = 1 # Default to unicode
|
|
if glyphset == "plain":
|
|
glyphset_index = 0 # plain
|
|
elif glyphset == "nerdfont":
|
|
glyphset_index = 2 # nerdfont
|
|
|
|
type_to_glyph_tuple = {
|
|
"BackboneInterface": "NetworkInterfaceType",
|
|
"AutoInterface": "NetworkInterfaceType",
|
|
"TCPClientInterface": "NetworkInterfaceType",
|
|
"TCPServerInterface": "NetworkInterfaceType",
|
|
"UDPInterface": "NetworkInterfaceType",
|
|
"I2PInterface": "NetworkInterfaceType",
|
|
|
|
"RNodeInterface": "RNodeInterfaceType",
|
|
"RNodeMultiInterface": "RNodeInterfaceType",
|
|
|
|
"SerialInterface": "SerialInterfaceType",
|
|
"KISSInterface": "SerialInterfaceType",
|
|
"AX25KISSInterface": "SerialInterfaceType",
|
|
|
|
"PipeInterface": "OtherInterfaceType"
|
|
}
|
|
|
|
glyph_tuple_name = type_to_glyph_tuple.get(iface_type, "OtherInterfaceType")
|
|
|
|
for glyph_tuple in INTERFACE_GLYPHS:
|
|
if glyph_tuple[0] == glyph_tuple_name:
|
|
return glyph_tuple[glyphset_index + 1]
|
|
|
|
# Fallback
|
|
return "(#)" if glyphset == "plain" else "\U0001f67e" if glyphset == "unicode" else "\ued95"
|
|
|
|
def format_bytes(bytes_value):
|
|
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
|
|
size = float(bytes_value)
|
|
unit_index = 0
|
|
|
|
while size >= 1024.0 and unit_index < len(units) - 1:
|
|
size /= 1024.0
|
|
unit_index += 1
|
|
|
|
if unit_index == 0:
|
|
return f"{int(size)} {units[unit_index]}"
|
|
else:
|
|
return f"{size:.1f} {units[unit_index]}"
|
|
|
|
|
|
### PORT FUNCTIONS ###
|
|
PYSERIAL_AVAILABLE = False # If NomadNet is installed on environments with rnspure instead of rns, pyserial won't be available
|
|
try:
|
|
import serial.tools.list_ports
|
|
PYSERIAL_AVAILABLE = True
|
|
except ImportError:
|
|
class DummyPort:
|
|
def __init__(self, device, description=None, manufacturer=None, hwid=None):
|
|
self.device = device
|
|
self.description = description or device
|
|
self.manufacturer = manufacturer
|
|
self.hwid = hwid
|
|
self.vid = None
|
|
self.pid = None
|
|
|
|
def get_port_info():
|
|
if not PYSERIAL_AVAILABLE:
|
|
return []
|
|
|
|
try:
|
|
ports = serial.tools.list_ports.comports()
|
|
port_info = []
|
|
|
|
# Ports are sorted into categories for dropdown, priority ports appear first
|
|
priority_ports = [] # USB, ACM, bluetooth, etc
|
|
standard_ports = [] # COM, tty/s ports
|
|
|
|
for port in ports:
|
|
desc = f"{port.device}"
|
|
if port.description and port.description != port.device:
|
|
desc += f" ({port.description})"
|
|
if port.manufacturer:
|
|
desc += f" - {port.manufacturer}"
|
|
|
|
is_standard = (
|
|
port.device.startswith("COM") or # windows
|
|
"/dev/ttyS" in port.device or # Linux
|
|
"Serial" in port.description
|
|
)
|
|
|
|
port_data = {
|
|
'device': port.device,
|
|
'description': desc,
|
|
'hwid': port.hwid,
|
|
'vid': port.vid,
|
|
'pid': port.pid,
|
|
'is_standard': is_standard
|
|
}
|
|
|
|
if is_standard:
|
|
standard_ports.append(port_data)
|
|
else:
|
|
priority_ports.append(port_data)
|
|
|
|
priority_ports.sort(key=lambda x: x['device'])
|
|
standard_ports.sort(key=lambda x: x['device'])
|
|
|
|
return priority_ports + standard_ports
|
|
except Exception as e:
|
|
RNS.log(f"error accessing serial ports: {str(e)}", RNS.LOG_ERROR)
|
|
return []
|
|
|
|
def get_port_field():
|
|
if not PYSERIAL_AVAILABLE:
|
|
return {
|
|
"config_key": "port",
|
|
"type": "edit",
|
|
"label": "Port: ",
|
|
"default": "",
|
|
"placeholder": "/dev/ttyUSB0 or COM port (pyserial not installed)",
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
}
|
|
|
|
port_info = get_port_info()
|
|
|
|
if len(port_info) > 1:
|
|
options = [p['description'] for p in port_info]
|
|
device_map = {p['description']: p['device'] for p in port_info}
|
|
|
|
return {
|
|
"config_key": "port",
|
|
"type": "dropdown",
|
|
"label": "Port: ",
|
|
"options": options,
|
|
"default": options[0] if options else "",
|
|
"validation": ["required"],
|
|
"transform": lambda x: device_map[x]
|
|
}
|
|
else:
|
|
# single or no ports - use text field
|
|
default = port_info[0]['device'] if port_info else ""
|
|
placeholder = "/dev/ttyXXX (or COM port on Windows)"
|
|
|
|
return {
|
|
"config_key": "port",
|
|
"type": "edit",
|
|
"label": "Port: ",
|
|
"default": default,
|
|
"placeholder": placeholder,
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
}
|
|
|
|
### RNODE ####
|
|
def calculate_rnode_parameters(bandwidth, spreading_factor, coding_rate, noise_floor=6, antenna_gain=0,
|
|
transmit_power=17):
|
|
crn = {
|
|
5: 1,
|
|
6: 2,
|
|
7: 3,
|
|
8: 4,
|
|
}
|
|
coding_rate_n = crn.get(coding_rate, 1)
|
|
|
|
sfn = {
|
|
5: -2.5,
|
|
6: -5,
|
|
7: -7.5,
|
|
8: -10,
|
|
9: -12.5,
|
|
10: -15,
|
|
11: -17.5,
|
|
12: -20
|
|
}
|
|
|
|
data_rate = spreading_factor * (
|
|
(4 / (4 + coding_rate_n)) / (pow(2, spreading_factor) / (bandwidth / 1000))) * 1000
|
|
|
|
sensitivity = -174 + 10 * log10(bandwidth) + noise_floor + (sfn.get(spreading_factor, 0))
|
|
|
|
if bandwidth == 203125 or bandwidth == 406250 or bandwidth > 500000:
|
|
sensitivity = -165.6 + 10 * log10(bandwidth) + noise_floor + (sfn.get(spreading_factor, 0))
|
|
|
|
link_budget = (transmit_power - sensitivity) + antenna_gain
|
|
|
|
if data_rate < 1000:
|
|
data_rate_str = f"{data_rate:.0f} bps"
|
|
else:
|
|
data_rate_str = f"{(data_rate / 1000):.2f} kbps"
|
|
|
|
return {
|
|
"data_rate": data_rate_str,
|
|
"link_budget": f"{link_budget:.1f} dB",
|
|
"sensitivity": f"{sensitivity:.1f} dBm",
|
|
"raw_data_rate": data_rate,
|
|
"raw_link_budget": link_budget,
|
|
"raw_sensitivity": sensitivity
|
|
}
|
|
|
|
class RNodeCalculator(urwid.WidgetWrap):
|
|
def __init__(self, parent_view):
|
|
self.parent_view = parent_view
|
|
self.update_alarm = None
|
|
|
|
self.data_rate_widget = urwid.Text("Data Rate: Calculating...")
|
|
self.link_budget_widget = urwid.Text("Link Budget: Calculating...")
|
|
self.sensitivity_widget = urwid.Text("Sensitivity: Calculating...")
|
|
|
|
self.noise_floor_edit = urwid.Edit("", "0")
|
|
self.antenna_gain_edit = urwid.Edit("", "0")
|
|
|
|
layout = urwid.Pile([
|
|
urwid.Divider("-"),
|
|
|
|
urwid.Columns([
|
|
(28, urwid.Text(("key", "Enter Noise Floor (dB): "), align="right")),
|
|
self.noise_floor_edit
|
|
]),
|
|
|
|
urwid.Columns([
|
|
(28, urwid.Text(("key", "Enter Antenna Gain (dBi): "), align="right")),
|
|
self.antenna_gain_edit
|
|
]),
|
|
|
|
urwid.Divider(),
|
|
|
|
urwid.Text(("connected_status", "On-Air Calculations:"), align="left"),
|
|
self.data_rate_widget,
|
|
self.link_budget_widget,
|
|
self.sensitivity_widget,
|
|
urwid.Divider(),
|
|
|
|
urwid.Text([
|
|
"These calculations will update as you change RNode parameters"
|
|
])
|
|
])
|
|
|
|
super().__init__(layout)
|
|
|
|
self.connect_all_field_signals()
|
|
|
|
self.update_calculation()
|
|
|
|
def connect_all_field_signals(self):
|
|
urwid.connect_signal(self.noise_floor_edit, 'change', self._queue_update)
|
|
urwid.connect_signal(self.antenna_gain_edit, 'change', self._queue_update)
|
|
rnode_fields = ['bandwidth', 'spreadingfactor', 'codingrate', 'txpower']
|
|
|
|
for field_name in rnode_fields:
|
|
if field_name in self.parent_view.fields:
|
|
|
|
field_widget = self.parent_view.fields[field_name]['widget']
|
|
|
|
if hasattr(field_widget, 'edit_text'):
|
|
urwid.connect_signal(field_widget, 'change', self._queue_update)
|
|
elif hasattr(field_widget, '_emit') and 'change' in getattr(field_widget, 'signals', []):
|
|
urwid.connect_signal(field_widget, 'change', self._queue_update)
|
|
|
|
def _queue_update(self, widget, new_text):
|
|
if self.update_alarm is not None:
|
|
try:
|
|
self.parent_view.parent.app.ui.loop.remove_alarm(self.update_alarm)
|
|
except:
|
|
pass
|
|
|
|
self.update_alarm = self.parent_view.parent.app.ui.loop.set_alarm_in(
|
|
0.3, self._delayed_update)
|
|
|
|
def _delayed_update(self, loop, user_data):
|
|
self.update_alarm = None
|
|
self.update_calculation()
|
|
|
|
def update_calculation(self):
|
|
try:
|
|
|
|
try:
|
|
bandwidth_widget = self.parent_view.fields.get('bandwidth', {}).get('widget')
|
|
bandwidth = int(bandwidth_widget.get_value()) if bandwidth_widget else 125000
|
|
except (ValueError, AttributeError):
|
|
bandwidth = 125000
|
|
|
|
try:
|
|
sf_widget = self.parent_view.fields.get('spreadingfactor', {}).get('widget')
|
|
spreading_factor = int(sf_widget.get_value()) if sf_widget else 7
|
|
except (ValueError, AttributeError):
|
|
spreading_factor = 7
|
|
|
|
try:
|
|
cr_widget = self.parent_view.fields.get('codingrate', {}).get('widget')
|
|
coding_rate = int(cr_widget.get_value()) if cr_widget else 5
|
|
if isinstance(coding_rate, str) and ":" in coding_rate:
|
|
coding_rate = int(coding_rate.split(":")[1])
|
|
except (ValueError, AttributeError):
|
|
coding_rate = 5
|
|
|
|
try:
|
|
txpower_widget = self.parent_view.fields.get('txpower', {}).get('widget')
|
|
if hasattr(txpower_widget, 'edit_text'):
|
|
txpower_text = txpower_widget.edit_text.strip()
|
|
txpower = int(txpower_text) if txpower_text else 17
|
|
else:
|
|
txpower = int(txpower_widget.get_value()) if txpower_widget else 17
|
|
except (ValueError, AttributeError):
|
|
txpower = 17
|
|
|
|
try:
|
|
noise_floor_text = self.noise_floor_edit.edit_text.strip()
|
|
noise_floor = int(noise_floor_text) if noise_floor_text else 0
|
|
except (ValueError, AttributeError):
|
|
noise_floor = 0
|
|
|
|
try:
|
|
antenna_gain_text = self.antenna_gain_edit.edit_text.strip()
|
|
antenna_gain = int(antenna_gain_text) if antenna_gain_text else 0
|
|
except (ValueError, AttributeError):
|
|
antenna_gain = 0
|
|
|
|
result = calculate_rnode_parameters(
|
|
bandwidth=bandwidth,
|
|
spreading_factor=spreading_factor,
|
|
coding_rate=coding_rate,
|
|
noise_floor=noise_floor,
|
|
antenna_gain=antenna_gain,
|
|
transmit_power=txpower
|
|
)
|
|
|
|
self.data_rate_widget.set_text(f"Data Rate: {result['data_rate']}")
|
|
self.link_budget_widget.set_text(f"Link Budget: {result['link_budget']}")
|
|
self.sensitivity_widget.set_text(f"Sensitivity: {result['sensitivity']}")
|
|
|
|
except (ValueError, KeyError, TypeError) as e:
|
|
self.data_rate_widget.set_text(f"Data Rate: Waiting for parameters...")
|
|
self.link_budget_widget.set_text(f"Link Budget: Waiting for valid parameters...")
|
|
self.sensitivity_widget.set_text(f"Sensitivity: Waiting for parameters...")
|
|
|
|
### INTERFACE FIELDS ###
|
|
COMMON_INTERFACE_OPTIONS = [
|
|
{
|
|
"config_key": "network_name",
|
|
"type": "edit",
|
|
"label": "Virtual Network Name: ",
|
|
"placeholder": "Optional virtual network name",
|
|
"default": "",
|
|
"validation": [],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "passphrase",
|
|
"type": "edit",
|
|
"label": "IFAC Passphrase: ",
|
|
"placeholder": "IFAC authentication passphrase",
|
|
"default": "",
|
|
"validation": [],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "ifac_size",
|
|
"type": "edit",
|
|
"label": "IFAC Size: ",
|
|
"placeholder": "8 - 512",
|
|
"default": "",
|
|
"validation": ['number'],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "bitrate",
|
|
"type": "edit",
|
|
"label": "Inferred Bitrate: ",
|
|
"placeholder": "Automatically determined",
|
|
"default": "",
|
|
"validation": ['number'],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
]
|
|
|
|
INTERFACE_FIELDS = {
|
|
"BackboneInterface": [
|
|
{
|
|
"config_key": "listen_on",
|
|
"type": "edit",
|
|
"label": "Listen On: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 0.0.0.0",
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "port",
|
|
"type": "edit",
|
|
"label": "Port: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 4242",
|
|
"validation": ["number"],
|
|
"transform": lambda x: int(x.strip()) if x.strip() else None
|
|
},
|
|
{
|
|
"config_key": "device",
|
|
"type": "edit",
|
|
"label": "Device: ",
|
|
"default": "",
|
|
"placeholder": "e.g., eth0",
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "remote",
|
|
"type": "edit",
|
|
"label": "Remote: ",
|
|
"default": "",
|
|
"placeholder": "e.g., a remote TCPServerInterface location",
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "target_host",
|
|
"type": "edit",
|
|
"label": "Target Host: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 201:5d78:af73:5caf:a4de:a79f:3278:71e5",
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "port",
|
|
"type": "edit",
|
|
"label": "Target Port: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 4242",
|
|
"validation": ["number"],
|
|
"transform": lambda x: int(x.strip()) if x.strip() else None
|
|
},
|
|
{
|
|
"config_key": "prefer_ipv6",
|
|
"type": "checkbox",
|
|
"label": "",
|
|
"default": False,
|
|
"validation": [],
|
|
"transform": lambda x: bool(x)
|
|
},
|
|
],
|
|
"AutoInterface": [
|
|
{
|
|
|
|
},
|
|
{
|
|
"additional_options": [
|
|
{
|
|
"config_key": "devices",
|
|
"type": "multilist",
|
|
"label": "Devices: ",
|
|
"validation": [],
|
|
"transform": lambda x: ",".join(x)
|
|
},
|
|
{
|
|
"config_key": "ignored_devices",
|
|
"type": "multilist",
|
|
"label": "Ignored Devices: ",
|
|
"validation": [],
|
|
"transform": lambda x: ",".join(x)
|
|
},
|
|
{
|
|
"config_key": "group_id",
|
|
"type": "edit",
|
|
"label": "Group ID: ",
|
|
"default": "",
|
|
"placeholder": "e.g., my_custom_network",
|
|
"validation": [],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "discovery_scope",
|
|
"type": "dropdown",
|
|
"label": "Discovery Scope: ",
|
|
"options": ["None", "link", "admin", "site", "organisation", "global"],
|
|
"default": "None",
|
|
"validation": [],
|
|
"transform": lambda x: "" if x == "None" else x.strip()
|
|
}
|
|
]
|
|
},
|
|
],
|
|
"I2PInterface": [
|
|
{
|
|
"config_key": "peers",
|
|
"type": "multilist",
|
|
"label": "Peers: ",
|
|
"placeholder": "",
|
|
"validation": ["required"],
|
|
"transform": lambda x: ",".join(x)
|
|
}
|
|
],
|
|
"TCPServerInterface": [
|
|
{
|
|
"config_key": "listen_ip",
|
|
"type": "edit",
|
|
"label": "Listen IP: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 0.0.0.0",
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "listen_port",
|
|
"type": "edit",
|
|
"label": "Listen Port: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 4242",
|
|
"validation": ["number"],
|
|
"transform": lambda x: int(x.strip()) if x.strip() else None
|
|
},
|
|
{
|
|
"additional_options": [
|
|
{
|
|
"config_key": "prefer_ipv6",
|
|
"type": "checkbox",
|
|
"label": "Prefer IPv6?",
|
|
"default": False,
|
|
"validation": [],
|
|
"transform": lambda x: bool(x)
|
|
},
|
|
{
|
|
"config_key": "i2p_tunneled",
|
|
"type": "checkbox",
|
|
"label": "I2P Tunneled?",
|
|
"default": False,
|
|
"validation": [],
|
|
"transform": lambda x: bool(x)
|
|
},
|
|
{
|
|
"config_key": "device",
|
|
"type": "edit",
|
|
"label": "Device: ",
|
|
"placeholder": "A specific network device to listen on - e.g. eth0",
|
|
"default": "",
|
|
"validation": [],
|
|
"transform": lambda x: x.strip() if x.strip() else None
|
|
},
|
|
{
|
|
"config_key": "port",
|
|
"type": "edit",
|
|
"label": "Port: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 4242",
|
|
"validation": ["number"],
|
|
"transform": lambda x: int(x.strip()) if x.strip() else None
|
|
},
|
|
]
|
|
}
|
|
],
|
|
"TCPClientInterface": [
|
|
{
|
|
"config_key": "target_host",
|
|
"type": "edit",
|
|
"label": "Target Host: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 127.0.0.1",
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "target_port",
|
|
"type": "edit",
|
|
"label": "Target Port: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 8080",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x.strip()) if x.strip() else None
|
|
},
|
|
{
|
|
"additional_options": [
|
|
{
|
|
"config_key": "i2p_tunneled",
|
|
"type": "checkbox",
|
|
"label": "I2P Tunneled?",
|
|
"default": False,
|
|
"validation": [],
|
|
"transform": lambda x: bool(x)
|
|
},
|
|
{
|
|
"config_key": "kiss_framing",
|
|
"type": "checkbox",
|
|
"label": "KISS Framing?",
|
|
"default": False,
|
|
"validation": [],
|
|
"transform": lambda x: bool(x)
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"UDPInterface": [
|
|
{
|
|
"config_key": "listen_ip",
|
|
"type": "edit",
|
|
"label": "Listen IP: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 0.0.0.0",
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "listen_port",
|
|
"type": "edit",
|
|
"label": "Listen Port: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 4242",
|
|
"validation": ["number"],
|
|
"transform": lambda x: int(x.strip()) if x.strip() else None
|
|
},
|
|
{
|
|
"config_key": "forward_ip",
|
|
"type": "edit",
|
|
"label": "Forward IP: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 255.255.255.255",
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "forward_port",
|
|
"type": "edit",
|
|
"label": "Forward Port: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 4242",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x.strip()) if x.strip() else None
|
|
},
|
|
{
|
|
"additional_options": [
|
|
{
|
|
"config_key": "device",
|
|
"type": "edit",
|
|
"label": "Device: ",
|
|
"placeholder": "A specific network device to listen on - e.g. eth0",
|
|
"default": "",
|
|
"validation": [],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "port",
|
|
"type": "edit",
|
|
"label": "Port: ",
|
|
"default": "",
|
|
"placeholder": "e.g., 4242",
|
|
"validation": ["number"],
|
|
"transform": lambda x: int(x.strip()) if x.strip() else None
|
|
},
|
|
]
|
|
}
|
|
],
|
|
"RNodeInterface": [
|
|
get_port_field(),
|
|
{
|
|
"config_key": "frequency",
|
|
"type": "edit",
|
|
"label": "Frequency (MHz): ",
|
|
"default": "",
|
|
"placeholder": "868.5",
|
|
"validation": ["required", "float"],
|
|
"transform": lambda x: int(float(x.strip()) * 1000000) if x.strip() else 868500000
|
|
},
|
|
{
|
|
"config_key": "txpower",
|
|
"type": "edit",
|
|
"label": "Transmit Power (dBm): ",
|
|
"default": "",
|
|
"placeholder": "17",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x.strip()) if x.strip() else 17
|
|
},
|
|
{
|
|
"config_key": "bandwidth",
|
|
"type": "dropdown",
|
|
"label": "Bandwidth (Hz): ",
|
|
"options": ["7800", "10400", "15600", "20800", "31250", "41700", "62500", "125000", "250000",
|
|
"500000", "1625000"],
|
|
"default": "7800",
|
|
"validation": ["required"],
|
|
"transform": lambda x: int(x)
|
|
},
|
|
{
|
|
"config_key": "spreadingfactor",
|
|
"type": "dropdown",
|
|
"label": "Spreading Factor: ",
|
|
"options": ["7", "8", "9", "10", "11", "12"],
|
|
"default": "7",
|
|
"validation": ["required"],
|
|
"transform": lambda x: int(x)
|
|
},
|
|
{
|
|
"config_key": "codingrate",
|
|
"type": "dropdown",
|
|
"label": "Coding Rate: ",
|
|
"options": ["4:5", "4:6", "4:7", "4:8"],
|
|
"default": "4:5",
|
|
"validation": ["required"],
|
|
"transform": lambda x: int(x.split(":")[1])
|
|
},
|
|
{
|
|
"additional_options": [
|
|
{
|
|
"config_key": "id_callsign",
|
|
"type": "edit",
|
|
"label": "Callsign: ",
|
|
"default": "",
|
|
"placeholder": "e.g. MYCALL-0",
|
|
"validation": [""],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "id_interval",
|
|
"type": "edit",
|
|
"label": "ID Interval (Seconds): ",
|
|
"placeholder": "e.g. 600",
|
|
"default": "",
|
|
"validation": ['number'],
|
|
"transform": lambda x: "" if x == "" else int(x)
|
|
},
|
|
{
|
|
"config_key": "airtime_limit_long",
|
|
"type": "edit",
|
|
"label": "Airtime Limit Long (Seconds): ",
|
|
"placeholder": "e.g. 1.5",
|
|
"default": "",
|
|
"validation": ['number'],
|
|
"transform": lambda x: "" if x == "" else int(x)
|
|
},
|
|
{
|
|
"config_key": "airtime_limit_short",
|
|
"type": "edit",
|
|
"label": "Airtime Limit Short (Seconds): ",
|
|
"placeholder": "e.g. 33",
|
|
"default": "",
|
|
"validation": ['number'],
|
|
"transform": lambda x: "" if x == "" else int(x)
|
|
},
|
|
]
|
|
}
|
|
],
|
|
"RNodeMultiInterface": [
|
|
get_port_field(),
|
|
{
|
|
"config_key": "subinterfaces",
|
|
"type": "multitable",
|
|
"fields": {
|
|
"frequency": {
|
|
"label": "Freq (Hz)",
|
|
"type": "edit",
|
|
"validation": ["required", "float"],
|
|
"transform": lambda x: int(x) if x else None
|
|
},
|
|
"bandwidth": {
|
|
"label": "BW (Hz)",
|
|
"type": "edit",
|
|
"options": ["7800", "10400", "15600", "20800", "31250", "41700", "62500", "125000", "250000", "500000", "1625000"],
|
|
"transform": lambda x: int(x) if x else None
|
|
},
|
|
"txpower": {
|
|
"label": "TX (dBm)",
|
|
"type": "edit",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x) if x else None
|
|
},
|
|
"vport": {
|
|
"label": "V.Port",
|
|
"type": "edit",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x) if x else None
|
|
},
|
|
"spreadingfactor": {
|
|
"label": "SF",
|
|
"type": "edit",
|
|
"transform": lambda x: int(x) if x else None
|
|
},
|
|
"codingrate": {
|
|
"label": "CR",
|
|
"type": "edit",
|
|
"transform": lambda x: int(x) if x else None
|
|
}
|
|
},
|
|
"validation": ["required"],
|
|
"transform": lambda x: x
|
|
},
|
|
{
|
|
"additional_options": [
|
|
{
|
|
"config_key": "id_callsign",
|
|
"type": "edit",
|
|
"label": "Callsign: ",
|
|
"default": "",
|
|
"placeholder": "e.g. MYCALL-0",
|
|
"validation": [""],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "id_interval",
|
|
"type": "edit",
|
|
"label": "ID Interval (Seconds): ",
|
|
"placeholder": "e.g. 600",
|
|
"default": "",
|
|
"validation": ['number'],
|
|
"transform": lambda x: "" if x == "" else int(x)
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"SerialInterface": [
|
|
get_port_field(),
|
|
{
|
|
"config_key": "speed",
|
|
"type": "edit",
|
|
"label": "Speed (bps): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 115200",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "databits",
|
|
"type": "edit",
|
|
"label": "Databits: ",
|
|
"default": "",
|
|
"placeholder": "e.g. 8",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "parity",
|
|
"type": "edit",
|
|
"label": "Parity: ",
|
|
"default": "",
|
|
"placeholder": "",
|
|
"validation": ["number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "stopbits",
|
|
"type": "edit",
|
|
"label": "Stopbits: ",
|
|
"default": "",
|
|
"placeholder": "e.g. 1",
|
|
"validation": ["number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
],
|
|
"PipeInterface": [
|
|
{
|
|
"config_key": "command",
|
|
"type": "edit",
|
|
"label": "Command: ",
|
|
"default": "",
|
|
"placeholder": "e.g. netcat -l 5757",
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "respawn_delay",
|
|
"type": "edit",
|
|
"label": "Respawn Delay (seconds): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 5",
|
|
"validation": ["number"],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
],
|
|
"KISSInterface": [
|
|
get_port_field(),
|
|
{
|
|
"config_key": "speed",
|
|
"type": "edit",
|
|
"label": "Speed (bps): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 115200",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "databits",
|
|
"type": "edit",
|
|
"label": "Databits: ",
|
|
"default": "",
|
|
"placeholder": "e.g. 8",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "parity",
|
|
"type": "edit",
|
|
"label": "Parity: ",
|
|
"default": "",
|
|
"placeholder": "",
|
|
"validation": ["number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "stopbits",
|
|
"type": "edit",
|
|
"label": "Stopbits: ",
|
|
"default": "",
|
|
"placeholder": "e.g. 1",
|
|
"validation": ["number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "preamble",
|
|
"type": "edit",
|
|
"label": "Preamble (miliseconds): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 150",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "txtail",
|
|
"type": "edit",
|
|
"label": "TX Tail (miliseconds): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 10",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "slottime",
|
|
"type": "edit",
|
|
"label": "slottime (miliseconds): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 20",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "persistence",
|
|
"type": "edit",
|
|
"label": "Persistence (miliseconds): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 200",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"additional_options": [
|
|
{
|
|
"config_key": "id_callsign",
|
|
"type": "edit",
|
|
"label": "ID Callsign: ",
|
|
"default": "",
|
|
"placeholder": "e.g. MYCALL-0",
|
|
"validation": [""],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "id_interval",
|
|
"type": "edit",
|
|
"label": "ID Interval (Seconds): ",
|
|
"placeholder": "e.g. 600",
|
|
"default": "",
|
|
"validation": ['number'],
|
|
"transform": lambda x: "" if x == "" else int(x)
|
|
},
|
|
{
|
|
"config_key": "flow_control",
|
|
"type": "checkbox",
|
|
"label": "Flow Control ",
|
|
"validation": [],
|
|
"transform": lambda x: "" if x == "" else bool(x)
|
|
},
|
|
]
|
|
}
|
|
],
|
|
"AX25KISSInterface": [
|
|
get_port_field(),
|
|
{
|
|
"config_key": "callsign",
|
|
"type": "edit",
|
|
"label": "Callsign: ",
|
|
"default": "",
|
|
"placeholder": "e.g. NO1CLL",
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "ssid",
|
|
"type": "edit",
|
|
"label": "SSID: ",
|
|
"default": "",
|
|
"placeholder": "e.g. 0",
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "speed",
|
|
"type": "edit",
|
|
"label": "Speed (bps): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 115200",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "databits",
|
|
"type": "edit",
|
|
"label": "Databits: ",
|
|
"default": "",
|
|
"placeholder": "e.g. 8",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "parity",
|
|
"type": "edit",
|
|
"label": "Parity: ",
|
|
"default": "",
|
|
"placeholder": "",
|
|
"validation": ["number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "stopbits",
|
|
"type": "edit",
|
|
"label": "Stopbits: ",
|
|
"default": "",
|
|
"placeholder": "e.g. 1",
|
|
"validation": ["number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "preamble",
|
|
"type": "edit",
|
|
"label": "Preamble (miliseconds): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 150",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "txtail",
|
|
"type": "edit",
|
|
"label": "TX Tail (miliseconds): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 10",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "slottime",
|
|
"type": "edit",
|
|
"label": "Slottime (miliseconds): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 20",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"config_key": "persistence",
|
|
"type": "edit",
|
|
"label": "Persistence (miliseconds): ",
|
|
"default": "",
|
|
"placeholder": "e.g. 200",
|
|
"validation": ["required", "number"],
|
|
"transform": lambda x: "" if x == "" else int(x.strip())
|
|
},
|
|
{
|
|
"additional_options": [
|
|
{
|
|
"config_key": "flow_control",
|
|
"type": "checkbox",
|
|
"label": "Flow Control ",
|
|
"validation": [],
|
|
"transform": lambda x: "" if x == "" else bool(x)
|
|
},
|
|
]
|
|
}
|
|
],
|
|
"CustomInterface": [
|
|
{
|
|
"config_key": "type",
|
|
"type": "edit",
|
|
"label": "Interface Type: ",
|
|
"default": "",
|
|
"placeholder": "Name of custom interface class",
|
|
"validation": ["required"],
|
|
"transform": lambda x: x.strip()
|
|
},
|
|
{
|
|
"config_key": "custom_parameters",
|
|
"type": "keyvaluepairs",
|
|
"label": "Parameters: ",
|
|
"validation": [],
|
|
"transform": lambda x: x
|
|
},
|
|
],
|
|
"default": [
|
|
{
|
|
|
|
},
|
|
]
|
|
}
|
|
|
|
### INTERFACE WIDGETS ####
|
|
class SelectableInterfaceItem(urwid.WidgetWrap):
|
|
def __init__(self, parent, name, is_connected, is_enabled, iface_type, tx, rx, icon="?", iface_options=None):
|
|
self.parent = parent
|
|
self._selectable = True
|
|
self.icon = icon
|
|
self.name = name
|
|
self.is_connected = is_connected
|
|
self.is_enabled = is_enabled
|
|
self.iface_options = iface_options
|
|
|
|
|
|
if is_enabled:
|
|
enabled_txt = ("connected_status", "Enabled")
|
|
else:
|
|
enabled_txt = ("disconnected_status", "Disabled")
|
|
|
|
if is_connected:
|
|
connected_txt = ("connected_status", "Connected")
|
|
else:
|
|
connected_txt = ("disconnected_status", "Disconnected")
|
|
|
|
self.selection_txt = urwid.Text(" ")
|
|
self.title_widget = urwid.Text(("interface_title", f"{icon} {name}"))
|
|
|
|
title_content = urwid.Columns([
|
|
(4, self.selection_txt),
|
|
self.title_widget,
|
|
])
|
|
|
|
self.tx_widget = urwid.Text(("value", format_bytes(tx)))
|
|
self.rx_widget = urwid.Text(("value", format_bytes(rx)))
|
|
|
|
self.status_widget = urwid.Text(enabled_txt)
|
|
self.connection_widget = urwid.Text(connected_txt)
|
|
|
|
rows = [
|
|
urwid.Columns([
|
|
(10, urwid.Text(("key", "Status: "))),
|
|
(10, self.status_widget),
|
|
(3, urwid.Text(" | ")),
|
|
self.connection_widget,
|
|
]),
|
|
|
|
urwid.Columns([
|
|
(10, urwid.Text(("key", "Type:"))),
|
|
urwid.Text(("value", iface_type)),
|
|
]),
|
|
|
|
urwid.Divider("-"),
|
|
|
|
urwid.Columns([
|
|
(10, urwid.Text(("key", "TX:"))),
|
|
(15, self.tx_widget),
|
|
(10, urwid.Text(("key", "RX:"))),
|
|
self.rx_widget,
|
|
]),
|
|
]
|
|
|
|
pile_contents = [title_content] + rows
|
|
|
|
pile = urwid.Pile(pile_contents)
|
|
|
|
padded_body = urwid.Padding(pile, left=2, right=2)
|
|
|
|
box = urwid.LineBox(
|
|
padded_body,
|
|
title=None,
|
|
#todo
|
|
tlcorner="╭", tline="─",
|
|
trcorner="╮", lline="│",
|
|
rline="│", blcorner="╰",
|
|
bline="─", brcorner="╯"
|
|
)
|
|
|
|
super().__init__(box)
|
|
|
|
def update_status_display(self):
|
|
if self.is_enabled:
|
|
self.status_widget.set_text(("connected_status", "Enabled"))
|
|
else:
|
|
self.status_widget.set_text(("disconnected_status", "Disabled"))
|
|
|
|
def selectable(self):
|
|
return True
|
|
|
|
def render(self, size, focus=False):
|
|
self.selection_txt.set_text(self.parent.g['selected'] if focus else self.parent.g['unselected'])
|
|
|
|
if focus:
|
|
self.title_widget.set_text(
|
|
("interface_title_selected", f"{self.icon} {self.name}"))
|
|
else:
|
|
self.title_widget.set_text(("interface_title", f"{self.icon} {self.name}"))
|
|
|
|
return super().render(size, focus=focus)
|
|
|
|
def keypress(self, size, key):
|
|
if key == "up":
|
|
listbox = self.parent.box_adapter._original_widget
|
|
walker = listbox.body
|
|
|
|
interface_items = [i for i, item in enumerate(walker)
|
|
if isinstance(item, SelectableInterfaceItem)]
|
|
|
|
if interface_items and walker[listbox.focus_position] is self and \
|
|
listbox.focus_position == interface_items[0]:
|
|
self.parent.app.ui.main_display.frame.focus_position = "header"
|
|
return None
|
|
elif key == "enter":
|
|
self.parent.switch_to_show_interface(self.name)
|
|
return None
|
|
return key
|
|
|
|
def update_stats(self, tx, rx):
|
|
self.tx_widget.set_text(("value", format_bytes(tx)))
|
|
self.rx_widget.set_text(("value", format_bytes(rx)))
|
|
|
|
class InterfaceOptionItem(urwid.WidgetWrap):
|
|
def __init__(self, parent_display, label, value):
|
|
self.parent_display = parent_display
|
|
self.label = label
|
|
self.value = value
|
|
self._selectable = True
|
|
|
|
text_widget = urwid.Text(label, align="left")
|
|
super().__init__(urwid.AttrMap(text_widget, "list_normal", focus_map="list_focus"))
|
|
|
|
def selectable(self):
|
|
return True
|
|
|
|
def keypress(self, size, key):
|
|
if key == "enter":
|
|
self.parent_display.dismiss_dialog()
|
|
self.parent_display.switch_to_add_interface(self.value)
|
|
return None
|
|
return super().keypress(size, key)
|
|
|
|
class InterfaceBandwidthChart:
|
|
|
|
def __init__(self, history_length=60, glyphset="unicode"):
|
|
self.history_length = history_length
|
|
self.glyphset = glyphset
|
|
self.rx_rates = [0] * history_length
|
|
self.tx_rates = [0] * history_length
|
|
|
|
self.prev_rx = None
|
|
self.prev_tx = None
|
|
self.prev_time = None
|
|
|
|
self.max_rx_rate = 1
|
|
self.max_tx_rate = 1
|
|
|
|
self.first_update = True
|
|
self.initialization_complete = False
|
|
self.stabilization_updates = 2
|
|
self.update_count = 0
|
|
|
|
self.peak_rx_for_display = 0
|
|
self.peak_tx_for_display = 0
|
|
|
|
def update(self, rx_bytes, tx_bytes):
|
|
current_time = time.time()
|
|
|
|
if self.prev_rx is None or self.first_update:
|
|
self.prev_rx = rx_bytes
|
|
self.prev_tx = tx_bytes
|
|
self.prev_time = current_time
|
|
self.first_update = False
|
|
return
|
|
|
|
time_delta = max(0.1, current_time - self.prev_time)
|
|
|
|
rx_delta = max(0, rx_bytes - self.prev_rx) / time_delta
|
|
tx_delta = max(0, tx_bytes - self.prev_tx) / time_delta
|
|
|
|
self.prev_rx = rx_bytes
|
|
self.prev_tx = tx_bytes
|
|
self.prev_time = current_time
|
|
|
|
self.update_count += 1
|
|
|
|
self.rx_rates.pop(0)
|
|
self.tx_rates.pop(0)
|
|
self.rx_rates.append(rx_delta)
|
|
self.tx_rates.append(tx_delta)
|
|
|
|
if self.update_count >= self.stabilization_updates:
|
|
self.initialization_complete = True
|
|
|
|
self.peak_rx_for_display = max(self.peak_rx_for_display, rx_delta)
|
|
self.peak_tx_for_display = max(self.peak_tx_for_display, tx_delta)
|
|
|
|
|
|
current_rx_max = max(self.rx_rates)
|
|
current_tx_max = max(self.tx_rates)
|
|
|
|
self.max_rx_rate = max(1, current_rx_max)
|
|
self.max_tx_rate = max(1, current_tx_max)
|
|
|
|
def get_charts(self, height=8):
|
|
chart = AsciiChart(glyphset=self.glyphset)
|
|
|
|
rx_data = self.rx_rates.copy()
|
|
tx_data = self.tx_rates.copy()
|
|
|
|
peak_rx = self.peak_rx_for_display if self.initialization_complete else 0
|
|
peak_tx = self.peak_tx_for_display if self.initialization_complete else 0
|
|
|
|
peak_rx_str = format_bytes(peak_rx)
|
|
peak_tx_str = format_bytes(peak_tx)
|
|
|
|
rx_chart = chart.plot(
|
|
[rx_data],
|
|
{
|
|
'height': height,
|
|
'format': '{:8.1f}',
|
|
'min': 0,
|
|
'max': self.max_rx_rate * 1.1,
|
|
}
|
|
)
|
|
|
|
tx_chart = chart.plot(
|
|
[tx_data],
|
|
{
|
|
'height': height,
|
|
'format': '{:8.1f}',
|
|
'min': 0,
|
|
'max': self.max_tx_rate * 1.1,
|
|
}
|
|
)
|
|
|
|
return rx_chart, tx_chart, peak_rx_str, peak_tx_str
|
|
|
|
|
|
class ResponsiveChartContainer(urwid.WidgetWrap):
|
|
|
|
def __init__(self, rx_box, tx_box, min_cols_for_horizontal=100):
|
|
self.rx_box = rx_box
|
|
self.tx_box = tx_box
|
|
self.min_cols_for_horizontal = min_cols_for_horizontal
|
|
|
|
self.horizontal_layout = urwid.Columns([
|
|
(urwid.WEIGHT, 1, self.rx_box),
|
|
(urwid.WEIGHT, 1, self.tx_box)
|
|
])
|
|
|
|
self.vertical_layout = urwid.Pile([
|
|
self.rx_box,
|
|
self.tx_box
|
|
])
|
|
|
|
self.layout = urwid.WidgetPlaceholder(self.horizontal_layout)
|
|
|
|
super().__init__(self.layout)
|
|
|
|
def render(self, size, focus=False):
|
|
maxcol = size[0] if len(size) > 0 else 0
|
|
|
|
if maxcol >= self.min_cols_for_horizontal and self.layout.original_widget is not self.horizontal_layout:
|
|
self.layout.original_widget = self.horizontal_layout
|
|
elif maxcol < self.min_cols_for_horizontal and self.layout.original_widget is not self.vertical_layout:
|
|
self.layout.original_widget = self.vertical_layout
|
|
|
|
return super().render(size, focus)
|
|
|
|
### URWID FILLER ###
|
|
class InterfaceFiller(urwid.WidgetWrap):
|
|
def __init__(self, widget, app):
|
|
self.app = app
|
|
self.filler = urwid.Filler(widget, urwid.TOP)
|
|
super().__init__(self.filler)
|
|
|
|
def keypress(self, size, key):
|
|
if key == "ctrl a":
|
|
# add interface
|
|
self.app.ui.main_display.sub_displays.interface_display.add_interface()
|
|
return
|
|
elif key == "ctrl x":
|
|
# remove Interface
|
|
self.app.ui.main_display.sub_displays.interface_display.remove_selected_interface()
|
|
return
|
|
elif key == "ctrl e":
|
|
# edit interface
|
|
self.app.ui.main_display.sub_displays.interface_display.edit_selected_interface()
|
|
return None
|
|
elif key == "ctrl w":
|
|
# open config file editor
|
|
self.app.ui.main_display.sub_displays.interface_display.open_config_editor()
|
|
return None
|
|
|
|
return super().keypress(size, key)
|
|
|
|
### VIEWS ###
|
|
class AddInterfaceView(urwid.WidgetWrap):
|
|
def __init__(self, parent, iface_type):
|
|
self.parent = parent
|
|
self.iface_type = iface_type
|
|
self.fields = {}
|
|
self.port_pile = None
|
|
self.additional_fields = {}
|
|
self.additional_pile_contents = []
|
|
self.common_fields = {}
|
|
|
|
self.parent.shortcuts_display.set_add_interface_shortcuts()
|
|
|
|
name_field = FormEdit(
|
|
config_key="name",
|
|
placeholder="Enter interface name",
|
|
validation_types=["required"]
|
|
)
|
|
self.fields['name'] = {
|
|
'label': "Name: ",
|
|
'widget': name_field
|
|
}
|
|
|
|
config = INTERFACE_FIELDS.get(iface_type, INTERFACE_FIELDS["default"])
|
|
iface_fields = [field for field in config if "config_key" in field]
|
|
|
|
for field in iface_fields:
|
|
self._initialize_field(field)
|
|
|
|
self._initialize_additional_fields(config)
|
|
|
|
self._initialize_common_fields()
|
|
|
|
pile_items = self._build_form_layout(iface_fields)
|
|
|
|
form_pile = urwid.Pile(pile_items)
|
|
form_filler = urwid.Filler(form_pile, valign="top")
|
|
form_box = urwid.LineBox(
|
|
form_filler,
|
|
title="Add Interface",
|
|
tlcorner="╭", tline="─",
|
|
trcorner="╮", lline="│",
|
|
rline="│", blcorner="╰",
|
|
bline="─", brcorner="╯"
|
|
)
|
|
|
|
background = urwid.SolidFill(" ")
|
|
self.overlay = urwid.Overlay(
|
|
top_w=form_box,
|
|
bottom_w=background,
|
|
align='center',
|
|
width=('relative', 85),
|
|
valign='middle',
|
|
height=('relative', 85),
|
|
)
|
|
super().__init__(self.overlay)
|
|
|
|
def _initialize_field(self, field):
|
|
if field["type"] == "dropdown":
|
|
widget = FormDropdown(
|
|
config_key=field["config_key"],
|
|
label=field.get("label", ""),
|
|
options=field["options"],
|
|
default=field.get("default"),
|
|
validation_types=field.get("validation", []),
|
|
transform=field.get("transform")
|
|
)
|
|
elif field["type"] == "checkbox":
|
|
widget = FormCheckbox(
|
|
config_key=field["config_key"],
|
|
label=field.get("label", ""),
|
|
state=field.get("default", False),
|
|
validation_types=field.get("validation", []),
|
|
transform=field.get("transform")
|
|
)
|
|
elif field["type"] == "multilist":
|
|
widget = FormMultiList(
|
|
config_key=field["config_key"],
|
|
placeholder=field.get("placeholder", ""),
|
|
validation_types=field.get("validation", []),
|
|
transform=field.get("transform")
|
|
)
|
|
elif field["type"] == "multitable":
|
|
widget = FormMultiTable(
|
|
config_key=field["config_key"],
|
|
fields=field.get("fields", {}),
|
|
validation_types=field.get("validation", []),
|
|
transform=field.get("transform")
|
|
)
|
|
elif field["type"] == "keyvaluepairs":
|
|
widget = FormKeyValuePairs(
|
|
config_key=field["config_key"],
|
|
validation_types=field.get("validation", []),
|
|
transform=field.get("transform")
|
|
)
|
|
else:
|
|
widget = FormEdit(
|
|
config_key=field["config_key"],
|
|
caption="",
|
|
edit_text=field.get("default", ""),
|
|
placeholder=field.get("placeholder", ""),
|
|
validation_types=field.get("validation", []),
|
|
transform=field.get("transform")
|
|
)
|
|
|
|
label = field.get("label", "")
|
|
if not label:
|
|
label = " ".join(word.capitalize() for word in field["config_key"].split('_')) + ": "
|
|
|
|
self.fields[field["config_key"]] = {
|
|
'label': label,
|
|
'widget': widget
|
|
}
|
|
|
|
def _initialize_additional_fields(self, config):
|
|
for field in config:
|
|
if isinstance(field, dict) and "additional_options" in field:
|
|
for option in field["additional_options"]:
|
|
if option["type"] == "checkbox":
|
|
widget = FormCheckbox(
|
|
config_key=option["config_key"],
|
|
label=option.get("label", ""),
|
|
state=option.get("default", False),
|
|
validation_types=option.get("validation", []),
|
|
transform=option.get("transform")
|
|
)
|
|
elif option["type"] == "dropdown":
|
|
widget = FormDropdown(
|
|
config_key=option["config_key"],
|
|
label=option.get("label", ""),
|
|
options=option["options"],
|
|
default=option.get("default"),
|
|
validation_types=option.get("validation", []),
|
|
transform=option.get("transform")
|
|
)
|
|
elif option["type"] == "multilist":
|
|
widget = FormMultiList(
|
|
config_key=option["config_key"],
|
|
placeholder=option.get("placeholder", ""),
|
|
validation_types=option.get("validation", []),
|
|
transform=option.get("transform")
|
|
)
|
|
else:
|
|
widget = FormEdit(
|
|
config_key=option["config_key"],
|
|
caption="",
|
|
edit_text=str(option.get("default", "")),
|
|
placeholder=option.get("placeholder", ""),
|
|
validation_types=option.get("validation", []),
|
|
transform=option.get("transform")
|
|
)
|
|
|
|
label = option.get("label", "")
|
|
if not label:
|
|
label = " ".join(word.capitalize() for word in option["config_key"].split('_')) + ": "
|
|
|
|
self.additional_fields[option["config_key"]] = {
|
|
'label': label,
|
|
'widget': widget,
|
|
'type': option["type"]
|
|
}
|
|
|
|
def _initialize_common_fields(self):
|
|
if self.parent.app.rns.transport_enabled():
|
|
# Transport mode options
|
|
COMMON_INTERFACE_OPTIONS.extend([
|
|
{
|
|
"config_key": "outgoing",
|
|
"type": "checkbox",
|
|
"label": "Allow outgoing traffic",
|
|
"default": True,
|
|
"validation": [],
|
|
"transform": lambda x: bool(x)
|
|
},
|
|
{
|
|
"config_key": "mode",
|
|
"type": "dropdown",
|
|
"label": "Interface Mode: ",
|
|
"options": ["full", "gateway", "access_point", "roaming", "boundary"],
|
|
"default": "full",
|
|
"validation": [],
|
|
"transform": lambda x: x
|
|
},
|
|
{
|
|
"config_key": "announce_cap",
|
|
"type": "edit",
|
|
"label": "Announce Cap: ",
|
|
"placeholder": "Default: 2.0",
|
|
"default": "",
|
|
"validation": ["float"],
|
|
"transform": lambda x: float(x) if x.strip() else 2.0
|
|
}
|
|
])
|
|
|
|
for option in COMMON_INTERFACE_OPTIONS:
|
|
if option["type"] == "checkbox":
|
|
widget = FormCheckbox(
|
|
config_key=option["config_key"],
|
|
label=option["label"],
|
|
state=option.get("default", False),
|
|
validation_types=option.get("validation", []),
|
|
transform=option.get("transform")
|
|
)
|
|
elif option["type"] == "dropdown":
|
|
widget = FormDropdown(
|
|
config_key=option["config_key"],
|
|
label=option["label"],
|
|
options=option["options"],
|
|
default=option.get("default"),
|
|
validation_types=option.get("validation", []),
|
|
transform=option.get("transform")
|
|
)
|
|
else:
|
|
widget = FormEdit(
|
|
config_key=option["config_key"],
|
|
caption="",
|
|
edit_text=str(option.get("default", "")),
|
|
placeholder=option.get("placeholder", ""),
|
|
validation_types=option.get("validation", []),
|
|
transform=option.get("transform")
|
|
)
|
|
|
|
self.common_fields[option["config_key"]] = {
|
|
'label': option["label"],
|
|
'widget': widget,
|
|
'type': option["type"]
|
|
}
|
|
|
|
def _on_rnode_field_change(self, widget, new_value):
|
|
if hasattr(self, 'rnode_calculator') and self.calculator_visible:
|
|
self.rnode_calculator.update_calculation()
|
|
|
|
def _build_form_layout(self, iface_fields):
|
|
pile_items = []
|
|
pile_items.append(urwid.Text(
|
|
("form_title", f"Add new {_get_interface_icon(self.parent.glyphset, self.iface_type)} {self.iface_type}"),
|
|
align="center"))
|
|
pile_items.append(urwid.Divider("─"))
|
|
|
|
for key in ["name"] + [f["config_key"] for f in iface_fields]:
|
|
field = self.fields[key]
|
|
widget = field["widget"]
|
|
|
|
# Special case for multitable and keyvaluepairs - they already have their own layout
|
|
if isinstance(widget, (FormMultiTable, FormKeyValuePairs)):
|
|
pile_items.append(urwid.Text(("key", field["label"]), align="left"))
|
|
pile_items.append(widget)
|
|
pile_items.append(urwid.Padding(widget.error_widget, left=2))
|
|
continue
|
|
|
|
field_pile = urwid.Pile([
|
|
urwid.Columns([
|
|
(26, urwid.Text(("key", field["label"]), align="right")),
|
|
widget,
|
|
]),
|
|
urwid.Padding(widget.error_widget, left=24)
|
|
])
|
|
|
|
if self.iface_type in ["RNodeInterface", "RNodeMultiInterface", "SerialInterface", "AX25KISSInterface",
|
|
"KISSInterface"] and key == "port":
|
|
refresh_btn = urwid.Button("Refresh Ports", on_press=self.refresh_ports)
|
|
refresh_btn = urwid.AttrMap(refresh_btn, "button_normal", focus_map="button_focus")
|
|
refresh_row = urwid.Padding(refresh_btn, left=26, width=20)
|
|
field_pile.contents.append((refresh_row, field_pile.options()))
|
|
self.port_pile = field_pile
|
|
|
|
pile_items.append(field_pile)
|
|
|
|
self.more_options_visible = False
|
|
self.more_options_button = urwid.Button("Show more options", on_press=self.toggle_more_options)
|
|
self.more_options_button = urwid.AttrMap(self.more_options_button, "button_normal", focus_map="button_focus")
|
|
self.more_options_widget = urwid.Pile([])
|
|
|
|
self.ifac_options_visible = False
|
|
self.ifac_options_button = urwid.Button("Show IFAC options", on_press=self.toggle_ifac_options)
|
|
self.ifac_options_button = urwid.AttrMap(self.ifac_options_button, "button_normal", focus_map="button_focus")
|
|
self.ifac_options_widget = urwid.Pile([])
|
|
|
|
if self.iface_type in ["RNodeInterface"]:
|
|
self.calculator_button = urwid.Button("Show On-Air Calculations", on_press=self.toggle_calculator)
|
|
self.calculator_button = urwid.AttrMap(self.calculator_button, "button_normal", focus_map="button_focus")
|
|
|
|
save_btn = urwid.Button("Save", on_press=self.on_save)
|
|
back_btn = urwid.Button("Cancel", on_press=self.on_back)
|
|
button_row = urwid.Columns([
|
|
(urwid.WEIGHT, 0.45, save_btn),
|
|
(urwid.WEIGHT, 0.1, urwid.Text("")),
|
|
(urwid.WEIGHT, 0.45, back_btn),
|
|
])
|
|
|
|
pile_items.extend([
|
|
urwid.Divider(),
|
|
self.more_options_button,
|
|
self.more_options_widget,
|
|
])
|
|
if self.iface_type in ["RNodeInterface"]:
|
|
self.rnode_calculator = RNodeCalculator(self)
|
|
self.calculator_visible = False
|
|
self.calculator_widget = urwid.Pile([])
|
|
pile_items.extend([
|
|
self.calculator_button,
|
|
self.calculator_widget,
|
|
])
|
|
pile_items.extend([
|
|
urwid.Divider("─"),
|
|
button_row,
|
|
])
|
|
|
|
return pile_items
|
|
|
|
def toggle_more_options(self, button):
|
|
if self.more_options_visible:
|
|
self.more_options_widget.contents = []
|
|
button.base_widget.set_label("Show more options")
|
|
self.more_options_visible = False
|
|
else:
|
|
pile_contents = []
|
|
|
|
if self.additional_fields:
|
|
for key, field in self.additional_fields.items():
|
|
widget = field['widget']
|
|
|
|
if field['type'] == "checkbox":
|
|
centered_widget = urwid.Columns([
|
|
('weight', 1, urwid.Text("")),
|
|
('pack', widget),
|
|
('weight', 1, urwid.Text(""))
|
|
])
|
|
field_pile = urwid.Pile([
|
|
centered_widget,
|
|
urwid.Padding(widget.error_widget, left=24)
|
|
])
|
|
else:
|
|
field_pile = urwid.Pile([
|
|
urwid.Columns([
|
|
(26, urwid.Text(("key", field["label"]), align="right")),
|
|
widget
|
|
]),
|
|
urwid.Padding(widget.error_widget, left=24)
|
|
])
|
|
|
|
pile_contents.append(field_pile)
|
|
|
|
if self.additional_fields and self.common_fields:
|
|
pile_contents.append(urwid.Divider("─"))
|
|
|
|
if self.common_fields:
|
|
for key, field in self.common_fields.items():
|
|
widget = field['widget']
|
|
|
|
if field['type'] == "checkbox":
|
|
centered_widget = urwid.Columns([
|
|
('weight', 1, urwid.Text("")),
|
|
('pack', widget),
|
|
('weight', 1, urwid.Text(""))
|
|
])
|
|
field_pile = urwid.Pile([
|
|
centered_widget,
|
|
urwid.Padding(widget.error_widget, left=24)
|
|
])
|
|
else:
|
|
field_pile = urwid.Pile([
|
|
urwid.Columns([
|
|
(26, urwid.Text(("key", field["label"]), align="right")),
|
|
widget
|
|
]),
|
|
urwid.Padding(widget.error_widget, left=24)
|
|
])
|
|
|
|
pile_contents.append(field_pile)
|
|
|
|
if pile_contents:
|
|
self.more_options_widget.contents = [(w, self.more_options_widget.options()) for w in pile_contents]
|
|
else:
|
|
self.more_options_widget.contents = [(
|
|
urwid.Text("No additional options available", align="center"),
|
|
self.more_options_widget.options()
|
|
)]
|
|
|
|
button.base_widget.set_label("Hide more options")
|
|
self.more_options_visible = True
|
|
|
|
def toggle_ifac_options(self, button):
|
|
if self.ifac_options_visible:
|
|
self.ifac_options_widget.contents = []
|
|
button.base_widget.set_label("Show IFAC options")
|
|
self.ifac_options_visible = False
|
|
else:
|
|
dummy = urwid.Text("IFAC (Interface Access Codes)", align="left")
|
|
self.ifac_options_widget.contents = [(dummy, self.more_options_widget.options())]
|
|
button.base_widget.set_label("Hide IFAC options")
|
|
self.ifac_options_visible = True
|
|
|
|
def toggle_calculator(self, button):
|
|
if self.calculator_visible:
|
|
self.calculator_widget.contents = []
|
|
button.base_widget.set_label("Show On-Air Calculations")
|
|
self.calculator_visible = False
|
|
else:
|
|
calculator_contents = [self.rnode_calculator]
|
|
|
|
self.calculator_widget.contents = [(w, self.calculator_widget.options()) for w in calculator_contents]
|
|
|
|
self.rnode_calculator.update_calculation()
|
|
|
|
button.base_widget.set_label("Hide On-Air Calculations")
|
|
self.calculator_visible = True
|
|
|
|
def refresh_ports(self, button):
|
|
if self.port_pile is not None:
|
|
# Get fresh port config
|
|
port_field = get_port_field()
|
|
|
|
if port_field["type"] == "dropdown":
|
|
widget = FormDropdown(
|
|
config_key=port_field["config_key"],
|
|
label=port_field["label"],
|
|
options=port_field["options"],
|
|
default=port_field.get("default"),
|
|
validation_types=port_field.get("validation", []),
|
|
transform=port_field.get("transform")
|
|
)
|
|
else:
|
|
widget = FormEdit(
|
|
config_key=port_field["config_key"],
|
|
caption="",
|
|
edit_text=port_field.get("default", ""),
|
|
placeholder=port_field.get("placeholder", ""),
|
|
validation_types=port_field.get("validation", []),
|
|
transform=port_field.get("transform")
|
|
)
|
|
|
|
self.fields["port"] = {
|
|
'label': port_field["label"],
|
|
'widget': widget
|
|
}
|
|
|
|
columns = urwid.Columns([
|
|
(26, urwid.Text(("key", port_field["label"]), align="right")),
|
|
widget
|
|
])
|
|
self.port_pile.contents[0] = (columns, self.port_pile.options())
|
|
|
|
self.port_pile.contents[1] = (urwid.Padding(widget.error_widget, left=24), self.port_pile.options())
|
|
|
|
def validate_all(self):
|
|
all_valid = True
|
|
|
|
# validate main fields
|
|
for field in self.fields.values():
|
|
if not field["widget"].validate():
|
|
all_valid = False
|
|
|
|
# validate additional iface fields
|
|
for field in self.additional_fields.values():
|
|
if not field["widget"].validate():
|
|
all_valid = False
|
|
|
|
# validate common fields
|
|
for field in self.common_fields.values():
|
|
if not field["widget"].validate():
|
|
all_valid = False
|
|
|
|
return all_valid
|
|
|
|
def on_save(self, button):
|
|
all_valid = self.validate_all()
|
|
|
|
name = self.fields['name']["widget"].get_value() or "Untitled interface"
|
|
|
|
existing_interfaces = self.parent.app.rns.config['interfaces']
|
|
if name in existing_interfaces:
|
|
self.fields['name']["widget"].error = f"Interface name '{name}' already exists"
|
|
self.fields['name']["widget"].error_widget.set_text(("error", self.fields['name']["widget"].error))
|
|
all_valid = False
|
|
|
|
if not all_valid:
|
|
return
|
|
|
|
if self.iface_type == "CustomInterface":
|
|
custom_type = self.fields.get('type', {}).get('widget').get_value()
|
|
interface_config = {
|
|
"type": custom_type,
|
|
"interface_enabled": True
|
|
}
|
|
else:
|
|
interface_config = {
|
|
"type": self.iface_type,
|
|
"interface_enabled": True
|
|
}
|
|
|
|
for field_key, field in self.fields.items():
|
|
if field_key not in ["name", "custom_parameters", "type"]:
|
|
widget = field["widget"]
|
|
value = widget.get_value()
|
|
|
|
if field_key == "subinterfaces" and self.iface_type == "RNodeMultiInterface" and isinstance(value,
|
|
dict):
|
|
for subname, subconfig in value.items():
|
|
interface_config[f"{subname}"] = subconfig
|
|
elif value is not None and value != "":
|
|
interface_config[widget.config_key] = value
|
|
|
|
if self.iface_type == "CustomInterface" and "custom_parameters" in self.fields:
|
|
custom_params = self.fields["custom_parameters"]["widget"].get_value()
|
|
if isinstance(custom_params, dict):
|
|
for param_key, param_value in custom_params.items():
|
|
interface_config[param_key] = param_value
|
|
|
|
for field_key, field in self.additional_fields.items():
|
|
widget = field["widget"]
|
|
value = widget.get_value()
|
|
if value is not None and value != "":
|
|
interface_config[widget.config_key] = value
|
|
|
|
for field_key, field in self.common_fields.items():
|
|
widget = field["widget"]
|
|
value = widget.get_value()
|
|
if value is not None and value != "":
|
|
interface_config[widget.config_key] = value
|
|
|
|
try:
|
|
interfaces = self.parent.app.rns.config['interfaces']
|
|
interfaces[name] = interface_config
|
|
self.parent.app.rns.config.write()
|
|
|
|
display_type = custom_type if self.iface_type == "CustomInterface" else self.iface_type
|
|
|
|
new_item = SelectableInterfaceItem(
|
|
parent=self.parent,
|
|
name=name,
|
|
is_connected=False, # will always be false until restart
|
|
is_enabled=True,
|
|
iface_type=display_type,
|
|
tx=0,
|
|
rx=0,
|
|
icon=_get_interface_icon(self.parent.glyphset, display_type),
|
|
iface_options=interface_config
|
|
)
|
|
|
|
self.parent.interface_items.append(new_item)
|
|
self.parent._rebuild_list()
|
|
|
|
self.show_message(f"Interface {name} added. Restart NomadNet to start using this interface")
|
|
|
|
except Exception as e:
|
|
print(f"Error saving interface: {str(e)}")
|
|
self.show_message(f"Error: {str(e)}", title="Error")
|
|
|
|
def on_back(self, button):
|
|
self.parent.switch_to_list()
|
|
|
|
def show_message(self, message, title="Notice"):
|
|
def dismiss_dialog(button):
|
|
self.parent.switch_to_list()
|
|
|
|
dialog = DialogLineBox(
|
|
urwid.Pile([
|
|
urwid.Text(message, align="center"),
|
|
urwid.Divider(),
|
|
urwid.Button("OK", on_press=dismiss_dialog)
|
|
]),
|
|
title=title
|
|
)
|
|
|
|
overlay = urwid.Overlay(
|
|
dialog,
|
|
self.parent.interfaces_display,
|
|
align='center',
|
|
width=100,
|
|
valign='middle',
|
|
height=8,
|
|
min_width=1,
|
|
min_height=1
|
|
)
|
|
|
|
self.parent.widget = overlay
|
|
self.parent.app.ui.main_display.update_active_sub_display()
|
|
|
|
|
|
class EditInterfaceView(AddInterfaceView):
|
|
def __init__(self, parent, iface_name):
|
|
self.parent = parent
|
|
self.iface_name = iface_name
|
|
|
|
self.interface_config = parent.app.rns.config['interfaces'][iface_name]
|
|
config_type = self.interface_config.get("type", "Unknown")
|
|
|
|
# check if this is a custom interface type
|
|
known_types = list(INTERFACE_FIELDS.keys())
|
|
if config_type not in known_types:
|
|
self.original_type = config_type
|
|
self.iface_type = "CustomInterface"
|
|
else:
|
|
self.original_type = None
|
|
self.iface_type = config_type
|
|
|
|
super().__init__(parent, self.iface_type)
|
|
|
|
self.overlay.top_w.title_widget.set_text(f"Edit Interface: {iface_name}")
|
|
|
|
self._populate_form_fields()
|
|
|
|
def _populate_form_fields(self):
|
|
self.fields['name']['widget'].edit_text = self.iface_name
|
|
|
|
if self.original_type and self.iface_type == "CustomInterface":
|
|
if "type" in self.fields:
|
|
self.fields["type"]["widget"].edit_text = self.original_type
|
|
|
|
if "custom_parameters" in self.fields:
|
|
custom_params = {}
|
|
standard_keys = ["type", "interface_enabled", "enabled", "description",
|
|
"network_name", "bitrate", "passphrase", "ifac_size",
|
|
"mode", "outgoing", "announce_cap"]
|
|
|
|
for key, value in self.interface_config.items():
|
|
if key not in standard_keys:
|
|
custom_params[key] = value
|
|
|
|
self.fields["custom_parameters"]["widget"].set_value(custom_params)
|
|
|
|
for key, field in self.fields.items():
|
|
if key not in ['name', 'type', 'custom_parameters']:
|
|
widget = field['widget']
|
|
|
|
if key == "subinterfaces" and isinstance(widget, FormMultiTable):
|
|
self._populate_subinterfaces(widget)
|
|
continue
|
|
|
|
if key in self.interface_config:
|
|
value = self.interface_config[key]
|
|
|
|
if key == 'frequency':
|
|
value = float(value) / 1000000
|
|
value = f"{value:.6f}".rstrip('0').rstrip('.') if '.' in f"{value:.6f}" else f"{value}"
|
|
|
|
self._set_field_value(widget, value)
|
|
|
|
for key, field in self.additional_fields.items():
|
|
if key in self.interface_config:
|
|
self._set_field_value(field['widget'], self.interface_config[key])
|
|
|
|
for key, field in self.common_fields.items():
|
|
if key in self.interface_config:
|
|
self._set_field_value(field['widget'], self.interface_config[key])
|
|
|
|
def _set_field_value(self, widget, value):
|
|
if hasattr(widget, 'edit_text'):
|
|
widget.edit_text = str(value)
|
|
elif hasattr(widget, 'set_state'):
|
|
checkbox_state = value if isinstance(value, bool) else value.strip().lower() not in (
|
|
'false', 'off', 'no', '0')
|
|
widget.set_state(checkbox_state)
|
|
elif isinstance(widget, FormDropdown):
|
|
str_value = str(value)
|
|
if str_value in widget.options:
|
|
widget.selected = str_value
|
|
widget.main_button.base_widget.set_text(str_value)
|
|
else:
|
|
# Try to match after transform
|
|
for opt in widget.options:
|
|
try:
|
|
if widget.transform(opt) == value:
|
|
widget.selected = opt
|
|
widget.main_button.base_widget.set_text(opt)
|
|
break
|
|
except:
|
|
pass
|
|
elif isinstance(widget, FormMultiList):
|
|
self._populate_multilist(widget, value)
|
|
|
|
def _populate_multilist(self, widget, value):
|
|
items = []
|
|
if isinstance(value, str):
|
|
items = [item.strip() for item in value.split(',') if item.strip()]
|
|
elif isinstance(value, list):
|
|
items = value
|
|
|
|
while len(widget.entries) > 1:
|
|
widget.remove_entry(None, widget.entries[-1])
|
|
|
|
if items:
|
|
first_entry = widget.entries[0]
|
|
first_edit = first_entry.contents[0][0]
|
|
if len(items) > 0:
|
|
first_edit.edit_text = items[0]
|
|
|
|
for i in range(1, len(items)):
|
|
widget.add_entry(None)
|
|
entry = widget.entries[i]
|
|
edit_widget = entry.contents[0][0]
|
|
edit_widget.edit_text = items[i]
|
|
|
|
def _populate_subinterfaces(self, widget):
|
|
subinterfaces = {}
|
|
|
|
for key, value in self.interface_config.items():
|
|
if isinstance(value, dict):
|
|
if key.startswith('[[[') and key.endswith(']]]'):
|
|
clean_key = key[3:-3] # removes [[[...]]]
|
|
subinterfaces[clean_key] = value
|
|
elif key not in ["type", "interface_enabled", "enabled", "port",
|
|
"id_callsign", "id_interval"]:
|
|
subinterfaces[key] = value
|
|
|
|
if subinterfaces:
|
|
widget.set_value(subinterfaces)
|
|
|
|
def on_save(self, button):
|
|
if not self.validate_all():
|
|
return
|
|
|
|
new_name = self.fields['name']["widget"].get_value() or self.iface_name
|
|
|
|
if new_name != self.iface_name and new_name in self.parent.app.rns.config['interfaces']:
|
|
self.fields['name']["widget"].error = f"Interface name '{new_name}' already exists"
|
|
self.fields['name']["widget"].error_widget.set_text(("error", self.fields['name']["widget"].error))
|
|
return
|
|
|
|
if self.iface_type == "CustomInterface":
|
|
interface_type = self.fields.get('type', {}).get('widget').get_value()
|
|
else:
|
|
interface_type = self.iface_type
|
|
|
|
updated_config = {
|
|
"type": interface_type,
|
|
"interface_enabled": True
|
|
}
|
|
|
|
for field_key, field in self.fields.items():
|
|
if field_key not in ["name", "custom_parameters", "type", "subinterfaces"]:
|
|
widget = field["widget"]
|
|
value = widget.get_value()
|
|
if value is not None and value != "":
|
|
updated_config[widget.config_key] = value
|
|
|
|
if self.iface_type == "CustomInterface" and "custom_parameters" in self.fields:
|
|
custom_params = self.fields["custom_parameters"]["widget"].get_value()
|
|
if isinstance(custom_params, dict):
|
|
for param_key, param_value in custom_params.items():
|
|
updated_config[param_key] = param_value
|
|
|
|
elif self.iface_type == "RNodeMultiInterface" and "subinterfaces" in self.fields:
|
|
subinterfaces = self.fields["subinterfaces"]["widget"].get_value()
|
|
for subname, subconfig in subinterfaces.items():
|
|
updated_config[subname] = subconfig
|
|
|
|
for field_key, field in self.additional_fields.items():
|
|
widget = field["widget"]
|
|
value = widget.get_value()
|
|
if value is not None and value != "":
|
|
updated_config[widget.config_key] = value
|
|
|
|
for field_key, field in self.common_fields.items():
|
|
widget = field["widget"]
|
|
value = widget.get_value()
|
|
if value is not None and value != "":
|
|
updated_config[widget.config_key] = value
|
|
|
|
try:
|
|
interfaces = self.parent.app.rns.config['interfaces']
|
|
|
|
if new_name != self.iface_name:
|
|
del interfaces[self.iface_name]
|
|
interfaces[new_name] = updated_config
|
|
|
|
for i, item in enumerate(self.parent.interface_items):
|
|
if item.name == self.iface_name:
|
|
self.parent.interface_items[i].name = new_name
|
|
break
|
|
else:
|
|
interfaces[self.iface_name] = updated_config
|
|
|
|
self.parent.app.rns.config.write()
|
|
|
|
display_type = interface_type
|
|
|
|
for item in self.parent.interface_items:
|
|
if item.name == new_name:
|
|
item.iface_type = display_type
|
|
break
|
|
|
|
self.parent._rebuild_list()
|
|
self.show_message(f"Interface {new_name} updated. Restart NomadNet for these changes to take effect")
|
|
|
|
except Exception as e:
|
|
print(f"Error saving interface: {str(e)}")
|
|
self.show_message(f"Error updating interface: {str(e)}", title="Error")
|
|
|
|
|
|
class ShowInterface(urwid.WidgetWrap):
|
|
def __init__(self, parent, iface_name):
|
|
self.parent = parent
|
|
self.iface_name = iface_name
|
|
self.started = False
|
|
self.g = self.parent.app.ui.glyphs
|
|
|
|
# get config
|
|
self.interface_config = self.parent.app.rns.config['interfaces'][iface_name]
|
|
iface_type = self.interface_config.get("type", "Unknown")
|
|
|
|
self.parent.shortcuts_display.set_show_interface_shortcuts()
|
|
|
|
self.config_rows = []
|
|
|
|
self.history_length=60
|
|
|
|
# get interface stats
|
|
interface_stats = self.parent.app.rns.get_interface_stats()
|
|
stats_lookup = {iface['short_name']: iface for iface in interface_stats['interfaces']}
|
|
self.stats = stats_lookup.get(iface_name, {})
|
|
|
|
self.tx = self.stats.get("txb", 0)
|
|
self.rx = self.stats.get("rxb", 0)
|
|
self.is_connected = self.stats.get("status", False)
|
|
self.is_enabled = (str(self.interface_config.get("enabled")).lower() != 'false' and
|
|
str(self.interface_config.get("interface_enabled")).lower() != 'false')
|
|
|
|
header_content = [
|
|
urwid.Text(("interface_title", f"Interface: {iface_name}"), align="center"),
|
|
urwid.Divider("=")
|
|
]
|
|
header = urwid.Pile(header_content)
|
|
|
|
self.edit_button = urwid.Button("Edit", on_press=self.on_edit)
|
|
self.back_button = urwid.Button("Back", on_press=self.on_back)
|
|
|
|
self.toggle_button = urwid.Button(
|
|
"Disable" if self.is_enabled else "Enable",
|
|
on_press=self.on_toggle_enabled
|
|
)
|
|
|
|
button_row = urwid.Columns([
|
|
(urwid.WEIGHT, 0.3, self.back_button),
|
|
(urwid.WEIGHT, 0.05, urwid.Text("")),
|
|
(urwid.WEIGHT, 0.3, self.toggle_button),
|
|
(urwid.WEIGHT, 0.05, urwid.Text("")),
|
|
(urwid.WEIGHT, 0.3, self.edit_button),
|
|
])
|
|
|
|
footer_content = [
|
|
urwid.Divider("="),
|
|
button_row
|
|
]
|
|
footer = urwid.Pile(footer_content)
|
|
|
|
# status widgets
|
|
self.status_text = urwid.Text(("connected_status" if self.is_enabled else "disconnected_status",
|
|
"Enabled" if self.is_enabled else "Disabled"))
|
|
|
|
self.status_indicator = urwid.Text(("connected_status" if self.is_enabled else "disconnected_status",
|
|
self.parent.g['selected'] if self.is_enabled else self.parent.g[
|
|
'unselected']))
|
|
|
|
self.connection_text = urwid.Text(("connected_status" if self.is_connected else "disconnected_status",
|
|
"Connected" if self.is_connected else "Disconnected"))
|
|
|
|
self.info_rows = [
|
|
urwid.Columns([
|
|
(10, urwid.Text(("key", "Type:"))),
|
|
urwid.Text(("value", f"{_get_interface_icon(self.parent.glyphset, iface_type)} {iface_type}")),
|
|
]),
|
|
urwid.Columns([
|
|
(10, urwid.Text(("key", "Status:"))),
|
|
(4, self.status_indicator),
|
|
(8, self.status_text),
|
|
(3, urwid.Text(" | ")),
|
|
self.connection_text,
|
|
]),
|
|
urwid.Divider("-")
|
|
]
|
|
|
|
self.tx_text = urwid.Text(("value", format_bytes(self.tx)))
|
|
self.rx_text = urwid.Text(("value", format_bytes(self.rx)))
|
|
|
|
self.stat_row = urwid.Columns([
|
|
(10, urwid.Text(("key", "TX:"))),
|
|
(15, self.tx_text),
|
|
(10, urwid.Text(("key", "RX:"))),
|
|
self.rx_text,
|
|
])
|
|
|
|
self.info_rows.append(self.stat_row)
|
|
self.info_rows.append(urwid.Divider("-"))
|
|
|
|
self.bandwidth_chart = InterfaceBandwidthChart(history_length=self.history_length, glyphset=self.parent.glyphset)
|
|
self.bandwidth_chart.update(self.rx, self.tx)
|
|
|
|
self.rx_chart_text = urwid.Text("Loading RX data...", align='left')
|
|
self.tx_chart_text = urwid.Text("Loading TX data...", align='left')
|
|
self.rx_peak_text = urwid.Text("Peak: 0 B/s", align='right')
|
|
self.tx_peak_text = urwid.Text("Peak: 0 B/s", align='right')
|
|
|
|
self.rx_box = urwid.LineBox(
|
|
urwid.Pile([
|
|
urwid.AttrMap(self.rx_chart_text, "rx"),
|
|
self.rx_peak_text
|
|
]),
|
|
title="RX Traffic (60s)"
|
|
)
|
|
|
|
self.tx_box = urwid.LineBox(
|
|
urwid.Pile([
|
|
urwid.AttrMap(self.tx_chart_text, "tx"),
|
|
self.tx_peak_text
|
|
]),
|
|
title="TX Traffic (60s)"
|
|
)
|
|
|
|
self.horizontal_charts = urwid.Columns([
|
|
(urwid.WEIGHT, 1, self.rx_box),
|
|
(urwid.WEIGHT, 1, self.tx_box)
|
|
])
|
|
|
|
self.vertical_charts = urwid.Pile([
|
|
self.rx_box,
|
|
self.tx_box
|
|
])
|
|
|
|
self.disconnected_message = urwid.Filler(
|
|
urwid.Text(("disconnected_status",
|
|
"Charts not available - Interface is not connected"),
|
|
align="center"),
|
|
valign="top"
|
|
)
|
|
self.disconnected_box = urwid.LineBox(self.disconnected_message, title="Bandwidth Charts")
|
|
|
|
self.charts_widget = self.vertical_charts
|
|
self.is_horizontal = False
|
|
|
|
screen = urwid.raw_display.Screen()
|
|
screen_cols, _ = screen.get_cols_rows()
|
|
# RNS.log(screen_cols)
|
|
if screen_cols >= 145:
|
|
self.charts_widget = self.horizontal_charts
|
|
self.is_horizontal = True
|
|
|
|
if not self.is_connected:
|
|
self.charts_widget = self.disconnected_box
|
|
|
|
connection_params = []
|
|
radio_params = []
|
|
network_params = []
|
|
ifac_params = []
|
|
other_params = []
|
|
|
|
# Sort parameters into groups
|
|
for key, value in self.interface_config.items():
|
|
# skip empty
|
|
if value is None or value == "":
|
|
continue
|
|
|
|
# Skip these keys as their shown elsewhere
|
|
if key in ["type", "interface_enabled", "enabled", 'selected_interface_mode', 'name']:
|
|
continue
|
|
|
|
# Connection parameters
|
|
elif key in ["port", "listen_ip", "listen_port", "target_host", "target_port", "device"]:
|
|
connection_params.append((key, value))
|
|
|
|
# Radio parameters
|
|
elif key in ["frequency", "bandwidth", "spreadingfactor", "codingrate", "txpower"]:
|
|
radio_params.append((key, value))
|
|
|
|
# Network parameters
|
|
elif key in ["network_name", "bitrate", "peers", "group_id", "multicast_address_type",
|
|
"discovery_scope", "announce_cap", "mode"]:
|
|
network_params.append((key, value))
|
|
|
|
# IFAC parameters
|
|
elif key in ["passphrase", "ifac_size", "ifac_netname", "ifac_netkey"]:
|
|
ifac_params.append((key, value))
|
|
|
|
else:
|
|
other_params.append((key, value))
|
|
|
|
def create_param_row(key, value):
|
|
if isinstance(value, bool):
|
|
value_str = "Yes" if value else "No"
|
|
elif key == "frequency":
|
|
int_value = int(value)
|
|
value_str = f"{int_value / 1000000:.3f} MHz"
|
|
elif key == "bandwidth":
|
|
int_value = int(value)
|
|
value_str = f"{int_value / 1000:.1f} kHz"
|
|
else:
|
|
value_str = str(value)
|
|
# format display keys: "listen_port" => Listen Port
|
|
display_key = key.replace('_', ' ').title()
|
|
|
|
return urwid.Columns([
|
|
(18, urwid.Text(("key", f"{display_key}:"))),
|
|
urwid.Text(("value", value_str)),
|
|
])
|
|
|
|
if connection_params:
|
|
connection_params.sort(key=lambda x: x[0])
|
|
self.config_rows.append(urwid.Text(("interface_title", "Connection Parameters"), align="left"))
|
|
for key, value in connection_params:
|
|
self.config_rows.append(create_param_row(key, value))
|
|
self.config_rows.append(urwid.Divider("-"))
|
|
|
|
if radio_params:
|
|
radio_params.sort(key=lambda x: x[0])
|
|
self.config_rows.append(urwid.Text(("interface_title", "Radio Parameters"), align="left"))
|
|
for key, value in radio_params:
|
|
self.config_rows.append(create_param_row(key, value))
|
|
self.config_rows.append(urwid.Divider("-"))
|
|
|
|
if network_params:
|
|
network_params.sort(key=lambda x: x[0])
|
|
self.config_rows.append(urwid.Text(("interface_title", "Network Parameters"), align="left"))
|
|
for key, value in network_params:
|
|
self.config_rows.append(create_param_row(key, value))
|
|
self.config_rows.append(urwid.Divider("-"))
|
|
|
|
if ifac_params:
|
|
ifac_params.sort(key=lambda x: x[0])
|
|
self.config_rows.append(urwid.Text(("interface_title", "IFAC Parameters"), align="left"))
|
|
for key, value in ifac_params:
|
|
self.config_rows.append(create_param_row(key, value))
|
|
self.config_rows.append(urwid.Divider("-"))
|
|
|
|
if other_params:
|
|
other_params.sort(key=lambda x: x[0])
|
|
self.config_rows.append(urwid.Text(("interface_title", "Additional Parameters"), align="left"))
|
|
for key, value in other_params:
|
|
self.config_rows.append(create_param_row(key, value))
|
|
self.config_rows.append(urwid.Divider("-"))
|
|
|
|
if not self.config_rows:
|
|
self.config_rows.append(urwid.Text("No additional parameters", align="center"))
|
|
|
|
body_content = []
|
|
body_content.extend(self.info_rows)
|
|
body_content.append(
|
|
self.charts_widget)
|
|
body_content.append(urwid.Divider("-"))
|
|
body_content.extend(self.config_rows)
|
|
|
|
body_pile = urwid.Pile(body_content)
|
|
body_padding = urwid.Padding(body_pile, left=2, right=2)
|
|
|
|
body = urwid.ListBox(urwid.SimpleListWalker([body_padding]))
|
|
|
|
self.frame = urwid.Frame(
|
|
body=body,
|
|
header=header,
|
|
footer=footer
|
|
)
|
|
|
|
self.content_box = urwid.LineBox(self.frame)
|
|
|
|
super().__init__(self.content_box)
|
|
|
|
def update_status_display(self):
|
|
if self.is_enabled:
|
|
self.status_indicator.set_text(("connected_status", self.parent.g['selected']))
|
|
self.status_text.set_text(("connected_status", "Enabled"))
|
|
else:
|
|
self.status_indicator.set_text(("disconnected_status", self.parent.g['unselected']))
|
|
self.status_text.set_text(("disconnected_status", "Disabled"))
|
|
|
|
def update_connection_display(self, is_connected):
|
|
old_connection_state = self.is_connected
|
|
self.is_connected = is_connected
|
|
|
|
self.connection_text.set_text(("connected_status" if self.is_connected else "disconnected_status",
|
|
"Connected" if self.is_connected else "Disconnected"))
|
|
|
|
if old_connection_state != self.is_connected:
|
|
body_pile = self.frame.body.body[0].original_widget
|
|
|
|
chart_index = None
|
|
for i, (widget, options) in enumerate(body_pile.contents):
|
|
if (widget == self.horizontal_charts or
|
|
widget == self.vertical_charts or
|
|
widget == self.disconnected_box):
|
|
chart_index = i
|
|
break
|
|
|
|
if chart_index is not None:
|
|
if self.is_connected:
|
|
new_widget = self.horizontal_charts if self.is_horizontal else self.vertical_charts
|
|
if not self.started:
|
|
self.start()
|
|
else:
|
|
new_widget = self.disconnected_box
|
|
self.started = False
|
|
|
|
body_pile.contents[chart_index] = (new_widget, body_pile.options())
|
|
|
|
def on_toggle_enabled(self, button):
|
|
action = "disable" if self.is_enabled else "enable"
|
|
|
|
def on_confirm_yes(confirm_button):
|
|
self.parent.app.ui.main_display.frame.body = self.parent.app.ui.main_display.sub_displays.active().widget
|
|
|
|
self.is_enabled = not self.is_enabled
|
|
|
|
self.toggle_button.set_label("Disable" if self.is_enabled else "Enable")
|
|
|
|
if "interface_enabled" in self.interface_config:
|
|
self.interface_config["interface_enabled"] = self.is_enabled
|
|
else:
|
|
self.interface_config["enabled"] = self.is_enabled
|
|
|
|
try:
|
|
interfaces = self.parent.app.rns.config['interfaces']
|
|
|
|
interfaces[self.iface_name] = self.interface_config
|
|
|
|
self.parent.app.rns.config.write()
|
|
|
|
self.update_status_display()
|
|
|
|
for item in self.parent.interface_items:
|
|
if item.name == self.iface_name:
|
|
item.is_enabled = self.is_enabled
|
|
item.update_status_display()
|
|
|
|
if hasattr(self.parent.app.ui, 'loop') and self.parent.app.ui.loop is not None:
|
|
self.parent.app.ui.loop.draw_screen()
|
|
|
|
self.show_restart_required_message()
|
|
|
|
except Exception as e:
|
|
self.show_error_message(f"Error updating interface: {str(e)}")
|
|
|
|
def on_confirm_no(confirm_button):
|
|
self.parent.app.ui.main_display.frame.body = self.parent.app.ui.main_display.sub_displays.active().widget
|
|
|
|
confirm_text = urwid.Text((
|
|
"interface_title",
|
|
f"Are you sure you want to {action} the {self.iface_name} interface?"
|
|
), align="center")
|
|
|
|
yes_button = urwid.Button("Yes", on_press=on_confirm_yes)
|
|
no_button = urwid.Button("No", on_press=on_confirm_no)
|
|
|
|
buttons_row = urwid.Columns([
|
|
(urwid.WEIGHT, 0.45, yes_button),
|
|
(urwid.WEIGHT, 0.1, urwid.Text("")),
|
|
(urwid.WEIGHT, 0.45, no_button),
|
|
])
|
|
|
|
pile = urwid.Pile([
|
|
confirm_text,
|
|
urwid.Divider(),
|
|
buttons_row
|
|
])
|
|
|
|
dialog = DialogLineBox(pile, title="Confirm")
|
|
|
|
overlay = urwid.Overlay(
|
|
dialog,
|
|
self.parent.app.ui.main_display.frame.body,
|
|
align='center',
|
|
width=50,
|
|
valign='middle',
|
|
height=7
|
|
)
|
|
|
|
self.parent.app.ui.main_display.frame.body = overlay
|
|
|
|
def show_restart_required_message(self):
|
|
|
|
def dismiss_dialog(button):
|
|
self.parent.app.ui.main_display.frame.body = self.parent.app.ui.main_display.sub_displays.active().widget
|
|
|
|
dialog = DialogLineBox(
|
|
urwid.Pile([
|
|
urwid.Text(
|
|
f"Interface {self.iface_name} has been " +
|
|
("enabled" if self.is_enabled else "disabled") +
|
|
".\nRestart required for changes to take effect.",
|
|
align="center"
|
|
),
|
|
urwid.Divider(),
|
|
urwid.Button("OK", on_press=dismiss_dialog)
|
|
]),
|
|
title="Notice"
|
|
)
|
|
|
|
overlay = urwid.Overlay(
|
|
dialog,
|
|
self.parent.app.ui.main_display.frame.body,
|
|
align='center',
|
|
width=50,
|
|
valign='middle',
|
|
height=8
|
|
)
|
|
|
|
self.parent.app.ui.main_display.frame.body = overlay
|
|
|
|
def show_error_message(self, message):
|
|
|
|
def dismiss_dialog(button):
|
|
self.parent.app.ui.main_display.frame.body = self.parent.app.ui.main_display.sub_displays.active().widget
|
|
|
|
dialog = DialogLineBox(
|
|
urwid.Pile([
|
|
urwid.Text(message, align="center"),
|
|
urwid.Divider(),
|
|
urwid.Button("OK", on_press=dismiss_dialog)
|
|
]),
|
|
title="Error"
|
|
)
|
|
|
|
overlay = urwid.Overlay(
|
|
dialog,
|
|
self.parent.app.ui.main_display.frame.body,
|
|
align='center',
|
|
width=50,
|
|
valign='middle',
|
|
height=8
|
|
)
|
|
|
|
self.parent.app.ui.main_display.frame.body = overlay
|
|
|
|
def keypress(self, size, key):
|
|
if key == 'tab':
|
|
if self.frame.focus_position == 'body':
|
|
self.frame.focus_position = 'footer'
|
|
footer_pile = self.frame.footer
|
|
if isinstance(footer_pile, urwid.Pile):
|
|
footer_pile.focus_position = 1 # button row
|
|
button_row = footer_pile.contents[1][0]
|
|
if isinstance(button_row, urwid.Columns):
|
|
button_row.focus_position = 0
|
|
return None
|
|
elif self.frame.focus_position == 'footer':
|
|
footer_pile = self.frame.footer
|
|
if isinstance(footer_pile, urwid.Pile):
|
|
button_row = footer_pile.contents[1][0]
|
|
if isinstance(button_row, urwid.Columns):
|
|
# If on first button (Back), move to Toggle button
|
|
if button_row.focus_position == 0:
|
|
button_row.focus_position = 2 # skip spacer
|
|
return None
|
|
# If on toggle button, move to Edit button
|
|
elif button_row.focus_position == 2:
|
|
button_row.focus_position = 4
|
|
return None
|
|
# if on edit button wrap back to toggle button
|
|
elif button_row.focus_position == 4:
|
|
button_row.focus_position = 0
|
|
return None
|
|
elif key == 'shift tab':
|
|
if self.frame.focus_position == 'footer':
|
|
self.frame.focus_position = 'body'
|
|
return None
|
|
elif self.frame.focus_position == 'footer':
|
|
footer_pile = self.frame.footer
|
|
if isinstance(footer_pile, urwid.Pile):
|
|
button_row = footer_pile.contents[1][0]
|
|
if isinstance(button_row, urwid.Columns):
|
|
if button_row.focus_position == 4: # edit button
|
|
button_row.focus_position = 2 # toggle button
|
|
return None
|
|
elif button_row.focus_position == 2:
|
|
button_row.focus_position = 0 # back button
|
|
return None
|
|
elif button_row.focus_position == 0: # back button
|
|
self.frame.focus_position = 'body'
|
|
return None
|
|
elif key == 'down':
|
|
if self.frame.focus_position == 'body':
|
|
result = super().keypress(size, key)
|
|
# if the key wasn't consumed, we're at the bottom
|
|
if result == 'down':
|
|
self.frame.focus_position = 'footer'
|
|
footer_pile = self.frame.footer
|
|
if isinstance(footer_pile, urwid.Pile):
|
|
footer_pile.focus_position = 1 # button row
|
|
button_row = footer_pile.contents[1][0]
|
|
if isinstance(button_row, urwid.Columns):
|
|
button_row.focus_position = 0 # focus on back button
|
|
return None
|
|
return result
|
|
elif key == 'up':
|
|
if self.frame.focus_position == 'footer':
|
|
self.frame.focus_position = 'body'
|
|
listbox = self.frame.body
|
|
if hasattr(listbox, 'body') and len(listbox.body) > 0:
|
|
listbox.focus_position = len(listbox.body) - 1
|
|
return None
|
|
elif self.frame.focus_position == 'body':
|
|
result = super().keypress(size, key)
|
|
# if the key wasn't consumed, we're at the top
|
|
if result == 'up':
|
|
pass
|
|
return result
|
|
elif key == "h" and self.is_connected: # horizontal layout
|
|
if not self.is_horizontal:
|
|
self.switch_to_horizontal()
|
|
return None
|
|
elif key == "v" and self.is_connected: # vertical layout
|
|
if self.is_horizontal:
|
|
self.switch_to_vertical()
|
|
return None
|
|
|
|
return super().keypress(size, key)
|
|
|
|
def switch_to_horizontal(self):
|
|
if not self.is_connected:
|
|
return
|
|
|
|
self.is_horizontal = True
|
|
|
|
body_pile = self.frame.body.body[0].original_widget
|
|
for i, (widget, options) in enumerate(body_pile.contents):
|
|
if widget == self.vertical_charts:
|
|
body_pile.contents[i] = (self.horizontal_charts, options)
|
|
self.charts_widget = self.horizontal_charts
|
|
break
|
|
|
|
def switch_to_vertical(self):
|
|
if not self.is_connected:
|
|
return
|
|
|
|
self.is_horizontal = False
|
|
body_pile = self.frame.body.body[0].original_widget
|
|
for i, (widget, options) in enumerate(body_pile.contents):
|
|
if widget == self.horizontal_charts:
|
|
body_pile.contents[i] = (self.vertical_charts, options)
|
|
self.charts_widget = self.vertical_charts
|
|
break
|
|
|
|
def start(self):
|
|
if not self.started and self.is_connected:
|
|
self.started = True
|
|
self.parent.app.ui.loop.set_alarm_in(1, self.update_bandwidth_charts)
|
|
|
|
def update_bandwidth_charts(self, loop, user_data):
|
|
if not self.started:
|
|
return
|
|
|
|
try:
|
|
interface_stats = self.parent.app.rns.get_interface_stats()
|
|
stats_lookup = {iface['short_name']: iface for iface in interface_stats['interfaces']}
|
|
stats = stats_lookup.get(self.iface_name, {})
|
|
|
|
tx = stats.get("txb", self.tx)
|
|
rx = stats.get("rxb", self.rx)
|
|
|
|
new_connection_status = stats.get("status", False)
|
|
if new_connection_status != self.is_connected:
|
|
self.update_connection_display(new_connection_status)
|
|
|
|
if not self.is_connected:
|
|
return
|
|
|
|
self.tx_text.set_text(("value", format_bytes(tx)))
|
|
self.rx_text.set_text(("value", format_bytes(rx)))
|
|
|
|
self.bandwidth_chart.update(rx, tx)
|
|
|
|
rx_chart, tx_chart, peak_rx, peak_tx = self.bandwidth_chart.get_charts(height=8)
|
|
|
|
self.rx_chart_text.set_text(rx_chart)
|
|
self.tx_chart_text.set_text(tx_chart)
|
|
self.rx_peak_text.set_text(f"Peak: {peak_rx}")
|
|
self.tx_peak_text.set_text(f"Peak: {peak_tx}")
|
|
|
|
self.tx = tx
|
|
self.rx = rx
|
|
except Exception as e:
|
|
if not hasattr(self.parent,
|
|
'disconnect_overlay') or self.parent.widget is not self.parent.disconnect_overlay:
|
|
dialog_text = urwid.Pile([
|
|
urwid.Text(("disconnected_status", "(!) RNS Instance Disconnected"), align="center"),
|
|
urwid.Text("Waiting to Reconnect...", align="center")
|
|
])
|
|
dialog_content = urwid.Filler(dialog_text)
|
|
dialog_box = urwid.LineBox(dialog_content)
|
|
|
|
self.parent.disconnect_overlay = urwid.Overlay(
|
|
dialog_box,
|
|
self,
|
|
align='center',
|
|
width=35,
|
|
valign='middle',
|
|
height=4
|
|
)
|
|
|
|
self.parent.widget = self.parent.disconnect_overlay
|
|
self.parent.app.ui.main_display.update_active_sub_display()
|
|
self.started = False
|
|
finally:
|
|
if self.started:
|
|
loop.set_alarm_in(1, self.update_bandwidth_charts)
|
|
|
|
def on_back(self, button):
|
|
self.started = False
|
|
self.parent.switch_to_list()
|
|
|
|
def on_edit(self, button):
|
|
self.started = False
|
|
self.parent.switch_to_edit_interface(self.iface_name)
|
|
|
|
### MAIN DISPLAY ###
|
|
class InterfaceDisplay:
|
|
def __init__(self, app):
|
|
self.app = app
|
|
self.started = False
|
|
self.interface_items = []
|
|
self.glyphset = self.app.config["textui"]["glyphs"]
|
|
self.g = self.app.ui.glyphs
|
|
|
|
self.terminal_cols, self.terminal_rows = urwid.raw_display.Screen().get_cols_rows()
|
|
self.iface_row_offset = 4
|
|
self.list_rows = self.terminal_rows - self.iface_row_offset
|
|
|
|
interfaces = app.rns.config['interfaces']
|
|
processed_interfaces = {}
|
|
|
|
for interface_name, interface in interfaces.items():
|
|
interface_data = interface.copy()
|
|
|
|
# handle sub-interfaces for RNodeMultiInterface
|
|
if interface_data.get("type") == "RNodeMultiInterface":
|
|
sub_interfaces = []
|
|
for sub_name, sub_config in interface_data.items():
|
|
if sub_name not in {"type", "port", "interface_enabled", "selected_interface_mode",
|
|
"configured_bitrate"}:
|
|
if isinstance(sub_config, dict):
|
|
sub_config["name"] = sub_name
|
|
sub_interfaces.append(sub_config)
|
|
|
|
# add sub-interfaces to the main interface data
|
|
interface_data["sub_interfaces"] = sub_interfaces
|
|
|
|
for sub in sub_interfaces:
|
|
del interface_data[sub["name"]]
|
|
|
|
processed_interfaces[interface_name] = interface_data
|
|
|
|
interface_stats = app.rns.get_interface_stats()
|
|
stats_lookup = {interface['short_name']: interface for interface in interface_stats['interfaces']}
|
|
# print(stats_lookup)
|
|
for interface_name, interface_data in processed_interfaces.items():
|
|
# configobj false values
|
|
is_enabled = str(interface_data.get("enabled")).lower() not in ('false', 'off', 'no', '0') and str(interface_data.get("interface_enabled")).lower() not in ('false', 'off', 'no', '0')
|
|
|
|
iface_type = interface_data.get("type", "Unknown")
|
|
icon = _get_interface_icon(self.glyphset, iface_type)
|
|
|
|
stats_for_interface = stats_lookup.get(interface_name)
|
|
|
|
if stats_for_interface:
|
|
tx = stats_for_interface.get("txb", 0)
|
|
rx = stats_for_interface.get("rxb", 0)
|
|
is_connected = stats_for_interface["status"]
|
|
else:
|
|
tx = 0
|
|
rx = 0
|
|
is_connected = False
|
|
|
|
item = SelectableInterfaceItem(
|
|
parent=self,
|
|
name=interface_data.get("name", interface_name),
|
|
is_connected=is_connected,
|
|
is_enabled=is_enabled,
|
|
iface_type=iface_type,
|
|
tx=tx,
|
|
rx=rx,
|
|
icon=icon
|
|
)
|
|
|
|
self.interface_items.append(item)
|
|
|
|
interface_header = urwid.Text(("interface_title", "Interfaces"), align="center")
|
|
if len(self.interface_items) == 0:
|
|
interface_header = urwid.Text(
|
|
("interface_title", "No interfaces found. Press Ctrl + A to add a new interface "), align="center")
|
|
|
|
|
|
list_contents = [
|
|
interface_header,
|
|
urwid.Divider(),
|
|
] + self.interface_items
|
|
|
|
self.list_walker = urwid.SimpleFocusListWalker(list_contents)
|
|
self.list_box = urwid.ListBox(self.list_walker)
|
|
|
|
self.box_adapter = urwid.BoxAdapter(self.list_box, self.list_rows)
|
|
|
|
|
|
pile = urwid.Pile([self.box_adapter])
|
|
self.interfaces_display = InterfaceFiller(pile, self.app)
|
|
self.shortcuts_display = InterfaceDisplayShortcuts(self.app)
|
|
self.widget = self.interfaces_display
|
|
|
|
def start(self):
|
|
# started from Main.py
|
|
self.started = True
|
|
self.app.ui.loop.set_alarm_in(1, self.poll_stats)
|
|
self.app.ui.loop.set_alarm_in(5, self.check_terminal_size)
|
|
|
|
def switch_to_edit_interface(self, iface_name):
|
|
self.edit_interface_view = EditInterfaceView(self, iface_name)
|
|
self.widget = self.edit_interface_view
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
|
|
def edit_selected_interface(self):
|
|
focus_widget, focus_position = self.box_adapter._original_widget.body.get_focus()
|
|
|
|
if not isinstance(focus_widget, SelectableInterfaceItem):
|
|
return
|
|
|
|
selected_item = focus_widget
|
|
interface_name = selected_item.name
|
|
|
|
self.switch_to_edit_interface(interface_name)
|
|
|
|
def check_terminal_size(self, loop, user_data):
|
|
new_cols, new_rows = loop.screen.get_cols_rows()
|
|
|
|
if new_rows != self.terminal_rows or new_cols != self.terminal_cols:
|
|
self.terminal_cols, self.terminal_rows = new_cols, new_rows
|
|
self.list_rows = self.terminal_rows - self.iface_row_offset
|
|
|
|
self.box_adapter.height = self.list_rows
|
|
|
|
loop.draw_screen()
|
|
|
|
if self.started:
|
|
loop.set_alarm_in(5, self.check_terminal_size)
|
|
|
|
def poll_stats(self, loop, user_data):
|
|
try:
|
|
if hasattr(self, 'disconnect_overlay') and self.widget is self.disconnect_overlay:
|
|
self.widget = self.interfaces_display
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
|
|
interface_stats = self.app.rns.get_interface_stats()
|
|
stats_lookup = {iface['short_name']: iface for iface in interface_stats['interfaces']}
|
|
for item in self.interface_items:
|
|
# use interface name as the key
|
|
stats_for_interface = stats_lookup.get(item.name)
|
|
if stats_for_interface:
|
|
tx = stats_for_interface.get("txb", 0)
|
|
rx = stats_for_interface.get("rxb", 0)
|
|
item.update_stats(tx, rx)
|
|
except Exception as e:
|
|
if not hasattr(self, 'disconnect_overlay') or self.widget is not self.disconnect_overlay:
|
|
dialog_text = urwid.Pile([
|
|
urwid.Text(("disconnected_status", "(!) RNS Instance Disconnected"), align="center"),
|
|
urwid.Text(("Waiting to Reconnect..."), align="center")
|
|
])
|
|
dialog_content = urwid.Filler(dialog_text)
|
|
dialog_box = urwid.LineBox(dialog_content)
|
|
|
|
self.disconnect_overlay = urwid.Overlay(
|
|
dialog_box,
|
|
self.interfaces_display,
|
|
align='center',
|
|
width=35,
|
|
valign='middle',
|
|
height=4
|
|
)
|
|
|
|
if self.widget is self.interfaces_display:
|
|
self.widget = self.disconnect_overlay
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
finally:
|
|
if self.started:
|
|
loop.set_alarm_in(1, self.poll_stats)
|
|
|
|
def shortcuts(self):
|
|
return self.shortcuts_display
|
|
|
|
def switch_to_show_interface(self, iface_name):
|
|
show_interface = ShowInterface(self, iface_name)
|
|
self.widget = show_interface
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
|
|
show_interface.start()
|
|
|
|
def switch_to_list(self):
|
|
self.shortcuts_display.reset_shortcuts()
|
|
self.widget = self.interfaces_display
|
|
self._rebuild_list()
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
|
|
def add_interface(self):
|
|
dialog_widgets = []
|
|
|
|
def add_heading(txt):
|
|
dialog_widgets.append(urwid.Text(("interface_title", txt), align="left"))
|
|
|
|
def add_option(label, value):
|
|
item = InterfaceOptionItem(self, label, value)
|
|
dialog_widgets.append(item)
|
|
|
|
# Get the icons based on plain, unicode, nerdfont glyphset
|
|
network_icon = _get_interface_icon(self.glyphset, "AutoInterface")
|
|
rnode_icon = _get_interface_icon(self.glyphset, "RNodeInterface")
|
|
serial_icon = _get_interface_icon(self.glyphset, "SerialInterface")
|
|
other_icon = _get_interface_icon(self.glyphset, "PipeInterface")
|
|
|
|
add_heading(f"{network_icon} IP Networks")
|
|
add_option("Auto Interface", "AutoInterface")
|
|
add_option("TCP Client Interface", "TCPClientInterface")
|
|
add_option("TCP Server Interface", "TCPServerInterface")
|
|
add_option("UDP Interface", "UDPInterface")
|
|
add_option("I2P Interface", "I2PInterface")
|
|
if PLATFORM_IS_LINUX:
|
|
add_option("Backbone Interface", "BackboneInterface")
|
|
|
|
if PYSERIAL_AVAILABLE:
|
|
add_heading(f"{rnode_icon} RNodes")
|
|
add_option("RNode Interface", "RNodeInterface")
|
|
add_option("RNode Multi Interface", "RNodeMultiInterface")
|
|
|
|
add_heading(f"{serial_icon} Hardware")
|
|
add_option("Serial Interface", "SerialInterface")
|
|
add_option("KISS Interface", "KISSInterface")
|
|
add_option("AX.25 KISS Interface", "AX25KISSInterface")
|
|
|
|
add_heading(f"{other_icon} Other")
|
|
add_option("Pipe Interface", "PipeInterface")
|
|
add_option("Custom Interface", "CustomInterface")
|
|
|
|
|
|
listbox = urwid.ListBox(urwid.SimpleFocusListWalker(dialog_widgets))
|
|
dialog = DialogLineBox(listbox, parent=self, title="Select Interface Type")
|
|
|
|
overlay = urwid.Overlay(
|
|
dialog,
|
|
self.interfaces_display,
|
|
align='center',
|
|
width=('relative', 50),
|
|
valign='middle',
|
|
height=('relative', 50),
|
|
min_width=20,
|
|
min_height=15,
|
|
left=2,
|
|
right=2
|
|
)
|
|
self.widget = overlay
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
|
|
def switch_to_add_interface(self, iface_type):
|
|
self.add_interface_view = AddInterfaceView(self, iface_type)
|
|
self.widget = self.add_interface_view
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
|
|
def remove_selected_interface(self):
|
|
focus_widget, focus_position = self.box_adapter._original_widget.body.get_focus()
|
|
if not isinstance(focus_widget, SelectableInterfaceItem):
|
|
return
|
|
|
|
selected_item = focus_widget
|
|
interface_name = selected_item.name
|
|
|
|
def on_confirm_yes(button):
|
|
try:
|
|
if interface_name in self.app.rns.config['interfaces']:
|
|
del self.app.rns.config['interfaces'][interface_name]
|
|
self.app.rns.config.write()
|
|
|
|
if selected_item in self.interface_items:
|
|
self.interface_items.remove(selected_item)
|
|
|
|
self._rebuild_list()
|
|
self.dismiss_dialog()
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
def on_confirm_no(button):
|
|
self.dismiss_dialog()
|
|
|
|
confirm_text = urwid.Text(("interface_title", f"Remove interface {interface_name}?"), align="center")
|
|
yes_button = urwid.Button("Yes", on_press=on_confirm_yes)
|
|
no_button = urwid.Button("No", on_press=on_confirm_no)
|
|
|
|
buttons_row = urwid.Columns([
|
|
(urwid.WEIGHT, 0.45, yes_button),
|
|
(urwid.WEIGHT, 0.1, urwid.Text("")),
|
|
(urwid.WEIGHT, 0.45, no_button),
|
|
])
|
|
|
|
pile = urwid.Pile([
|
|
confirm_text,
|
|
buttons_row
|
|
])
|
|
|
|
dialog = DialogLineBox(pile, parent=self, title="?")
|
|
|
|
overlay = urwid.Overlay(
|
|
dialog,
|
|
self.interfaces_display,
|
|
align='center',
|
|
width=('relative', 35),
|
|
valign='middle',
|
|
height=(5),
|
|
min_width=5,
|
|
left=2,
|
|
right=2
|
|
)
|
|
dialog.original_widget.focus_position = 1 # columns row
|
|
buttons_row = dialog.original_widget.contents[1][0]
|
|
buttons_row.focus_position = 2 # second button "No"
|
|
|
|
self.widget = overlay
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
|
|
def dismiss_dialog(self):
|
|
self.widget = self.interfaces_display
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
|
|
def _rebuild_list(self):
|
|
interface_header = urwid.Text(("interface_title", f"Interfaces ({len(self.interface_items)})"), align="center")
|
|
if len(self.interface_items) == 0:
|
|
interface_header = urwid.Text(("interface_title", "No interfaces found. Press Ctrl + A to add a new interface "), align="center")
|
|
|
|
new_list = [
|
|
interface_header,
|
|
urwid.Divider(),
|
|
] + self.interface_items
|
|
# RNS.log(f"items: {self.interface_items}")
|
|
|
|
walker = urwid.SimpleFocusListWalker(new_list)
|
|
self.box_adapter._original_widget.body = walker
|
|
self.box_adapter._original_widget.focus_position = len(new_list) - 1
|
|
|
|
def open_config_editor(self):
|
|
import platform
|
|
|
|
editor_cmd = self.app.config["textui"]["editor"]
|
|
|
|
if platform.system() == "Darwin" and editor_cmd == "editor":
|
|
editor_cmd = "nano"
|
|
|
|
editor_term = urwid.Terminal(
|
|
(editor_cmd, self.app.rns.configpath),
|
|
encoding='utf-8',
|
|
main_loop=self.app.ui.loop,
|
|
)
|
|
|
|
def quit_term(*args, **kwargs):
|
|
self.widget = self.interfaces_display
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
self.app.ui.main_display.request_redraw()
|
|
|
|
urwid.connect_signal(editor_term, 'closed', quit_term)
|
|
|
|
editor_box = urwid.LineBox(editor_term, title="Editing RNS Config")
|
|
self.widget = editor_box
|
|
self.app.ui.main_display.update_active_sub_display()
|
|
self.app.ui.main_display.frame.focus_position = "body"
|
|
editor_term.change_focus(True)
|
|
|
|
### SHORTCUTS ###
|
|
class InterfaceDisplayShortcuts:
|
|
def __init__(self, app):
|
|
self.app = app
|
|
self.default_shortcuts = "[C-a] Add Interface [C-e] Edit Interface [C-x] Remove Interface [Enter] Show Interface [C-w] Open Text Editor"
|
|
self.current_shortcuts = self.default_shortcuts
|
|
self.widget = urwid.AttrMap(
|
|
urwid.Text(self.current_shortcuts),
|
|
"shortcutbar"
|
|
)
|
|
|
|
def update_shortcuts(self, new_shortcuts):
|
|
self.current_shortcuts = new_shortcuts
|
|
self.widget.original_widget.set_text(new_shortcuts)
|
|
|
|
def reset_shortcuts(self):
|
|
self.update_shortcuts(self.default_shortcuts)
|
|
|
|
def set_show_interface_shortcuts(self):
|
|
show_shortcuts = "[Up/Down] Navigate [Tab] Switch Focus [h] Horizontal Charts [v] Vertical Charts "
|
|
self.update_shortcuts(show_shortcuts)
|
|
|
|
def set_add_interface_shortcuts(self):
|
|
add_shortcuts = "[Up/Down] Navigate Fields [Enter] Select Option"
|
|
self.update_shortcuts(add_shortcuts)
|
|
|
|
def set_edit_interface_shortcuts(self):
|
|
edit_shortcuts = "[Up/Down] Navigate Fields [Enter] Select Option"
|
|
self.update_shortcuts(edit_shortcuts) |